- 浏览: 723630 次
- 性别:
- 来自: 大连
-
最新评论
-
lixuanbin:
iteye已经快要tj了吧。。
iteye为什么不支持markdown? -
haorengoodman:
Tachyon 能在做数据分类吗?例如我有一坨hdfs文件,将 ...
tachyon与hdfs,以及spark整合 -
lee3836:
求源码,大牛
clover分布式任务调度系统 -
cfan37:
...
sparksql与hive整合 -
greemranqq:
9.9 送上,希望博客长久~。~
【【【【【#####>>>>>【关于我】【您·的·支·持·是·我·最·大·的·动·力】<<<<<#####】】】】】
文章列表
首先从SparkContext中TaskScheduler实例的创建开始:
进入taskScheduler.start()方法内部:
进入其实现者TaskSchedulerImpl内部:
可以发现在start具体实现的内部首先是有个backend.start方法:
其最终具体的实现类为:
RDD的核心方法:
首先看一下getPartitions方法的源码:
getPartitions返回的是一系列partitions的集合,即一个Partition类型的数组
我们就想进入HadoopRDD实现:
1、getJobConf():用来获取job Confi
接着上一篇文章继续分析代码:
3.1.3.3.3.1、进入TaskSet 方法:
3.1.3.3.3.2、进入taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties)) 方法:
RDD源码的count方法:
从上面代码可以看出来,count方法触发SparkContext的runJob方法的调用:
进入 runJob(rdd, func, 0 until rdd.partitions.size, false)方法:
进一步跟踪runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions, allowLocal)方法:
继续跟踪进入runJob[T, U](rdd, func, partitions, allowLocal, (index, ...
在SparkContext中可以看到初始化UI代码:
// Initialize the Spark UI
private[spark] val ui: Option[SparkUI] =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(SparkUI.createLiveUI(this, conf, listenerBus, jobProgressListener,
env.securityManager,appName))
} else {
// For tests, ...
当构建完TaskScheduler之后,我们需要构建DAGScheduler这个核心对象:
进入其构造函数中:
可以看出构建DAGScheduler实例的时候需要把TaskScheduler实例对象作为参数传入。
LiveListenerBus:
MapOutputTrackerMaster:
TaskScheduler实例对象启动源代码如下所示:
从上面代码可以看出来,taskScheduler的启动是在SparkContext
找到TaskSchedulerImpl实现类中的start方法实现:
1、从上代码看到,先启动CoarseGrainedSchedulerBackend,
从上面CoarseGrainedSchedulerBackend类的代码,可以看出spark启动了DriverActor,名称为CoarseGrainedScheduler,这是一个akka消息通信类,会先运行preStart()方法
从上面代码可以看到,初始
SparkContext在获得了一系列的初始化信息后开始创建并启动TaskScheduler实例:
进入createTaskScheduler方法:
我们看一下其Standalone的方式:
在上述代码中首先实例化一个TaskSchedulerImpl:
然后构建出了masterUrls:
接着创建出关键的backend:
spark的任务调度系统如下所示:
从上图中可以看出来由RDD Objects产生DAG,然后进入了DAGScheduler阶段,DAGScheduler是面向state的高层次的调度器,DAGScheduler把DAG拆分成很多的tasks,每组的tasks都是一个state,每当遇到shuffle就会产生新的state,可以看出上图一共有三个state;DAGScheduler需要记录那些RDD被存入磁盘等物化动作,同时需寻找task的最优化调度,例如数据本地性等;DAGScheduler还要监视因为shuffle输出导致的失败,如果发生这种失败,可能就需要重新提交该state ...
spark核心组件如下所示:
在SparkContext初始化的时候,会初始化一系列内容:
查看内存使用情况:
创建和启动scheduler:
集群核心组件中的Block tracker是用于block和partition对应关系的管理。
集群核心组件中的shuffle tracker是用于记录shuffle操作的过程细节。
从集群中也可以看出,Executor在执行任务的时候是采用多线程的方式执行的并能够在HDFS或者HBase等系统上读取数据。
而在实际的Driver Program运行的时候每个partition都会由一个tas ...
spark内核揭秘-02-spark集群概览
- 博客分类:
- spark
Spark集群预览:
官方文档对spark集群的初步描述如下,这是一个典型的主从结构:
官方文档对spark集群中的一些关键点给出详细的指导:
其Worker的定义如下所示:
需要注意的是Spark Driver所在的集群需要和Spark集群最好位于同一个网络环境中,因为Driver中的SparkContext实例需发送任务给不同Worker Node的Executor并接受Executor的一些执行结果信息,一般而言,在企业实际的生产环境中Driver所在的机器是的配置往往都是比较不错的,尤其是其CPU的处理能力要很强悍。
Application:
Application是创建了SparkContext实例对象的spark用户,包含了Driver程序:
Spark-shell是一个应用程序,因为spark-shell在启动的时候创建了一个SparkContext对象,其名称为sc:
Job:
和Spark的action相对应,每一个action例如count、saveAsTextFile等都会对应一个job实例,该job实例包含多任务的并行计算。
Driver Program:
运行main函数并且创建SparkContext实例的程序
Cluster Manager:
集群资源的管理外部服务 ...
启动spark-shell:
简单的RDD:
上述代码中使用的sc,这是Spark-Shell帮助我们自动生成的SparkContext的实例:
我们把生成的RDD的每个元素都乘以3:
上述的操作都是transformations我们需要触发一个action才能执行:
RDD操作例子:
RDD的依赖和运行时
Hadoop MapReduce:
MapReduce在每次执行的时候都要从磁盘读数据,计算完毕后都要把数据放到磁盘
spark map reduce:
RDD is everything for dev: