`

flume与kafka集成

阅读更多
1、flume配置文件

agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1

agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /opt/soft/tomcatloging/logs/test.log
agent1.sources.r1.batchSize = 10
agent1.sources.r1.channels= c1

agent1.sinks.k1.type = org.apache.flume.sink.KafkaSink
agent1.sinks.k1.channel = c1
agent1.sinks.k1.metadata.broker.list = 172.18.90.51:9092
agent1.sinks.k1.serializer.class = kafka.serializer.StringEncoder
agent1.sinks.k1.request.required.acks = 1
agent1.sinks.k1.custom.topic.name = test22

agent1.channels.c1.type=memory
agent1.channels.c1.capacity=10000
agent1.channels.c1.transactionCapacity=500
agent1.channels.c1.keep-alive=30


2、KafkaSink类

package org.apache.flume.sink;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import org.apache.commons.lang.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;

public class KafkaSink extends AbstractSink implements Configurable{
	
	private static final Logger logger = LoggerFactory.getLogger(AbstractSink.class);
	
	public static final String PARTITION_KEY_NAME = "custom.partition.key";
    public static final String ENCODING_KEY_NAME = "custom.encoding";
    public static final String DEFAULT_ENCODING = "UTF-8";
    public static final String CUSTOME_TOPIC_KEY_NAME = "custom.topic.name";
    public static final String CUSTOME_CONSUMER_THREAD_COUNT_KEY_NAME = "custom.thread.per.consumer";

	private Properties parameters;
	private Producer<String, String> producer;
	
	@Override
	public synchronized void start() {
		super.start();
		ProducerConfig config = new ProducerConfig(parameters);
		this.producer = new Producer<String,String>(config);
	}
	
	@Override
	public Status process() throws EventDeliveryException {
		
		Status status = null;
		
		//start transaction
		Channel ch = getChannel();
		Transaction tx = ch.getTransaction();
		tx.begin();
		
		try{
			Event event = ch.take();
			
			String partitionKey = (String)parameters.get(PARTITION_KEY_NAME);
			String encoding = StringUtils.defaultIfEmpty((String)parameters.get(ENCODING_KEY_NAME), DEFAULT_ENCODING);
			String topic = Preconditions.checkNotNull((String)parameters.get(CUSTOME_TOPIC_KEY_NAME), "custom.topic.name is required");
			
			String eventData = new String(event.getBody(),encoding);
			
			KeyedMessage<String, String> data;
			
			if(StringUtils.isEmpty(partitionKey)){
				data = new KeyedMessage<String, String>(topic, eventData);
			}else{
				data = new KeyedMessage<String, String>(topic,partitionKey, eventData);
			}
			
			if(logger.isInfoEnabled()){
				logger.info("Send Message to Kafka : [" + eventData + "] -- [" + EventHelper.dumpEvent(event) + "]");
			}
			
			producer.send(data);
			tx.commit();
			status = Status.READY;
		}catch(Throwable t){
			tx.rollback();
			status = Status.BACKOFF;
			if(t instanceof Error){
				throw (Error)t;
			}
		}finally{
			tx.close();
		}
		
		return status;
	}

	@Override
	public void configure(Context context) {
		ImmutableMap<String, String> props = context.getParameters();
		
		parameters = new Properties();
		for(String key : props.keySet()){
			String value = props.get(key);
			parameters.put(key, value);
		}
	}

	
	@Override
	public synchronized void stop() {
		producer.close();
	}

}



3、把相关的kafka及scala包导入到flume的lib中。
分享到:
评论

相关推荐

    flume与kafka整合需要的jar包

    为了确保Flume与Kafka的稳定集成,还需要注意以下几点: 1. 确保Flume和Kafka版本兼容。不同版本之间可能存在API不兼容问题,选择相匹配的版本能避免潜在错误。 2. 配置好Kafka服务器的地址和端口(如`brokerList`)...

    企业大数据处理:Spark、Druid、Flume与Kafka应用实践(超清完整版).pdf

    ### 企业大数据处理:Spark、Druid、Flume与Kafka应用实践 #### Spark **Spark** 是一种用于大规模数据处理的开源分布式计算系统。它提供了比Hadoop MapReduce更快的性能,支持实时数据流处理、机器学习、图计算等...

    flume-kafka流程

    ### Flume-Kafka集成流程详解 #### 一、Flume与Kafka简介 - **Flume**:Flume是一款高可靠、高性能的日志采集、聚合和传输系统,支持在日志系统中定制各类数据发送方无缝地接入。 - **Kafka**:Kafka是一个分布式...

    lamp安装配置及flume+Kafka+Storm+HDFS实时系统搭分享

    《LNMP环境构建与Flume+Kafka+Storm+HDFS实时系统集成详解》 在当前的互联网时代,数据量的急剧增长使得大数据处理成为一项关键任务。本篇将深入探讨如何在Linux环境下搭建LNMP(Linux + Nginx + MySQL + PHP)...

    Flume+Kafka环境构建和实战.zip

    3. **Flume与Kafka集成**: - **配置Flume Source**:将Kafka作为Flume的数据源,需要配置为`kafka.Source`类型,指定Kafka的broker列表、topic等信息。 - **配置Flume Sink**:设置Flume将数据发送到Kafka,配置...

    log4j+flume+kafka+storm

    在集成Log4j与Flume的过程中,Avro Source是一种常用的方式。Avro Source允许远程客户端通过网络接口发送数据到Flume Agent。在这个场景中,Flume版本为1.5.2。 **步骤一:配置Flume** 首先需要对Flume的配置文件`...

    flume-kafka之日志数据模拟

    为了将Flume与Kafka对接,我们需要创建一个Flume配置文件,其中定义了源(Source)、通道(Channel)和 sink(Sink)。在这个例子中,源应该是监控日志文件的File Tailer Source,它能够监听指定目录下文件的变化。...

    flume+kafka搭建.docx

    Flume 的安装配置也是关键步骤之一,特别是为了与 Kafka 进行集成,选择合适的版本至关重要。 1. **下载安装 Flume**: - 建议下载 Flume 最新的 1.6.0 版本,因为此版本已内置了与 Kafka 集成的插件包。 - 下载...

    flume-kafka-storm源程序

    "flume-kafka-storm源程序"这个压缩包很可能是包含这三个组件的集成示例或者源代码,用于帮助开发者理解和实践这三者之间的协同工作。 **Flume**: Flume是Apache Hadoop项目的一个子项目,专门用于高效、可靠、...

    flume+kafka+storm搭建

    此外,搭建过程中也需要注意各个组件之间的网络通信问题,如Zookeeper与Kafka之间的通信、Kafka集群内部的通信,以及Flume和Kafka之间的数据交互。确保通信顺畅需要做好相应的网络安全和权限设置。 总结来看,利用...

    老男孩大数据kafka视频教程

    |____kafka第01天-09.flume与kafka集成.avi |____kafka第01天-08.通过java API编程实现kafka消息消费者.avi |____kafka第01天-07.通过java API编程实现kafka消息生产者.avi |____kafka第01天-06.kafka手动分区再平衡...

    flumeng-kafka-plugin:flumeng-kafka-plugin

    《Flume与Kafka集成:深入理解flumeng-kafka-plugin》 在大数据处理领域,Apache Flume 和 Apache Kafka 都扮演着至关重要的角色。Flume 是一款用于收集、聚合和移动大量日志数据的工具,而 Kafka 则是一个分布式流...

    flume-kafka:此插件用于将flume及kafka集成,其中flume支持的版本为flume-ng 1.3.1及以上, kafka为2.10_0.8.2.0及以上

    Flume-Kafka插件是Apache Flume与Apache Kafka之间的数据集成工具,主要用于高效地将Flume收集的日志数据传输到Kafka主题中。这个插件适用于Flume的新一代(ng)版本1.3.1及以上,同时兼容Kafka的发行版2.10及0.8....

    flume kafak实验报告.docx

    2. **Flume 与 Spark Streaming 集成**:在 DEA(可能是开发环境或服务器)上,通过 Maven 管理项目,将 `spark-streaming-flume-sink_2.11-2.4.2.jar` 添加到 Flume 安装目录的 `lib` 文件夹。由于文件权限问题,...

    全国首份接地气流处理文档,kafka,flume,整合

    #### 三、测试Flume-Kafka集成 - 在Flume所在的机器上创建一个测试文件,并将其移至Flume监控的目录下: ```bash echo "有问题咨询lsz2012bj@163.com" &gt; cs.txt mv cs.txt /home/hadoop/hh ``` - 观察Kafka消费...

    16:Flume+HBase+Kafka集成开发.rar

    总之,Flume+Kafka+HBase的集成能够构建一个稳定、高效的实时大数据处理系统,它既能处理海量数据的输入,又能快速地将数据存储到分布式数据库中,适用于各种大数据实时分析场景。在实践中,我们需要根据具体需求...

    java大数据内容_7Flume、Kafka、Sqoop、Lucene

    ### Java大数据内容_7Flume、Kafka、Sqoop、Lucene #### 一、Flume 入门 ##### 1.1 Flume 概述 Flume 是一个分布式的、可靠的、高可用的日志采集系统,主要用于收集、汇总和移动大量的日志数据。它由 Cloudera ...

    基于Spark+Flume+Kafka+Hbase的实时日志分析系统.zip

    《基于Spark+Flume+Kafka+...综上所述,"基于Spark+Flume+Kafka+Hbase的实时日志分析系统"是一个强大的日志处理框架,它将数据采集、实时处理、消息传输和数据存储紧密集成,为电商系统提供了高效的数据分析解决方案。

    快速学习-Flume 对接 Kafka

    通过Flume与Kafka的结合,我们可以实现大规模日志数据的实时流处理。 首先,我们需要配置Flume来连接到Kafka。在配置文件`flume-kafka.conf`中,我们定义了三个关键组件:源(source)、通道(channel)和接收器...

    基于spark+flume+kafka+hbase的实时日志处理分析系统.zip

    《构建基于Spark+Flume+Kafka+HBase的实时日志处理分析系统》 在当前大数据时代,实时数据处理已经成为企业决策与运营的关键。本项目以“基于Spark+Flume+Kafka+HBase的实时日志处理分析系统”为主题,旨在提供一个...

Global site tag (gtag.js) - Google Analytics