- 浏览: 61001 次
- 性别:
- 来自: 西安
最新评论
-
gmd2009:
...
memstore的flush流程分析 -
hongs_yang:
iteye上好像格式没法复制过来。可以去csdn上看,那上面有 ...
memstore的flush流程分析 -
hongs_yang:
复制到word上去看,
memstore的flush流程分析 -
qindongliang1922:
注意格式擦
memstore的flush流程分析 -
hongs_yang:
可以有一些意见发表的,后期我会慢慢发布一些hadoop/yar ...
hbase put 流程分析regionserver端
文章列表
mapPartitions/mapPartitionsWithIndex
这 两个transform中:mapPartitions与map的区别是map中是对每个partition中的iterator执行map操作,对 map过程中的每一条record进行传入的function的处理,而mapPartitions是把partition中整个iterator传给 function进行处理.如果是map操作,你并不能知道这个iterator什么时候结束,但mapPartitions时给你的是一个 iterator,所以你的函数中知道这个iterator什么时候会结束.而mapPartitions ...
Sample是对rdd中的数据集进行采样,并生成一个新的RDD,这个新的RDD只有原来RDD的部分数据,这个保留的数据集大小由fraction来进行控制,这个分析中,不分析sample的两个算法的具体实现,如果后期有必要时,可以分析这两个算法的具体的实现.
首先,先看看sample的实现代码:
def sample( withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T] = withScope {
reduceByKey 通过PairRDDFunctions进行的实现,reduceByKey的操作是把两个V类型的值进行处理,并最终返回的还是一个V类型的结果(V类型 就是value的类型).针对一个reduceByKey的操作,需要执行shuffle的操作,也就是说如果包含有reduceByKey时,会生成两 个执行的stage,第一个stage会根据shuffle的partition与分区的算子,对数据重新进行分区操作,第二个stage去读取重新分区 的数据.
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.with ...
该 函数主要功能:通过指定的排序规则与进行排序操作的分区个数,对当前的RDD中的数据集按KEY进行排序,并生成一个SHUFFLEdrdd的实例,这个 过程会执行shuffle操作,在执行排序操作前,sortBy操作会执行一次到两次的数据取样的操作,取出RDD中每个PARTITION的部分数据, 并根据进行分区的partition的个数,按key的compare大小把某个范围内的key放到一个指定的partition中进行排序.
该函数的操作示例:
import org.apache.spark.SparkContext._** val rdd: RDD[(String, Int)] ...
这个操作的作用根据相同的key的所有的value存储到一个集合中的一个玩意.
def groupByKey(): RDD[(K, Iterable[V])] = self.withScope { groupByKey(defaultPartitioner(self))}
在 做groupByKey的操作时,由于需要根据key对数据进行重新的分区操作,因此这个操作需要有一个partitioner的实例.默认是hash算 子.这个操作根据当前操作的RDD中是否有partitioner,同时这个partitioner与当前的传入的partitioner的实例是否相同 来判断是否需要执行shuf ...
关于Hbase的cache配置
在hbase中的hfilecache中,0.96版本中新增加了bucket cache,
bucket cache通过把hbase.offheapcache.percentage配置为0来启用,
如果hbase.offheapcache.percentage的配置值大于0时,直接使用堆外内存来管理hbase的cache,
通过把hfi ...
HADOOP HA配置
hadoop2.x的ha配置,此文档中描述有hdfs与yarn的ha配置。
此文档的假定条件是zk已经安装并配置完成,其实也没什么安装的。
hdfs ha配置
首先,先配置core-site.xml配置文件:
<property>
在老版本中使用mr1时,还可能使用fs.default.name来进行配置
ReduceTask的运行
Reduce处理程序中需要执行三个类型的处理,
1.copy,从各map中copy数据过来
2.sort,对数据进行排序操作。
3.reduce,执行业务逻辑的处理。
ReduceTask的运行也是通过run方法开始,
通过mapreduce.job.reduce.shuffle.consumer.plugin.class配置shuffle的
MapTask运行通过执行.run方法:
1.生成TaskAttemptContextImpl实例,此实例中的Configuration就是job本身。
2.得到用户定义的Mapper实现类,也就是map函数的类。
HFileV2文件
HFileV2文件写入通过StoreFile.Writer-->HFileWriterV2进行写入。
文件格式通过hfile.format.version配置。默认为2,也只有2这个值在0.96可用。
可通过cf中配置
关于MemStore的补充
在通过HStore.add向store中添加一个kv时,首先把数据写入到memstore中。这一点没有什么说明;
publiclong add(final KeyValue kv) {
lock.readLock().lock();
try {
return
spark shuffle流程分析
回到ShuffleMapTask.runTask函数
现在回到ShuffleMapTask.runTask函数中:
override def runTask(context: TaskContext): MapStatus = {
首先得到要reduce的task的个数。
valnumOutputSplits = dep.partitioner
Task的执行过程分析
Task的执行通过Worker启动时生成的Executor实例进行,
case RegisteredExecutor(sparkProperties) =>
logInfo("Successfully registered with driver")
// Make this host instead of hostPort ?
executor = new Executor(executorId, Utils.parseHostPort(hostPort).
Spark中的Scheduler
scheduler分成两个类型,一个是TaskScheduler与其实现,一个是DAGScheduler。
TaskScheduler:主要负责各stage中传入的task的执行与调度。
DAGScheduler:主要负责对JOB中的各种依赖进行解析,根据RDD的依赖生成stage并通知TaskScheduler执行。
实例生成
TaskScheduler实例生成:
scheduler实例生成,我目前主要是针对on yarn的spark进行的相关分析,
在appmaster启动后,通过调用startU ...
RDD的依赖关系
Rdd之间的依赖关系通过rdd中的getDependencies来进行表示,
在提交job后,会通过在 DAGShuduler.submitStage-->getMissingParentStages