`
wbj0110
  • 浏览: 1611077 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

Spark源码系列(五)分布式缓存

阅读更多

这一章想讲一下Spark的缓存是如何实现的。这个persist方法是在RDD里面的,所以我们直接打开RDD这个类。

复制代码
  def persist(newLevel: StorageLevel): this.type = { // StorageLevel不能随意更改 if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) { throw new UnsupportedOperationException("Cannot change storage level of an RDD after it was already assigned a level")
    }
    sc.persistRDD(this) // Register the RDD with the ContextCleaner for automatic GC-based cleanup
    // 注册清理方法 sc.cleaner.foreach(_.registerRDDForCleanup(this)) storageLevel = newLevel this }
复制代码

它调用SparkContext去缓存这个RDD,追杀下去。

  private[spark] def persistRDD(rdd: RDD[_]) { persistentRdds(rdd.id) = rdd
  }

它居然是用一个HashMap来存的,具体看这个map的类型是TimeStampedWeakValueHashMap[Int, RDD[_]]类型。把存进去的值都隐式转换成WeakReference,然后加到一个内部的一个ConcurrentHashMap里面。这里貌似也没干啥,这是有个鸟蛋用。。大神莫喷,知道干啥用的人希望告诉我一下。

CacheManager

现在并没有保存,等到真正运行Task运行的时候才会去缓存起来。入口在Task的runTask方法里面,具体的我们可以看ResultTask,它调用了RDD的iterator方法。

复制代码
  final def iterator(split: Partition, context: TaskContext): Iterator[T] = { if (storageLevel != StorageLevel.NONE) {
      SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
    } else {
      computeOrReadCheckpoint(split, context)
    }
  }
复制代码

一旦设置了StorageLevel,就要从SparkEnv的cacheManager取数据。

复制代码
  def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel): Iterator[T] = {
    val key = RDDBlockId(rdd.id, split.index)
    blockManager.get(key) match { case Some(values) => // 已经有了,直接返回就可以了 new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]]) case None => // loading包含这个key表示已经有人在加载了,等到loading被释放了,就可以去blockManager里面取到了 loading.synchronized { if (loading.contains(key)) { while (loading.contains(key)) { try {
                loading.wait()
              } catch { case e: Exception => logWarning(s"Got an exception while waiting for another thread to load $key", e)
              }
            } // 别人成功拿到了,我们直接取结果就是了,如果别人取失败了,我们再来取一次  blockManager.get(key) match { case Some(values) => return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]]) case None => loading.add(key)
            }
          } else {
            loading.add(key)
          }
        } try { // 通过rdd自身的compute方法去计算得到结果,回去看看RDD那文章,自己看看源码就清楚了 val computedValues = rdd.computeOrReadCheckpoint(split, context) // 如果是本地运行的,就没必要缓存了,直接返回即可 if (context.runningLocally) { return computedValues
          } // 跟踪blocks的更新状态 var updatedBlocks = Seq[(BlockId, BlockStatus)]()
          val returnValue: Iterator[T] = { if (storageLevel.useDisk && !storageLevel.useMemory) { /* 这是RDD采用DISK_ONLY的情况,直接扔给blockManager
               * 然后把结果直接返回,它不需要把结果一下子全部加载进内存
               * 这同样适用于MEMORY_ONLY_SER,但是我们需要在启用它之前确认blocks没被block store给丢弃 */ updatedBlocks = blockManager.put(key, computedValues, storageLevel, tellMaster = true)
              blockManager.get(key) match { case Some(values) => values.asInstanceOf[Iterator[T]] case None => throw new Exception("Block manager failed to return persisted valued")
              }
            } else { // 先存到一个ArrayBuffer,然后一次返回,在blockManager里也存一份 val elements = new ArrayBuffer[Any]
              elements ++= computedValues
              updatedBlocks = blockManager.put(key, elements, storageLevel, tellMaster = true)
              elements.iterator.asInstanceOf[Iterator[T]]
            }
          } // 更新task的监控参数 val metrics = context.taskMetrics
          metrics.updatedBlocks = Some(updatedBlocks) new InterruptibleIterator(context, returnValue)

        } finally { // 改完了,释放锁 loading.synchronized {
            loading.remove(key)
            loading.notifyAll()
          }
        }
    }
  }
复制代码

1、如果blockManager当中有,直接从blockManager当中取。

2、如果blockManager没有,就先用RDD的compute函数得到出来一个Iterable接口。

3、如果StorageLevel是只保存在硬盘的话,就把值存在blockManager当中,然后从blockManager当中取出一个Iterable接口,这样的好处是不会一次把数据全部加载进内存。

4、如果StorageLevel是需要使用内存的情况,就把结果添加到一个ArrayBuffer当中一次返回,另外在blockManager存上一份,下次直接从blockManager取。

对StorageLevel说明一下吧,贴一下它的源码。

复制代码
class StorageLevel private( private var useDisk_ : Boolean, private var useMemory_ : Boolean, private var useOffHeap_ : Boolean, private var deserialized_ : Boolean, private var replication_ : Int = 1)

  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(false, false, true, false)
复制代码

大家注意看它那几个参数,useDisk_、useMemory_、useOffHeap_、deserialized_、replication_ 在具体的类型的时候是传的什么值。

下面我们的目标要放到blockManager。

BlockManager

BlockManager这个类比较大,我们从两方面开始看吧,putBytes和get方法。先从putBytes说起,之前说过Task运行结束之后,结果超过10M的话,会用BlockManager缓存起来。

env.blockManager.putBytes(blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)

putBytes内部又掉了另外一个方法doPut,方法很大呀,先折叠起来。

复制代码
  private def doPut(
      blockId: BlockId,
      data: Values,
      level: StorageLevel,
      tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {// Return value val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] // 记录它的StorageLevel,以便我们可以在它加载进内存之后,可以按需写入硬盘。 // 此外,在我们把调用BlockInfo的markReay方法之前,都没法通过get方法获得该部分内容 val putBlockInfo = {
      val tinfo = new BlockInfo(level, tellMaster) // 如果不存在,就添加到blockInfo里面 val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo) if (oldBlockOpt.isDefined) { // 如果已经存在了,就不需要重复添加了 if (oldBlockOpt.get.waitForReady()) {return updatedBlocks
        } // 存在于blockInfo当中->但是上一次保存失败了,拿出旧的信息,再试一遍  oldBlockOpt.get
      } else {
        tinfo
      }
    }

    val startTimeMs = System.currentTimeMillis // 当我们需要存储数据,并且要复制数据到别的机器,我们需要访问它的值,但是因为我们的put操作会读取整个iterator, // 这就不会有任何的值留下。在我们保存序列化的数据的场景,我们可以记住这些bytes,但在其他场景,比如反序列化存储的 // 时候,我们就必须依赖返回一个Iterator var valuesAfterPut: Iterator[Any] = null // Ditto for the bytes after the put var bytesAfterPut: ByteBuffer = null // Size of the block in bytes var size = 0L // 在保存数据之前,我们要实例化,在数据已经序列化并且准备好发送的情况下,这个过程是很快的 val replicationFuture = if (data.isInstanceOf[ByteBufferValues] && level.replication > 1) { // duplicate并不是复制这些数据,只是做了一个包装 val bufferView = data.asInstanceOf[ByteBufferValues].buffer.duplicate()
      Future { // 把block复制到别的机器上去  replicate(blockId, bufferView, level)
      }
    } else { null }

    putBlockInfo.synchronized {

      var marked = false try { if (level.useMemory) { // 首先是保存到内存里面,尽管它也使用硬盘,等内存不够的时候,才会写入硬盘 // 下面分了三种情况,但是Task的结果是ByteBufferValues这种情况,具体看putBytes方法 val res = data match { case IteratorValues(iterator) => memoryStore.putValues(blockId, iterator, level, true) case ArrayBufferValues(array) => memoryStore.putValues(blockId, array, level, true) case ByteBufferValues(bytes) => bytes.rewind()
              memoryStore.putBytes(blockId, bytes, level)
          }
          size = res.size // 这里写得那么恶心,是跟data的类型有关系的,data: Either[Iterator[_], ByteBuffer],Left是Iterator,Right是ByteBuffer  res.data match { case Right(newBytes) => bytesAfterPut = newBytes case Left(newIterator) => valuesAfterPut = newIterator
          } // 把被置换到硬盘的blocks记录到updatedBlocks上 res.droppedBlocks.foreach { block => updatedBlocks += block }
        } else if (level.useOffHeap) { // 保存到Tachyon上. val res = data match { case IteratorValues(iterator) => tachyonStore.putValues(blockId, iterator, level, false) case ArrayBufferValues(array) => tachyonStore.putValues(blockId, array, level, false) case ByteBufferValues(bytes) => bytes.rewind()
              tachyonStore.putBytes(blockId, bytes, level)
          }
          size = res.size
          res.data match { case Right(newBytes) => bytesAfterPut = newBytes case _ => }
        } else { // 直接保存到硬盘,不要复制到其它节点的就别返回数据了. val askForBytes = level.replication > 1 val res = data match { case IteratorValues(iterator) => diskStore.putValues(blockId, iterator, level, askForBytes) case ArrayBufferValues(array) => diskStore.putValues(blockId, array, level, askForBytes) case ByteBufferValues(bytes) => bytes.rewind()
              diskStore.putBytes(blockId, bytes, level)
          }
          size = res.size
          res.data match { case Right(newBytes) => bytesAfterPut = newBytes case _ => }
        } // 通过blockId获得当前的block状态 val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo) if (putBlockStatus.storageLevel != StorageLevel.NONE) { // 成功了,把该block标记为ready,通知BlockManagerMaster marked = true putBlockInfo.markReady(size) if (tellMaster) {
            reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
          }
          updatedBlocks += ((blockId, putBlockStatus))
        }
      } finally { // 如果没有标记成功,就把该block信息清除 if (!marked) {
          blockInfo.remove(blockId)
          putBlockInfo.markFailure()
        }
      }
    } // 把数据发送到别的节点做备份 if (level.replication > 1) {
      data match { case ByteBufferValues(bytes) => Await.ready(replicationFuture, Duration.Inf) case _ => {
          val remoteStartTime = System.currentTimeMillis // 把Iterator里面的数据序列化之后,发送到别的节点 if (bytesAfterPut == null) { if (valuesAfterPut == null) { throw new SparkException("Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
            }
            bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
          }
          replicate(blockId, bytesAfterPut, level)
        }
      }
    } // 销毁bytesAfterPut  BlockManager.dispose(bytesAfterPut)
    updatedBlocks
  }
复制代码

从上面的的来看:

1、存储的时候按照不同的存储级别分了3种情况来处理:存在内存当中(包括MEMORY字样的),存在tachyon上(OFF_HEAP),只存在硬盘上(DISK_ONLY)。

2、存储完成之后会根据存储级别决定是否发送到别的节点,在名字上最后带2字的都是这种,2表示一个block会在两个节点上保存。

3、存储完毕之后,会向BlockManagerMaster汇报block的情况。

4、这里面的序列化其实是先压缩后序列化,默认使用的是LZF压缩,可以通过spark.io.compression.codec设定为snappy或者lzo,序列化方式通过spark.serializer设置,默认是JavaSerializer。

接下来我们再看get的情况。

 val local = getLocal(blockId) if (local.isDefined) return local
    val remote = getRemote(blockId) if (remote.isDefined) return remote
    None 

先从本地取,本地没有再去别的节点取,都没有,返回None。从本地取就不说了,怎么进怎么出。讲一下怎么从别的节点去,它们是一个什么样子的关系?

我们先看getRemote方法

复制代码
  private def doGetRemote(blockId: BlockId, asValues: Boolean): Option[Any] = {
    val locations = Random.shuffle(master.getLocations(blockId)) for (loc <- locations) {
      val data = BlockManagerWorker.syncGetBlock(GetBlock(blockId), ConnectionManagerId(loc.host, loc.port)) if (data != null) { if (asValues) { return Some(dataDeserialize(blockId, data))
        } else { return Some(data)
        }
      }
    }
    None
  }
复制代码

这个方法包括两个步骤:

1、用blockId通过master的getLocations方法找到它的位置。

2、通过BlockManagerWorker.syncGetBlock到指定的节点获取数据。

ok,下面就重点讲BlockManager和BlockManagerMaster之间的关系,以及BlockManager之间是如何相互传输数据。

BlockManager与BlockManagerMaster的关系

BlockManager我们使用的时候是从SparkEnv.get获得的,我们观察了一下SparkEnv,发现它包含了我们运行时候常用的那些东东。那它创建是怎么创建的呢,我们找到SparkEnv里面的create方法,右键FindUsages,就会找到两个地方调用了,一个是SparkContext,另一个是Executor。在SparkEnv的create方法里面会实例化一个BlockManager和BlockManagerMaster。这里我们需要注意看BlockManagerMaster的实例化方法,里面调用了registerOrLookup方法。

复制代码
    def registerOrLookup(name: String, newActor: => Actor): ActorRef = { if (isDriver) {
        actorSystem.actorOf(Props(newActor), name = name)
      } else {
        val driverHost: String = conf.get("spark.driver.host", "localhost")
        val driverPort: Int = conf.getInt("spark.driver.port", 7077)
        Utils.checkHost(driverHost, "Expected hostname")
        val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name" val timeout = AkkaUtils.lookupTimeout(conf)
        Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
      }
    }
复制代码

所以从这里可以看出来,除了Driver之后的actor都是,都是持有的Driver的引用ActorRef。梳理一下,我们可以得出以下结论:

1、SparkContext持有一个BlockManager和BlockManagerMaster。

2、每一个Executor都持有一个BlockManager和BlockManagerMaster。

3、Executor和SparkContext的BlockManagerMaster通过BlockManagerMasterActor来通信。

接下来,我们看看BlockManagerMasterActor里的三组映射关系。

  // 1、BlockManagerId和BlockManagerInfo的映射关系 private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo] // 2、Executor ID 和 Block manager ID的映射关系 private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId] // 3、BlockId和保存它的BlockManagerId的映射关系 private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]

看到这三组关系,前面的getLocations方法不用看它的实现,我们都应该知道是怎么找了。

BlockManager相互传输数据

BlockManager之间发送数据和接受数据是通过BlockManagerWorker的syncPutBlock和syncGetBlock方法来实现。看BlockManagerWorker的注释,说是BlockManager的网络接口,采用的是事件驱动模型。

再仔细看这两个方法,它传输的数据包装成BlockMessage之后,通过ConnectionManager的sendMessageReliablySync方法来传输。

接下来的故事就是nio之间的发送和接收了,就简单说几点吧:

1、ConnectionManager内部实例化一个selectorThread线程来接收消息,具体请看run方法。

2、Connection发送数据的时候,是一次把消息队列的message全部发送,不是一个一个message发送,具体看SendConnection的write方法,与之对应的接收看ReceivingConnection的read方法。

3、read完了之后,调用回调函数ConnectionManager的receiveMessage方法,它又调用了handleMessage方法,handleMessage又调用了BlockManagerWorker的onBlockMessageReceive方法。传说中的事件驱动又出现了。

复制代码
  def processBlockMessage(blockMessage: BlockMessage): Option[BlockMessage] = {
    blockMessage.getType match { case BlockMessage.TYPE_PUT_BLOCK => {
        val pB = PutBlock(blockMessage.getId, blockMessage.getData, blockMessage.getLevel) putBlock(pB.id, pB.data, pB.level)
        None
      } case BlockMessage.TYPE_GET_BLOCK => {
        val gB = new GetBlock(blockMessage.getId) val buffer = getBlock(gB.id) Some(BlockMessage.fromGotBlock(GotBlock(gB.id, buffer)))
      } case _ => None
    }
  }
复制代码

根据BlockMessage的类型进行处理,put类型就保存数据,get类型就从本地把block读出来返回给它。

 

注:BlockManagerMasterActor是存在于BlockManagerMaster内部,画在外面只是因为它在通信的时候起了关键的作用的,Executor上持有的BlockManagerMasterActor均是Driver节点的Actor的引用。

广播变量

先回顾一下怎么使用广播变量:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

看了一下实现调用的是broadcastFactory的newBroadcast方法。

  def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean) = { broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
  }

默认的broadcastFactory是HttpBroadcastFactory,内部还有另外一个实现TorrentBroadcastFactory,先说HttpBroadcastFactory的newBroadcast方法。

它直接new了一个HttpBroadcast。

复制代码
 HttpBroadcast.synchronized {
    SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
  } if (!isLocal) {
    HttpBroadcast.write(id, value_)
  }
复制代码

它的内部做了两个操作,把数据保存到driver端的BlockManager并且写入到硬盘。

TorrentBroadcast和HttpBroadcast都把数据存进了BlockManager做备份,但是TorrentBroadcast接着并没有把数据写入文件,而是采用了下面这种方式:

复制代码
 def sendBroadcast() { // 把数据给切分了,每4M一个分片 val tInfo = TorrentBroadcast.blockifyObject(value_)
    totalBlocks = tInfo.totalBlocks
    totalBytes = tInfo.totalBytes
    hasBlocks = tInfo.totalBlocks // 把分片的信息存到BlockManager,并通知Master val metaId = BroadcastBlockId(id, "meta")
    val metaInfo = TorrentInfo(null, totalBlocks, totalBytes)
    TorrentBroadcast.synchronized {
      SparkEnv.get.blockManager.putSingle(
        metaId, metaInfo, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
    } // 遍历所有分片,存到BlockManager上面,并通知Master for (i <- 0 until totalBlocks) {
      val pieceId = BroadcastBlockId(id, "piece" + i)
      TorrentBroadcast.synchronized {
        SparkEnv.get.blockManager.putSingle(
          pieceId, tInfo.arrayOfBlocks(i), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
      }
    }
  }
复制代码

1、把数据序列化之后,每4M切分一下。

2、切分完了之后,把所有分片写入BlockManager。

但是找不到它们是怎么传播的??只是写入到BlockManager,但是tellMaster为false的话,就相当于存在本地了,别的BlockManager是没法获取到的。

这时候我注意到它内部有两个方法,readObject和writeObject,会不会和这两个方法有关呢?它们做的操作就是给value赋值。

为了检验这个想法,我亲自调试了一下,在反序列化任务的时候,readObject这个方法是被ObjectInputStream调用了。这块的知识大家可以百度下ObjectInputStream和ObjectOutputStream。

具体操作如下:

1、打开BroadcastSuite这个类,找到下面这段代码,图中的地方原来是512, 被我改成256了,之前一直运行不起来。

复制代码
  test("Accessing TorrentBroadcast variables in a local cluster") {
    val numSlaves = 4 sc = new SparkContext("local-cluster[%d, 1, 256]".format(numSlaves), "test", torrentConf)
    val list = List[Int](1, 2, 3, 4)
    val broadcast = sc.broadcast(list)
    val results = sc.parallelize(1 to numSlaves).map(x => (x, broadcast.value.sum))
    assert(results.collect().toSet === (1 to numSlaves).map(x => (x, 10)).toSet)
  }
复制代码

2、找到TorrentBroadcast,在readObject方法上打上断点。

3、开始调试吧。

之前讲过,Task是被序列化之后包装在消息里面发送给Worker去运行的,所以在运行之前必须把Task进行反序列化,具体在TaskRunner的run方法里面:

task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)

Ok,告诉大家入口了,剩下的大家去尝试吧。前面介绍了怎么切分的,到TorrentBroadcast的readObject里面就很容易理解了。

1、先通过MetaId从BlockManager里面取出来Meta信息。

2、通过Meta信息,构造分片id,去BlockManager里面取。

3、获得分片之后,把分片写入到本地的BlockManager当中。

4、全部取完之后,通过下面的方法反向赋值。

if (receiveBroadcast()) {
     value_ = TorrentBroadcast.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks)  SparkEnv.get.blockManager.putSingle(broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
}

5、把value_又顺手写入到BlockManager当中。(这里相当于写了两份进去,大家要注意了哈,内存消耗还是大大地。幸好是MEMORY_AND_DISK的)

这么做是有好处的,这是一种类似BT的做法,把数据切分成一小块一小块,容易传播,从不同的机器上获取一小块一小块的数据,最后组装成完整的。

把完整的value写入BlockManager是为了使用的时候方便,不需要再次组装。

相关参数

复制代码
// BlockManager的最大内存 spark.storage.memoryFraction 默认值0.6 // 文件保存的位置 spark.local.dir 默认是系统变量java.io.tmpdir的值 // tachyon保存的地址 spark.tachyonStore.url 默认值tachyon://localhost:19998 // 默认不启用netty来传输shuffle的数据 spark.shuffle.use.netty 默认值是false spark.shuffle.sender.port 默认值是0 // 一个reduce抓取map中间结果的最大的同时抓取数量大小(to avoid over-allocating memory for receiving shuffle outputs) spark.reducer.maxMbInFlight 默认值是48*1024*1024 // TorrentBroadcast切分数据块的分片大小 spark.broadcast.blockSize 默认是4096 // 广播变量的工厂类 spark.broadcast.factory 默认是org.apache.spark.broadcast.HttpBroadcastFactory,也可以设置为org.apache.spark.broadcast.TorrentBroadcastFactory // 压缩格式 spark.io.compression.codec 默认是LZF,可以设置成Snappy或者Lzo 

http://www.luobo360.com/article/139

 
 
分享到:
评论

相关推荐

    Spark源码剖析

    《Spark源码剖析》PDF 文件很可能会深入到这些技术细节,包括类结构、算法实现以及关键代码的解析,帮助读者更好地理解和优化 Spark 应用。通过深入学习 Spark 源码,开发者可以更好地掌握 Spark 内部工作原理,从而...

    spark 源码解读迷你书

    《Spark源码解读迷你书》是一本专注于深入理解Apache Spark核心源码的书籍,适合对大数据处理技术有深厚兴趣并且想要探索Spark内部机制的读者。在阅读这本书之前,建议先搭建好Spark开发环境,比如使用Intelij IDEA...

    大数据Spark源码

    《深入解析Spark源码:大数据处理的基石》 Spark,作为大数据处理领域的明星框架,以其高效、易用和可扩展性赢得了广泛的认可。Spark源码的学习对于深入理解其内部机制,提升开发效率,以及解决实际问题具有至关...

    spark源码分析.pdf

    Spark源码分析是一项深入理解Apache Spark内部工作机制的重要途径。Apache Spark是一个快速、通用、可扩展的大数据处理平台,拥有着内存计算的特性,它提供了RDD(弹性分布式数据集)、DAG(有向无环图)、stage、...

    spark源码以及官方的示例(方便阅读源码学习)

    Spark源码提供了深入了解其内部工作原理的机会,这对于开发者优化应用性能、解决技术问题或者进行二次开发至关重要。官方示例则为初学者提供了一个快速上手和理解Spark功能的平台。 在Spark源码中,我们可以深入...

    Apache Spark源码剖析

    Apache Spark 是一个流行的开源大...通过深入研究Apache Spark源码,开发者可以更好地优化应用程序性能,解决特定场景下的问题,甚至为Spark贡献代码,推动项目发展。源码剖析是提升技术深度和解决问题能力的重要途径。

    Apache-Spark2.20源码中文注释

    当用户提交任务时,Spark 将一系列操作转换成 DAG,并通过调度器进行任务分解和资源分配。 3. **存储系统**:Spark 支持内存和磁盘混合存储,通过缓存机制实现数据复用,提升计算效率。Tachyon 和 HDFS 是常见的...

    深入理解Spark 核心思想与源码分析

    通过对Spark源码的深入分析,可以了解到其内部的优化策略和设计原则,这对于开发高效的大数据应用和优化性能至关重要。Spark的设计理念不仅适用于大规模数据处理,还为实时分析、机器学习等领域提供了强大的支持。...

    spark最新源码以及二次开发教程

    1. **环境搭建**:安装Openfire服务器,获取Spark源码,设置开发环境(如IDE、构建工具等)。 2. **代码阅读**:理解Spark源码的组织结构,重点关注与目标功能相关的部分。 3. **设计与实现**:根据需求设计新功能...

    Spark SQL源码概览.zip

    Spark SQL利用Spark的内存模型来缓存数据,以提高查询性能。它采用Tungsten项目实现的Columnar存储,减少了数据序列化和反序列化的开销。 10. **扩展性**: 开发者可以通过继承Spark SQL的抽象类,比如...

    spark-2.2.0.tgz源码

    这个版本包含了丰富的功能改进和性能优化,对于理解Spark的工作原理、开发分布式应用程序或者进行性能调优来说,研究源码是非常有价值的。 1. **Spark核心组件**:Spark的核心组件包括了Spark Core、Spark SQL、...

    Spark源码包(编译完成)

    **Spark源码结构与编译** Spark的源码组织结构主要包含以下几个部分: 1. `core`:Spark的基础功能,包括RDD、调度、内存管理和持久化等。 2. `sql`:Spark SQL相关代码,包括DataFrame和Dataset API。 3. `...

    spark2.02源码

    总的来说,Spark 2.0.2的源码包含了许多关键的改进和优化,对于学习大数据处理、分布式计算和机器学习有着极高的价值。通过深入研究源码,开发者可以更好地理解和定制Spark,提升其在特定业务场景下的效能。

    Spark2.6.3源码

    首先,Java目录是Spark源码的核心部分,它包含了Spark的大部分核心组件和功能的实现。以下是一些主要的子目录及其相关的知识点: 1. **core**:这是Spark的基础模块,包含了任务调度、内存管理、容错机制以及与...

    spark源码阅读笔记

    ### Spark源码解析:Master与Worker机制 #### Spark概述及特点 Spark是一个高效的数据处理框架,它由加州大学伯克利分校的AMP实验室研发。该框架支持多种编程语言(包括Java、Scala、Python和R等),使开发者可以...

    spark源码阅读笔记(详)

    ### Spark源码解析要点 #### 一、Spark概述与特性 **Spark** 是一款由加州大学伯克利分校AMP实验室研发的数据处理框架,它极大简化了大数据应用的开发流程,支持多种编程语言如Java、Scala、Python和R,使得开发者...

Global site tag (gtag.js) - Google Analytics