第一步,Flume和Kakfa对接,Flume抓取日志,写到Kafka中
第二部,Spark Streaming读取Kafka中的数据,进行实时分析
本文首先使用Kakfa自带的消息处理(脚本)来获取消息,走通Flume和Kafka的对接
1. Flume配置
1. 下载Flume和Kafka集成的插件,下载地址:https://github.com/beyondj2ee/flumeng-kafka-plugin。将package目录中的flumeng-kafka-plugin.jar拷贝到Flume安装目录的lib目录下
2. 将Kakfa安装目录libs目录下的如下jar包拷贝到Flume安装目录的lib目录下
kafka_2.10-0.8.1.1.jar
scala-library-2.10.1.jar
metrics-core-2.2.0.jar
3.添加agent配置
producer.sources = s producer.channels = c producer.sinks = r #source section #producer.sources.s.type = seq producer.sources.s.type = netcat producer.sources.s.bind = localhost producer.sources.s.port = 44444 producer.sources.s.channels = c # Each sink's type must be defined producer.sinks.r.type = org.apache.flume.plugins.KafkaSink producer.sinks.r.metadata.broker.list=127.0.0.1:9092 producer.sinks.r.partition.key=0 producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition producer.sinks.r.serializer.class=kafka.serializer.StringEncoder producer.sinks.r.request.required.acks=0 producer.sinks.r.max.message.size=1000000 producer.sinks.r.producer.type=sync producer.sinks.r.custom.encoding=UTF-8 ##定义Kafka接收消息的Topic的名字 producer.sinks.r.custom.topic.name=test #Specify the channel the sink should use producer.sinks.r.channel = c # Each channel's type is defined. producer.channels.c.type = memory producer.channels.c.capacity = 1000
3.1 上面指定了sink的类型为KafkaSink,目的是将日志送往Kafka消息队列,分区类为SinglePartition
3.2 指定topic的名字为test
3.3 指定Flume的消息源来自于netcat,(localhost,44444)
4. 启动Flume
./flume-ng agent -f ../conf/kafka.conf -c . -n producer
指定配置文件和agent的名字
Kafka配置
5. 启动Kafka
./kafka-server-start.sh ../config/server.properties
5.1 启动Kafka依赖的Zookeeper,添加topic名字为test,详见
5.2 启动Kakfa的消息接收进程
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
6.启动telnet,输入netcat接受的数据
telnet localhost 44444
数据流转过程
1. 在telnet终端输入数据,被Flume的source接受
2. Flume将数据写入到Kafka消息队列中,在Flume_Kafka的插件中有向Kafka发送消息的逻辑
3. Kafka消息消费者,监听到Kafka队列中来了消息,那么就在Kakfa的消息接收端看到控制台上有输出
问题:
1. 此处Kafka使用SinglePartition的方式接收消息,如果是Kafka集群,那么Flume如何写入消息到一个topic的多个partition中
2. Flume的消息源是监听端口44444实现的,如何监听日志文件呢?日志文件可以自动增长,另外也会自动的创建新的日志文件,这用Kafka如何处理?
对于监听日志文件,应该使用Flume结合Log4J的方式,有个专门针对Flume的Log4J Appender,可以将写入到文件的内容通过Appender发送给Flume作为数据源,Flume的源收到数据后,就可以通过Channel发送给Sink(此处的Sink是KafkaSingk)
关于Kafka的Partition
1. 第一个问题,SinglePartition的实现
package org.apache.flume.plugins; import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SinglePartition implements Partitioner<String> { public SinglePartition(VerifiableProperties props) { } @Override public int partition(String key, int numberOfPartions) { return 0; } }
可见,只要把partition方法实现为 key.hashCode()%numberOfPartitions即可
2. 第二个问题,如何设置Kafka的一个topic几个partition?
在创建topic时,就需要指定partition的个数
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
建立一个分区数为17,复制因为为3的topic,看看zk上记录了哪些信息,
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 17 --topic test
2.1. 报错:也就是说,复制因子不能比brokers的个数大
[hadoop@hadoop kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 17 --topic test_many_partitions Error while executing topic command replication factor: 3 larger than available brokers: 1 kafka.admin.AdminOperationException: replication factor: 3 larger than available brokers: 1 at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:70) at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:155) at kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:86) at kafka.admin.TopicCommand$.main(TopicCommand.scala:50) at kafka.admin.TopicCommand.main(TopicCommand.scala)
2.2 新建了topic后,Kafka server日志显示
[2015-02-14 02:53:53,526] INFO Completed load of log test_many_partitions-4 with log end offset 0 (kafka.log.Log) [2015-02-14 02:53:53,526] INFO Created log for partition [test_many_partitions,4] in /tmp/kafka-logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager) [2015-02-14 02:53:53,527] WARN Partition [test_many_partitions,4] on broker 0: No checkpointed highwatermark is found for partition [test_many_partitions,4] (kafka.cluster.Partition) [2015-02-14 02:53:53,540] INFO Completed load of log test_many_partitions-13 with log end offset 0 (kafka.log.Log) [2015-02-14 02:53:53,541] INFO Created log for partition [test_many_partitions,13] in /tmp/kafka-logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager) [2015-02-14 02:53:53,541] WARN Partition [test_many_partitions,13] on broker 0: No checkpointed highwatermark is found for partition [test_many_partitions,13] (kafka.cluster.Partition) [2015-02-14 02:53:53,554] INFO Completed load of log test_many_partitions-1 with log end offset 0 (kafka.log.Log) [2015-02-14 02:53:53,555] INFO Created log for partition [test_many_partitions,1] in /tmp/kafka-logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager) [2015-02-14 02:53:53,555] WARN Partition [test_many_partitions,1] on broker 0: No checkpointed highwatermark is found for partition [test_many_partitions,1] (kafka.cluster.Partition)
3.3 查看zk上关于具有多partition的topic,结果如下:
17个partition
[zk: localhost:2181(CONNECTED) 26] ls /brokers/topics [test_many_partitions, test] [zk: localhost:2181(CONNECTED) 27] ls /brokers/topics/test_many_partitions [partitions] [zk: localhost:2181(CONNECTED) 28] ls /brokers/topics/test_many_partitions/partitions [15, 16, 13, 14, 11, 12, 3, 2, 1, 10, 0, 7, 6, 5, 4, 9, 8] [zk: localhost:2181(CONNECTED) 29]
1个partition
[zk: localhost:2181(CONNECTED) 30] ls /brokers/topics/test [partitions] [zk: localhost:2181(CONNECTED) 31] ls /brokers/topics/test/partitions [0]
参考:
https://github.com/beyondj2ee/flumeng-kafka-plugin
http://blog.csdn.net/weijonathan/article/details/18301321
http://liyonghui160com.iteye.com/blog/2173235
相关推荐
本科毕业设计项目,基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 本科毕业设计项目,基于spark streaming+flume+kafka+hbase的...
flume+Logstash+Kafka+Spark Streaming进行实时日志处理分析【大数据】
基于spark streaming+flume+kafka+hbase的实时日志处理分析系统源码(分控制台版本和Web UI可视化版本).zip基于spark streaming+flume+kafka+hbase的实时日志处理分析系统源码(分控制台版本和Web UI可视化版本).zip...
这里提到的"人工智能-spark"项目,结合了Flume、Kafka、HBase以及Spark-Streaming,构建了一个高效且实时的数据处理管道。下面我们将详细探讨这些组件各自的作用及其相互间的协同工作原理。 1. **Apache Flume**:...
通过flume监控文件,让kafka消费flume数据,再将sparkstreaming连接kafka作为消费者进行数据处理,文档整理实现
基于spark streaming+flume+kafka+hbase的实时日志处理分析系统源码(分控制台版本和Web UI可视化版本).zip 基于spark streaming+flume+kafka+hbase的实时日志处理分析系统源码(分控制台版本和Web UI可视化版本).zip ...
本项目"基于spark streaming+flume+kafka+hbase的实时日志处理分析系统"结合了多个关键技术,构建了一个高效、实时的数据处理流水线。下面将详细阐述这些技术及其在系统中的作用。 1. Spark Streaming: Spark ...
这里提到的"基于Flume+kafka+spark大型电商网站日志分析系统(离线+实时)"就是一个这样的解决方案,结合了三个关键组件:Apache Flume、Apache Kafka和Apache Spark。下面将详细介绍这三个技术及其在系统中的作用。...
基于 Flume+ Kafka+ Spark Streaming 实现实时监控输出日志的报警系统的 Spark Streaming 程序代码。 基于 Flume+Kafka+Spark Streaming 实现实时监控输出日志的报警系统的 Spark Streaming 程序代码,博客链接: ...
在本项目中,"基于Spark Streaming + Kafka + Flume 实现的日志收集处理系统"是一个高效、可扩展的日志分析解决方案。它结合了三个关键组件:Apache Spark Streaming、Apache Kafka和Apache Flume,以实现实时数据流...
【描述】: 本系统旨在实现一个高效的实时日志处理和分析平台,利用现代大数据技术栈中的关键组件,包括Apache Spark Streaming、Flume、Kafka以及HBase。系统分为控制台版本和基于Web的版本,旨在满足不同场景下的...
基于 Flume+Kafka+Spark Streaming 实现实时监控输出日志的报警系统的 Spark Streaming 程序代码,博客链接: https://blog.csdn.net/linge1995/article/details/81326146
生成的数据主要是模拟某学习网站学习视频课程的访问量(其中*以“ / class”开头的表示实战课程,然后通过流水线Flume + Kafka + SparkStreaming进行实时日志的收集,HBase来存储数据)*注意事项(使用的软件工具及...
spark-streaming-kafka-0-8_2.11-2.4.0.jar