`
Kevin12
  • 浏览: 236151 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

SparkStreaming pull data from Flume

阅读更多
Spark Streaming + Flume Integration Guide:http://spark.apache.org/docs/latest/streaming-flume-integration.html

本例子是做一个Spark Streaming 从flume中拉取数据的实验。
1.面配置flume
1.首先下载必须的jar:在上面的连接中有。并将其拷贝到/usr/local/flume/apache-flume-1.6.0-bin/lib目录下。
jar可以从官网上下载,也可以下载附件中的jar;

配置master1上的flume
master1上修改配置文件root@master1:/usr/local/flume/apache-flume-1.6.0-bin/conf/flume-conf.properties
#agent1 name
agent1.channels = c1
agent1.sources = r1
agent1.sinks = k1

#set source
agent1.sources.r1.type = spooldir
agent1.sources.r1.spoolDir =/usr/local/flume/tmp/TestDir
agent1.sources.r1.channels = c1
agent1.sources.r1.fileHeader = false
agent1.sources.r1.interceptors = i1
agent1.sources.r1.interceptors.i1.type = timestamp

# set sink to hdfs
#agent1.sinks.k1.type=hdfs
#agent1.sinks.k1.hdfs.path=hdfs://master1:9000/library/flume
#agent1.sinks.k1.hdfs.fileType=DataStream
#agent1.sinks.k1.hdfs.writerFormat=TEXT
#agent1.sinks.k1.hdfs.roolInterval=1
#agent1.sinks.k1.hdfs.filePrefix=%Y-%m-%d
#agent1.sinks.k1.channel=c1

#set sink to Spark Streaming
#agent1.sinks.k1.type = avro
#agent1.sinks.k1.channel = c1
#agent1.sinks.k1.hostname = master1
#agent1.sinks.k1.port = 9999

#set sink Spark Streaming pull data from flume
agent1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
agent1.sinks.k1.hostname = master1
agent1.sinks.k1.port = 9999
agent1.sinks.k1.channel = c1

#set channel
agent1.channels.c1.type = file
agent1.channels.c1.checkpointDir=/usr/local/flume/tmp/checkpointDir
agent2.channels.c1.dataDirs=/usr/local/flume/tmp/dataDirs


2.源码编写
package com.imf.spark.SparkApps.sparkstreaming;

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.flume.FlumeUtils;
import org.apache.spark.streaming.flume.SparkFlumeEvent;

import scala.Tuple2;

/**
 * 
 * @Description:Spark Streaming 从flume中拉取数据
 * @Author: lujinyong168
 * @Date: 2016年6月19日 下午3:37:01
 */
public class SparkStreamingPullDataFromFlume {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("SparkStreamingPullDataFromFlume for Java");
        JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(30));
//        JavaReceiverInputDStream<SparkFlumeEvent> lines = FlumeUtils.createStream(jsc,"master1", 9999); flume push data to Spark Streaming
        JavaReceiverInputDStream<SparkFlumeEvent> lines = FlumeUtils.createPollingStream(jsc,"master1", 9999);//Spark Streaming pull data from flume
        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<SparkFlumeEvent, String>() { 
            private static final long serialVersionUID = 1L;

            @Override
            public Iterable<String> call(SparkFlumeEvent event) throws Exception {
                String line = new String(event.event().getBody().array());
                return Arrays.asList(line.split(" "));
            }
        });

        JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<String, Integer>(word, 1);
            }
        });

        JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { //对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce)

            private static final long serialVersionUID = 1L;

            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        wordsCount.print();

        jsc.start();

        jsc.awaitTermination();
        jsc.close();

    }

}


3.编写启动脚本
/usr/local/spark/spark-1.6.0-bin-hadoop2.6/bin/spark-submit \
--class com.imf.spark.SparkApps.sparkstreaming.SparkStreamingPullDataFromFlume \
--master spark://master1:7077 \
/usr/local/sparkApps/SparkStreamingPullDataFromFlume/SparkApps-0.0.1-SNAPSHOT-jar-with-dependencies.jar


4.启动
先启动Hadoop集群,Spark集群可以不用启动,使用standalone模式。
启动flume
命令:root@master1:/usr/local/flume/apache-flume-1.6.0-bin/conf# flume-ng agent -n agent1 -c conf -f flume-conf.properties -Dflume.root.logger=DEBUG,console
查看控制台信息(截取部分信息):
16/06/19 16:52:27 INFO node.Application: Starting Sink k1
16/06/19 16:52:27 INFO sink.SparkSink: Starting Spark Sink: k1 on port: 9999 and interface: master1 with pool size: 10 and transaction timeout: 60.
16/06/19 16:52:27 INFO node.Application: Starting Source r1
16/06/19 16:52:27 INFO source.SpoolDirectorySource: SpoolDirectorySource source starting with directory: /usr/local/flume/tmp/TestDir
16/06/19 16:52:27 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
16/06/19 16:52:27 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started


启动spark Streaming job的调度脚本,并查看控制台,job30秒调度一次
root@master1:/usr/local/sparkApps/SparkStreamingPullDataFromFlume# ./run.sh
16/06/19 16:59:11 INFO spark.SparkContext: Running Spark version 1.6.0
16/06/19 16:59:12 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/06/19 16:59:12 WARN spark.SparkConf: 
SPARK_CLASSPATH was detected (set to '/usr/local/hive/apache-hive-1.2.1-bin/lib/mysql-connector-java-5.1.35-bin.jar:').
This is deprecated in Spark 1.0+.

Please instead use:
 - ./spark-submit with --driver-class-path to augment the driver classpath
 - spark.executor.extraClassPath to augment the executor classpath

16/06/19 16:59:12 WARN spark.SparkConf: Setting 'spark.executor.extraClassPath' to '/usr/local/hive/apache-hive-1.2.1-bin/lib/mysql-connector-java-5.1.35-bin.jar:' as a work-around.
16/06/19 16:59:12 WARN spark.SparkConf: Setting 'spark.driver.extraClassPath' to '/usr/local/hive/apache-hive-1.2.1-bin/lib/mysql-connector-java-5.1.35-bin.jar:' as a work-around.
16/06/19 16:59:13 INFO spark.SecurityManager: Changing view acls to: root
16/06/19 16:59:13 INFO spark.SecurityManager: Changing modify acls to: root
16/06/19 16:59:13 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
16/06/19 16:59:14 INFO util.Utils: Successfully started service 'sparkDriver' on port 32969.
16/06/19 16:59:14 INFO slf4j.Slf4jLogger: Slf4jLogger started
16/06/19 16:59:14 INFO Remoting: Starting remoting
16/06/19 16:59:15 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.112.130:46574]
16/06/19 16:59:15 INFO util.Utils: Successfully started service 'sparkDriverActorSystem' on port 46574.
16/06/19 16:59:15 INFO spark.SparkEnv: Registering MapOutputTracker
16/06/19 16:59:15 INFO spark.SparkEnv: Registering BlockManagerMaster
16/06/19 16:59:15 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-5b79546a-c1c2-466c-b72f-cef9cae03ffb
16/06/19 16:59:15 INFO storage.MemoryStore: MemoryStore started with capacity 517.4 MB
16/06/19 16:59:15 INFO spark.SparkEnv: Registering OutputCommitCoordinator
16/06/19 16:59:15 INFO server.Server: jetty-8.y.z-SNAPSHOT
16/06/19 16:59:15 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
16/06/19 16:59:15 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
16/06/19 16:59:15 INFO ui.SparkUI: Started SparkUI at http://192.168.112.130:4040
16/06/19 16:59:15 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-63eb086d-39bf-4416-b21b-24acaf38d99c/httpd-2322a15e-b95e-4b4d-8bdb-fbb37e7d6c16
16/06/19 16:59:15 INFO spark.HttpServer: Starting HTTP Server
16/06/19 16:59:16 INFO server.Server: jetty-8.y.z-SNAPSHOT
16/06/19 16:59:16 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:34178
16/06/19 16:59:16 INFO util.Utils: Successfully started service 'HTTP file server' on port 34178.
16/06/19 16:59:17 INFO spark.SparkContext: Added JAR file:/usr/local/sparkApps/SparkStreamingPullDataFromFlume/SparkApps-0.0.1-SNAPSHOT-jar-with-dependencies.jar at http://192.168.112.130:34178/jars/SparkApps-0.0.1-SNAPSHOT-jar-with-dependencies.jar with timestamp 1466326757806
16/06/19 16:59:18 INFO executor.Executor: Starting executor ID driver on host localhost
16/06/19 16:59:18 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 40258.
16/06/19 16:59:18 INFO netty.NettyBlockTransferService: Server created on 40258
16/06/19 16:59:18 INFO storage.BlockManagerMaster: Trying to register BlockManager
16/06/19 16:59:18 INFO storage.BlockManagerMasterEndpoint: Registering block manager localhost:40258 with 517.4 MB RAM, BlockManagerId(driver, localhost, 40258)
16/06/19 16:59:18 INFO storage.BlockManagerMaster: Registered BlockManager
16/06/19 16:59:19 INFO scheduler.EventLoggingListener: Logging events to hdfs://master1:9000/historyserverforSpark/local-1466326757943
16/06/19 16:59:20 INFO scheduler.ReceiverTracker: Starting 1 receivers
16/06/19 16:59:20 INFO scheduler.ReceiverTracker: ReceiverTracker started
16/06/19 16:59:20 INFO dstream.ForEachDStream: metadataCleanupDelay = -1
16/06/19 16:59:20 INFO dstream.ShuffledDStream: metadataCleanupDelay = -1
16/06/19 16:59:20 INFO dstream.MappedDStream: metadataCleanupDelay = -1
16/06/19 16:59:20 INFO dstream.FlatMappedDStream: metadataCleanupDelay = -1
16/06/19 16:59:20 INFO flume.FlumePollingInputDStream: metadataCleanupDelay = -1
16/06/19 16:59:20 INFO flume.FlumePollingInputDStream: Slide time = 30000 ms
16/06/19 16:59:20 INFO flume.FlumePollingInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/06/19 16:59:20 INFO flume.FlumePollingInputDStream: Checkpoint interval = null
16/06/19 16:59:20 INFO flume.FlumePollingInputDStream: Remember duration = 30000 ms
16/06/19 16:59:20 INFO flume.FlumePollingInputDStream: Initialized and validated org.apache.spark.streaming.flume.FlumePollingInputDStream@538a1d89
16/06/19 16:59:20 INFO dstream.FlatMappedDStream: Slide time = 30000 ms
16/06/19 16:59:20 INFO dstream.FlatMappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/06/19 16:59:20 INFO dstream.FlatMappedDStream: Checkpoint interval = null
16/06/19 16:59:20 INFO dstream.FlatMappedDStream: Remember duration = 30000 ms
16/06/19 16:59:20 INFO dstream.FlatMappedDStream: Initialized and validated org.apache.spark.streaming.dstream.FlatMappedDStream@1e794193
16/06/19 16:59:20 INFO dstream.MappedDStream: Slide time = 30000 ms
16/06/19 16:59:20 INFO dstream.MappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/06/19 16:59:20 INFO dstream.MappedDStream: Checkpoint interval = null
16/06/19 16:59:20 INFO dstream.MappedDStream: Remember duration = 30000 ms
16/06/19 16:59:20 INFO dstream.MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@2647c258
16/06/19 16:59:20 INFO dstream.ShuffledDStream: Slide time = 30000 ms
16/06/19 16:59:20 INFO dstream.ShuffledDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/06/19 16:59:20 INFO dstream.ShuffledDStream: Checkpoint interval = null
16/06/19 16:59:20 INFO dstream.ShuffledDStream: Remember duration = 30000 ms
16/06/19 16:59:20 INFO dstream.ShuffledDStream: Initialized and validated org.apache.spark.streaming.dstream.ShuffledDStream@f9cdfc5
16/06/19 16:59:20 INFO dstream.ForEachDStream: Slide time = 30000 ms
16/06/19 16:59:20 INFO dstream.ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/06/19 16:59:20 INFO dstream.ForEachDStream: Checkpoint interval = null
16/06/19 16:59:20 INFO dstream.ForEachDStream: Remember duration = 30000 ms
16/06/19 16:59:20 INFO dstream.ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@cf9fdea
16/06/19 16:59:20 INFO scheduler.ReceiverTracker: Receiver 0 started
16/06/19 16:59:20 INFO scheduler.DAGScheduler: Got job 0 (start at SparkStreamingPullDataFromFlume.java:63) with 1 output partitions
16/06/19 16:59:20 INFO scheduler.DAGScheduler: Final stage: ResultStage 0 (start at SparkStreamingPullDataFromFlume.java:63)
16/06/19 16:59:20 INFO scheduler.DAGScheduler: Parents of final stage: List()
16/06/19 16:59:20 INFO scheduler.DAGScheduler: Missing parents: List()
16/06/19 16:59:20 INFO util.RecurringTimer: Started timer for JobGenerator at time 1466326770000
16/06/19 16:59:20 INFO scheduler.JobGenerator: Started JobGenerator at 1466326770000 ms
16/06/19 16:59:20 INFO scheduler.JobScheduler: Started JobScheduler
16/06/19 16:59:20 INFO streaming.StreamingContext: StreamingContext started
16/06/19 16:59:20 INFO scheduler.DAGScheduler: Submitting ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:588), which has no missing parents
16/06/19 16:59:21 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 61.0 KB, free 61.0 KB)
16/06/19 16:59:21 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 20.5 KB, free 81.6 KB)
16/06/19 16:59:21 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:40258 (size: 20.5 KB, free: 517.4 MB)
16/06/19 16:59:21 INFO spark.SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
16/06/19 16:59:21 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:588)
16/06/19 16:59:21 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
16/06/19 16:59:21 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 3106 bytes)
16/06/19 16:59:21 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0)
16/06/19 16:59:21 INFO executor.Executor: Fetching http://192.168.112.130:34178/jars/SparkApps-0.0.1-SNAPSHOT-jar-with-dependencies.jar with timestamp 1466326757806
16/06/19 16:59:21 INFO util.Utils: Fetching http://192.168.112.130:34178/jars/SparkApps-0.0.1-SNAPSHOT-jar-with-dependencies.jar to /tmp/spark-63eb086d-39bf-4416-b21b-24acaf38d99c/userFiles-fcba890a-719c-4351-bad0-38118b66a90c/fetchFileTemp3397272035245474294.tmp
16/06/19 16:59:23 INFO executor.Executor: Adding file:/tmp/spark-63eb086d-39bf-4416-b21b-24acaf38d99c/userFiles-fcba890a-719c-4351-bad0-38118b66a90c/SparkApps-0.0.1-SNAPSHOT-jar-with-dependencies.jar to class loader
16/06/19 16:59:23 INFO util.RecurringTimer: Started timer for BlockGenerator at time 1466326763400
16/06/19 16:59:23 INFO receiver.BlockGenerator: Started BlockGenerator
16/06/19 16:59:23 INFO receiver.BlockGenerator: Started block pushing thread
16/06/19 16:59:23 INFO scheduler.ReceiverTracker: Registered receiver for stream 0 from 192.168.112.130:32969
16/06/19 16:59:23 INFO receiver.ReceiverSupervisorImpl: Starting receiver
16/06/19 16:59:23 INFO flume.FlumePollingReceiver: Starting Flume Polling Receiver worker threads..
16/06/19 16:59:23 INFO flume.FlumePollingReceiver: Starting Flume Polling Receiver worker threads..
16/06/19 16:59:23 INFO flume.FlumePollingReceiver: Starting Flume Polling Receiver worker threads..
16/06/19 16:59:23 INFO flume.FlumePollingReceiver: Starting Flume Polling Receiver worker threads..
16/06/19 16:59:23 INFO flume.FlumePollingReceiver: Starting Flume Polling Receiver worker threads..
16/06/19 16:59:23 INFO receiver.ReceiverSupervisorImpl: Called receiver onStart
16/06/19 16:59:23 INFO receiver.ReceiverSupervisorImpl: Waiting for receiver to be stopped
16/06/19 16:59:30 INFO scheduler.JobScheduler: Added jobs for time 1466326770000 ms
16/06/19 16:59:30 INFO scheduler.JobScheduler: Starting job streaming job 1466326770000 ms.0 from job set of time 1466326770000 ms
16/06/19 16:59:30 INFO spark.SparkContext: Starting job: print at SparkStreamingPullDataFromFlume.java:61
16/06/19 16:59:30 INFO scheduler.DAGScheduler: Registering RDD 3 (mapToPair at SparkStreamingPullDataFromFlume.java:41)
16/06/19 16:59:30 INFO scheduler.DAGScheduler: Got job 1 (print at SparkStreamingPullDataFromFlume.java:61) with 1 output partitions
16/06/19 16:59:30 INFO scheduler.DAGScheduler: Final stage: ResultStage 2 (print at SparkStreamingPullDataFromFlume.java:61)
16/06/19 16:59:30 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 1)
16/06/19 16:59:30 INFO scheduler.DAGScheduler: Missing parents: List()
16/06/19 16:59:30 INFO scheduler.DAGScheduler: Submitting ResultStage 2 (ShuffledRDD[4] at reduceByKey at SparkStreamingPullDataFromFlume.java:51), which has no missing parents
16/06/19 16:59:30 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 2.9 KB, free 84.5 KB)
16/06/19 16:59:30 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1805.0 B, free 86.3 KB)
16/06/19 16:59:30 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:40258 (size: 1805.0 B, free: 517.4 MB)
16/06/19 16:59:30 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
16/06/19 16:59:30 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (ShuffledRDD[4] at reduceByKey at SparkStreamingPullDataFromFlume.java:51)
16/06/19 16:59:30 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
16/06/19 16:59:30 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0 (TID 1, localhost, partition 0,PROCESS_LOCAL, 1988 bytes)
16/06/19 16:59:30 INFO executor.Executor: Running task 0.0 in stage 2.0 (TID 1)
16/06/19 16:59:30 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks
16/06/19 16:59:30 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 3 ms
16/06/19 16:59:30 INFO executor.Executor: Finished task 0.0 in stage 2.0 (TID 1). 1161 bytes result sent to driver
16/06/19 16:59:30 INFO scheduler.DAGScheduler: ResultStage 2 (print at SparkStreamingPullDataFromFlume.java:61) finished in 0.086 s
16/06/19 16:59:30 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 2.0 (TID 1) in 71 ms on localhost (1/1)
16/06/19 16:59:30 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
16/06/19 16:59:30 INFO scheduler.DAGScheduler: Job 1 finished: print at SparkStreamingPullDataFromFlume.java:61, took 0.159471 s
16/06/19 16:59:30 INFO spark.SparkContext: Starting job: print at SparkStreamingPullDataFromFlume.java:61
16/06/19 16:59:30 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 82 bytes
16/06/19 16:59:30 INFO scheduler.DAGScheduler: Got job 2 (print at SparkStreamingPullDataFromFlume.java:61) with 3 output partitions
16/06/19 16:59:30 INFO scheduler.DAGScheduler: Final stage: ResultStage 4 (print at SparkStreamingPullDataFromFlume.java:61)
16/06/19 16:59:30 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 3)
16/06/19 16:59:30 INFO scheduler.DAGScheduler: Missing parents: List()
16/06/19 16:59:30 INFO scheduler.DAGScheduler: Submitting ResultStage 4 (ShuffledRDD[4] at reduceByKey at SparkStreamingPullDataFromFlume.java:51), which has no missing parents
16/06/19 16:59:30 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 2.9 KB, free 89.2 KB)
16/06/19 16:59:30 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1805.0 B, free 91.0 KB)
16/06/19 16:59:30 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:40258 (size: 1805.0 B, free: 517.4 MB)
16/06/19 16:59:30 INFO spark.SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006
16/06/19 16:59:30 INFO scheduler.DAGScheduler: Submitting 3 missing tasks from ResultStage 4 (ShuffledRDD[4] at reduceByKey at SparkStreamingPullDataFromFlume.java:51)
16/06/19 16:59:30 INFO scheduler.TaskSchedulerImpl: Adding task set 4.0 with 3 tasks
16/06/19 16:59:30 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 4.0 (TID 2, localhost, partition 1,PROCESS_LOCAL, 1988 bytes)
16/06/19 16:59:30 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 4.0 (TID 3, localhost, partition 2,PROCESS_LOCAL, 1988 bytes)
16/06/19 16:59:30 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 4.0 (TID 4, localhost, partition 3,PROCESS_LOCAL, 1988 bytes)
16/06/19 16:59:30 INFO executor.Executor: Running task 0.0 in stage 4.0 (TID 2)
16/06/19 16:59:30 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks
16/06/19 16:59:30 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
16/06/19 16:59:30 INFO executor.Executor: Finished task 0.0 in stage 4.0 (TID 2). 1161 bytes result sent to driver
16/06/19 16:59:30 INFO executor.Executor: Running task 2.0 in stage 4.0 (TID 4)
16/06/19 16:59:30 INFO executor.Executor: Running task 1.0 in stage 4.0 (TID 3)
16/06/19 16:59:30 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 4.0 (TID 2) in 16 ms on localhost (1/3)
16/06/19 16:59:30 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks
16/06/19 16:59:30 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 4 ms
16/06/19 16:59:30 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks
16/06/19 16:59:30 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 4 ms
16/06/19 16:59:30 INFO executor.Executor: Finished task 1.0 in stage 4.0 (TID 3). 1161 bytes result sent to driver
16/06/19 16:59:30 INFO executor.Executor: Finished task 2.0 in stage 4.0 (TID 4). 1161 bytes result sent to driver
16/06/19 16:59:30 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 4.0 (TID 3) in 41 ms on localhost (2/3)
16/06/19 16:59:30 INFO scheduler.DAGScheduler: ResultStage 4 (print at SparkStreamingPullDataFromFlume.java:61) finished in 0.041 s
16/06/19 16:59:30 INFO scheduler.DAGScheduler: Job 2 finished: print at SparkStreamingPullDataFromFlume.java:61, took 0.113834 s
16/06/19 16:59:30 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 4.0 (TID 4) in 42 ms on localhost (3/3)
16/06/19 16:59:30 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool 
-------------------------------------------
Time: 1466326770000 ms
-------------------------------------------

16/06/19 16:59:30 INFO scheduler.JobScheduler: Finished job streaming job 1466326770000 ms.0 from job set of time 1466326770000 ms
16/06/19 16:59:30 INFO scheduler.JobScheduler: Total delay: 0.480 s for time 1466326770000 ms (execution: 0.373 s)
16/06/19 16:59:30 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer()
16/06/19 16:59:30 INFO scheduler.InputInfoTracker: remove old batch metadata: 


Spark Streaming 的job启动后,从flume控制台上可以看到下面的日志,说明Spark Streaming 和Flume通信成功。
16/06/19 16:52:27 INFO sink.SparkSink: Starting Avro server for sink: k1
16/06/19 16:52:27 INFO sink.SparkSink: Blocking Sink Runner, sink will continue to run..
16/06/19 16:52:47 INFO ipc.NettyServer: [id: 0x976e21b0, /192.168.112.130:51610 => /192.168.112.130:9999] OPEN
16/06/19 16:52:47 INFO ipc.NettyServer: [id: 0x976e21b0, /192.168.112.130:51610 => /192.168.112.130:9999] BOUND: /192.168.112.130:9999
16/06/19 16:52:47 INFO ipc.NettyServer: [id: 0x976e21b0, /192.168.112.130:51610 => /192.168.112.130:9999] CONNECTED: /192.168.112.130:51610


测试数据
创建文件test_7.log,并拷贝到TestDir目录中。
root@master1:/usr/local/flume/tmp# cat test_7.log 
Hello Java Java
Hello Hadoop
Hello Spark Spark Spark
root@master1:/usr/local/flume/tmp# cp test_7.log TestDir/

查看flume控制台信息:
16/06/19 17:08:11 INFO avro.ReliableSpoolingFileEventReader: Last read took us just up to a file boundary. Rolling to the next file, if there is one.
16/06/19 17:08:11 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /usr/local/flume/tmp/TestDir/test_7.log to /usr/local/flume/tmp/TestDir/test_7.log.COMPLETED
16/06/19 17:08:26 INFO file.EventQueueBackingStoreFile: Start checkpoint for /usr/local/flume/tmp/checkpointDir/checkpoint, elements to sync = 3
16/06/19 17:08:26 INFO file.EventQueueBackingStoreFile: Updating checkpoint metadata: logWriteOrderID: 1466326346643, queueSize: 0, queueHead: 7
16/06/19 17:08:26 INFO file.Log: Updated checkpoint for file: /root/.flume/file-channel/data/log-9 position: 1060 logWriteOrderID: 1466326346643


查看job控制台信息:
16/06/19 17:08:30 INFO scheduler.JobScheduler: Starting job streaming job 1466327310000 ms.0 from job set of time 1466327310000 ms
16/06/19 17:08:30 INFO spark.SparkContext: Starting job: print at SparkStreamingPullDataFromFlume.java:61
16/06/19 17:08:30 INFO scheduler.DAGScheduler: Registering RDD 75 (mapToPair at SparkStreamingPullDataFromFlume.java:41)
16/06/19 17:08:30 INFO scheduler.DAGScheduler: Got job 37 (print at SparkStreamingPullDataFromFlume.java:61) with 1 output partitions
16/06/19 17:08:30 INFO scheduler.DAGScheduler: Final stage: ResultStage 74 (print at SparkStreamingPullDataFromFlume.java:61)
16/06/19 17:08:30 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 73)
16/06/19 17:08:30 INFO scheduler.DAGScheduler: Missing parents: List(ShuffleMapStage 73)
16/06/19 17:08:30 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 73 (MapPartitionsRDD[75] at mapToPair at SparkStreamingPullDataFromFlume.java:41), which has no missing parents
16/06/19 17:08:30 INFO storage.MemoryStore: Block broadcast_39 stored as values in memory (estimated size 3.5 KB, free 99.4 KB)
16/06/19 17:08:30 INFO scheduler.JobScheduler: Added jobs for time 1466327310000 ms
16/06/19 17:08:30 INFO storage.MemoryStore: Block broadcast_39_piece0 stored as bytes in memory (estimated size 1984.0 B, free 101.3 KB)
16/06/19 17:08:30 INFO storage.BlockManagerInfo: Added broadcast_39_piece0 in memory on localhost:40258 (size: 1984.0 B, free: 517.4 MB)
16/06/19 17:08:30 INFO spark.SparkContext: Created broadcast 39 from broadcast at DAGScheduler.scala:1006
16/06/19 17:08:30 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 73 (MapPartitionsRDD[75] at mapToPair at SparkStreamingPullDataFromFlume.java:41)
16/06/19 17:08:30 INFO scheduler.TaskSchedulerImpl: Adding task set 73.0 with 1 tasks
16/06/19 17:08:30 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 73.0 (TID 75, localhost, partition 0,NODE_LOCAL, 2100 bytes)
16/06/19 17:08:30 INFO executor.Executor: Running task 0.0 in stage 73.0 (TID 75)
16/06/19 17:08:30 INFO storage.BlockManager: Found block input-0-1466326763359 locally
16/06/19 17:08:30 INFO executor.Executor: Finished task 0.0 in stage 73.0 (TID 75). 1161 bytes result sent to driver
16/06/19 17:08:30 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 73.0 (TID 75) in 18 ms on localhost (1/1)
16/06/19 17:08:30 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 73.0, whose tasks have all completed, from pool 
16/06/19 17:08:30 INFO scheduler.DAGScheduler: ShuffleMapStage 73 (mapToPair at SparkStreamingPullDataFromFlume.java:41) finished in 0.014 s
16/06/19 17:08:30 INFO scheduler.DAGScheduler: looking for newly runnable stages
16/06/19 17:08:30 INFO scheduler.DAGScheduler: running: Set(ResultStage 0)
16/06/19 17:08:30 INFO scheduler.DAGScheduler: waiting: Set(ResultStage 74)
16/06/19 17:08:30 INFO scheduler.DAGScheduler: failed: Set()
16/06/19 17:08:30 INFO scheduler.DAGScheduler: Submitting ResultStage 74 (ShuffledRDD[76] at reduceByKey at SparkStreamingPullDataFromFlume.java:51), which has no missing parents
16/06/19 17:08:30 INFO storage.MemoryStore: Block broadcast_40 stored as values in memory (estimated size 2.9 KB, free 104.3 KB)
16/06/19 17:08:30 INFO storage.MemoryStore: Block broadcast_40_piece0 stored as bytes in memory (estimated size 1807.0 B, free 106.1 KB)
16/06/19 17:08:30 INFO storage.BlockManagerInfo: Added broadcast_40_piece0 in memory on localhost:40258 (size: 1807.0 B, free: 517.4 MB)
16/06/19 17:08:30 INFO spark.SparkContext: Created broadcast 40 from broadcast at DAGScheduler.scala:1006
16/06/19 17:08:30 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 74 (ShuffledRDD[76] at reduceByKey at SparkStreamingPullDataFromFlume.java:51)
16/06/19 17:08:30 INFO scheduler.TaskSchedulerImpl: Adding task set 74.0 with 1 tasks
16/06/19 17:08:30 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 74.0 (TID 76, localhost, partition 0,PROCESS_LOCAL, 1988 bytes)
16/06/19 17:08:30 INFO executor.Executor: Running task 0.0 in stage 74.0 (TID 76)
16/06/19 17:08:30 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 1 blocks
16/06/19 17:08:30 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
16/06/19 17:08:30 INFO executor.Executor: Finished task 0.0 in stage 74.0 (TID 76). 1161 bytes result sent to driver
16/06/19 17:08:30 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 74.0 (TID 76) in 3 ms on localhost (1/1)
16/06/19 17:08:30 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 74.0, whose tasks have all completed, from pool 
16/06/19 17:08:30 INFO scheduler.DAGScheduler: ResultStage 74 (print at SparkStreamingPullDataFromFlume.java:61) finished in 0.001 s
16/06/19 17:08:30 INFO scheduler.DAGScheduler: Job 37 finished: print at SparkStreamingPullDataFromFlume.java:61, took 0.054738 s
16/06/19 17:08:30 INFO spark.SparkContext: Starting job: print at SparkStreamingPullDataFromFlume.java:61
16/06/19 17:08:30 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 18 is 147 bytes
16/06/19 17:08:30 INFO scheduler.DAGScheduler: Got job 38 (print at SparkStreamingPullDataFromFlume.java:61) with 3 output partitions
16/06/19 17:08:30 INFO scheduler.DAGScheduler: Final stage: ResultStage 76 (print at SparkStreamingPullDataFromFlume.java:61)
16/06/19 17:08:30 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 75)
16/06/19 17:08:30 INFO scheduler.DAGScheduler: Missing parents: List()
16/06/19 17:08:30 INFO scheduler.DAGScheduler: Submitting ResultStage 76 (ShuffledRDD[76] at reduceByKey at SparkStreamingPullDataFromFlume.java:51), which has no missing parents
16/06/19 17:08:30 INFO storage.MemoryStore: Block broadcast_41 stored as values in memory (estimated size 2.9 KB, free 109.0 KB)
16/06/19 17:08:30 INFO storage.MemoryStore: Block broadcast_41_piece0 stored as bytes in memory (estimated size 1807.0 B, free 110.8 KB)
16/06/19 17:08:30 INFO storage.BlockManagerInfo: Added broadcast_41_piece0 in memory on localhost:40258 (size: 1807.0 B, free: 517.4 MB)
16/06/19 17:08:30 INFO spark.SparkContext: Created broadcast 41 from broadcast at DAGScheduler.scala:1006
16/06/19 17:08:30 INFO scheduler.DAGScheduler: Submitting 3 missing tasks from ResultStage 76 (ShuffledRDD[76] at reduceByKey at SparkStreamingPullDataFromFlume.java:51)
16/06/19 17:08:30 INFO scheduler.TaskSchedulerImpl: Adding task set 76.0 with 3 tasks
16/06/19 17:08:30 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 76.0 (TID 77, localhost, partition 1,NODE_LOCAL, 1988 bytes)
16/06/19 17:08:30 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 76.0 (TID 78, localhost, partition 2,NODE_LOCAL, 1988 bytes)
16/06/19 17:08:30 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 76.0 (TID 79, localhost, partition 3,PROCESS_LOCAL, 1988 bytes)
16/06/19 17:08:30 INFO executor.Executor: Running task 0.0 in stage 76.0 (TID 77)
16/06/19 17:08:30 INFO executor.Executor: Running task 2.0 in stage 76.0 (TID 79)
16/06/19 17:08:30 INFO executor.Executor: Running task 1.0 in stage 76.0 (TID 78)
16/06/19 17:08:30 INFO storage.ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
16/06/19 17:08:30 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 2 ms
16/06/19 17:08:30 INFO storage.ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
16/06/19 17:08:30 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
16/06/19 17:08:30 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 1 blocks
16/06/19 17:08:30 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 2 ms
16/06/19 17:08:30 INFO executor.Executor: Finished task 2.0 in stage 76.0 (TID 79). 1161 bytes result sent to driver
16/06/19 17:08:30 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 76.0 (TID 79) in 7 ms on localhost (1/3)
16/06/19 17:08:30 INFO executor.Executor: Finished task 0.0 in stage 76.0 (TID 77). 1336 bytes result sent to driver
16/06/19 17:08:30 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 76.0 (TID 77) in 18 ms on localhost (2/3)
16/06/19 17:08:30 INFO executor.Executor: Finished task 1.0 in stage 76.0 (TID 78). 1334 bytes result sent to driver
16/06/19 17:08:30 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 76.0 (TID 78) in 18 ms on localhost (3/3)
16/06/19 17:08:30 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 76.0, whose tasks have all completed, from pool 
16/06/19 17:08:30 INFO scheduler.DAGScheduler: ResultStage 76 (print at SparkStreamingPullDataFromFlume.java:61) finished in 0.003 s
16/06/19 17:08:30 INFO scheduler.DAGScheduler: Job 38 finished: print at SparkStreamingPullDataFromFlume.java:61, took 0.046156 s
-------------------------------------------
Time: 1466327310000 ms
-------------------------------------------
(Spark,3)
(Hadoop,1)
(Hello,3)
(Java,2)

16/06/19 17:08:30 INFO scheduler.JobScheduler: Finished job streaming job 1466327310000 ms.0 from job set of time 1466327310000 ms
16/06/19 17:08:30 INFO scheduler.JobScheduler: Total delay: 0.123 s for time 1466327310000 ms (execution: 0.114 s)
16/06/19 17:08:30 INFO rdd.ShuffledRDD: Removing RDD 72 from persistence list
16/06/19 17:08:30 INFO storage.BlockManager: Removing RDD 72
16/06/19 17:08:30 INFO rdd.MapPartitionsRDD: Removing RDD 71 from persistence list
16/06/19 17:08:30 INFO rdd.MapPartitionsRDD: Removing RDD 70 from persistence list
16/06/19 17:08:30 INFO storage.BlockManager: Removing RDD 71
16/06/19 17:08:30 INFO rdd.BlockRDD: Removing RDD 69 from persistence list
16/06/19 17:08:30 INFO storage.BlockManager: Removing RDD 70
16/06/19 17:08:30 INFO flume.FlumePollingInputDStream: Removing blocks of RDD BlockRDD[69] at createPollingStream at SparkStreamingPullDataFromFlume.java:30 of time 1466327310000 ms
16/06/19 17:08:30 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer(1466327250000 ms)
16/06/19 17:08:30 INFO storage.BlockManager: Removing RDD 69
16/06/19 17:08:30 INFO scheduler.InputInfoTracker: remove old batch metadata: 1466327250000 ms


从flume控制台和job控制台打印的信息可以看到,当test_7.log文件拷贝到TestDir目录时,flume会处理文件并将其checkpoint,当job触发后会拉取该数据进行处理。
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics