1,作为Producer的Flume端配置,其中是以netcat为source数据源,sink是kafka
#agent section producer.sources = s producer.channels = c producer.sinks = r #source section #producer.sources.s.type = seq producer.sources.s.type = netcat producer.sources.s.bind = localhost producer.sources.s.port = 44444 producer.sources.s.channels = c # Each sink's type must be defined producer.sinks.r.type = org.apache.flume.plugins.KafkaSink producer.sinks.r.metadata.broker.list=127.0.0.1:9092 producer.sinks.r.partition.key=0 producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition producer.sinks.r.serializer.class=kafka.serializer.StringEncoder producer.sinks.r.request.required.acks=0 producer.sinks.r.max.message.size=1000000 producer.sinks.r.producer.type=sync producer.sinks.r.custom.encoding=UTF-8 producer.sinks.r.custom.topic.name=test #Specify the channel the sink should use producer.sinks.r.channel = c # Each channel's type is defined. producer.channels.c.type = memory producer.channels.c.capacity = 1000
2,配置consumer,source是Kafka,sink是logger
consumer.sources = s consumer.channels = c consumer.sinks = r consumer.sources.s.type = seq consumer.sources.s.channels = c consumer.sinks.r.type = logger consumer.sinks.r.channel = c consumer.channels.c.type = memory consumer.channels.c.capacity = 100 consumer.sources.s.type = org.apache.flume.plugins.KafkaSource consumer.sources.s.zookeeper.connect=127.0.0.1:2181 consumer.sources.s.group.id=testGroup consumer.sources.s.zookeeper.session.timeout.ms=400 consumer.sources.s.zookeeper.sync.time.ms=200 consumer.sources.s.auto.commit.interval.ms=1000 consumer.sources.s.custom.topic.name=test consumer.sources.s.custom.thread.per.consumer=4
3,分别运行着两个agent
bin/flume-ng agent --conf conf --conf-file conf/producer1.properties --name producer -Dflume.root.logger=INFO,console
bin/flume-ng agent --conf conf --conf-file conf/comsumer1.properties --name consumer -Dflume.root.logger=INFO,console
4,这时telnet上端口44444
hadoop@stormspark:~/bigdata/apache-flume-1.4.0-bin$ telnet localhost 44444 Trying ::1... Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. 1111111111111111 OK kak^Hfkakakkakakakkakkakkaakaknnnm OK abcdefghijklmnopqrstuvwxyz OK
两个agent都有信息输出了
org.apache.flume.plugins的代码参考 :https://github.com/baniuyao/flume-kafka上面也有详细的使用方法
相关推荐
### Flume+kafka+Storm整合知识点详解 #### 一、Flume、Kafka与Storm概述 在大数据领域,数据采集、传输与实时处理是至关重要的环节。本篇内容重点介绍了如何利用Flume、Kafka与Storm这三个开源工具实现一套完整的...
整合Flume与Kafka的关键在于Flume的类路径中包含正确的jar包,这样才能让Flume理解如何与Kafka通信。以下是可能需要的jar包列表: 1. `flume-kafka-sink.jar`:这是Flume Kafka Sink的实现,它提供了Flume与Kafka...
Flume 与 Kafka 整合高可靠教程 本篇教程旨在指导读者如何将 Flume 与 Kafka 整合,以实现高可靠的数据传输。在本教程中,我们将从安装 Kafka 和 Flume 开始,然后配置 Flume,使其将数据传输到 Kafka。 一、安装 ...
通过上述步骤,我们成功实现了Flume与Kafka的集成,具体包括了ZooKeeper的安装与启动、Kafka服务的配置与启动、Flume Agent的配置与启动、数据的生成与发送以及数据的消费等环节。这种集成方式不仅能够实现大规模的...
### Flume采集数据到Kafka,然后从Kafka取数据存储到HDFS的方法思路和完整步骤 #### 一、概述 随着大数据技术的发展,高效的数据采集、处理与存储变得尤为重要。本文将详细介绍如何利用Flume采集日志数据,并将其...
- Flume Kafka插件:https://github.com/beyondj2ee/flumeng-kafka-plugin - Storm版本:0.9.7 - Zookeeper版本:3.4.5 接下来是各个软件的安装步骤: 1. **Flume安装**: - 下载Apache Flume 1.5.0的tar.gz包。 ...
本文详细介绍了如何将Log4j、Flume与Kafka进行整合,使得日志数据能够从Log4j经由Flume传输至Kafka。这一过程不仅涉及具体的配置细节,还包括了环境准备、测试验证等多个方面,确保了整个系统能够稳定高效地运行。...
3. **实时流处理**:Flume NG与Kafka结合,可以构建实时数据流处理平台,实现快速的数据处理和分析。 4. **Hadoop集成**:Flume NG可以直接将数据写入HDFS,为Hadoop MapReduce或Hive提供数据输入,简化大数据处理...
Flume可以和许多的系统进行整合,包括了Hadoop、Spark、Kafka、Hbase等等;当然,强悍的Flume也是可以和Mysql进行整合,将分析好的日志存储到Mysql(当然,你也可以存放到pg、oracle等等关系型数据库)。
这些组件在大数据生态系统中的整合使用,可以创建一个强大的数据处理链路:Flume收集和传输日志数据,Kafka作为中间层进行数据缓冲和分发,ZooKeeper提供一致性服务来管理整个流程。这样的架构允许实时数据流处理,...
对于更复杂的配置,Flume 允许一个 Source 对应多个 Sinks,这意味着数据可以被发送到多个目的地,例如 Kafka。在配置中,你可以设置不同的参数,如 `rollInterval` 控制文件滚动的时间间隔,`type` 定义不同的 ...
4. **启动Flume**:使用`flume-ng agent`命令启动Flume Agent,命令格式通常为`flume-ng agent --conf conf --conf-file <config_file> --name <agent_name> -Dflume.root.logger=,console`。 5. **监控和调试**:...
Apache Flume 是一个分布式、可靠且可用于有效收集、聚合和移动大量...在部署和使用过程中,理解其核心组件、配置文件的编写以及与其他系统的整合是关键,这样才能充分利用 Flume 的优势,实现高效的数据管理和分析。
- **非结构化数据**:使用FlumeNG进行采集。 - **结构化数据**:使用Sqoop从Oracle/SQLServer/MySQL等关系数据库中收集。 - **实时流数据**:采用Kafka等消息队列组件采集。 - **数据处理与分析**: - **离线...
"storm-miclog"就是这样一个专为实现近乎实时日志监控而设计的系统,它巧妙地融合了多个开源组件,如Flume-ng、Kafka、Storm以及Zookeeper,打造了一套高效、可靠的解决方案。 首先,我们来了解一下这个系统的核心...
对于离线数据,则需要采用FlumeNG等系统进行数据的收集和储存。 2. 数据清洗与计算 数据清洗的目的是从海量的数据中提取出高质量的数据。这通常需要根据一定的逻辑对数据进行筛选和过滤,保留有用的数据,去除无关...
软件版本列表包括了搭建Hadoop生态系统所需的关键组件版本,例如JDK 1.7.0_67、MySQL 5.1、Hadoop 2.3.0、HBase 0.96、Hive 0.12、SolrCloud 4.4、Storm 0.92、Kafka 2.8.0、Flume-ng 1.4.0、Sqoop 1.4.4等。...
数据从爬虫、SDK、用户行为日志等来源被收集,经过FlumeNG、Storm、Spark Streaming等实时处理,再通过离线计算进行深入分析,最后应用于推荐广告、搜索、BI等领域。 6. **推荐系统** 推荐系统的核心是根据用户的...
具体来说,日志收集通过Logstash、Kafka、Flume-ng实现,离线处理使用HDFS、HBase和Hive进行数据存储与分析,而实时处理则依托于Storm和Spark Streaming技术。此外,为了提高用户体验,Mobike还特别强调实时搜索服务...