`
Kevin12
  • 浏览: 236068 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Flume push数据到SparkStreaming

阅读更多
上节http://kevin12.iteye.com/blog/2305946将flume的环境搭建好,并测试了flume的故障转移功能,这节编码实现Flume推送数据到Spark Streaming中。
下面的例子我只在master1上配置flume,worker1,worker2不进行配置了。
1.配置
master1上修改配置文件root@master1:/usr/local/flume/apache-flume-1.6.0-bin/conf/flume-conf.properties
#agent1 name
agent1.channels = c1
agent1.sources = r1
agent1.sinks = k1

#set source
agent1.sources.r1.type = spooldir
agent1.sources.r1.spoolDir =/usr/local/flume/tmp/TestDir
agent1.sources.r1.channels = c1
agent1.sources.r1.fileHeader = false
agent1.sources.r1.interceptors = i1
agent1.sources.r1.interceptors.i1.type = timestamp

# set sink to hdfs
#agent1.sinks.k1.type=hdfs
#agent1.sinks.k1.hdfs.path=hdfs://master1:9000/library/flume
#agent1.sinks.k1.hdfs.fileType=DataStream
#agent1.sinks.k1.hdfs.writerFormat=TEXT
#agent1.sinks.k1.hdfs.roolInterval=1
#agent1.sinks.k1.hdfs.filePrefix=%Y-%m-%d
#agent1.sinks.k1.channel=c1

#set sink to Spark Streaming
agent1.sinks.k1.type = avro
agent1.sinks.k1.channel = c1
agent1.sinks.k1.hostname = master1
agent1.sinks.k1.port = 9999

#set channel
agent1.channels.c1.type = file
agent1.channels.c1.checkpointDir=/usr/local/flume/tmp/checkpointDir
agent2.channels.c1.dataDirs=/usr/local/flume/tmp/dataDirs


注意:此时还不能启动flume,如果启动会报错。

2.编写代码
package com.imf.spark.SparkApps.sparkstreaming;

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.flume.FlumeUtils;
import org.apache.spark.streaming.flume.SparkFlumeEvent;

import scala.Tuple2;

/**
 * 
 * @Description:Flume推送数据到SparkStreaming例子
 * @Author: lujinyong168
 * @Date: 2016年6月19日 上午11:40:13
 */
public class FlumePushDate2SparkStreaming {

    public static void main(String[] args) {

        SparkConf conf = new SparkConf().setMaster("local[4]").
                setAppName("FlumePushDate2SparkStreaming");

        JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(30));

        JavaReceiverInputDStream<SparkFlumeEvent> lines = FlumeUtils.createStream(jsc,"master1", 9999);

        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<SparkFlumeEvent, String>() { //如果是Scala,由于SAM转换,所以可以写成val words = lines.flatMap { line => line.split(" ")}

            private static final long serialVersionUID = 1L;

            @Override
            public Iterable<String> call(SparkFlumeEvent event) throws Exception {
                String line = new String(event.event().getBody().array());
                return Arrays.asList(line.split(" "));
            }
        });

        
        JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<String, Integer>(word, 1);
            }
        });

        JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { //对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce)

            private static final long serialVersionUID = 1L;

            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        wordsCount.print();
        jsc.start();

        jsc.awaitTermination();
        jsc.close();

    }

}


3.pom.xml 配置,见附件

4.编译成可执行的jar
maven没配置的自己去配置。
maven命令:mvn clean package

5.调度脚本
将打好的大架包SparkApps-0.0.1-SNAPSHOT-jar-with-dependencies.jar拷贝到虚拟机的/usr/local/sparkApps/FlumePushDate2SparkStreaming目录中,并编写调度脚本:
/usr/local/spark/spark-1.6.0-bin-hadoop2.6/bin/spark-submit \
--class com.imf.spark.SparkApps.sparkstreaming.FlumePushDate2SparkStreaming \
--master spark://master1:7077 \
/usr/local/sparkApps/FlumePushDate2SparkStreaming/SparkApps-0.0.1-SNAPSHOT-jar-with-dependencies.jar

并启动run.sh脚本,运行spark streaming,再将flume启动起来,命令:
root@master1:/usr/local/flume/apache-flume-1.6.0-bin/conf# flume-ng agent -n agent1 -c conf -f flume-conf.properties -Dflume.root.logger=DEBUG,console


6.测试
创建一个测试文件,内容如下:
root@master1:/usr/local/flume/tmp# cat test_5.log 
Spark Spark Spark
Hadoop Hadoop
Java
Scala Scala

将该文件拷贝到TestDir目录中
root@master1:/usr/local/flume/tmp# cp test_5.log TestDir/


观察flume控制台日志信息如下(截取关键部分):
16/06/19 11:46:33 INFO avro.ReliableSpoolingFileEventReader: Last read took us just up to a file boundary. Rolling to the next file, if there is one.
16/06/19 11:46:33 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /usr/local/flume/tmp/TestDir/test_5.log to /usr/local/flume/tmp/TestDir/test_5.log.COMPLETED
16/06/19 11:46:50 INFO file.EventQueueBackingStoreFile: Start checkpoint for /usr/local/flume/tmp/checkpointDir/checkpoint, elements to sync = 4
16/06/19 11:46:50 INFO file.EventQueueBackingStoreFile: Updating checkpoint metadata: logWriteOrderID: 1466307980457, queueSize: 0, queueHead: 5
16/06/19 11:46:50 INFO file.Log: Updated checkpoint for file: /root/.flume/file-channel/data/log-4 position: 569 logWriteOrderID: 1466307980457
16/06/19 11:46:50 INFO file.LogFile: Closing RandomReader /root/.flume/file-channel/data/log-1
16/06/19 11:46:50 INFO file.LogFile: Closing RandomReader /root/.flume/file-channel/data/log-2

观察spark streaming控制台,日志信息如下(截取关键部分):
16/06/19 11:46:30 INFO scheduler.JobScheduler: Finished job streaming job 1466307990000 ms.0 from job set of time 1466307990000 ms
16/06/19 11:46:30 INFO scheduler.JobScheduler: Total delay: 0.646 s for time 1466307990000 ms (execution: 0.559 s)
16/06/19 11:46:30 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer()
16/06/19 11:46:30 INFO scheduler.InputInfoTracker: remove old batch metadata: 
16/06/19 11:46:36 INFO storage.MemoryStore: Block input-0-1466307996600 stored as bytes in memory (estimated size 352.0 B, free 91.3 KB)
16/06/19 11:46:36 INFO storage.BlockManagerInfo: Added input-0-1466307996600 in memory on localhost:33925 (size: 352.0 B, free: 517.4 MB)
16/06/19 11:46:36 WARN storage.BlockManager: Block input-0-1466307996600 replicated to only 0 peer(s) instead of 1 peers
16/06/19 11:46:36 INFO receiver.BlockGenerator: Pushed block input-0-1466307996600
16/06/19 11:47:00 INFO scheduler.JobScheduler: Starting job streaming job 1466308020000 ms.0 from job set of time 1466308020000 ms
16/06/19 11:47:00 INFO spark.SparkContext: Starting job: print at FlumePushDate2SparkStreaming.java:123
16/06/19 11:47:00 INFO scheduler.JobScheduler: Added jobs for time 1466308020000 ms
16/06/19 11:47:00 INFO scheduler.DAGScheduler: Registering RDD 7 (mapToPair at FlumePushDate2SparkStreaming.java:89)
16/06/19 11:47:00 INFO scheduler.DAGScheduler: Got job 3 (print at FlumePushDate2SparkStreaming.java:123) with 1 output partitions
16/06/19 11:47:00 INFO scheduler.DAGScheduler: Final stage: ResultStage 6 (print at FlumePushDate2SparkStreaming.java:123)
16/06/19 11:47:00 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 5)
16/06/19 11:47:00 INFO scheduler.DAGScheduler: Missing parents: List(ShuffleMapStage 5)
16/06/19 11:47:00 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 5 (MapPartitionsRDD[7] at mapToPair at FlumePushDate2SparkStreaming.java:89), which has no missing parents
16/06/19 11:47:00 INFO storage.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 3.4 KB, free 94.8 KB)
16/06/19 11:47:00 INFO storage.MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 1983.0 B, free 96.7 KB)
16/06/19 11:47:00 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:33925 (size: 1983.0 B, free: 517.4 MB)
16/06/19 11:47:00 INFO spark.SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1006
16/06/19 11:47:00 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 5 (MapPartitionsRDD[7] at mapToPair at FlumePushDate2SparkStreaming.java:89)
16/06/19 11:47:00 INFO scheduler.TaskSchedulerImpl: Adding task set 5.0 with 1 tasks
16/06/19 11:47:00 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 5.0 (TID 5, localhost, partition 0,NODE_LOCAL, 2100 bytes)
16/06/19 11:47:00 INFO executor.Executor: Running task 0.0 in stage 5.0 (TID 5)
16/06/19 11:47:00 INFO storage.BlockManager: Found block input-0-1466307996600 locally
16/06/19 11:47:00 INFO executor.Executor: Finished task 0.0 in stage 5.0 (TID 5). 1161 bytes result sent to driver
16/06/19 11:47:00 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 5.0 (TID 5) in 87 ms on localhost (1/1)
16/06/19 11:47:00 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 5.0, whose tasks have all completed, from pool 
16/06/19 11:47:00 INFO scheduler.DAGScheduler: ShuffleMapStage 5 (mapToPair at FlumePushDate2SparkStreaming.java:89) finished in 0.077 s
16/06/19 11:47:00 INFO scheduler.DAGScheduler: looking for newly runnable stages
16/06/19 11:47:00 INFO scheduler.DAGScheduler: running: Set(ResultStage 0)
16/06/19 11:47:00 INFO scheduler.DAGScheduler: waiting: Set(ResultStage 6)
16/06/19 11:47:00 INFO scheduler.DAGScheduler: failed: Set()
16/06/19 11:47:00 INFO scheduler.DAGScheduler: Submitting ResultStage 6 (ShuffledRDD[8] at reduceByKey at FlumePushDate2SparkStreaming.java:103), which has no missing parents
16/06/19 11:47:00 INFO storage.MemoryStore: Block broadcast_4 stored as values in memory (estimated size 2.9 KB, free 99.7 KB)
16/06/19 11:47:00 INFO storage.MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 1802.0 B, free 101.4 KB)
16/06/19 11:47:00 INFO storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on localhost:33925 (size: 1802.0 B, free: 517.4 MB)
16/06/19 11:47:00 INFO spark.SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:1006
16/06/19 11:47:00 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 6 (ShuffledRDD[8] at reduceByKey at FlumePushDate2SparkStreaming.java:103)
16/06/19 11:47:00 INFO scheduler.TaskSchedulerImpl: Adding task set 6.0 with 1 tasks
16/06/19 11:47:00 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 6.0 (TID 6, localhost, partition 0,PROCESS_LOCAL, 1988 bytes)
16/06/19 11:47:00 INFO executor.Executor: Running task 0.0 in stage 6.0 (TID 6)
16/06/19 11:47:00 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 1 blocks
16/06/19 11:47:00 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 11 ms
16/06/19 11:47:00 INFO executor.Executor: Finished task 0.0 in stage 6.0 (TID 6). 1161 bytes result sent to driver
16/06/19 11:47:00 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 6.0 (TID 6) in 19 ms on localhost (1/1)
16/06/19 11:47:00 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks have all completed, from pool 
16/06/19 11:47:00 INFO scheduler.DAGScheduler: ResultStage 6 (print at FlumePushDate2SparkStreaming.java:123) finished in 0.010 s
16/06/19 11:47:00 INFO scheduler.DAGScheduler: Job 3 finished: print at FlumePushDate2SparkStreaming.java:123, took 0.229024 s
16/06/19 11:47:00 INFO spark.SparkContext: Starting job: print at FlumePushDate2SparkStreaming.java:123
16/06/19 11:47:00 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 146 bytes
16/06/19 11:47:00 INFO scheduler.DAGScheduler: Got job 4 (print at FlumePushDate2SparkStreaming.java:123) with 3 output partitions
16/06/19 11:47:00 INFO scheduler.DAGScheduler: Final stage: ResultStage 8 (print at FlumePushDate2SparkStreaming.java:123)
16/06/19 11:47:00 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 7)
16/06/19 11:47:00 INFO scheduler.DAGScheduler: Missing parents: List()
16/06/19 11:47:00 INFO scheduler.DAGScheduler: Submitting ResultStage 8 (ShuffledRDD[8] at reduceByKey at FlumePushDate2SparkStreaming.java:103), which has no missing parents
16/06/19 11:47:00 INFO storage.MemoryStore: Block broadcast_5 stored as values in memory (estimated size 2.9 KB, free 104.4 KB)
16/06/19 11:47:00 INFO storage.MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 1802.0 B, free 106.1 KB)
16/06/19 11:47:00 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in memory on localhost:33925 (size: 1802.0 B, free: 517.4 MB)
16/06/19 11:47:00 INFO spark.SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:1006
16/06/19 11:47:00 INFO scheduler.DAGScheduler: Submitting 3 missing tasks from ResultStage 8 (ShuffledRDD[8] at reduceByKey at FlumePushDate2SparkStreaming.java:103)
16/06/19 11:47:00 INFO scheduler.TaskSchedulerImpl: Adding task set 8.0 with 3 tasks
16/06/19 11:47:00 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 8.0 (TID 7, localhost, partition 1,NODE_LOCAL, 1988 bytes)
16/06/19 11:47:00 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 8.0 (TID 8, localhost, partition 2,NODE_LOCAL, 1988 bytes)
16/06/19 11:47:00 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 8.0 (TID 9, localhost, partition 3,PROCESS_LOCAL, 1988 bytes)
16/06/19 11:47:00 INFO executor.Executor: Running task 0.0 in stage 8.0 (TID 7)
16/06/19 11:47:00 INFO executor.Executor: Running task 1.0 in stage 8.0 (TID 8)
16/06/19 11:47:00 INFO executor.Executor: Running task 2.0 in stage 8.0 (TID 9)
16/06/19 11:47:00 INFO storage.ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
16/06/19 11:47:00 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 3 ms
16/06/19 11:47:00 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 1 blocks
16/06/19 11:47:00 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
16/06/19 11:47:00 INFO executor.Executor: Finished task 2.0 in stage 8.0 (TID 9). 1161 bytes result sent to driver
16/06/19 11:47:00 INFO storage.ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
16/06/19 11:47:00 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 15 ms
16/06/19 11:47:00 INFO executor.Executor: Finished task 0.0 in stage 8.0 (TID 7). 1336 bytes result sent to driver
16/06/19 11:47:00 INFO executor.Executor: Finished task 1.0 in stage 8.0 (TID 8). 1334 bytes result sent to driver
16/06/19 11:47:00 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 8.0 (TID 9) in 27 ms on localhost (1/3)
16/06/19 11:47:00 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 8.0 (TID 7) in 38 ms on localhost (2/3)
16/06/19 11:47:00 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 8.0 (TID 8) in 40 ms on localhost (3/3)
16/06/19 11:47:00 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 8.0, whose tasks have all completed, from pool 
16/06/19 11:47:00 INFO scheduler.DAGScheduler: ResultStage 8 (print at FlumePushDate2SparkStreaming.java:123) finished in 0.012 s
16/06/19 11:47:00 INFO scheduler.DAGScheduler: Job 4 finished: print at FlumePushDate2SparkStreaming.java:123, took 0.078575 s
-------------------------------------------
Time: 1466308020000 ms
-------------------------------------------
(Spark,3)
(Hadoop,2)
(Java,1)
(Scala,2)

16/06/19 11:47:00 INFO scheduler.JobScheduler: Finished job streaming job 1466308020000 ms.0 from job set of time 1466308020000 ms
16/06/19 11:47:00 INFO scheduler.JobScheduler: Total delay: 0.381 s for time 1466308020000 ms (execution: 0.332 s)
16/06/19 11:47:00 INFO rdd.ShuffledRDD: Removing RDD 4 from persistence list
16/06/19 11:47:00 INFO rdd.MapPartitionsRDD: Removing RDD 3 from persistence list
16/06/19 11:47:00 INFO rdd.MapPartitionsRDD: Removing RDD 2 from persistence list
16/06/19 11:47:00 INFO storage.BlockManager: Removing RDD 4
16/06/19 11:47:00 INFO rdd.BlockRDD: Removing RDD 1 from persistence list
16/06/19 11:47:00 INFO storage.BlockManager: Removing RDD 3
16/06/19 11:47:00 INFO storage.BlockManager: Removing RDD 2
16/06/19 11:47:00 INFO storage.BlockManager: Removing RDD 1
16/06/19 11:47:00 INFO flume.FlumeInputDStream: Removing blocks of RDD BlockRDD[1] at createStream at FlumePushDate2SparkStreaming.java:66 of time 1466308020000 ms
16/06/19 11:47:00 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer()
16/06/19 11:47:00 INFO scheduler.InputInfoTracker: remove old batch metadata: 


从控制台上可以看到flume将文件push到了spark streaming中,spark streaming将该文件单词进行统计。完毕!

  • pom.zip (962 Bytes)
  • 描述: pom
  • 下载次数: 6
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics