- 浏览: 236151 次
- 性别:
- 来自: 上海
文章分类
最新评论
-
lwb314:
你的这个是创建的临时的hive表,数据也是通过文件录入进去的, ...
Spark SQL操作Hive数据库 -
yixiaoqi2010:
你好 我的提交上去 总是报错,找不到hive表,可能是哪里 ...
Spark SQL操作Hive数据库 -
bo_hai:
target jvm版本也要选择正确。不能选择太高。2.10对 ...
eclipse开发spark程序配置本地运行
Spark Streaming + Flume Integration Guide:http://spark.apache.org/docs/latest/streaming-flume-integration.html
本例子是做一个Spark Streaming 从flume中拉取数据的实验。
1.面配置flume
1.首先下载必须的jar:在上面的连接中有。并将其拷贝到/usr/local/flume/apache-flume-1.6.0-bin/lib目录下。
jar可以从官网上下载,也可以下载附件中的jar;
配置master1上的flume
master1上修改配置文件root@master1:/usr/local/flume/apache-flume-1.6.0-bin/conf/flume-conf.properties
2.源码编写
3.编写启动脚本
4.启动
先启动Hadoop集群,Spark集群可以不用启动,使用standalone模式。
启动flume
命令:root@master1:/usr/local/flume/apache-flume-1.6.0-bin/conf# flume-ng agent -n agent1 -c conf -f flume-conf.properties -Dflume.root.logger=DEBUG,console
查看控制台信息(截取部分信息):
启动spark Streaming job的调度脚本,并查看控制台,job30秒调度一次
Spark Streaming 的job启动后,从flume控制台上可以看到下面的日志,说明Spark Streaming 和Flume通信成功。
测试数据
创建文件test_7.log,并拷贝到TestDir目录中。
查看flume控制台信息:
查看job控制台信息:
从flume控制台和job控制台打印的信息可以看到,当test_7.log文件拷贝到TestDir目录时,flume会处理文件并将其checkpoint,当job触发后会拉取该数据进行处理。
本例子是做一个Spark Streaming 从flume中拉取数据的实验。
1.面配置flume
1.首先下载必须的jar:在上面的连接中有。并将其拷贝到/usr/local/flume/apache-flume-1.6.0-bin/lib目录下。
jar可以从官网上下载,也可以下载附件中的jar;
配置master1上的flume
master1上修改配置文件root@master1:/usr/local/flume/apache-flume-1.6.0-bin/conf/flume-conf.properties
#agent1 name agent1.channels = c1 agent1.sources = r1 agent1.sinks = k1 #set source agent1.sources.r1.type = spooldir agent1.sources.r1.spoolDir =/usr/local/flume/tmp/TestDir agent1.sources.r1.channels = c1 agent1.sources.r1.fileHeader = false agent1.sources.r1.interceptors = i1 agent1.sources.r1.interceptors.i1.type = timestamp # set sink to hdfs #agent1.sinks.k1.type=hdfs #agent1.sinks.k1.hdfs.path=hdfs://master1:9000/library/flume #agent1.sinks.k1.hdfs.fileType=DataStream #agent1.sinks.k1.hdfs.writerFormat=TEXT #agent1.sinks.k1.hdfs.roolInterval=1 #agent1.sinks.k1.hdfs.filePrefix=%Y-%m-%d #agent1.sinks.k1.channel=c1 #set sink to Spark Streaming #agent1.sinks.k1.type = avro #agent1.sinks.k1.channel = c1 #agent1.sinks.k1.hostname = master1 #agent1.sinks.k1.port = 9999 #set sink Spark Streaming pull data from flume agent1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink agent1.sinks.k1.hostname = master1 agent1.sinks.k1.port = 9999 agent1.sinks.k1.channel = c1 #set channel agent1.channels.c1.type = file agent1.channels.c1.checkpointDir=/usr/local/flume/tmp/checkpointDir agent2.channels.c1.dataDirs=/usr/local/flume/tmp/dataDirs
2.源码编写
package com.imf.spark.SparkApps.sparkstreaming; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.flume.FlumeUtils; import org.apache.spark.streaming.flume.SparkFlumeEvent; import scala.Tuple2; /** * * @Description:Spark Streaming 从flume中拉取数据 * @Author: lujinyong168 * @Date: 2016年6月19日 下午3:37:01 */ public class SparkStreamingPullDataFromFlume { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("SparkStreamingPullDataFromFlume for Java"); JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(30)); // JavaReceiverInputDStream<SparkFlumeEvent> lines = FlumeUtils.createStream(jsc,"master1", 9999); flume push data to Spark Streaming JavaReceiverInputDStream<SparkFlumeEvent> lines = FlumeUtils.createPollingStream(jsc,"master1", 9999);//Spark Streaming pull data from flume JavaDStream<String> words = lines.flatMap(new FlatMapFunction<SparkFlumeEvent, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> call(SparkFlumeEvent event) throws Exception { String line = new String(event.event().getBody().array()); return Arrays.asList(line.split(" ")); } }); JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }); JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { //对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce) private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); wordsCount.print(); jsc.start(); jsc.awaitTermination(); jsc.close(); } }
3.编写启动脚本
/usr/local/spark/spark-1.6.0-bin-hadoop2.6/bin/spark-submit \ --class com.imf.spark.SparkApps.sparkstreaming.SparkStreamingPullDataFromFlume \ --master spark://master1:7077 \ /usr/local/sparkApps/SparkStreamingPullDataFromFlume/SparkApps-0.0.1-SNAPSHOT-jar-with-dependencies.jar
4.启动
先启动Hadoop集群,Spark集群可以不用启动,使用standalone模式。
启动flume
命令:root@master1:/usr/local/flume/apache-flume-1.6.0-bin/conf# flume-ng agent -n agent1 -c conf -f flume-conf.properties -Dflume.root.logger=DEBUG,console
查看控制台信息(截取部分信息):
16/06/19 16:52:27 INFO node.Application: Starting Sink k1 16/06/19 16:52:27 INFO sink.SparkSink: Starting Spark Sink: k1 on port: 9999 and interface: master1 with pool size: 10 and transaction timeout: 60. 16/06/19 16:52:27 INFO node.Application: Starting Source r1 16/06/19 16:52:27 INFO source.SpoolDirectorySource: SpoolDirectorySource source starting with directory: /usr/local/flume/tmp/TestDir 16/06/19 16:52:27 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean. 16/06/19 16:52:27 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
启动spark Streaming job的调度脚本,并查看控制台,job30秒调度一次
root@master1:/usr/local/sparkApps/SparkStreamingPullDataFromFlume# ./run.sh 16/06/19 16:59:11 INFO spark.SparkContext: Running Spark version 1.6.0 16/06/19 16:59:12 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 16/06/19 16:59:12 WARN spark.SparkConf: SPARK_CLASSPATH was detected (set to '/usr/local/hive/apache-hive-1.2.1-bin/lib/mysql-connector-java-5.1.35-bin.jar:'). This is deprecated in Spark 1.0+. Please instead use: - ./spark-submit with --driver-class-path to augment the driver classpath - spark.executor.extraClassPath to augment the executor classpath 16/06/19 16:59:12 WARN spark.SparkConf: Setting 'spark.executor.extraClassPath' to '/usr/local/hive/apache-hive-1.2.1-bin/lib/mysql-connector-java-5.1.35-bin.jar:' as a work-around. 16/06/19 16:59:12 WARN spark.SparkConf: Setting 'spark.driver.extraClassPath' to '/usr/local/hive/apache-hive-1.2.1-bin/lib/mysql-connector-java-5.1.35-bin.jar:' as a work-around. 16/06/19 16:59:13 INFO spark.SecurityManager: Changing view acls to: root 16/06/19 16:59:13 INFO spark.SecurityManager: Changing modify acls to: root 16/06/19 16:59:13 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root) 16/06/19 16:59:14 INFO util.Utils: Successfully started service 'sparkDriver' on port 32969. 16/06/19 16:59:14 INFO slf4j.Slf4jLogger: Slf4jLogger started 16/06/19 16:59:14 INFO Remoting: Starting remoting 16/06/19 16:59:15 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.112.130:46574] 16/06/19 16:59:15 INFO util.Utils: Successfully started service 'sparkDriverActorSystem' on port 46574. 16/06/19 16:59:15 INFO spark.SparkEnv: Registering MapOutputTracker 16/06/19 16:59:15 INFO spark.SparkEnv: Registering BlockManagerMaster 16/06/19 16:59:15 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-5b79546a-c1c2-466c-b72f-cef9cae03ffb 16/06/19 16:59:15 INFO storage.MemoryStore: MemoryStore started with capacity 517.4 MB 16/06/19 16:59:15 INFO spark.SparkEnv: Registering OutputCommitCoordinator 16/06/19 16:59:15 INFO server.Server: jetty-8.y.z-SNAPSHOT 16/06/19 16:59:15 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 16/06/19 16:59:15 INFO util.Utils: Successfully started service 'SparkUI' on port 4040. 16/06/19 16:59:15 INFO ui.SparkUI: Started SparkUI at http://192.168.112.130:4040 16/06/19 16:59:15 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-63eb086d-39bf-4416-b21b-24acaf38d99c/httpd-2322a15e-b95e-4b4d-8bdb-fbb37e7d6c16 16/06/19 16:59:15 INFO spark.HttpServer: Starting HTTP Server 16/06/19 16:59:16 INFO server.Server: jetty-8.y.z-SNAPSHOT 16/06/19 16:59:16 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:34178 16/06/19 16:59:16 INFO util.Utils: Successfully started service 'HTTP file server' on port 34178. 16/06/19 16:59:17 INFO spark.SparkContext: Added JAR file:/usr/local/sparkApps/SparkStreamingPullDataFromFlume/SparkApps-0.0.1-SNAPSHOT-jar-with-dependencies.jar at http://192.168.112.130:34178/jars/SparkApps-0.0.1-SNAPSHOT-jar-with-dependencies.jar with timestamp 1466326757806 16/06/19 16:59:18 INFO executor.Executor: Starting executor ID driver on host localhost 16/06/19 16:59:18 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 40258. 16/06/19 16:59:18 INFO netty.NettyBlockTransferService: Server created on 40258 16/06/19 16:59:18 INFO storage.BlockManagerMaster: Trying to register BlockManager 16/06/19 16:59:18 INFO storage.BlockManagerMasterEndpoint: Registering block manager localhost:40258 with 517.4 MB RAM, BlockManagerId(driver, localhost, 40258) 16/06/19 16:59:18 INFO storage.BlockManagerMaster: Registered BlockManager 16/06/19 16:59:19 INFO scheduler.EventLoggingListener: Logging events to hdfs://master1:9000/historyserverforSpark/local-1466326757943 16/06/19 16:59:20 INFO scheduler.ReceiverTracker: Starting 1 receivers 16/06/19 16:59:20 INFO scheduler.ReceiverTracker: ReceiverTracker started 16/06/19 16:59:20 INFO dstream.ForEachDStream: metadataCleanupDelay = -1 16/06/19 16:59:20 INFO dstream.ShuffledDStream: metadataCleanupDelay = -1 16/06/19 16:59:20 INFO dstream.MappedDStream: metadataCleanupDelay = -1 16/06/19 16:59:20 INFO dstream.FlatMappedDStream: metadataCleanupDelay = -1 16/06/19 16:59:20 INFO flume.FlumePollingInputDStream: metadataCleanupDelay = -1 16/06/19 16:59:20 INFO flume.FlumePollingInputDStream: Slide time = 30000 ms 16/06/19 16:59:20 INFO flume.FlumePollingInputDStream: Storage level = StorageLevel(false, false, false, false, 1) 16/06/19 16:59:20 INFO flume.FlumePollingInputDStream: Checkpoint interval = null 16/06/19 16:59:20 INFO flume.FlumePollingInputDStream: Remember duration = 30000 ms 16/06/19 16:59:20 INFO flume.FlumePollingInputDStream: Initialized and validated org.apache.spark.streaming.flume.FlumePollingInputDStream@538a1d89 16/06/19 16:59:20 INFO dstream.FlatMappedDStream: Slide time = 30000 ms 16/06/19 16:59:20 INFO dstream.FlatMappedDStream: Storage level = StorageLevel(false, false, false, false, 1) 16/06/19 16:59:20 INFO dstream.FlatMappedDStream: Checkpoint interval = null 16/06/19 16:59:20 INFO dstream.FlatMappedDStream: Remember duration = 30000 ms 16/06/19 16:59:20 INFO dstream.FlatMappedDStream: Initialized and validated org.apache.spark.streaming.dstream.FlatMappedDStream@1e794193 16/06/19 16:59:20 INFO dstream.MappedDStream: Slide time = 30000 ms 16/06/19 16:59:20 INFO dstream.MappedDStream: Storage level = StorageLevel(false, false, false, false, 1) 16/06/19 16:59:20 INFO dstream.MappedDStream: Checkpoint interval = null 16/06/19 16:59:20 INFO dstream.MappedDStream: Remember duration = 30000 ms 16/06/19 16:59:20 INFO dstream.MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@2647c258 16/06/19 16:59:20 INFO dstream.ShuffledDStream: Slide time = 30000 ms 16/06/19 16:59:20 INFO dstream.ShuffledDStream: Storage level = StorageLevel(false, false, false, false, 1) 16/06/19 16:59:20 INFO dstream.ShuffledDStream: Checkpoint interval = null 16/06/19 16:59:20 INFO dstream.ShuffledDStream: Remember duration = 30000 ms 16/06/19 16:59:20 INFO dstream.ShuffledDStream: Initialized and validated org.apache.spark.streaming.dstream.ShuffledDStream@f9cdfc5 16/06/19 16:59:20 INFO dstream.ForEachDStream: Slide time = 30000 ms 16/06/19 16:59:20 INFO dstream.ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1) 16/06/19 16:59:20 INFO dstream.ForEachDStream: Checkpoint interval = null 16/06/19 16:59:20 INFO dstream.ForEachDStream: Remember duration = 30000 ms 16/06/19 16:59:20 INFO dstream.ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@cf9fdea 16/06/19 16:59:20 INFO scheduler.ReceiverTracker: Receiver 0 started 16/06/19 16:59:20 INFO scheduler.DAGScheduler: Got job 0 (start at SparkStreamingPullDataFromFlume.java:63) with 1 output partitions 16/06/19 16:59:20 INFO scheduler.DAGScheduler: Final stage: ResultStage 0 (start at SparkStreamingPullDataFromFlume.java:63) 16/06/19 16:59:20 INFO scheduler.DAGScheduler: Parents of final stage: List() 16/06/19 16:59:20 INFO scheduler.DAGScheduler: Missing parents: List() 16/06/19 16:59:20 INFO util.RecurringTimer: Started timer for JobGenerator at time 1466326770000 16/06/19 16:59:20 INFO scheduler.JobGenerator: Started JobGenerator at 1466326770000 ms 16/06/19 16:59:20 INFO scheduler.JobScheduler: Started JobScheduler 16/06/19 16:59:20 INFO streaming.StreamingContext: StreamingContext started 16/06/19 16:59:20 INFO scheduler.DAGScheduler: Submitting ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:588), which has no missing parents 16/06/19 16:59:21 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 61.0 KB, free 61.0 KB) 16/06/19 16:59:21 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 20.5 KB, free 81.6 KB) 16/06/19 16:59:21 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:40258 (size: 20.5 KB, free: 517.4 MB) 16/06/19 16:59:21 INFO spark.SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006 16/06/19 16:59:21 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:588) 16/06/19 16:59:21 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 16/06/19 16:59:21 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 3106 bytes) 16/06/19 16:59:21 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0) 16/06/19 16:59:21 INFO executor.Executor: Fetching http://192.168.112.130:34178/jars/SparkApps-0.0.1-SNAPSHOT-jar-with-dependencies.jar with timestamp 1466326757806 16/06/19 16:59:21 INFO util.Utils: Fetching http://192.168.112.130:34178/jars/SparkApps-0.0.1-SNAPSHOT-jar-with-dependencies.jar to /tmp/spark-63eb086d-39bf-4416-b21b-24acaf38d99c/userFiles-fcba890a-719c-4351-bad0-38118b66a90c/fetchFileTemp3397272035245474294.tmp 16/06/19 16:59:23 INFO executor.Executor: Adding file:/tmp/spark-63eb086d-39bf-4416-b21b-24acaf38d99c/userFiles-fcba890a-719c-4351-bad0-38118b66a90c/SparkApps-0.0.1-SNAPSHOT-jar-with-dependencies.jar to class loader 16/06/19 16:59:23 INFO util.RecurringTimer: Started timer for BlockGenerator at time 1466326763400 16/06/19 16:59:23 INFO receiver.BlockGenerator: Started BlockGenerator 16/06/19 16:59:23 INFO receiver.BlockGenerator: Started block pushing thread 16/06/19 16:59:23 INFO scheduler.ReceiverTracker: Registered receiver for stream 0 from 192.168.112.130:32969 16/06/19 16:59:23 INFO receiver.ReceiverSupervisorImpl: Starting receiver 16/06/19 16:59:23 INFO flume.FlumePollingReceiver: Starting Flume Polling Receiver worker threads.. 16/06/19 16:59:23 INFO flume.FlumePollingReceiver: Starting Flume Polling Receiver worker threads.. 16/06/19 16:59:23 INFO flume.FlumePollingReceiver: Starting Flume Polling Receiver worker threads.. 16/06/19 16:59:23 INFO flume.FlumePollingReceiver: Starting Flume Polling Receiver worker threads.. 16/06/19 16:59:23 INFO flume.FlumePollingReceiver: Starting Flume Polling Receiver worker threads.. 16/06/19 16:59:23 INFO receiver.ReceiverSupervisorImpl: Called receiver onStart 16/06/19 16:59:23 INFO receiver.ReceiverSupervisorImpl: Waiting for receiver to be stopped 16/06/19 16:59:30 INFO scheduler.JobScheduler: Added jobs for time 1466326770000 ms 16/06/19 16:59:30 INFO scheduler.JobScheduler: Starting job streaming job 1466326770000 ms.0 from job set of time 1466326770000 ms 16/06/19 16:59:30 INFO spark.SparkContext: Starting job: print at SparkStreamingPullDataFromFlume.java:61 16/06/19 16:59:30 INFO scheduler.DAGScheduler: Registering RDD 3 (mapToPair at SparkStreamingPullDataFromFlume.java:41) 16/06/19 16:59:30 INFO scheduler.DAGScheduler: Got job 1 (print at SparkStreamingPullDataFromFlume.java:61) with 1 output partitions 16/06/19 16:59:30 INFO scheduler.DAGScheduler: Final stage: ResultStage 2 (print at SparkStreamingPullDataFromFlume.java:61) 16/06/19 16:59:30 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 1) 16/06/19 16:59:30 INFO scheduler.DAGScheduler: Missing parents: List() 16/06/19 16:59:30 INFO scheduler.DAGScheduler: Submitting ResultStage 2 (ShuffledRDD[4] at reduceByKey at SparkStreamingPullDataFromFlume.java:51), which has no missing parents 16/06/19 16:59:30 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 2.9 KB, free 84.5 KB) 16/06/19 16:59:30 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1805.0 B, free 86.3 KB) 16/06/19 16:59:30 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:40258 (size: 1805.0 B, free: 517.4 MB) 16/06/19 16:59:30 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006 16/06/19 16:59:30 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (ShuffledRDD[4] at reduceByKey at SparkStreamingPullDataFromFlume.java:51) 16/06/19 16:59:30 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 1 tasks 16/06/19 16:59:30 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0 (TID 1, localhost, partition 0,PROCESS_LOCAL, 1988 bytes) 16/06/19 16:59:30 INFO executor.Executor: Running task 0.0 in stage 2.0 (TID 1) 16/06/19 16:59:30 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks 16/06/19 16:59:30 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 3 ms 16/06/19 16:59:30 INFO executor.Executor: Finished task 0.0 in stage 2.0 (TID 1). 1161 bytes result sent to driver 16/06/19 16:59:30 INFO scheduler.DAGScheduler: ResultStage 2 (print at SparkStreamingPullDataFromFlume.java:61) finished in 0.086 s 16/06/19 16:59:30 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 2.0 (TID 1) in 71 ms on localhost (1/1) 16/06/19 16:59:30 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 16/06/19 16:59:30 INFO scheduler.DAGScheduler: Job 1 finished: print at SparkStreamingPullDataFromFlume.java:61, took 0.159471 s 16/06/19 16:59:30 INFO spark.SparkContext: Starting job: print at SparkStreamingPullDataFromFlume.java:61 16/06/19 16:59:30 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 82 bytes 16/06/19 16:59:30 INFO scheduler.DAGScheduler: Got job 2 (print at SparkStreamingPullDataFromFlume.java:61) with 3 output partitions 16/06/19 16:59:30 INFO scheduler.DAGScheduler: Final stage: ResultStage 4 (print at SparkStreamingPullDataFromFlume.java:61) 16/06/19 16:59:30 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 3) 16/06/19 16:59:30 INFO scheduler.DAGScheduler: Missing parents: List() 16/06/19 16:59:30 INFO scheduler.DAGScheduler: Submitting ResultStage 4 (ShuffledRDD[4] at reduceByKey at SparkStreamingPullDataFromFlume.java:51), which has no missing parents 16/06/19 16:59:30 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 2.9 KB, free 89.2 KB) 16/06/19 16:59:30 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1805.0 B, free 91.0 KB) 16/06/19 16:59:30 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:40258 (size: 1805.0 B, free: 517.4 MB) 16/06/19 16:59:30 INFO spark.SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006 16/06/19 16:59:30 INFO scheduler.DAGScheduler: Submitting 3 missing tasks from ResultStage 4 (ShuffledRDD[4] at reduceByKey at SparkStreamingPullDataFromFlume.java:51) 16/06/19 16:59:30 INFO scheduler.TaskSchedulerImpl: Adding task set 4.0 with 3 tasks 16/06/19 16:59:30 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 4.0 (TID 2, localhost, partition 1,PROCESS_LOCAL, 1988 bytes) 16/06/19 16:59:30 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 4.0 (TID 3, localhost, partition 2,PROCESS_LOCAL, 1988 bytes) 16/06/19 16:59:30 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 4.0 (TID 4, localhost, partition 3,PROCESS_LOCAL, 1988 bytes) 16/06/19 16:59:30 INFO executor.Executor: Running task 0.0 in stage 4.0 (TID 2) 16/06/19 16:59:30 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks 16/06/19 16:59:30 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms 16/06/19 16:59:30 INFO executor.Executor: Finished task 0.0 in stage 4.0 (TID 2). 1161 bytes result sent to driver 16/06/19 16:59:30 INFO executor.Executor: Running task 2.0 in stage 4.0 (TID 4) 16/06/19 16:59:30 INFO executor.Executor: Running task 1.0 in stage 4.0 (TID 3) 16/06/19 16:59:30 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 4.0 (TID 2) in 16 ms on localhost (1/3) 16/06/19 16:59:30 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks 16/06/19 16:59:30 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 4 ms 16/06/19 16:59:30 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks 16/06/19 16:59:30 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 4 ms 16/06/19 16:59:30 INFO executor.Executor: Finished task 1.0 in stage 4.0 (TID 3). 1161 bytes result sent to driver 16/06/19 16:59:30 INFO executor.Executor: Finished task 2.0 in stage 4.0 (TID 4). 1161 bytes result sent to driver 16/06/19 16:59:30 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 4.0 (TID 3) in 41 ms on localhost (2/3) 16/06/19 16:59:30 INFO scheduler.DAGScheduler: ResultStage 4 (print at SparkStreamingPullDataFromFlume.java:61) finished in 0.041 s 16/06/19 16:59:30 INFO scheduler.DAGScheduler: Job 2 finished: print at SparkStreamingPullDataFromFlume.java:61, took 0.113834 s 16/06/19 16:59:30 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 4.0 (TID 4) in 42 ms on localhost (3/3) 16/06/19 16:59:30 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool ------------------------------------------- Time: 1466326770000 ms ------------------------------------------- 16/06/19 16:59:30 INFO scheduler.JobScheduler: Finished job streaming job 1466326770000 ms.0 from job set of time 1466326770000 ms 16/06/19 16:59:30 INFO scheduler.JobScheduler: Total delay: 0.480 s for time 1466326770000 ms (execution: 0.373 s) 16/06/19 16:59:30 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer() 16/06/19 16:59:30 INFO scheduler.InputInfoTracker: remove old batch metadata:
Spark Streaming 的job启动后,从flume控制台上可以看到下面的日志,说明Spark Streaming 和Flume通信成功。
16/06/19 16:52:27 INFO sink.SparkSink: Starting Avro server for sink: k1 16/06/19 16:52:27 INFO sink.SparkSink: Blocking Sink Runner, sink will continue to run.. 16/06/19 16:52:47 INFO ipc.NettyServer: [id: 0x976e21b0, /192.168.112.130:51610 => /192.168.112.130:9999] OPEN 16/06/19 16:52:47 INFO ipc.NettyServer: [id: 0x976e21b0, /192.168.112.130:51610 => /192.168.112.130:9999] BOUND: /192.168.112.130:9999 16/06/19 16:52:47 INFO ipc.NettyServer: [id: 0x976e21b0, /192.168.112.130:51610 => /192.168.112.130:9999] CONNECTED: /192.168.112.130:51610
测试数据
创建文件test_7.log,并拷贝到TestDir目录中。
root@master1:/usr/local/flume/tmp# cat test_7.log Hello Java Java Hello Hadoop Hello Spark Spark Spark root@master1:/usr/local/flume/tmp# cp test_7.log TestDir/
查看flume控制台信息:
16/06/19 17:08:11 INFO avro.ReliableSpoolingFileEventReader: Last read took us just up to a file boundary. Rolling to the next file, if there is one. 16/06/19 17:08:11 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /usr/local/flume/tmp/TestDir/test_7.log to /usr/local/flume/tmp/TestDir/test_7.log.COMPLETED 16/06/19 17:08:26 INFO file.EventQueueBackingStoreFile: Start checkpoint for /usr/local/flume/tmp/checkpointDir/checkpoint, elements to sync = 3 16/06/19 17:08:26 INFO file.EventQueueBackingStoreFile: Updating checkpoint metadata: logWriteOrderID: 1466326346643, queueSize: 0, queueHead: 7 16/06/19 17:08:26 INFO file.Log: Updated checkpoint for file: /root/.flume/file-channel/data/log-9 position: 1060 logWriteOrderID: 1466326346643
查看job控制台信息:
16/06/19 17:08:30 INFO scheduler.JobScheduler: Starting job streaming job 1466327310000 ms.0 from job set of time 1466327310000 ms 16/06/19 17:08:30 INFO spark.SparkContext: Starting job: print at SparkStreamingPullDataFromFlume.java:61 16/06/19 17:08:30 INFO scheduler.DAGScheduler: Registering RDD 75 (mapToPair at SparkStreamingPullDataFromFlume.java:41) 16/06/19 17:08:30 INFO scheduler.DAGScheduler: Got job 37 (print at SparkStreamingPullDataFromFlume.java:61) with 1 output partitions 16/06/19 17:08:30 INFO scheduler.DAGScheduler: Final stage: ResultStage 74 (print at SparkStreamingPullDataFromFlume.java:61) 16/06/19 17:08:30 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 73) 16/06/19 17:08:30 INFO scheduler.DAGScheduler: Missing parents: List(ShuffleMapStage 73) 16/06/19 17:08:30 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 73 (MapPartitionsRDD[75] at mapToPair at SparkStreamingPullDataFromFlume.java:41), which has no missing parents 16/06/19 17:08:30 INFO storage.MemoryStore: Block broadcast_39 stored as values in memory (estimated size 3.5 KB, free 99.4 KB) 16/06/19 17:08:30 INFO scheduler.JobScheduler: Added jobs for time 1466327310000 ms 16/06/19 17:08:30 INFO storage.MemoryStore: Block broadcast_39_piece0 stored as bytes in memory (estimated size 1984.0 B, free 101.3 KB) 16/06/19 17:08:30 INFO storage.BlockManagerInfo: Added broadcast_39_piece0 in memory on localhost:40258 (size: 1984.0 B, free: 517.4 MB) 16/06/19 17:08:30 INFO spark.SparkContext: Created broadcast 39 from broadcast at DAGScheduler.scala:1006 16/06/19 17:08:30 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 73 (MapPartitionsRDD[75] at mapToPair at SparkStreamingPullDataFromFlume.java:41) 16/06/19 17:08:30 INFO scheduler.TaskSchedulerImpl: Adding task set 73.0 with 1 tasks 16/06/19 17:08:30 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 73.0 (TID 75, localhost, partition 0,NODE_LOCAL, 2100 bytes) 16/06/19 17:08:30 INFO executor.Executor: Running task 0.0 in stage 73.0 (TID 75) 16/06/19 17:08:30 INFO storage.BlockManager: Found block input-0-1466326763359 locally 16/06/19 17:08:30 INFO executor.Executor: Finished task 0.0 in stage 73.0 (TID 75). 1161 bytes result sent to driver 16/06/19 17:08:30 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 73.0 (TID 75) in 18 ms on localhost (1/1) 16/06/19 17:08:30 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 73.0, whose tasks have all completed, from pool 16/06/19 17:08:30 INFO scheduler.DAGScheduler: ShuffleMapStage 73 (mapToPair at SparkStreamingPullDataFromFlume.java:41) finished in 0.014 s 16/06/19 17:08:30 INFO scheduler.DAGScheduler: looking for newly runnable stages 16/06/19 17:08:30 INFO scheduler.DAGScheduler: running: Set(ResultStage 0) 16/06/19 17:08:30 INFO scheduler.DAGScheduler: waiting: Set(ResultStage 74) 16/06/19 17:08:30 INFO scheduler.DAGScheduler: failed: Set() 16/06/19 17:08:30 INFO scheduler.DAGScheduler: Submitting ResultStage 74 (ShuffledRDD[76] at reduceByKey at SparkStreamingPullDataFromFlume.java:51), which has no missing parents 16/06/19 17:08:30 INFO storage.MemoryStore: Block broadcast_40 stored as values in memory (estimated size 2.9 KB, free 104.3 KB) 16/06/19 17:08:30 INFO storage.MemoryStore: Block broadcast_40_piece0 stored as bytes in memory (estimated size 1807.0 B, free 106.1 KB) 16/06/19 17:08:30 INFO storage.BlockManagerInfo: Added broadcast_40_piece0 in memory on localhost:40258 (size: 1807.0 B, free: 517.4 MB) 16/06/19 17:08:30 INFO spark.SparkContext: Created broadcast 40 from broadcast at DAGScheduler.scala:1006 16/06/19 17:08:30 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 74 (ShuffledRDD[76] at reduceByKey at SparkStreamingPullDataFromFlume.java:51) 16/06/19 17:08:30 INFO scheduler.TaskSchedulerImpl: Adding task set 74.0 with 1 tasks 16/06/19 17:08:30 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 74.0 (TID 76, localhost, partition 0,PROCESS_LOCAL, 1988 bytes) 16/06/19 17:08:30 INFO executor.Executor: Running task 0.0 in stage 74.0 (TID 76) 16/06/19 17:08:30 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 1 blocks 16/06/19 17:08:30 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms 16/06/19 17:08:30 INFO executor.Executor: Finished task 0.0 in stage 74.0 (TID 76). 1161 bytes result sent to driver 16/06/19 17:08:30 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 74.0 (TID 76) in 3 ms on localhost (1/1) 16/06/19 17:08:30 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 74.0, whose tasks have all completed, from pool 16/06/19 17:08:30 INFO scheduler.DAGScheduler: ResultStage 74 (print at SparkStreamingPullDataFromFlume.java:61) finished in 0.001 s 16/06/19 17:08:30 INFO scheduler.DAGScheduler: Job 37 finished: print at SparkStreamingPullDataFromFlume.java:61, took 0.054738 s 16/06/19 17:08:30 INFO spark.SparkContext: Starting job: print at SparkStreamingPullDataFromFlume.java:61 16/06/19 17:08:30 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 18 is 147 bytes 16/06/19 17:08:30 INFO scheduler.DAGScheduler: Got job 38 (print at SparkStreamingPullDataFromFlume.java:61) with 3 output partitions 16/06/19 17:08:30 INFO scheduler.DAGScheduler: Final stage: ResultStage 76 (print at SparkStreamingPullDataFromFlume.java:61) 16/06/19 17:08:30 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 75) 16/06/19 17:08:30 INFO scheduler.DAGScheduler: Missing parents: List() 16/06/19 17:08:30 INFO scheduler.DAGScheduler: Submitting ResultStage 76 (ShuffledRDD[76] at reduceByKey at SparkStreamingPullDataFromFlume.java:51), which has no missing parents 16/06/19 17:08:30 INFO storage.MemoryStore: Block broadcast_41 stored as values in memory (estimated size 2.9 KB, free 109.0 KB) 16/06/19 17:08:30 INFO storage.MemoryStore: Block broadcast_41_piece0 stored as bytes in memory (estimated size 1807.0 B, free 110.8 KB) 16/06/19 17:08:30 INFO storage.BlockManagerInfo: Added broadcast_41_piece0 in memory on localhost:40258 (size: 1807.0 B, free: 517.4 MB) 16/06/19 17:08:30 INFO spark.SparkContext: Created broadcast 41 from broadcast at DAGScheduler.scala:1006 16/06/19 17:08:30 INFO scheduler.DAGScheduler: Submitting 3 missing tasks from ResultStage 76 (ShuffledRDD[76] at reduceByKey at SparkStreamingPullDataFromFlume.java:51) 16/06/19 17:08:30 INFO scheduler.TaskSchedulerImpl: Adding task set 76.0 with 3 tasks 16/06/19 17:08:30 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 76.0 (TID 77, localhost, partition 1,NODE_LOCAL, 1988 bytes) 16/06/19 17:08:30 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 76.0 (TID 78, localhost, partition 2,NODE_LOCAL, 1988 bytes) 16/06/19 17:08:30 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 76.0 (TID 79, localhost, partition 3,PROCESS_LOCAL, 1988 bytes) 16/06/19 17:08:30 INFO executor.Executor: Running task 0.0 in stage 76.0 (TID 77) 16/06/19 17:08:30 INFO executor.Executor: Running task 2.0 in stage 76.0 (TID 79) 16/06/19 17:08:30 INFO executor.Executor: Running task 1.0 in stage 76.0 (TID 78) 16/06/19 17:08:30 INFO storage.ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks 16/06/19 17:08:30 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 2 ms 16/06/19 17:08:30 INFO storage.ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks 16/06/19 17:08:30 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms 16/06/19 17:08:30 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 1 blocks 16/06/19 17:08:30 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 2 ms 16/06/19 17:08:30 INFO executor.Executor: Finished task 2.0 in stage 76.0 (TID 79). 1161 bytes result sent to driver 16/06/19 17:08:30 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 76.0 (TID 79) in 7 ms on localhost (1/3) 16/06/19 17:08:30 INFO executor.Executor: Finished task 0.0 in stage 76.0 (TID 77). 1336 bytes result sent to driver 16/06/19 17:08:30 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 76.0 (TID 77) in 18 ms on localhost (2/3) 16/06/19 17:08:30 INFO executor.Executor: Finished task 1.0 in stage 76.0 (TID 78). 1334 bytes result sent to driver 16/06/19 17:08:30 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 76.0 (TID 78) in 18 ms on localhost (3/3) 16/06/19 17:08:30 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 76.0, whose tasks have all completed, from pool 16/06/19 17:08:30 INFO scheduler.DAGScheduler: ResultStage 76 (print at SparkStreamingPullDataFromFlume.java:61) finished in 0.003 s 16/06/19 17:08:30 INFO scheduler.DAGScheduler: Job 38 finished: print at SparkStreamingPullDataFromFlume.java:61, took 0.046156 s ------------------------------------------- Time: 1466327310000 ms ------------------------------------------- (Spark,3) (Hadoop,1) (Hello,3) (Java,2) 16/06/19 17:08:30 INFO scheduler.JobScheduler: Finished job streaming job 1466327310000 ms.0 from job set of time 1466327310000 ms 16/06/19 17:08:30 INFO scheduler.JobScheduler: Total delay: 0.123 s for time 1466327310000 ms (execution: 0.114 s) 16/06/19 17:08:30 INFO rdd.ShuffledRDD: Removing RDD 72 from persistence list 16/06/19 17:08:30 INFO storage.BlockManager: Removing RDD 72 16/06/19 17:08:30 INFO rdd.MapPartitionsRDD: Removing RDD 71 from persistence list 16/06/19 17:08:30 INFO rdd.MapPartitionsRDD: Removing RDD 70 from persistence list 16/06/19 17:08:30 INFO storage.BlockManager: Removing RDD 71 16/06/19 17:08:30 INFO rdd.BlockRDD: Removing RDD 69 from persistence list 16/06/19 17:08:30 INFO storage.BlockManager: Removing RDD 70 16/06/19 17:08:30 INFO flume.FlumePollingInputDStream: Removing blocks of RDD BlockRDD[69] at createPollingStream at SparkStreamingPullDataFromFlume.java:30 of time 1466327310000 ms 16/06/19 17:08:30 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer(1466327250000 ms) 16/06/19 17:08:30 INFO storage.BlockManager: Removing RDD 69 16/06/19 17:08:30 INFO scheduler.InputInfoTracker: remove old batch metadata: 1466327250000 ms
从flume控制台和job控制台打印的信息可以看到,当test_7.log文件拷贝到TestDir目录时,flume会处理文件并将其checkpoint,当job触发后会拉取该数据进行处理。
- commons-lang3-3.3.2.jar (403.1 KB)
- 下载次数: 0
- spark-streaming-flume-sink_2.10-1.6.1.jar (84 KB)
- 下载次数: 0
- scala-library-2.10.5.jar (6.8 MB)
- 下载次数: 0
发表评论
-
Flume push数据到SparkStreaming
2016-06-19 15:16 1948上节http://kevin12.iteye.com/blog ... -
Flume的安装和测试故障转移
2016-06-19 14:56 33941.实现功能 配置Flume监控本地文件夹变化,将变化的文件 ... -
Spark Streaming 统计单词的例
2016-06-19 14:55 3测试Spark Streaming 统计单词的例子 1.准 ... -
Flume的安装和测试故障转移
2016-06-19 12:48 17261.实现功能 配置Flume监控本地文件夹变化,将变化的文件上 ... -
Spark Streaming 统计单词的例子
2016-06-19 12:29 3690测试Spark Streaming 统计单词的例子 1.准备 ... -
Spark SQL窗口函数
2016-04-22 07:18 2565窗口函数又叫着窗口分析函数,Spark 1.4版本SparkS ... -
Spark SQL内置函数应用
2016-04-22 07:00 8672简单说明 使用Spark SQL中的内置函数对数据进行 ... -
Spark SQL操作Hive数据库
2016-04-13 22:37 17608本次例子通过scala编程实现Spark SQL操作Hive数 ... -
Spark SQL on hive配置和实战
2016-03-26 18:40 5581spark sql 官网:http://spark ... -
Spark RDD弹性表现和来源
2016-02-09 20:12 3863hadoop 的MapReduce是基于数 ... -
Spark内核架构
2016-02-07 12:24 10201.在将spark内核架构前,先了解一下Hadoop的MR,H ... -
spark集群HA搭建
2016-01-31 08:50 4536spark集群的HA图: 搭建spark的HA需要安装z ... -
Spark集群中WordCount运行原理
2016-01-31 07:05 2515以数据流动的视角解释一下wordcount运行的原理 pa ... -
eclipse开发spark程序配置在集群上运行
2016-01-27 08:08 9373这篇bolg讲一下,IDE开发的spark程序如何提交到集群上 ... -
eclipse开发spark程序配置本地运行
2016-01-27 07:58 12415今天简单讲一下在local模式下用eclipse开发一个简单的 ... -
spark1.6.0搭建(基于hadoop2.6.0分布式)
2016-01-24 10:11 5982本文是基于hadoop2.6.0的分布式环境搭建spark1. ...
相关推荐
《Spark Streaming + Kafka + Flume 日志收集处理系统的构建与应用》 在大数据处理领域,实时数据流处理已经成为不可或缺的一部分。Spark Streaming、Kafka和Flume作为三个关键组件,共同构建了一个高效、可靠且可...
1.Spark Streaming整合Flume需要的安装包. 2. Spark Streaming拉取Flume数据的flume配置文件.conf 3. Flume向Spark Streaming推数据的flume配置文件.conf
在大数据处理领域,Flume 和 Spark Streaming 是两个重要的工具,它们分别用于数据收集与实时流处理。本压缩包中的 jar 包是为了解决 Flume 与 Spark Streaming 的集成问题,确保数据能够从 Flume 无缝流转到 Spark ...
spark-streaming-flume_2.11-2.1.0.jar
本科毕业设计项目,基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 本科毕业设计项目,基于spark streaming+flume+kafka+hbase的...
基于 Flume+Kafka+Spark Streaming 实现实时监控输出日志的报警系统的 Spark Streaming 程序代码,博客链接: https://blog.csdn.net/linge1995/article/details/81326146
Spring + Spark + SparkStreaming + Kafka + Flume 简单的电影推荐系统案例。.zip项目工程资源经过严格测试可直接运行成功且功能正常的情况才上传,可轻松复刻,拿到资料包后可轻松复现出一样的项目,本人系统开发...
使用spark集成flume,由于flume默认只支持pull消息的方式,不过它可以自定义消息拉取方式,现要使用poll方式,可以使用spark-streaming-flume-sink_2.11-2.1.0.jar包下的org.apache.spark.streaming.flume.sink....
spark-streaming-flume-sink_2.11-2.0.0.jar的jar包。
基于 Flume+ Kafka+ Spark Streaming 实现实时监控输出日志的报警系统的 Spark Streaming 程序代码。 基于 Flume+Kafka+Spark Streaming 实现实时监控输出日志的报警系统的 Spark Streaming 程序代码,博客链接: ...
通过flume监控文件,让kafka消费flume数据,再将sparkstreaming连接kafka作为消费者进行数据处理,文档整理实现
spark-streaming的flume依赖
flume+Logstash+Kafka+Spark Streaming进行实时日志处理分析【大数据】
sparkstreming结合flume需要的jar包,scala是2.11版本,spark是1.6.2版本。也有其他版本的,需要的留言找我要
flume与spark streaming结合(pull方式)报错:org.apache.flume.FlumeException: Unable to load sink type: org.apache.spark.streaming.flume.sink.SparkSink, class: org.apache.spark.streaming.flume.sink....