Flume-ng 聚集和移动大量日志数据的分布式的,有效的服务。这里我们解释怎样配置 Flume -ng 和 Spa rk Streaming 来从 Flume 获取数据。这里 介绍第二种方法 。
1. 使用自定义 Sink 的拉方式
不是 Flume 直接推送数据到 SparkStreaming ,这种方法运行了一个如下所示的 Flume Sink 。
1. Flume 将数据推送到 Sink 中,然后数据在此处缓存。
2. Spark Streaming 使用一个可靠的 Flume 接收器和操作从 Sink 中拉取数据。只有在 Spark Streaming 接收到数据并且把数据复制后才认为操作成功。
这个方法比前面的方法提供了强大的可靠性和容错保证。然而,这需要配置 Flume 运行一个自定义 Sink 。下面是配置步骤。
1.1 一般需求
选择一台在 Flume 代理中运行自定义 Sink 的机器。 Flume 其余的管道被配置为向那个代理发送数据。 Spark 集群中的机器都能连接到运行自定义 Sink 的那台机器上。
1.2 配置 Flume
在选定的机器上配置 Flume 需要如下的两步。
A . 添加如下的 JAR 包到要运行自定义 Sink 的机器中的 Flume 的 classpath 中 (这里我把如下 jar 放在 /usr/lib/flume-ng/lib/ 目录下)
spark-streaming-flume-sink_2.10-1.3.0-cdh5.4.3.jar
commons-lang3 -3.3.2.jar
B. 配置文件:在那台机器上,通过下面的配置文件配置 Flume 代理发送数据到一个 Avro Sink 中。
agent.sinks = spark
agent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
agent.sinks.spark.hostname = <hostname of the local machine>
agent.sinks.spark.port = 41444
agent.sinks.spark.channel = memoryChannel
2. 配置 Spark Streaming 程序
2.1 编程:在流处理程序的代码中,引入 FlumeUtils 并如下创建一个输入 DStream 流。 这里给出 Spark Java 程序例子
import java.util.Arrays ;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction ;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.examples.streaming.StreamingExamples;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.flume.FlumeUtils;
import org.apache.spark.streaming.flume.SparkFlumeEvent;
import org.apache.spark.storage.StorageLevel;
public class SparkStreamingFlume2 {
public SparkStreamingFlume2() {
}
public static void main(String[] args) {
if (args. length != 2) {
System. err .println( "Usage: JavaFlumeEventCount1 <host> <port>" );
System. exit (1);
}
StreamingExamples. setStreamingLogLevels ();
String host = args[0];
int port = Integer. parseInt (args[1]);
Duration batchInterval = new Duration(2000);
SparkConf sparkConf = new SparkConf().setAppName( "JavaFlumeEventCount" );
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval);
StorageLevel storagelevel = StorageLevel. MEMORY_ONLY ();
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils. createPollingStream (ssc, host, port,storagelevel);
flumeStream.count();
flumeStream.count().map( new Function<Long, String>() {
@Override
public String call(Long in) {
return "Received " + in + " flume events." ;
}
}).print();
ssc.start();
ssc.awaitTermination();
}
2.2 启动 Flume 这里主要需要添加 spark-streaming-flume-sink_2.10-1.3.0-cdh5.4.3.jar 和 commons-lang3-3.3.2.jar 到 $FLUME_HOME/lib 目录下
F lume-ng agent – c /etc/flume-ng/conf – f /etc/flume-ng/conf/flume.conf – Dflume.root.logger=DEBUG,console – n agent02
2.3 提交 Spark ,这里需要注意的添加必要的 jar 包,可以在提交的时候加上 --jars 来指定相关的 jar 包,也可以在 sc 中调用 addJar() 添加
spark-submit --master spark://udh-spark-test-04:7077 --class SparkStreamingFlume2 --jars /root/spark-streaming-flume-sink_2.10-1.3.0-cdh5.4.3.jar,/usr/lib/flume-ng/lib/flume-ng-sdk-1.5.0-cdh5.4.3.jar,/usr/lib/spark/lib/spark-examples-1.3.0-cdh5.4.3-hadoop2.6.0-cdh5.4.3.jar,/usr/lib/spark/lib/spark-assembly-1.3.0-cdh5.4.3-hadoop2.6.0-cdh5.4.3.jar /root/flume-test02.jar udh-spark-test-03 41444
http://udn.yyuap.com/doc/ae/1511892.html
相关推荐
spark-streaming-flume-sink_2.11-2.0.0.jar的jar包。
Kafka作为一个高吞吐量的分布式消息系统,能够很好地处理来自Flume的数据,使得这些数据可以进一步被实时处理引擎(如Spark Streaming或Flink)消费,进行实时分析或存储到数据仓库。 Flume-ng-sql-source的版本...
spark-streaming-flume_2.11-2.1.0.jar
使用spark集成flume,由于flume默认只支持pull消息的方式,不过它可以自定义消息拉取方式,现要使用poll方式,可以使用spark-streaming-flume-sink_2.11-2.1.0.jar包下的org.apache.spark.streaming.flume.sink....
flume与spark streaming结合(pull方式)报错:org.apache.flume.FlumeException: Unable to load sink type: org.apache.spark.streaming.flume.sink.SparkSink, class: org.apache.spark.streaming.flume.sink....
flume pull 方式需要的jar包,spark-streaming-flume-sink_2.11_2.1.1.jar
将该jar包上传至flume/lib目录下,并将spark-streaming-flume其他版本jar包删除即可使用,该jar包适用于spark2.1.3版本使用
3. spark-streaming-flume-sink_2.10-1.6.1.jar:这是 Spark Streaming 与 Flume 对接的关键组件,称为 Flume 收集器(sink)。这个 jar 包实现了将 Flume 接收到的数据流发送到 Spark Streaming 进行处理的接口。...
spark-streaming-kafka-0-8_2.11-2.4.0.jar
1.Spark Streaming整合Flume需要的安装包. 2. Spark Streaming拉取Flume数据的flume配置文件.conf 3. Flume向Spark Streaming推数据的flume配置文件.conf
sparkstreming结合flume需要的jar包,scala是2.11版本,spark是1.6.2版本。也有其他版本的,需要的留言找我要
spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar
本科毕业设计项目,基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 本科毕业设计项目,基于spark streaming+flume+kafka+hbase的...
spark-streaming-kafka-0-10_2.11-2.4.0-cdh6.1.1.jar
spark-streaming的flume依赖
spark streaming 链接kafka必用包,欢迎大家下载与使用
总结起来,`spark-streaming-kafka-0-8-assembly_2.11-2.4.5`是连接Spark Streaming到Kafka的重要组件,用于实时处理大数据流。在实际应用中,需要正确配置和使用这个库,以便从Kafka高效地消费和处理数据。
spark3.0.0版本对接kafka数据源需要的jar包,最新的版本导致maven的阿里云仓库不能直接下载下来,所以需要手动导入jar包进行操作,有需要的朋友可以免费下载
spakr streaming的kafka依赖
KafkaUtils所依赖的jar包,导入文件中KafkaUtils报错,需要导入spark-streaming-kafka_2.10-1.6.0.jar包