`
外星人学习大数据
  • 浏览: 1244 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

第5课:基于案例一节课贯通Spark Streaming流计算框架的运行源码

阅读更多

一:案例实战演示:

 

package com.dt.spark.sparkstreaming
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 使用Spark Streaming+Spark SQL来在线动态计算电商中不同类别中最热门的商品排名,例如手机这个类别下面最热门的三种手机、电视这个类别
  * 下最热门的三种电视,该实例在实际生产环境下具有非常重大的意义;
  *
  * @author DT大数据梦工厂
  * 新浪微博:http://weibo.com/ilovepains/
  *
  *
  *   实现技术:Spark Streaming+Spark SQL,之所以Spark Streaming能够使用ML、sql、graphx等功能是因为有foreachRDD和Transform
  * 等接口,这些接口中其实是基于RDD进行操作,所以以RDD为基石,就可以直接使用Spark其它所有的功能,就像直接调用API一样简单。
  *  假设说这里的数据的格式:user item category,例如Rocky Samsung Android
  */
object OnlineTheTop3ItemForEachCategory2DB {
  def main(args: Array[String]){
    /**
      * 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,
      * 例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,如果设置
      * 为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如
      * 只有1G的内存)的初学者       *
      */
    val conf = new SparkConf() //创建SparkConf对象
    conf.setAppName("OnlineTheTop3ItemForEachCategory2DB") //设置应用程序的名称,在程序运行的监控界面可以看到名称
    //    conf.setMaster("spark://Master:7077") //此时,程序在Spark集群
    conf.setMaster("local[6]")
    //设置batchDuration时间间隔来控制Job生成的频率并且创建Spark Streaming执行的入口
    val ssc = new StreamingContext(conf, Seconds(5))

    ssc.checkpoint("/root/Documents/SparkApps/checkpoint")


    val userClickLogsDStream = ssc.socketTextStream("Master", 9999)

    val formattedUserClickLogsDStream = userClickLogsDStream.map(clickLog =>
      (clickLog.split(" ")(2) + "_" + clickLog.split(" ")(1), 1))

    //    val categoryUserClickLogsDStream = formattedUserClickLogsDStream.reduceByKeyAndWindow((v1:Int, v2: Int) => v1 + v2,
    //      (v1:Int, v2: Int) => v1 - v2, Seconds(60), Seconds(20))

    val categoryUserClickLogsDStream = formattedUserClickLogsDStream.reduceByKeyAndWindow(_+_,
      _-_, Seconds(60), Seconds(20))

    categoryUserClickLogsDStream.foreachRDD { rdd => {
      if (rdd.isEmpty()) {
        println("No data inputted!!!")
      } else {
        val categoryItemRow = rdd.map(reducedItem => {
          val category = reducedItem._1.split("_")(0)
          val item = reducedItem._1.split("_")(1)
          val click_count = reducedItem._2
          Row(category, item, click_count)
        })

        val structType = StructType(Array(
          StructField("category", StringType, true),
          StructField("item", StringType, true),
          StructField("click_count", IntegerType, true)
        ))

        val hiveContext = new HiveContext(rdd.context)
        val categoryItemDF = hiveContext.createDataFrame(categoryItemRow, structType)

        categoryItemDF.registerTempTable("categoryItemTable")

        val reseltDataFram = hiveContext.sql("SELECT category,item,click_count FROM (SELECT category,item,click_count,row_number()" +
          " OVER (PARTITION BY category ORDER BY click_count DESC) rank FROM categoryItemTable) subquery " +
          " WHERE rank <= 3")
        reseltDataFram.show()

        val resultRowRDD = reseltDataFram.rdd

        resultRowRDD.foreachPartition { partitionOfRecords => {

          if (partitionOfRecords.isEmpty){
            println("This RDD is not null but partition is null")
          } else {
            // ConnectionPool is a static, lazily initialized pool of connections
            val connection = ConnectionPool.getConnection()
            partitionOfRecords.foreach(record => {
              val sql = "insert into categorytop3(category,item,client_count) values('" + record.getAs("category") + "','" +
                record.getAs("item") + "'," + record.getAs("click_count") + ")"
              val stmt = connection.createStatement();
              stmt.executeUpdate(sql);

            })
            ConnectionPool.returnConnection(connection) // return to the pool for future reuse

          }
        }
        }
      }


    }
    }



    /**
      * 在StreamingContext调用start方法的内部其实是会启动JobScheduler的Start方法,进行消息循环,在JobScheduler
      * 的start内部会构造JobGenerator和ReceiverTacker,并且调用JobGenerator和ReceiverTacker的start方法:
      *   1,JobGenerator启动后会不断的根据batchDuration生成一个个的Job
      *   2,ReceiverTracker启动后首先在Spark Cluster中启动Receiver(其实是在Executor中先启动ReceiverSupervisor),在Receiver收到
      *   数据后会通过ReceiverSupervisor存储到Executor并且把数据的Metadata信息发送给Driver中的ReceiverTracker,在ReceiverTracker
      *   内部会通过ReceivedBlockTracker来管理接收到的元数据信息
      * 每个BatchInterval会产生一个具体的Job,其实这里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD
      * 的DAG而已,从Java角度讲,相当于Runnable接口实例,此时要想运行Job需要提交给JobScheduler,在JobScheduler中通过线程池的方式找到一个
      * 单独的线程来提交Job到集群运行(其实是在线程中基于RDD的Action触发真正的作业的运行),为什么使用线程池呢?
      *   1,作业不断生成,所以为了提升效率,我们需要线程池;这和在Executor中通过线程池执行Task有异曲同工之妙;
      *   2,有可能设置了Job的FAIR公平调度的方式,这个时候也需要多线程的支持;
      *
      */
    ssc.start()
    ssc.awaitTermination()

  }
}

 二:源码分析

 

 

 

 

第一步:创建StreamingContext

 

val ssc = new StreamingContext(conf, Seconds(5))

 1,StreamingContext源码如下:

 

 

/**
 * Create a StreamingContext by providing the configuration necessary for a new SparkContext.
 * @param conf a org.apache.spark.SparkConf object specifying Spark parameters
 * @param batchDuration the time interval at which streaming data will be divided into batches
 */
def this(conf: SparkConf, batchDuration: Duration) = {
  this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
}

/**
 * Create a StreamingContext by providing the details necessary for creating a new SparkContext.
 * @param master cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
 * @param appName a name for your job, to display on the cluster web UI
 * @param batchDuration the time interval at which streaming data will be divided into batches
 */
def this(
    master: String,
    appName: String,
    batchDuration: Duration,
    sparkHome: String = null,
    jars: Seq[String] = Nil,
    environment: Map[String, String] = Map()) = {
  this(StreamingContext.createNewSparkContext(master, appName, sparkHome, jars, environment),
       null, batchDuration)
}

 

   2. 其中this里面的第一个参数创建SparkContext,Spark Streaming就是Spark Core上面的一个应用程序。

 

   private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {
  new SparkContext(conf)
}

 第二步:获取输入数据源

 

 

    

   val userClickLogsDStream = ssc.socketTextStream("Master", 9999)

   1.       socketTextStream接收socket数据流。

 

 

   /**
 * Create a input stream from TCP source hostname:port. Data is received using
 * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited
 * lines.
 * @param hostname      Hostname to connect to for receiving data
 * @param port          Port to connect to for receiving data
 * @param storageLevel  Storage level to use for storing the received objects
 *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
 */
def socketTextStream(
    hostname: String,
    port: Int,
    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
  ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
  socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}

   2.      创建SocketInputDStream实例。

 

    

   /**
 * Create a input stream from TCP source hostname:port. Data is received using
 * a TCP socket and the receive bytes it interepreted as object using the given
 * converter.
 * @param hostname      Hostname to connect to for receiving data
 * @param port          to connect to for receiving data
 * @param converter     Function to convert the byte stream to objects
 * @param storageLevel  Storage level to use for storing the received objects
 * @tparam T            Type of the objects received (after converting bytes to objects)
 */
def socketStream[T: ClassTag](
    hostname: String,
    port: Int,
    converter: (InputStream) => Iterator[T],
    storageLevel: StorageLevel
  ): ReceiverInputDStream[T] = {
  new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
}

   3.       通过SocketReceiver接收数据。

 

 

   private[streaming]
class SocketInputDStream[T: ClassTag](
    ssc_ : StreamingContext,
    host: String,
    port: Int,
    bytesToObjects: InputStream => Iterator[T],
    storageLevel: StorageLevel
  ) extends ReceiverInputDStream[T](ssc_) {

  def getReceiver(): Receiver[T] = {
    new SocketReceiver(host, port, bytesToObjects, storageLevel)
  }
}

 4,SocketReceiver中通过onstart方法调用receiver方法。

 

   

   def onStart() {
  // Start the thread that receives data over a connection
  new Thread("Socket Receiver") {
    setDaemon(true)
    override def run() { receive() }
  }.start()
}

  5,Receive方法通过网络连接,接收来自网络的数据。

 

 

   /** Create a socket connection and receive data until receiver is stopped */
def receive() {
  var socket: Socket = null
  try {
    logInfo("Connecting to " + host + ":" + port)
    socket = new Socket(host, port)
    logInfo("Connected to " + host + ":" + port)
//根据IP和端口
    val iterator = bytesToObjects(socket.getInputStream())
    while(!isStopped && iterator.hasNext) {
      store(iterator.next)
    }
    if (!isStopped()) {
      restart("Socket data stream had no more data")
    } else {
      logInfo("Stopped receiving")
    }
  } catch {
    case e: java.net.ConnectException =>
      restart("Error connecting to " + host + ":" + port, e)
    case NonFatal(e) =>
      logWarning("Error receiving data", e)
      restart("Error receiving data", e)
  } finally {
    if (socket != null) {
      socket.close()
      logInfo("Closed socket to " + host + ":" + port)
    }
  }

    6.      Receive接收到数据产生DStream,而DStream内部是以RDD的方式封装数据。

// RDDs generated, marked as private[streaming] so that testsuites can access it
@transient
private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()

   socketTextStream读取数据的调用过程如下:

 

 第三步:根据自己的业务进行transformation操作。

 

 第四步:调用start方法。

 

/**
  * 在StreamingContext调用start方法的内部其实是会启动JobScheduler的Start方法,进行消息循环,在JobScheduler
  * 的start内部会构造JobGenerator和ReceiverTacker,并且调用JobGenerator和ReceiverTacker的start方法:
  *   1,JobGenerator启动后会不断的根据batchDuration生成一个个的Job
  *   2,ReceiverTracker启动后首先在Spark Cluster中启动Receiver(其实是在Executor中先启动ReceiverSupervisor),在Receiver收到
  *   数据后会通过ReceiverSupervisor存储到Executor并且把数据的Metadata信息发送给Driver中的ReceiverTracker,在ReceiverTracker
  *   内部会通过ReceivedBlockTracker来管理接受到的元数据信息
  * 每个BatchInterval会产生一个具体的Job,其实这里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD
  * 的DAG而已,从Java角度讲,相当于Runnable接口实例,此时要想运行Job需要提交给JobScheduler,在JobScheduler中通过线程池的方式找到一个
  * 单独的线程来提交Job到集群运行(其实是在线程中基于RDD的Action触发真正的作业的运行),为什么使用线程池呢?
  *   1,作业不断生成,所以为了提升效率,我们需要线程池;这和在Executor中通过线程池执行Task有异曲同工之妙;
  *   2,有可能设置了Job的FAIR公平调度的方式,这个时候也需要多线程的支持;
  *
  */

ssc.start()

 1.       Start源码如下:

 

 

   /**
 * Start the execution of the streams.
 *
 * @throws IllegalStateException if the StreamingContext is already stopped.
 */
def start(): Unit = synchronized {
  state match {
    case INITIALIZED =>
      startSite.set(DStream.getCreationSite())
      StreamingContext.ACTIVATION_LOCK.synchronized {
        StreamingContext.assertNoOtherContextIsActive()
        try {
          validate()

          // Start the streaming scheduler in a new thread, so that thread local properties
          // like call sites and job groups can be reset without affecting those of the
          // current thread.
//线程本地存储,线程有自己的私有属性,设置这些线程的时候不会影响其他线程,
          ThreadUtils.runInNewThread("streaming-start") {
            sparkContext.setCallSite(startSite.get)
            sparkContext.clearJobGroup()
            sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
//调用JobScheduler的start方法。
            scheduler.start()
          }
          state = StreamingContextState.ACTIVE
        } catch {
          case NonFatal(e) =>
            logError("Error starting the context, marking it as stopped", e)
            scheduler.stop(false)
            state = StreamingContextState.STOPPED
            throw e
        }
        StreamingContext.setActiveContext(this)
      }
      shutdownHookRef = ShutdownHookManager.addShutdownHook(
        StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
      // Registering Streaming Metrics at the start of the StreamingContext
      assert(env.metricsSystem != null)
      env.metricsSystem.registerSource(streamingSource)
      uiTab.foreach(_.attach())
      logInfo("StreamingContext started")
    case ACTIVE =>
//当有StreamingContext运行的时候就不许新的StreamingContext运行了,因为,//目前Spark还不支持多个SparkContext同时运行。
      logWarning("StreamingContext has already been started")
    case STOPPED =>
      throw new IllegalStateException("StreamingContext has already been stopped")
  }
}

 2.       追踪JobScheduler的start方法源码如下:

 

 

  JoScheduler的启动主要实现以下步骤:

 

<!--[if !supportLists]-->1.       <!--[endif]-->创建eventLoop的匿名类实现,主要是处理各类JobScheduler的事件。

 

 

   def start(): Unit = synchronized {

  if (eventLoop != null) return // scheduler has already been started

  logDebug("Starting JobScheduler")
  eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
    override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

    override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
  }
  eventLoop.start()

  // attach rate controllers of input streams to receive batch completion updates
  for {
//获得inputDStream
    inputDStream <- ssc.graph.getInputStreams
// rateController可以控制输入速度
    rateController <- inputDStream.rateController
  } ssc.addStreamingListener(rateController)
//启动StreamingListenerBus,主要是用于更新Spark UI中的StreamTab的内容。
  listenerBus.start(ssc.sparkContext)
  receiverTracker = new ReceiverTracker(ssc)
  inputInfoTracker = new InputInfoTracker(ssc)
  receiverTracker.start()
  jobGenerator.start()
  logInfo("Started JobScheduler")
}

 3.       JobScheduler负责动态作业调度的具体类。

 

 

JobScheduler是整个Job的调度器,本身用了一条线程循环去监听不同的Job启动,Job完成或失败等:

private def processEvent(event: JobSchedulerEvent) {
 
try {
    event
match {
     
case JobStarted(job, startTime) => handleJobStart(job, startTime)
     
case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)
     
case ErrorReported(m, e) => handleError(m, e)
    }
  }
catch {
   
case e: Throwable =>
      reportError(
"Error in job scheduler", e)
  }
}

 

其中receiverTrackerstart方法源码如下:

ReceiverTracker的作用是: 处理数据接收,数据缓存,Block生成等工作。

ReceiverTracker是以发送Job的方式到集群中的Executor去启动receiver

/** Start the endpoint and receiver execution thread. */
def start(): Unit = synchronized {
 
if (isTrackerStarted) {
   
throw new SparkException("ReceiverTracker already started")
  }

 
if (!receiverInputStreams.isEmpty) {
   
endpoint = ssc.env.rpcEnv.setupEndpoint(
     
"ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
   
if (!skipReceiverLaunch) launchReceivers()
    logInfo(
"ReceiverTracker started")
   
trackerState = Started
 
}
}

ReceiverTrackEndpoint用于接收来自Receiver的消息。

Receive接收消息:启动一个Job接收消息。

/** RpcEndpoint to receive messages from the receivers. */
private class ReceiverTrackerEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint {

 
// TODO Remove this thread pool after https://github.com/apache/spark/issues/7385 is merged
 
private val submitJobThreadPool = ExecutionContext.fromExecutorService(
    ThreadUtils.newDaemonCachedThreadPool(
"submit-job-thread-pool"))

 
private val walBatchingThreadPool = ExecutionContext.fromExecutorService(
    ThreadUtils.newDaemonCachedThreadPool(
"wal-batching-thread-pool"))

 
@volatile private var active: Boolean = true

  override def
receive: PartialFunction[Any, Unit] = {
   
// Local messages
   
case StartAllReceivers(receivers) =>


     
val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
     
for (receiver <- receivers) {

//在那些机器上启动executors
       
val executors = scheduledLocations(receiver.streamId)
        updateReceiverScheduledExecutors(receiver.streamId
, executors)
       
receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
        startReceiver(receiver
, executors)
      }
   
case RestartReceiver(receiver) =>
     
// Old scheduled executors minus the ones that are not active any more
     
val oldScheduledExecutors = getStoredScheduledExecutors(receiver.streamId)
     
val scheduledLocations = if (oldScheduledExecutors.nonEmpty) {
         
// Try global scheduling again
         
oldScheduledExecutors
        }
else {
         
val oldReceiverInfo = receiverTrackingInfos(receiver.streamId)
         
// Clear "scheduledLocations" to indicate we are going to do local scheduling
         
val newReceiverInfo = oldReceiverInfo.copy(
            state = ReceiverState.
INACTIVE, scheduledLocations = None)
         
receiverTrackingInfos(receiver.streamId) = newReceiverInfo
         
schedulingPolicy.rescheduleReceiver(
            receiver.streamId
,
           
receiver.preferredLocation,
           
receiverTrackingInfos,
           
getExecutors)
        }
     
// Assume there is one receiver restarting at one time, so we don't need to update
      // receiverTrackingInfos
     
startReceiver(receiver, scheduledLocations)
   
case c: CleanupOldBlocks =>
     
receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c))
   
case UpdateReceiverRateLimit(streamUID, newRate) =>
     
for (info <- receiverTrackingInfos.get(streamUID); eP <- info.endpoint) {
        eP.send(UpdateRateLimit(newRate))
      }
   
// Remote messages
   
case ReportError(streamId, message, error) =>
      reportError(streamId
, message, error)
  }

调用startReceiver方法在Executors上启动receiver.其中以封装函数startReceiverFunc的方式启动receiver.

/**
 * Start a receiver along with its scheduled executors
 */
private def startReceiver(
    receiver: Receiver[_]
,
   
scheduledLocations: Seq[TaskLocation]): Unit = {
 
def shouldStartReceiver: Boolean = {
   
// It's okay to start when trackerState is Initialized or Started
   
!(isTrackerStopping || isTrackerStopped)
  }

 
val receiverId = receiver.streamId
 
if (!shouldStartReceiver) {
    onReceiverJobFinish(receiverId)
   
return
 
}

 
val checkpointDirOption = Option(ssc.checkpointDir)
 
val serializableHadoopConf =
   
new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)

 
// Function to start the receiver on the worker node
 
val startReceiverFunc: Iterator[Receiver[_]] => Unit =
    (iterator:
Iterator[Receiver[_]]) => {
     
if (!iterator.hasNext) {
       
throw new SparkException(
         
"Could not start receiver as object not found.")
      }
     
if (TaskContext.get().attemptNumber() == 0) {
       
val receiver = iterator.next()
        assert(iterator.hasNext ==
false)
       
val supervisor = new ReceiverSupervisorImpl(
          receiver
, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
        supervisor.start()
        supervisor.awaitTermination()
      }
else {
       
// It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
     
}
    }

 
// Create the RDD using the scheduledLocations to run the receiver in a Spark job
 
val receiverRDD: RDD[Receiver[_]] =
   
if (scheduledLocations.isEmpty) {
      ssc.
sc.makeRDD(Seq(receiver), 1)
    }
else {
     
val preferredLocations = scheduledLocations.map(_.toString).distinct
      ssc.
sc.makeRDD(Seq(receiver -> preferredLocations))
    }
  receiverRDD.setName(
s"Receiver $receiverId")
  ssc.sparkContext.setJobDescription(
s"Streaming job running receiver $receiverId")
  ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))

 
val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
    receiverRDD
, startReceiverFunc, Seq(0), (_, _) => Unit, ())
 
// We will keep restarting the receiver job until ReceiverTracker is stopped
 
future.onComplete {
   
case Success(_) =>
     
if (!shouldStartReceiver) {
        onReceiverJobFinish(receiverId)
      }
else {
        logInfo(
s"Restarting Receiver $receiverId")
        self.send(RestartReceiver(receiver))
      }
   
case Failure(e) =>
     
if (!shouldStartReceiver) {
        onReceiverJobFinish(receiverId)
      }
else {
        logError(
"Receiver has been stopped. Try to restart it.", e)
        logInfo(
s"Restarting Receiver $receiverId")
        self.send(RestartReceiver(receiver))
      }
  }(
submitJobThreadPool)
  logInfo(
s"Receiver ${receiver.streamId} started")
}

startReceiver方法内部会启动supervisor.

/** Start the supervisor */
def start() {
  onStart()
  startReceiver()
}

首先调用了onStart()方法,其实调用的是子类的onstart方法。

/**
 * Called when supervisor is started.
 * Note that this must be called before the receiver.onStart() is called to ensure
 * things like [[BlockGenerator]]s are started before the receiver starts sending data.
 */
protected def onStart() { }

也就是ReceiverSupervisorImplonStart方法。

override protected def onStart() {
  registeredBlockGenerators.foreach { _.start() }
}

BlockGeneratorstart方法启动了BlockIntervalTimerBlockPushingThread.

/** Start block generating and pushing threads. */
def start(): Unit = synchronized {
  if (state == Initialized) {
    state = Active
    blockIntervalTimer.start()
    blockPushingThread.start()
    logInfo("Started BlockGenerator")
  } else {
    throw new SparkException(
      s"Cannot start BlockGenerator as its not in the Initialized state [state = $state]")
  }
}

   回到上面,我们现在看ReceiverSupervisor.startReceiver方法的调用。

/** Start receiver */
def startReceiver(): Unit = synchronized {
  try {
    if (onReceiverStart()) {
      logInfo("Starting receiver")
      receiverState = Started
      receiver.onStart()
      logInfo("Called receiver onStart")
    } else {
      // The driver refused us
      stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
    }
  } catch {
    case NonFatal(t) =>
      stop("Error starting receiver " + streamId, Some(t))
  }
}

其中onReceiverStart方法在子类ReceiverSupervisorImplonReceiverStart,启用给ReciverTrackEndpoint发送registerReceiver消息。

override protected def onReceiverStart(): Boolean = {
  val msg = RegisterReceiver(
    streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
  trackerEndpoint.askWithRetry[Boolean](msg)
}

此时,ReceiverTrackEndpoint接收到消息后会调用registerReceiver方法。

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
  // Remote messages
  case RegisterReceiver(streamId, typ, host, executorId, receiverEndpoint) =>
    val successful =
      registerReceiver(streamId, typ, host, executorId, receiverEndpoint, context.senderAddress)
    context.reply(successful)
  case AddBlock(receivedBlockInfo) =>
    if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {
      walBatchingThreadPool.execute(new Runnable {
        override def run(): Unit = Utils.tryLogNonFatalError {
          if (active) {
            context.reply(addBlock(receivedBlockInfo))
          } else {
            throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")
          }
        }
      })

 至此,ReceiverTrack的启动就完成了。下面就回到我们最初的代码。

   obSchedulerstart方法:

receiverTracker.start()
jobGenerator.start()

   启动JobGeneratorJobGenerator负责对DstreamGraph的初始化,DStreamRDD的转换,生成Job,提交执行等工作。

/** Start generation of jobs */
def start(): Unit = synchronized {
  if (eventLoop != null) return // generator has already been started

  // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
  // See SPARK-10125
  checkpointWriter
// eventLoop用于接收JobGeneratorEvent消息的通信体。
  eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
    override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

    override protected def onError(e: Throwable): Unit = {
      jobScheduler.reportError("Error in job generator", e)
    }
  }
  eventLoop.start()

  if (ssc.isCheckpointPresent) {
    restart()
  } else {

    startFirstTime()
  }

   调用processEvent,以时间间隔发消息。

/** Processes all events */
private def processEvent(event: JobGeneratorEvent) {
  logDebug("Got event " + event)
  event match {
    case GenerateJobs(time) => generateJobs(time)
    case ClearMetadata(time) => clearMetadata(time)
    case DoCheckpoint(time, clearCheckpointDataLater) =>
      doCheckpoint(time, clearCheckpointDataLater)
    case ClearCheckpointData(time) => clearCheckpointData(time)
  }
}

    generateJobs中发time就是我们指点的batch Duractions

/** Generate jobs and perform checkpoint for the given `time`.  */
private def generateJobs(time: Time) {
  // Set the SparkEnv in this thread, so that job generation code can access the environment
  // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
  // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
  SparkEnv.set(ssc.env)
  Try {
// batch时间间隔获得Block数据。
    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
// generateJobs生成Job
    graph.generateJobs(time) // generate jobs using allocated block
  } match {
    case Success(jobs) =>
      val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
//如果作业成功生成,那么就提交这个作业。将作业提交给JobScheduler.
      jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
    case Failure(e) =>
      jobScheduler.reportError("Error generating jobs for time " + time, e)
  }
  eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}

    submitJobSet提交Job.

def submitJobSet(jobSet: JobSet) {
  if (jobSet.jobs.isEmpty) {
    logInfo("No jobs added for time " + jobSet.time)
  } else {
    listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
    jobSets.put(jobSet.time, jobSet)
    jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
    logInfo("Added jobs for time " + jobSet.time)
  }
}

    而我们提交的Job,是被JobHandle封装的。

 private class JobHandler(job: Job) extends Runnable with Logging {

    import JobScheduler._

    def run() {
      try {
        val formattedTime = UIUtils.formatBatchTime(
          job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
        val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
        val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"

        ssc.sc.setJobDescription(
          s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")
        ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
        ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)

        // We need to assign `eventLoop` to a temp variable. Otherwise, because
        // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
        // it's possible that when `post` is called, `eventLoop` happens to null.
        var _eventLoop = eventLoop
        if (_eventLoop != null) {
          _eventLoop.post(JobStarted(job, clock.getTimeMillis()))
          // Disable checks for existing output directories in jobs launched by the streaming
          // scheduler, since we may need to write output to an existing directory during checkpoint
          // recovery; see SPARK-4835 for more details.
          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
            job.run()
          }
          _eventLoop = eventLoop
          if (_eventLoop != null) {
            _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
          }
        } else {
          // JobScheduler has been stopped.
        }
      } finally {
        ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
        ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)
      }
    }
  }
}

 

总体流程如下图所示:



 InputDStream继承关系图如下:



 
补充:

Spark运行的时候会启动作业,runDummySparkJob函数是为了确保Receiver不会集中在一个节点上。

/**
 * Run the dummy Spark job to ensure that all slaves have registered. This avoids all the
 * receivers to be scheduled on the same node.
 *
 * TODO Should poll the executor number and wait for executors according to
 * "spark.scheduler.minRegisteredResourcesRatio" and
 * "spark.scheduler.maxRegisteredResourcesWaitingTime" rather than running a dummy job.
 */
private def runDummySparkJob(): Unit = {
 
if (!ssc.sparkContext.isLocal) {
    ssc.sparkContext.makeRDD(
1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
  }
  assert(getExecutors.nonEmpty)
}

 

 

/**
 * Get the receivers from the ReceiverInputDStreams, distributes them to the
 * worker nodes as a parallel collection, and runs them.
 */
private def launchReceivers(): Unit = {
 
val receivers = receiverInputStreams.map(nis => {
   
val rcvr = nis.getReceiver()
    rcvr.setReceiverId(nis.
id)
    rcvr
  })

  runDummySparkJob()

  logInfo(
"Starting " + receivers.length + " receivers")

//在资源没有问题的前提下

//ReceiverTrackEndpoint => endpoint
 
endpoint.send(StartAllReceivers(receivers))
}

 

声明:

上述内容来自[DT大数据梦工厂]录制的视频,更多资源请参考百度网盘链接:http://pan.baidu.com/s/1cFqjQu

(如果链接失效或需要后续的更多资源,请联系QQ460507491或者微信号:DT1219477246或拨打电话18610086859获取上述资料)

 

 

 

 

 

 

 

 

 

 

 

 

  • 大小: 12.7 KB
  • 大小: 46.6 KB
  • 大小: 15.5 KB
0
1
分享到:
评论

相关推荐

    基于Spark Streaming的大数据实时流计算平台和框架,并且是基于运行在yarn模式运行的spark streaming

    一个完善的Spark Streaming二次封装开源框架,包含:实时流任务调度、...基于Spark Streaming的大数据实时流计算平台和框架(包括:调度平台,开发框架,开发demo),并且是基于运行在yarn模式运行的spark streaming

    Flink,Storm,Spark Streaming三种流框架的对比分析

    * Spark Streaming:Spark Streaming是一个基于Spark Core的流处理系统,主要用于处理大规模的数据流。Spark Streaming的架构主要包括Driver、Executor、Receiver三个组件。Spark Streaming具有良好的可扩展性和高...

    SparkStreaming入门案例

    Spark Streaming 是一种构建在 Spark 上的实时计算框架,用来处理大规模流式数据。它将从数据源(如 Kafka、Flume、Twitter、ZeroMQ、HDFS 和 TCP 套接字)获得的连续数据流,离散化成一批一批地数据进行处理。每一...

    SparkStreaming预研报告

    作为一种新兴的流处理框架,它能够让用户利用Spark的高效性和可靠性进行流数据的处理任务。对于熟悉Spark基础(Spark Core)的用户而言,Spark Streaming提供了平滑的学习曲线和强大的数据处理能力。 2. Spark ...

    spark Streaming和structed streaming分析

    Apache Spark Streaming是Apache Spark用于处理实时流数据的一个组件。它允许用户使用Spark的高度抽象概念处理实时数据流,并且可以轻松地与存储解决方案、批处理数据和机器学习算法集成。Spark Streaming提供了一种...

    基于Spark Streaming的实时流数据处理模型化研究与实现.pdf

    2. Spark Streaming:Spark Streaming是Apache Spark的一个子项目,用于处理实时数据流。它支持从多种源接收数据,例如Kafka、Flume、TCP套接字等,并能够将数据流划分为一系列小批次,然后使用Spark引擎进行处理,...

    基于spark-streaming框架的实时计算系统源码+项目说明.zip

    计算框架:Spark-Streaming 数据库:Redis、Elasticsearch 消息队列:Kafka 数据采集:Maxwell(离线)、Spark-Streaming(实时) 数据可视化:Spring-Boot、Echart 项目流程: 1、产生数据到MySQL; 2、...

    flink,spark streaming,storm框架对比

    flink,spark streaming,storm框架对比,

    毕业设计:基于Spark streaming的系统日志分析系统.zip

    Spark Streaming是Apache Spark框架的一个扩展,它允许实时处理连续的数据流,提供了强大的流处理能力。 【描述】"基于spark开发的完整项目算法源码,可用于毕业设计、课程设计、练手学习等" 这个项目包含了完整的...

    Spark Streaming实时流处理项目实战.rar.rar

    Spark Streaming是中国大数据技术领域中广泛使用的实时数据处理框架,它基于Apache Spark的核心设计,提供了对持续数据流的微批处理能力。本项目实战旨在帮助读者深入理解和应用Spark Streaming,通过实际操作来掌握...

    Spark Streaming 示例

    Spark Streaming 是 Apache Spark 的一个模块,它允许开发者处理实时数据流。这个强大的工具提供了一种弹性、容错性好且易于编程的模型,用于构建实时流处理应用。在这个"Spark Streaming 示例"中,我们将深入探讨...

    spark core、spark sql以及spark streaming 的Scala、java项目混合框架搭建以及大数据案例

    3. **Spark Streaming**:Spark Streaming是Spark处理实时数据流的组件,它将数据流分解为小批次,然后使用Spark Core进行快速处理。项目中可能包含创建DStream(Discretized Stream),设置窗口操作,以及实现复杂...

    sparkStreaming消费数据不丢失

    sparkStreaming消费数据不丢失,sparkStreaming消费数据不丢失

    深入理解SparkStreaming执行模型

    Spark Streaming的核心是一个执行模型,这个执行模型基于微批处理(micro-batch processing)的概念,允许将实时数据流切分成小批次进行处理。下面将详细介绍Spark Streaming执行模型的知识点。 首先,Spark ...

    spark之sparkStreaming 理解

    - **集成能力**:Spark Streaming的一个显著优势在于它可以无缝地运行在Spark平台上,这意味着开发者可以使用同一套代码来处理批处理任务和实时流数据。此外,Spark Streaming还能与其他Spark模块(如Spark SQL、...

    基于Spark Streaming将图片以流的方式写入HDFS分布式文件系统.zip

    "Spark"是关键的技术标签,它是一个用于大规模数据处理的快速、通用和可扩展的开源框架,支持批处理、交互式查询(Spark SQL)、图计算(GraphX)以及我们的重点——实时流处理(Spark Streaming)。"毕业设计"和...

    Flume对接Spark Streaming的相关jar包

    Spark Streaming 是基于 Apache Spark 的实时流处理框架,它扩展了 Spark Core 的批处理功能,使其能够处理持续流入的数据流。Spark Streaming 提供了微批处理的概念,将实时数据流分割成一系列小的批处理作业,从而...

    Spark Streaming Real-time big-data processing

    Spark Streaming是Apache Spark框架的一部分,专为实时数据处理而设计。它构建在Spark核心之上,提供了对实时数据流的高吞吐量、容错性和可伸缩性处理能力。Spark Streaming能够以微批处理的方式处理数据流,将实时...

Global site tag (gtag.js) - Google Analytics