`
liyonghui160com
  • 浏览: 777122 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

FlumeNG与Kafka整合

阅读更多

 

 

 

1,作为Producer的Flume端配置,其中是以netcat为source数据源,sink是kafka

 

 #agent section  
producer.sources = s  
producer.channels = c  
producer.sinks = r  
  
#source section  
#producer.sources.s.type = seq  
producer.sources.s.type = netcat  
producer.sources.s.bind = localhost  
producer.sources.s.port = 44444  
producer.sources.s.channels = c  
  
# Each sink's type must be defined  
producer.sinks.r.type = org.apache.flume.plugins.KafkaSink  
producer.sinks.r.metadata.broker.list=127.0.0.1:9092  
producer.sinks.r.partition.key=0  
producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition  
producer.sinks.r.serializer.class=kafka.serializer.StringEncoder  
producer.sinks.r.request.required.acks=0  
producer.sinks.r.max.message.size=1000000  
producer.sinks.r.producer.type=sync  
producer.sinks.r.custom.encoding=UTF-8  
producer.sinks.r.custom.topic.name=test  
  
#Specify the channel the sink should use  
producer.sinks.r.channel = c  
  
# Each channel's type is defined.  
producer.channels.c.type = memory  
producer.channels.c.capacity = 1000  

 

 

2,配置consumer,source是Kafka,sink是logger

 

 

 consumer.sources = s  
consumer.channels = c  
consumer.sinks = r  
  
consumer.sources.s.type = seq  
consumer.sources.s.channels = c  
consumer.sinks.r.type = logger  
  
consumer.sinks.r.channel = c  
consumer.channels.c.type = memory  
consumer.channels.c.capacity = 100  
  
consumer.sources.s.type = org.apache.flume.plugins.KafkaSource  
consumer.sources.s.zookeeper.connect=127.0.0.1:2181  
consumer.sources.s.group.id=testGroup  
consumer.sources.s.zookeeper.session.timeout.ms=400  
consumer.sources.s.zookeeper.sync.time.ms=200  
consumer.sources.s.auto.commit.interval.ms=1000  
consumer.sources.s.custom.topic.name=test  
consumer.sources.s.custom.thread.per.consumer=4  

 

 

3,分别运行着两个agent

 

bin/flume-ng agent --conf conf  --conf-file conf/producer1.properties --name producer -Dflume.root.logger=INFO,console

 

bin/flume-ng agent --conf conf  --conf-file conf/comsumer1.properties   --name consumer -Dflume.root.logger=INFO,console

 

4,这时telnet上端口44444

 

 

 hadoop@stormspark:~/bigdata/apache-flume-1.4.0-bin$ telnet localhost 44444  
Trying ::1...  
Trying 127.0.0.1...  
Connected to localhost.  
Escape character is '^]'.  
1111111111111111  
OK  
kak^Hfkakakkakakakkakkakkaakaknnnm  
OK  
abcdefghijklmnopqrstuvwxyz  
OK 

 

 

两个agent都有信息输出了

 

org.apache.flume.plugins的代码参考 :https://github.com/baniuyao/flume-kafka上面也有详细的使用方法

 

 

 

 

分享到:
评论

相关推荐

    Flume+kafka+Storm整合

    ### Flume+kafka+Storm整合知识点详解 #### 一、Flume、Kafka与Storm概述 在大数据领域,数据采集、传输与实时处理是至关重要的环节。本篇内容重点介绍了如何利用Flume、Kafka与Storm这三个开源工具实现一套完整的...

    flume与kafka整合需要的jar包

    整合Flume与Kafka的关键在于Flume的类路径中包含正确的jar包,这样才能让Flume理解如何与Kafka通信。以下是可能需要的jar包列表: 1. `flume-kafka-sink.jar`:这是Flume Kafka Sink的实现,它提供了Flume与Kafka...

    flume与kafka整合高可靠教程

    Flume 与 Kafka 整合高可靠教程 本篇教程旨在指导读者如何将 Flume 与 Kafka 整合,以实现高可靠的数据传输。在本教程中,我们将从安装 Kafka 和 Flume 开始,然后配置 Flume,使其将数据传输到 Kafka。 一、安装 ...

    flume-kafka流程

    通过上述步骤,我们成功实现了Flume与Kafka的集成,具体包括了ZooKeeper的安装与启动、Kafka服务的配置与启动、Flume Agent的配置与启动、数据的生成与发送以及数据的消费等环节。这种集成方式不仅能够实现大规模的...

    flume,kafka,storm整合

    - Flume Kafka插件:https://github.com/beyondj2ee/flumeng-kafka-plugin - Storm版本:0.9.7 - Zookeeper版本:3.4.5 接下来是各个软件的安装步骤: 1. **Flume安装**: - 下载Apache Flume 1.5.0的tar.gz包。 ...

    Flume采集数据到Kafka,然后从kafka取数据存储到HDFS的方法思路和完整步骤

    ### Flume采集数据到Kafka,然后从Kafka取数据存储到HDFS的方法思路和完整步骤 #### 一、概述 随着大数据技术的发展,高效的数据采集、处理与存储变得尤为重要。本文将详细介绍如何利用Flume采集日志数据,并将其...

    log4j+flume+kafka+storm

    本文详细介绍了如何将Log4j、Flume与Kafka进行整合,使得日志数据能够从Log4j经由Flume传输至Kafka。这一过程不仅涉及具体的配置细节,还包括了环境准备、测试验证等多个方面,确保了整个系统能够稳定高效地运行。...

    flume-ng-1.6.0-cdh5.12.0.tar.gz

    3. **实时流处理**:Flume NG与Kafka结合,可以构建实时数据流处理平台,实现快速的数据处理和分析。 4. **Hadoop集成**:Flume NG可以直接将数据写入HDFS,为Hadoop MapReduce或Hive提供数据输入,简化大数据处理...

    kafka+flume 实时采集oracle数据到hive中.docx

    然后,需要启动Flume Agent,使用命令./flume-ng agent -n a1 -c conf -f conf/flume.conf。 三、实时采集Oracle数据到Hive中 为了实时采集Oracle数据到Hive中,需要使用Kafka和Flume。首先,需要使用Kafka ...

    flume-ng整合hbase

    Flume可以和许多的系统进行整合,包括了Hadoop、Spark、Kafka、Hbase等等;当然,强悍的Flume也是可以和Mysql进行整合,将分析好的日志存储到Mysql(当然,你也可以存放到pg、oracle等等关系型数据库)。

    apache-flume-1.9.0-bin.tar,kafka_2.11-0.10.1.0,zookeeper-3.3.6_.tar

    这些组件在大数据生态系统中的整合使用,可以创建一个强大的数据处理链路:Flume收集和传输日志数据,Kafka作为中间层进行数据缓冲和分发,ZooKeeper提供一致性服务来管理整个流程。这样的架构允许实时数据流处理,...

    flume 安装和使用

    对于更复杂的配置,Flume 允许一个 Source 对应多个 Sinks,这意味着数据可以被发送到多个目的地,例如 Kafka。在配置中,你可以设置不同的参数,如 `rollInterval` 控制文件滚动的时间间隔,`type` 定义不同的 ...

    Flume安装包、安装文档

    4. **启动Flume**:使用`flume-ng agent`命令启动Flume Agent,命令格式通常为`flume-ng agent --conf conf --conf-file <config_file> --name <agent_name> -Dflume.root.logger=,console`。 5. **监控和调试**:...

    apache-flume-1.5.2-bin.zip

    Apache Flume 是一个分布式、可靠且可用于有效收集、聚合和移动大量...在部署和使用过程中,理解其核心组件、配置文件的编写以及与其他系统的整合是关键,这样才能充分利用 Flume 的优势,实现高效的数据管理和分析。

    格力大数据项目工作说明书.docx

    - **非结构化数据**:使用FlumeNG进行采集。 - **结构化数据**:使用Sqoop从Oracle/SQLServer/MySQL等关系数据库中收集。 - **实时流数据**:采用Kafka等消息队列组件采集。 - **数据处理与分析**: - **离线...

    storm-miclog:近乎实时的日志监控系统。 它有一些基本组件

    "storm-miclog"就是这样一个专为实现近乎实时日志监控而设计的系统,它巧妙地融合了多个开源组件,如Flume-ng、Kafka、Storm以及Zookeeper,打造了一套高效、可靠的解决方案。 首先,我们来了解一下这个系统的核心...

    大数据技术在物联网平台的应用及研究.pdf

    对于离线数据,则需要采用FlumeNG等系统进行数据的收集和储存。 2. 数据清洗与计算 数据清洗的目的是从海量的数据中提取出高质量的数据。这通常需要根据一定的逻辑对数据进行筛选和过滤,保留有用的数据,去除无关...

    hadoop生态系统搭建

    软件版本列表包括了搭建Hadoop生态系统所需的关键组件版本,例如JDK 1.7.0_67、MySQL 5.1、Hadoop 2.3.0、HBase 0.96、Hive 0.12、SolrCloud 4.4、Storm 0.92、Kafka 2.8.0、Flume-ng 1.4.0、Sqoop 1.4.4等。...

    大数据平台及在推荐广告的应用20(ppt版).ppt

    数据从爬虫、SDK、用户行为日志等来源被收集,经过FlumeNG、Storm、Spark Streaming等实时处理,再通过离线计算进行深入分析,最后应用于推荐广告、搜索、BI等领域。 6. **推荐系统** 推荐系统的核心是根据用户的...

Global site tag (gtag.js) - Google Analytics