在http://bit1129.iteye.com/blog/2184467一文中对Spark Streaming整合Flume-NG进行了基本的配置,并且Spark Streaming能够监听到来自于Flume的数据输出(通过Sink),不过代码很简单同时也是在单机上(Master和Worker在同一台机器上)进行试验的,因而还有有几个问题没有解决,本文继续Spark Streaming整合Flume-NG
package spark.examples.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.flume.FlumeUtils import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ object SparkFlumeNGWordCount { def main(args : Array[String]) { val conf = new SparkConf().setAppName("SparkFlumeNGWordCount") val ssc = new StreamingContext(conf, Seconds(10)) //9999端口是由这个Spark负责开启并监听的,Spark Streaming采集写到这个端口的数据 //问题:如果这个代码运行在集群中,那么localhost指的是driver所在的IP,还是每个worker所在的IP //即每个worker都会启动这个端口?这很重要因为它将影响Flume的配置(Flume的Sink就是9999端口) ///答案:只是一个Worker的IP,那么问题是如何知道Receiver在哪个Worker Node上启动?做法时先启动Spark Streaming,然后确定Receiver在哪个Node上启动 val lines = FlumeUtils.createStream(ssc,"localhost",9999) lines.cache(); //lines是DStream对象,它的每个元素是SparkFlumeEvent对象,可以将它转换为字符窜(evt.event.getBody.array()) // 打印显示前10行的字符串 lines.map(evt => { val str = new String(evt.event.getBody.array()) //打印到控制的文本内容 ("string received: "+ str) //print方法是action,也就是让map的转换操作运行,必须调用action }).print() //保存到磁盘文件 lines.map(evt => { val str = new String(evt.event.getBody.array()) //保存到磁盘文件的内容 ("string received: "+ str) }).saveAsTextFiles("file:///home/hadoop/flumeoutput", "suff") lines.count().map(cnt => "Received " + cnt + " flume events. at " + System.currentTimeMillis() ).print() ssc.start() ssc.awaitTermination(); } }
例子中,Spark Streaming对接收到来自于Flume的输入(通过9999端口)一方面打印到控制台(通过print算子,print只打印前10行),另一方面通过saveAsTextFiles写到磁盘文件中,这验证了一个问题,进行Flume输出的数据进行了包装,包装有headers,有body的JSON串,但是通过SparkFlumeEnvent.event.getBody.array()还是很容易的获取到真正的数据,获取到用户数据后可以对数据进行操作
有几个问题需要解决:
1. FlumeUtils的createStream方法有两个参数,host和port,那么这个host和port只会在Driver所在机器上开启监听,还是在所有Workers上也会监听。这个问题实质上是要回答这样一个问题,如下代码是在哪里执行
val lines = FlumeUtils.createStream(ssc,"localhost",9999)
因为这是main函数,即DriverProgram,我认为上面的代码应该在Driver上运行, 因为Driver的目的,一是构造RDD以及相应的DAG,然后提交作业,作业中的Task是例子中的print()和saveAsTextFiles触发Job提交,然后再划分Stage形成TaskSet创建的,这些Task是与val lines = FlumeUtils.createStream(ssc,"localhost",9999)代码无关的,所以,这个application只会在Driver上开启9999端口
答:这个认为是错误的,即9999是在Worker上监听的,也就是数据直接流向Worer节点了,而输出是在Driver上。这也就可以理解,Spark RDD的数据本地性了,所有的数据都在本地计算。
同时发现了一个现象:
三台虚机(一主两从),提交application的时候,指定的参数--total-executor-cores 为2,当集群启动后,发现Master分配了两个core,两个Slave也分配两个,但是由于物理机只有4个core,因此,两个Slave真正的core个数是每个Slave1个,
之前提到过1个core无法运行spark streaming程序,因为关掉一个虚机,采用一主一从每个分配2个core的方式运行。
是否可以认为--total-executor-cores参数的意义是给集群中的每个节点分配这么core(如果某个节点core不够,那么就有几个分配几个)
关于hostname和9999的详细说明:
1. Spark will listen on the given port for Flume to push data into it.2. When in local mode, it will listen on localhost:9999
3. When in some kind of cluster, instead of localhost you will have to give the hostname of the cluster node where you want Flume to forward the data. Spark will launch the Flume receiver on that node (assuming the hostname matching is correct), and list on port 9999, for receiving data from Flume. So only the configured machine will listen on port 9999.
2. 上例中,saveAsTextFiles,是DStream的一个方法。 注意Files是复数形式,即会产生多个文件目录,多个文件是指DStream中的每个RDD都会调用其saveAsFile方法,也就是每个RDD都会产生一个文件目录
//prefix: //suffix: def saveAsTextFiles(prefix: String, suffix: String = "") { ///saveFunc方法,time入参是rdd构造时的时间,因此每个RDD都不同 val saveFunc = (rdd: RDD[T], time: Time) => { ///根据前缀,后缀以及RDD的标识创建RDD的名字 val file = rddToFileName(prefix, suffix, time) ///每个RDD都保存在不同的文件目录中 rdd.saveAsTextFile(file) } //对于DStream中的每个RDD,调用saveFunc函数 this.foreachRDD(saveFunc) }
格式是prefix-时间戳.suff,如下所示
[hadoop@hadoop ~]$ pwd /home/hadoop [hadoop@hadoop ~]$ ls -l drwxrwxr-x 2 hadoop hadoop 41 Feb 20 21:15 flumeoutput-1424484950000.suff drwxrwxr-x 2 hadoop hadoop 41 Feb 20 21:16 flumeoutput-1424484960000.suff drwxrwxr-x 2 hadoop hadoop 41 Feb 20 21:16 flumeoutput-1424484970000.suff drwxrwxr-x 2 hadoop hadoop 80 Feb 20 21:16 flumeoutput-1424484980000.suff drwxrwxr-x 2 hadoop hadoop 80 Feb 20 21:16 flumeoutput-1424484990000.suff drwxrwxr-x 2 hadoop hadoop 119 Feb 20 21:16 flumeoutput-1424485000000.suff drwxrwxr-x 2 hadoop hadoop 41 Feb 20 21:16 flumeoutput-1424485010000.suff
因此在定义prefix时,最后定义为多级目录,不要写成/home/hadoop/flumeoutput,应该写成/home/hadoop/flumeoutput/appname/flume
关于Spark Streaming接收到FlumeNG发送来的Avro数据的处理:
val events = FlumeUtils.createStream(ssc, receiverHostname, receiverPort) val lines = events.map{e => new String(e.event.getBody().array(), "UTF-8")}
events是一个DStream,其中的每个元素是SparkFlumeEvents对象,SparkFlumeEvents.event获取到AvroFlumeEvent对象,AvroFlumeEvent的getBody方法获取到数据Body,array()方法转换为字节数组
相关推荐
spark-streaming-flume-sink_2.11-2.0.0.jar的jar包。
Flume-ng-sql-source是Apache Flume的一个扩展插件,主要功能是允许用户从各种数据库中抽取数据并将其传输到其他目的地,如Apache Kafka。在本案例中,我们讨论的是版本1.5.2的发布包,即"flume-ng-sql-source-...
spark-streaming-flume_2.11-2.1.0.jar
使用spark集成flume,由于flume默认只支持pull消息的方式,不过它可以自定义消息拉取方式,现要使用poll方式,可以使用spark-streaming-flume-sink_2.11-2.1.0.jar包下的org.apache.spark.streaming.flume.sink....
flume与spark streaming结合(pull方式)报错:org.apache.flume....把spark-streaming-flume-sink_2.11-2.2.0.jar复制到flume的lib目录。(使用不同版本的scala和spark请放对应的jar)我这里使用的scala为2.11.8。
flume pull 方式需要的jar包,spark-streaming-flume-sink_2.11_2.1.1.jar
将该jar包上传至flume/lib目录下,并将spark-streaming-flume其他版本jar包删除即可使用,该jar包适用于spark2.1.3版本使用
spark-streaming-kafka-0-8_2.11-2.4.0.jar
sparkstreming结合flume需要的jar包,scala是2.11版本,spark是1.6.2版本。也有其他版本的,需要的留言找我要
1.Spark Streaming整合Flume需要的安装包. 2. Spark Streaming拉取Flume数据的flume配置文件.conf 3. Flume向Spark Streaming推数据的flume配置文件.conf
spark-streaming的flume依赖
3. spark-streaming-flume-sink_2.10-1.6.1.jar:这是 Spark Streaming 与 Flume 对接的关键组件,称为 Flume 收集器(sink)。这个 jar 包实现了将 Flume 接收到的数据流发送到 Spark Streaming 进行处理的接口。...
spark-streaming_2.12-2.4.0.jar包,可以使用
Spark_Streaming整合Flume.md
spark-streaming_2.11-2.4.0-cdh6.1.1.jar
本科毕业设计项目,基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 本科毕业设计项目,基于spark streaming+flume+kafka+hbase的...
此外,书中还讨论了如何设置和配置Spark Streaming应用,包括数据源的接入,如Kafka、Flume和TCP套接字等,以及如何定义和实现数据处理逻辑。Spark Streaming支持多种窗口操作,如时间窗口和滑动窗口,这些操作对于...
该压缩包下commons-lang3-3.3.2.jar,spark-streaming-flume_2.10-1.6.0.jar,scala-compiler-2.10.5.jar用于实现Flume监控文件夹中的内容变化,然后Spark Streaming对数据进行分析。
通过flume监控文件,让kafka消费flume数据,再将sparkstreaming连接kafka作为消费者进行数据处理,文档整理实现
Spark 项目流 org.apache.spark/spark-streaming_2.11/1.2.0/spark-streaming_2.11-1.2.0.jar