1、flume配置文件
agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /opt/soft/tomcatloging/logs/test.log
agent1.sources.r1.batchSize = 10
agent1.sources.r1.channels= c1
agent1.sinks.k1.type = org.apache.flume.sink.KafkaSink
agent1.sinks.k1.channel = c1
agent1.sinks.k1.metadata.broker.list = 172.18.90.51:9092
agent1.sinks.k1.serializer.class = kafka.serializer.StringEncoder
agent1.sinks.k1.request.required.acks = 1
agent1.sinks.k1.custom.topic.name = test22
agent1.channels.c1.type=memory
agent1.channels.c1.capacity=10000
agent1.channels.c1.transactionCapacity=500
agent1.channels.c1.keep-alive=30
2、KafkaSink类
package org.apache.flume.sink;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
public class KafkaSink extends AbstractSink implements Configurable{
private static final Logger logger = LoggerFactory.getLogger(AbstractSink.class);
public static final String PARTITION_KEY_NAME = "custom.partition.key";
public static final String ENCODING_KEY_NAME = "custom.encoding";
public static final String DEFAULT_ENCODING = "UTF-8";
public static final String CUSTOME_TOPIC_KEY_NAME = "custom.topic.name";
public static final String CUSTOME_CONSUMER_THREAD_COUNT_KEY_NAME = "custom.thread.per.consumer";
private Properties parameters;
private Producer<String, String> producer;
@Override
public synchronized void start() {
super.start();
ProducerConfig config = new ProducerConfig(parameters);
this.producer = new Producer<String,String>(config);
}
@Override
public Status process() throws EventDeliveryException {
Status status = null;
//start transaction
Channel ch = getChannel();
Transaction tx = ch.getTransaction();
tx.begin();
try{
Event event = ch.take();
String partitionKey = (String)parameters.get(PARTITION_KEY_NAME);
String encoding = StringUtils.defaultIfEmpty((String)parameters.get(ENCODING_KEY_NAME), DEFAULT_ENCODING);
String topic = Preconditions.checkNotNull((String)parameters.get(CUSTOME_TOPIC_KEY_NAME), "custom.topic.name is required");
String eventData = new String(event.getBody(),encoding);
KeyedMessage<String, String> data;
if(StringUtils.isEmpty(partitionKey)){
data = new KeyedMessage<String, String>(topic, eventData);
}else{
data = new KeyedMessage<String, String>(topic,partitionKey, eventData);
}
if(logger.isInfoEnabled()){
logger.info("Send Message to Kafka : [" + eventData + "] -- [" + EventHelper.dumpEvent(event) + "]");
}
producer.send(data);
tx.commit();
status = Status.READY;
}catch(Throwable t){
tx.rollback();
status = Status.BACKOFF;
if(t instanceof Error){
throw (Error)t;
}
}finally{
tx.close();
}
return status;
}
@Override
public void configure(Context context) {
ImmutableMap<String, String> props = context.getParameters();
parameters = new Properties();
for(String key : props.keySet()){
String value = props.get(key);
parameters.put(key, value);
}
}
@Override
public synchronized void stop() {
producer.close();
}
}
3、把相关的kafka及scala包导入到flume的lib中。
分享到:
相关推荐
为了确保Flume与Kafka的稳定集成,还需要注意以下几点: 1. 确保Flume和Kafka版本兼容。不同版本之间可能存在API不兼容问题,选择相匹配的版本能避免潜在错误。 2. 配置好Kafka服务器的地址和端口(如`brokerList`)...
### 企业大数据处理:Spark、Druid、Flume与Kafka应用实践 #### Spark **Spark** 是一种用于大规模数据处理的开源分布式计算系统。它提供了比Hadoop MapReduce更快的性能,支持实时数据流处理、机器学习、图计算等...
在这个数据统计与分析系统中,Flink 作为处理引擎,可以从 Kafka 消费数据,执行实时分析任务,如计算每分钟的访问量、最热门的页面、平均响应时间等。Flink 的事件时间处理和窗口功能使其在处理延迟数据和动态数据...
### Flume-Kafka集成流程详解 #### 一、Flume与Kafka简介 - **Flume**:Flume是一款高可靠、高性能的日志采集、聚合和传输系统,支持在日志系统中定制各类数据发送方无缝地接入。 - **Kafka**:Kafka是一个分布式...
《LNMP环境构建与Flume+Kafka+Storm+HDFS实时系统集成详解》 在当前的互联网时代,数据量的急剧增长使得大数据处理成为一项关键任务。本篇将深入探讨如何在Linux环境下搭建LNMP(Linux + Nginx + MySQL + PHP)...
3. **Flume与Kafka集成**: - **配置Flume Source**:将Kafka作为Flume的数据源,需要配置为`kafka.Source`类型,指定Kafka的broker列表、topic等信息。 - **配置Flume Sink**:设置Flume将数据发送到Kafka,配置...
在集成Log4j与Flume的过程中,Avro Source是一种常用的方式。Avro Source允许远程客户端通过网络接口发送数据到Flume Agent。在这个场景中,Flume版本为1.5.2。 **步骤一:配置Flume** 首先需要对Flume的配置文件`...
为了将Flume与Kafka对接,我们需要创建一个Flume配置文件,其中定义了源(Source)、通道(Channel)和 sink(Sink)。在这个例子中,源应该是监控日志文件的File Tailer Source,它能够监听指定目录下文件的变化。...
Flume 的安装配置也是关键步骤之一,特别是为了与 Kafka 进行集成,选择合适的版本至关重要。 1. **下载安装 Flume**: - 建议下载 Flume 最新的 1.6.0 版本,因为此版本已内置了与 Kafka 集成的插件包。 - 下载...
"flume-kafka-storm源程序"这个压缩包很可能是包含这三个组件的集成示例或者源代码,用于帮助开发者理解和实践这三者之间的协同工作。 **Flume**: Flume是Apache Hadoop项目的一个子项目,专门用于高效、可靠、...
此外,搭建过程中也需要注意各个组件之间的网络通信问题,如Zookeeper与Kafka之间的通信、Kafka集群内部的通信,以及Flume和Kafka之间的数据交互。确保通信顺畅需要做好相应的网络安全和权限设置。 总结来看,利用...
|____kafka第01天-09.flume与kafka集成.avi |____kafka第01天-08.通过java API编程实现kafka消息消费者.avi |____kafka第01天-07.通过java API编程实现kafka消息生产者.avi |____kafka第01天-06.kafka手动分区再平衡...
《Flume与Kafka集成:深入理解flumeng-kafka-plugin》 在大数据处理领域,Apache Flume 和 Apache Kafka 都扮演着至关重要的角色。Flume 是一款用于收集、聚合和移动大量日志数据的工具,而 Kafka 则是一个分布式流...
Flume-Kafka插件是Apache Flume与Apache Kafka之间的数据集成工具,主要用于高效地将Flume收集的日志数据传输到Kafka主题中。这个插件适用于Flume的新一代(ng)版本1.3.1及以上,同时兼容Kafka的发行版2.10及0.8....
2. **Flume 与 Spark Streaming 集成**:在 DEA(可能是开发环境或服务器)上,通过 Maven 管理项目,将 `spark-streaming-flume-sink_2.11-2.4.2.jar` 添加到 Flume 安装目录的 `lib` 文件夹。由于文件权限问题,...
#### 三、测试Flume-Kafka集成 - 在Flume所在的机器上创建一个测试文件,并将其移至Flume监控的目录下: ```bash echo "有问题咨询lsz2012bj@163.com" > cs.txt mv cs.txt /home/hadoop/hh ``` - 观察Kafka消费...
总之,Flume+Kafka+HBase的集成能够构建一个稳定、高效的实时大数据处理系统,它既能处理海量数据的输入,又能快速地将数据存储到分布式数据库中,适用于各种大数据实时分析场景。在实践中,我们需要根据具体需求...
### Java大数据内容_7Flume、Kafka、Sqoop、Lucene #### 一、Flume 入门 ##### 1.1 Flume 概述 Flume 是一个分布式的、可靠的、高可用的日志采集系统,主要用于收集、汇总和移动大量的日志数据。它由 Cloudera ...
《基于Spark+Flume+Kafka+...综上所述,"基于Spark+Flume+Kafka+Hbase的实时日志分析系统"是一个强大的日志处理框架,它将数据采集、实时处理、消息传输和数据存储紧密集成,为电商系统提供了高效的数据分析解决方案。
通过Flume与Kafka的结合,我们可以实现大规模日志数据的实时流处理。 首先,我们需要配置Flume来连接到Kafka。在配置文件`flume-kafka.conf`中,我们定义了三个关键组件:源(source)、通道(channel)和接收器...