- 浏览: 235333 次
- 性别:
- 来自: 上海
文章分类
最新评论
-
lwb314:
你的这个是创建的临时的hive表,数据也是通过文件录入进去的, ...
Spark SQL操作Hive数据库 -
yixiaoqi2010:
你好 我的提交上去 总是报错,找不到hive表,可能是哪里 ...
Spark SQL操作Hive数据库 -
bo_hai:
target jvm版本也要选择正确。不能选择太高。2.10对 ...
eclipse开发spark程序配置本地运行
上节http://kevin12.iteye.com/blog/2305946将flume的环境搭建好,并测试了flume的故障转移功能,这节编码实现Flume推送数据到Spark Streaming中。
下面的例子我只在master1上配置flume,worker1,worker2不进行配置了。
1.配置
master1上修改配置文件root@master1:/usr/local/flume/apache-flume-1.6.0-bin/conf/flume-conf.properties
注意:此时还不能启动flume,如果启动会报错。
2.编写代码
3.pom.xml 配置,见附件
4.编译成可执行的jar
maven没配置的自己去配置。
maven命令:mvn clean package
5.调度脚本
将打好的大架包SparkApps-0.0.1-SNAPSHOT-jar-with-dependencies.jar拷贝到虚拟机的/usr/local/sparkApps/FlumePushDate2SparkStreaming目录中,并编写调度脚本:
并启动run.sh脚本,运行spark streaming,再将flume启动起来,命令:
6.测试
创建一个测试文件,内容如下:
将该文件拷贝到TestDir目录中
观察flume控制台日志信息如下(截取关键部分):
从控制台上可以看到flume将文件push到了spark streaming中,spark streaming将该文件单词进行统计。完毕!
下面的例子我只在master1上配置flume,worker1,worker2不进行配置了。
1.配置
master1上修改配置文件root@master1:/usr/local/flume/apache-flume-1.6.0-bin/conf/flume-conf.properties
#agent1 name agent1.channels = c1 agent1.sources = r1 agent1.sinks = k1 #set source agent1.sources.r1.type = spooldir agent1.sources.r1.spoolDir =/usr/local/flume/tmp/TestDir agent1.sources.r1.channels = c1 agent1.sources.r1.fileHeader = false agent1.sources.r1.interceptors = i1 agent1.sources.r1.interceptors.i1.type = timestamp # set sink to hdfs #agent1.sinks.k1.type=hdfs #agent1.sinks.k1.hdfs.path=hdfs://master1:9000/library/flume #agent1.sinks.k1.hdfs.fileType=DataStream #agent1.sinks.k1.hdfs.writerFormat=TEXT #agent1.sinks.k1.hdfs.roolInterval=1 #agent1.sinks.k1.hdfs.filePrefix=%Y-%m-%d #agent1.sinks.k1.channel=c1 #set sink to Spark Streaming agent1.sinks.k1.type = avro agent1.sinks.k1.channel = c1 agent1.sinks.k1.hostname = master1 agent1.sinks.k1.port = 9999 #set channel agent1.channels.c1.type = file agent1.channels.c1.checkpointDir=/usr/local/flume/tmp/checkpointDir agent2.channels.c1.dataDirs=/usr/local/flume/tmp/dataDirs
注意:此时还不能启动flume,如果启动会报错。
2.编写代码
package com.imf.spark.SparkApps.sparkstreaming; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.flume.FlumeUtils; import org.apache.spark.streaming.flume.SparkFlumeEvent; import scala.Tuple2; /** * * @Description:Flume推送数据到SparkStreaming例子 * @Author: lujinyong168 * @Date: 2016年6月19日 上午11:40:13 */ public class FlumePushDate2SparkStreaming { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local[4]"). setAppName("FlumePushDate2SparkStreaming"); JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(30)); JavaReceiverInputDStream<SparkFlumeEvent> lines = FlumeUtils.createStream(jsc,"master1", 9999); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<SparkFlumeEvent, String>() { //如果是Scala,由于SAM转换,所以可以写成val words = lines.flatMap { line => line.split(" ")} private static final long serialVersionUID = 1L; @Override public Iterable<String> call(SparkFlumeEvent event) throws Exception { String line = new String(event.event().getBody().array()); return Arrays.asList(line.split(" ")); } }); JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }); JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { //对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce) private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); wordsCount.print(); jsc.start(); jsc.awaitTermination(); jsc.close(); } }
3.pom.xml 配置,见附件
4.编译成可执行的jar
maven没配置的自己去配置。
maven命令:mvn clean package
5.调度脚本
将打好的大架包SparkApps-0.0.1-SNAPSHOT-jar-with-dependencies.jar拷贝到虚拟机的/usr/local/sparkApps/FlumePushDate2SparkStreaming目录中,并编写调度脚本:
/usr/local/spark/spark-1.6.0-bin-hadoop2.6/bin/spark-submit \ --class com.imf.spark.SparkApps.sparkstreaming.FlumePushDate2SparkStreaming \ --master spark://master1:7077 \ /usr/local/sparkApps/FlumePushDate2SparkStreaming/SparkApps-0.0.1-SNAPSHOT-jar-with-dependencies.jar
并启动run.sh脚本,运行spark streaming,再将flume启动起来,命令:
root@master1:/usr/local/flume/apache-flume-1.6.0-bin/conf# flume-ng agent -n agent1 -c conf -f flume-conf.properties -Dflume.root.logger=DEBUG,console
6.测试
创建一个测试文件,内容如下:
root@master1:/usr/local/flume/tmp# cat test_5.log Spark Spark Spark Hadoop Hadoop Java Scala Scala
将该文件拷贝到TestDir目录中
root@master1:/usr/local/flume/tmp# cp test_5.log TestDir/
观察flume控制台日志信息如下(截取关键部分):
16/06/19 11:46:33 INFO avro.ReliableSpoolingFileEventReader: Last read took us just up to a file boundary. Rolling to the next file, if there is one. 16/06/19 11:46:33 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /usr/local/flume/tmp/TestDir/test_5.log to /usr/local/flume/tmp/TestDir/test_5.log.COMPLETED 16/06/19 11:46:50 INFO file.EventQueueBackingStoreFile: Start checkpoint for /usr/local/flume/tmp/checkpointDir/checkpoint, elements to sync = 4 16/06/19 11:46:50 INFO file.EventQueueBackingStoreFile: Updating checkpoint metadata: logWriteOrderID: 1466307980457, queueSize: 0, queueHead: 5 16/06/19 11:46:50 INFO file.Log: Updated checkpoint for file: /root/.flume/file-channel/data/log-4 position: 569 logWriteOrderID: 1466307980457 16/06/19 11:46:50 INFO file.LogFile: Closing RandomReader /root/.flume/file-channel/data/log-1 16/06/19 11:46:50 INFO file.LogFile: Closing RandomReader /root/.flume/file-channel/data/log-2 观察spark streaming控制台,日志信息如下(截取关键部分): 16/06/19 11:46:30 INFO scheduler.JobScheduler: Finished job streaming job 1466307990000 ms.0 from job set of time 1466307990000 ms 16/06/19 11:46:30 INFO scheduler.JobScheduler: Total delay: 0.646 s for time 1466307990000 ms (execution: 0.559 s) 16/06/19 11:46:30 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer() 16/06/19 11:46:30 INFO scheduler.InputInfoTracker: remove old batch metadata: 16/06/19 11:46:36 INFO storage.MemoryStore: Block input-0-1466307996600 stored as bytes in memory (estimated size 352.0 B, free 91.3 KB) 16/06/19 11:46:36 INFO storage.BlockManagerInfo: Added input-0-1466307996600 in memory on localhost:33925 (size: 352.0 B, free: 517.4 MB) 16/06/19 11:46:36 WARN storage.BlockManager: Block input-0-1466307996600 replicated to only 0 peer(s) instead of 1 peers 16/06/19 11:46:36 INFO receiver.BlockGenerator: Pushed block input-0-1466307996600 16/06/19 11:47:00 INFO scheduler.JobScheduler: Starting job streaming job 1466308020000 ms.0 from job set of time 1466308020000 ms 16/06/19 11:47:00 INFO spark.SparkContext: Starting job: print at FlumePushDate2SparkStreaming.java:123 16/06/19 11:47:00 INFO scheduler.JobScheduler: Added jobs for time 1466308020000 ms 16/06/19 11:47:00 INFO scheduler.DAGScheduler: Registering RDD 7 (mapToPair at FlumePushDate2SparkStreaming.java:89) 16/06/19 11:47:00 INFO scheduler.DAGScheduler: Got job 3 (print at FlumePushDate2SparkStreaming.java:123) with 1 output partitions 16/06/19 11:47:00 INFO scheduler.DAGScheduler: Final stage: ResultStage 6 (print at FlumePushDate2SparkStreaming.java:123) 16/06/19 11:47:00 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 5) 16/06/19 11:47:00 INFO scheduler.DAGScheduler: Missing parents: List(ShuffleMapStage 5) 16/06/19 11:47:00 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 5 (MapPartitionsRDD[7] at mapToPair at FlumePushDate2SparkStreaming.java:89), which has no missing parents 16/06/19 11:47:00 INFO storage.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 3.4 KB, free 94.8 KB) 16/06/19 11:47:00 INFO storage.MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 1983.0 B, free 96.7 KB) 16/06/19 11:47:00 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:33925 (size: 1983.0 B, free: 517.4 MB) 16/06/19 11:47:00 INFO spark.SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1006 16/06/19 11:47:00 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 5 (MapPartitionsRDD[7] at mapToPair at FlumePushDate2SparkStreaming.java:89) 16/06/19 11:47:00 INFO scheduler.TaskSchedulerImpl: Adding task set 5.0 with 1 tasks 16/06/19 11:47:00 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 5.0 (TID 5, localhost, partition 0,NODE_LOCAL, 2100 bytes) 16/06/19 11:47:00 INFO executor.Executor: Running task 0.0 in stage 5.0 (TID 5) 16/06/19 11:47:00 INFO storage.BlockManager: Found block input-0-1466307996600 locally 16/06/19 11:47:00 INFO executor.Executor: Finished task 0.0 in stage 5.0 (TID 5). 1161 bytes result sent to driver 16/06/19 11:47:00 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 5.0 (TID 5) in 87 ms on localhost (1/1) 16/06/19 11:47:00 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 5.0, whose tasks have all completed, from pool 16/06/19 11:47:00 INFO scheduler.DAGScheduler: ShuffleMapStage 5 (mapToPair at FlumePushDate2SparkStreaming.java:89) finished in 0.077 s 16/06/19 11:47:00 INFO scheduler.DAGScheduler: looking for newly runnable stages 16/06/19 11:47:00 INFO scheduler.DAGScheduler: running: Set(ResultStage 0) 16/06/19 11:47:00 INFO scheduler.DAGScheduler: waiting: Set(ResultStage 6) 16/06/19 11:47:00 INFO scheduler.DAGScheduler: failed: Set() 16/06/19 11:47:00 INFO scheduler.DAGScheduler: Submitting ResultStage 6 (ShuffledRDD[8] at reduceByKey at FlumePushDate2SparkStreaming.java:103), which has no missing parents 16/06/19 11:47:00 INFO storage.MemoryStore: Block broadcast_4 stored as values in memory (estimated size 2.9 KB, free 99.7 KB) 16/06/19 11:47:00 INFO storage.MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 1802.0 B, free 101.4 KB) 16/06/19 11:47:00 INFO storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on localhost:33925 (size: 1802.0 B, free: 517.4 MB) 16/06/19 11:47:00 INFO spark.SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:1006 16/06/19 11:47:00 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 6 (ShuffledRDD[8] at reduceByKey at FlumePushDate2SparkStreaming.java:103) 16/06/19 11:47:00 INFO scheduler.TaskSchedulerImpl: Adding task set 6.0 with 1 tasks 16/06/19 11:47:00 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 6.0 (TID 6, localhost, partition 0,PROCESS_LOCAL, 1988 bytes) 16/06/19 11:47:00 INFO executor.Executor: Running task 0.0 in stage 6.0 (TID 6) 16/06/19 11:47:00 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 1 blocks 16/06/19 11:47:00 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 11 ms 16/06/19 11:47:00 INFO executor.Executor: Finished task 0.0 in stage 6.0 (TID 6). 1161 bytes result sent to driver 16/06/19 11:47:00 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 6.0 (TID 6) in 19 ms on localhost (1/1) 16/06/19 11:47:00 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks have all completed, from pool 16/06/19 11:47:00 INFO scheduler.DAGScheduler: ResultStage 6 (print at FlumePushDate2SparkStreaming.java:123) finished in 0.010 s 16/06/19 11:47:00 INFO scheduler.DAGScheduler: Job 3 finished: print at FlumePushDate2SparkStreaming.java:123, took 0.229024 s 16/06/19 11:47:00 INFO spark.SparkContext: Starting job: print at FlumePushDate2SparkStreaming.java:123 16/06/19 11:47:00 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 146 bytes 16/06/19 11:47:00 INFO scheduler.DAGScheduler: Got job 4 (print at FlumePushDate2SparkStreaming.java:123) with 3 output partitions 16/06/19 11:47:00 INFO scheduler.DAGScheduler: Final stage: ResultStage 8 (print at FlumePushDate2SparkStreaming.java:123) 16/06/19 11:47:00 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 7) 16/06/19 11:47:00 INFO scheduler.DAGScheduler: Missing parents: List() 16/06/19 11:47:00 INFO scheduler.DAGScheduler: Submitting ResultStage 8 (ShuffledRDD[8] at reduceByKey at FlumePushDate2SparkStreaming.java:103), which has no missing parents 16/06/19 11:47:00 INFO storage.MemoryStore: Block broadcast_5 stored as values in memory (estimated size 2.9 KB, free 104.4 KB) 16/06/19 11:47:00 INFO storage.MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 1802.0 B, free 106.1 KB) 16/06/19 11:47:00 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in memory on localhost:33925 (size: 1802.0 B, free: 517.4 MB) 16/06/19 11:47:00 INFO spark.SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:1006 16/06/19 11:47:00 INFO scheduler.DAGScheduler: Submitting 3 missing tasks from ResultStage 8 (ShuffledRDD[8] at reduceByKey at FlumePushDate2SparkStreaming.java:103) 16/06/19 11:47:00 INFO scheduler.TaskSchedulerImpl: Adding task set 8.0 with 3 tasks 16/06/19 11:47:00 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 8.0 (TID 7, localhost, partition 1,NODE_LOCAL, 1988 bytes) 16/06/19 11:47:00 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 8.0 (TID 8, localhost, partition 2,NODE_LOCAL, 1988 bytes) 16/06/19 11:47:00 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 8.0 (TID 9, localhost, partition 3,PROCESS_LOCAL, 1988 bytes) 16/06/19 11:47:00 INFO executor.Executor: Running task 0.0 in stage 8.0 (TID 7) 16/06/19 11:47:00 INFO executor.Executor: Running task 1.0 in stage 8.0 (TID 8) 16/06/19 11:47:00 INFO executor.Executor: Running task 2.0 in stage 8.0 (TID 9) 16/06/19 11:47:00 INFO storage.ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks 16/06/19 11:47:00 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 3 ms 16/06/19 11:47:00 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 1 blocks 16/06/19 11:47:00 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms 16/06/19 11:47:00 INFO executor.Executor: Finished task 2.0 in stage 8.0 (TID 9). 1161 bytes result sent to driver 16/06/19 11:47:00 INFO storage.ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks 16/06/19 11:47:00 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 15 ms 16/06/19 11:47:00 INFO executor.Executor: Finished task 0.0 in stage 8.0 (TID 7). 1336 bytes result sent to driver 16/06/19 11:47:00 INFO executor.Executor: Finished task 1.0 in stage 8.0 (TID 8). 1334 bytes result sent to driver 16/06/19 11:47:00 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 8.0 (TID 9) in 27 ms on localhost (1/3) 16/06/19 11:47:00 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 8.0 (TID 7) in 38 ms on localhost (2/3) 16/06/19 11:47:00 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 8.0 (TID 8) in 40 ms on localhost (3/3) 16/06/19 11:47:00 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 8.0, whose tasks have all completed, from pool 16/06/19 11:47:00 INFO scheduler.DAGScheduler: ResultStage 8 (print at FlumePushDate2SparkStreaming.java:123) finished in 0.012 s 16/06/19 11:47:00 INFO scheduler.DAGScheduler: Job 4 finished: print at FlumePushDate2SparkStreaming.java:123, took 0.078575 s ------------------------------------------- Time: 1466308020000 ms ------------------------------------------- (Spark,3) (Hadoop,2) (Java,1) (Scala,2) 16/06/19 11:47:00 INFO scheduler.JobScheduler: Finished job streaming job 1466308020000 ms.0 from job set of time 1466308020000 ms 16/06/19 11:47:00 INFO scheduler.JobScheduler: Total delay: 0.381 s for time 1466308020000 ms (execution: 0.332 s) 16/06/19 11:47:00 INFO rdd.ShuffledRDD: Removing RDD 4 from persistence list 16/06/19 11:47:00 INFO rdd.MapPartitionsRDD: Removing RDD 3 from persistence list 16/06/19 11:47:00 INFO rdd.MapPartitionsRDD: Removing RDD 2 from persistence list 16/06/19 11:47:00 INFO storage.BlockManager: Removing RDD 4 16/06/19 11:47:00 INFO rdd.BlockRDD: Removing RDD 1 from persistence list 16/06/19 11:47:00 INFO storage.BlockManager: Removing RDD 3 16/06/19 11:47:00 INFO storage.BlockManager: Removing RDD 2 16/06/19 11:47:00 INFO storage.BlockManager: Removing RDD 1 16/06/19 11:47:00 INFO flume.FlumeInputDStream: Removing blocks of RDD BlockRDD[1] at createStream at FlumePushDate2SparkStreaming.java:66 of time 1466308020000 ms 16/06/19 11:47:00 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer() 16/06/19 11:47:00 INFO scheduler.InputInfoTracker: remove old batch metadata:
从控制台上可以看到flume将文件push到了spark streaming中,spark streaming将该文件单词进行统计。完毕!
- pom.zip (962 Bytes)
- 描述: pom
- 下载次数: 6
发表评论
-
SparkStreaming pull data from Flume
2016-06-19 17:29 1230Spark Streaming + Flume Integra ... -
Flume的安装和测试故障转移
2016-06-19 14:56 33881.实现功能 配置Flume监控本地文件夹变化,将变化的文件 ... -
Spark Streaming 统计单词的例
2016-06-19 14:55 3测试Spark Streaming 统计单词的例子 1.准 ... -
Flume的安装和测试故障转移
2016-06-19 12:48 17181.实现功能 配置Flume监控本地文件夹变化,将变化的文件上 ... -
Spark Streaming 统计单词的例子
2016-06-19 12:29 3684测试Spark Streaming 统计单词的例子 1.准备 ... -
Spark SQL窗口函数
2016-04-22 07:18 2562窗口函数又叫着窗口分析函数,Spark 1.4版本SparkS ... -
Spark SQL内置函数应用
2016-04-22 07:00 8656简单说明 使用Spark SQL中的内置函数对数据进行 ... -
Spark SQL操作Hive数据库
2016-04-13 22:37 17604本次例子通过scala编程实现Spark SQL操作Hive数 ... -
Spark SQL on hive配置和实战
2016-03-26 18:40 5569spark sql 官网:http://spark ... -
Spark RDD弹性表现和来源
2016-02-09 20:12 3859hadoop 的MapReduce是基于数 ... -
Spark内核架构
2016-02-07 12:24 10141.在将spark内核架构前,先了解一下Hadoop的MR,H ... -
spark集群HA搭建
2016-01-31 08:50 4526spark集群的HA图: 搭建spark的HA需要安装z ... -
Spark集群中WordCount运行原理
2016-01-31 07:05 2511以数据流动的视角解释一下wordcount运行的原理 pa ... -
eclipse开发spark程序配置在集群上运行
2016-01-27 08:08 9366这篇bolg讲一下,IDE开发的spark程序如何提交到集群上 ... -
eclipse开发spark程序配置本地运行
2016-01-27 07:58 12413今天简单讲一下在local模式下用eclipse开发一个简单的 ... -
spark1.6.0搭建(基于hadoop2.6.0分布式)
2016-01-24 10:11 5975本文是基于hadoop2.6.0的分布式环境搭建spark1. ...
相关推荐
通过flume监控文件,让kafka消费flume数据,再将sparkstreaming连接kafka作为消费者进行数据处理,文档整理实现
基于 Flume+ Kafka+ Spark Streaming 实现实时监控输出日志的报警系统的 Spark Streaming 程序代码。 基于 Flume+Kafka+Spark Streaming 实现实时监控输出日志的报警系统的 Spark Streaming 程序代码,博客链接: ...
本压缩包中的 jar 包是为了解决 Flume 与 Spark Streaming 的集成问题,确保数据能够从 Flume 无缝流转到 Spark Streaming 进行实时分析。 Flume 是 Apache Hadoop 生态系统中的一个分布式、可靠且可用的服务,它...
基于 Flume+Kafka+Spark Streaming 实现实时监控输出日志的报警系统的 Spark Streaming 程序代码,博客链接: https://blog.csdn.net/linge1995/article/details/81326146
1.Spark Streaming整合Flume需要的安装包. 2. Spark Streaming拉取Flume数据的flume配置文件.conf 3. Flume向Spark Streaming推数据的flume配置文件.conf
对于离线分析,Spark SQL 或 DataFrames 可以用于结构化数据的处理,而 Spark Streaming 则用于处理实时数据流,它可以以微批处理的方式高效地处理数据流,提供近实时的分析结果。 - **Spark SQL**:用于离线分析...
在本设计中,Spark Streaming扮演数据处理的角色,它连接到Kafka,实时读取数据,并利用DStream(Discretized Stream)接口进行复杂的计算操作,如过滤、窗口聚合等,以满足实时分析的需求。Spark Streaming的优势...
基于Flume+Kafka+SparkStreaming打造企业大数据流处理平台视频教程.txt
该压缩包下commons-lang3-3.3.2.jar,spark-streaming-flume_2.10-1.6.0.jar,scala-compiler-2.10.5.jar用于实现Flume监控文件夹中的内容变化,然后Spark Streaming对数据进行分析。
本科毕业设计项目,基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 本科毕业设计项目,基于spark streaming+flume+kafka+hbase的...
flume+Logstash+Kafka+Spark Streaming进行实时日志处理分析【大数据】
Kafka可以作为Flume的数据目的地,将Flume收集到的数据存储为消息主题,供其他消费者如Spark Streaming消费。 Spark Streaming是Spark框架的一部分,专为实时数据处理设计。它利用Spark的微批处理能力处理连续的...
spark-streaming-flume_2.11-2.1.0.jar
项目架构: 主要是基于Flume+Kafka+Sparkstreaming +HBase+ES来实现实时的用户信息存储轨迹查询任务。 1、资源内容: 2、代码特点:内含运行结果,不会运行可私信,参数化编程、参数可方便更改、代码编程思路清晰、...