- 浏览: 74300 次
文章列表
最近在玩spark streaming, 感觉到了他的强大。 然后看 StreamingContext的源码去理解spark是怎么完成计算的。 大部分的源码比较容易看懂, 但是这个
reduceByKeyAndWindow(_+_, _-_, Duration, Duration)
还是花了不少时间。 主要还是由于对spark不熟悉造成的吧, 还好基本弄明白了。
总的来说SparkStreaming提供这个方法主要是出于效率考虑。 比如说我要每10秒计算一下前15秒的内容,(每个batch 5秒), 可以想象每十秒计算出来的结果和前一次计算的结果其实中间有5秒的时间值是重复的。
那么就是通过 ...
最近由于项目需要, 搭建了一个类似线上环境的处理流数据的环境
用的是CDH 版本5.9.x
hdfs组成: 2 namenode HA, 6 datanode
kafka: 3 台kafka server
zookeeper: 3台
flume: 1台
spark: 6台 每台32G内存
数据流程是有远端终端向我们数据处理服务器(Gengo)发送, 再由Gengo向kafka还有flume发送同样数据
flume那份数据最后会存储到hdfs上, 万一哪天spark或者kafka挂了, 我们还可以通过离线Job处理hdfs上的数据, 保证数据完整
kafka的数据直接有我们spark s ...
之前看了那么些源码, 大致对整个Yarn的运行过程有了一个了解, 总结一下
首先每个Yarn集群都有一个Resource Manager 以及若干个NodeManager
Resource Manager主要有两个对象, 一个就是Scheduler, 还有一个就是Applications Manager ASM
Scheduler有FIFO和Fair等, 主要作用就是根据Node Manager的资源使用状况来分配container。
当然还有一种Uber模式, 具体就是满足7个条件后就会只用一个container去跑整个Job
Applications Manager主要是管理所有的A ...
中间隔了国庆, 好不容易才看明白了MRAppMaster如何启动其他container以及如何在NodeManager上面运行Task的。
上回写到了AM启动到最后其实是运行的MRAppMaster的main方法, 那么我们就从这里开始看他是如何启动其他container的, 首先看一下main方法:
public static void main(String[] args) {
try {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
...
继续上一篇文章, 那时候AM Allocation已经生成, 就等着NM 的心跳来找到有资源的NM, 再去启动, 那么假设一个NM 心跳, 然后走的就是RMNodeImpl的状态机的RMNodeEventType.STATUS_UPDATE事件, 看一下事件定义:
private static final StateMachineFactory<RMNodeImpl,
NodeState,
RMNodeEve ...
参考了一篇文章, 才看懂了Yarnrunner的整个流程:
http://blog.csdn.net/caodaoxi/article/details/12970993
网上很多文章写的不是很清楚, 有些到AM的启动的时候就错了, 结合最近俩星期看的Yarnrunner部分的源码 我把我的理解写下来, 打算分三部分
上: SubmitJob到schduler为ApplicationMaster申请Container
中:AM到NodeManager启动container 至在NM端执行APPMaster的脚本
下:启动container去执行Job的过程
可能中间也会有一些理解的错误, 导致 ...
http://blog.csdn.net/caodaoxi/article/details/12970993
http://www.cnblogs.com/shenh062326/p/3586510.html
这两篇文章不错, 这两天在看Yarnrunner的整个流程, 回头看完了后写几篇文章出来记录一下
前面一片文章写了MR怎么写, 然后添加的主要功能怎么用, 像partitioner, combiner等, 这周看了一下MR执行的时候Job提交以及Task运行的过程, 记录一下整个源码执行步骤, 量太大就不写详细了, 只是一步一步跟踪下去, 具体是在做什么就稍微解释一下, 跟多还是要靠自己看上下文理解了, 首先Job是通过job.waitForCompletion(true) 来提交的, 里面是通过submit()来提交的:
public boolean waitForCompletion(boolean verbose
...
Mapreduce在执行的时候首先会解析成KV键值对传送到Map方法里面, 在Mapper类的run里面有这么一段代码:
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
就是只要还有KV键值对, 就不停的调用Map方法。
在Map端处理完成后我们通过Context.write输出, 然后到reducer端相同的Key的value组成到迭代器里面通过reduce代码处理。
这是默认最简单的方式, ...
之前花了点时间玩spark, 现在开始学一下hadoop
前面花了几天时间搭建Hadoop环境和开发环境, 具体就不多说了, 今天开始了第一个Map/Reduce程序, 经典的wordcount。
使用的Hadoop版本是2.6.3, 所以我会用最新的API, 大部分都是在org.apache.hadoop.mapreduce这个包下面的。 (mapred是老的api)
我的sample文件是:
hdfs://<IP>:<Port>/words.txt
内容:
1
2
1
a
b
a
v
c
c
c
c
输出路径为:
hdfs://<IP>:< ...
之前看了Job怎么submit 以及最后run的, 然后也看了blockmanager是怎么工作的, 那么接下来就是要看spark是如何从blockManager中读写数据的。
首先每个计算都是在对应机器的executor的task上面运行的, 那么计算完后也是从executor端开始 ...
前一段时间看了如何划分stage以及如何提交Job, 最后把结果返回到Driver端的过程, 中间也涉及到了通过blockManager来获取Data等过程。 这两天花了点时间看了一下blockmanager是如何工作的, 在这里记录一下。
看了一下源代码, 这里有几个主要的对象:
1.BlockManager
2.BlockManagerMaster
3.BlockManagerMasterEndpoint
4.BlockManagerSlaveEndpoint
5.SparkEnv
BlockManagerMaster是在Driver端, 用来管理整个集群的BlockManager的, ...
前文: http://humingminghz.iteye.com/blog/2314269
前面先看到了从action入口到如何切分stage, 随后submit stage的过程, 那么既然stage被submit了, 接下来就应该是cluster manager去分配各个任务到prefer location的executor上面去执行了.
submitstage的方法, 最终会把当前stage相关的所有祖先stage都提交(isActive=false),并把当前stage放到waiting的stage里面, 等所有前部stage执行完后, 再执行当前stage。 每个stage都有 ...
之前看了Spark Streaming和Spark SQL, 自己还花了一些时间去玩了些machine learning的算法, 像 线性回归, kmeans, 协同过滤等。
现在回过头来, 打算看一下spark core部分代码, 就先找了下saveAsTextFile这个方法作为入口, 看一下是怎么保存文档到hadoop中,并且怎么切分stage以及提交Task。 中间也会触碰到DAGScheduler, 也能明白为什么大家都说DAGScheduler是作业调度的核心了
看一下saveAsTextFile代码:
def saveAsTextFile(path: String): U ...
在上一篇文章前, 我一直没看懂为什么下面的代码就能得到max或者avg或者min的值:
malePPL.agg(Map("height" -> "max", "sex" -> "count")).show
数据是
身高 性别
这样的一个组合大概有几百万个值
刚开始是使用reducebykey去做计算, 后来发现网上有agg里面直接进行排序获取值的做法, 特地看了一下为什么传进去一个Map(column -> Expression)就能得到想要的结果
首先还是直接进到agg的方法里面: ...