`
raymond.chen
  • 浏览: 1425662 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

Flume使用范例

 
阅读更多

范例主要的处理流程说明如下:

    1、web应用的日志信息实时输出到ActiveMQ

    2、Flume的Source从MQ指定队列中获取消息,并提交到内存型Channel中

    3、自定义Sink从Channel提取event,对event进行转换处理,并写入到Oracle数据库


 开发自定义的Sink,编译打包成jar包,并上传到/opt/apache-flume-1.6.0-bin/plugins.d/flume-oracle-sink/lib,自定义sink的存放目录规范请查阅官方相关文档。

 

自定义Sink的代码骨架如下:

public class OracleSink extends AbstractSink implements Configurable {
	private int batchSize;
	private String charset;
	
	public void configure(Context context) {
		this.batchSize = context.getInteger(OracleSinkConfiguration.BATCH_SIZE, new Integer(OracleSinkConfiguration.BATCH_SIZE_DEFAULT)).intValue();
		this.charset = context.getString(OracleSinkConfiguration.CHARSET, OracleSinkConfiguration.CHARSET_DEFAULT);
	}
	
	public synchronized void start() {
		super.start();
	}

	public Status process() throws EventDeliveryException {
	    Sink.Status result = Sink.Status.READY;
	    Channel channel = getChannel();
	    Transaction transaction = null;
	    Event event = null;
	    
	    try{
	    	transaction = channel.getTransaction();
	      	transaction.begin();

	      	for(int i=0; i<this.batchSize; i++){
		        event = channel.take();
		        //Map headers = event.getHeaders();
		        
	      		if(event == null){
	      			break;
	      		}else{
		      		byte[] eventBody = event.getBody();
					String bodyString = new String(eventBody, this.charset);
					
					//something code here
	      		}
	      	}
	        
	        transaction.commit();
	    } catch (Exception ex) {
	    	System.out.println(ex.toString());x
	    	result = Sink.Status.BACKOFF;
	    	if (transaction != null) {
		        try {
		        	transaction.rollback();
		        } catch (Exception e) {
		        	throw Throwables.propagate(e);
		        }
	    	}
	    } finally {
	    	if (transaction != null) {
	    		transaction.close();
	    	}
	    }
	    
	    return result;
	}
	
	public synchronized void stop() {
		super.stop();
	}
}

 

 在Flume根目录的config文件夹内新建一个flume config文件,内容如下:

# Name the components on this agent
agent.sources = source1
agent.sinks = sink1 sink2
agent.channels = channel1


#source
agent.sources.source1.type = jms
agent.sources.source1.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory
agent.sources.source1.connectionFactory = ConnectionFactory
agent.sources.source1.providerURL = failover://(tcp://192.168.247.2:61616?tcpNoDelay=true)
agent.sources.source1.destinationType = QUEUE
agent.sources.source1.destinationName = my_queue
agent.sources.source1.batchSize = 200
agent.sources.source1.pollTimeout = 2000

 
#channel
#agent.channels.channel1.type = file   
#agent.channels.channel1.checkpointDir = /tmp/flume/loadcheckpoint  
#agent.channels.channel1.dataDirs = /tmp/flume/loaddata
#agent1.channels.channel1.capacity = 1000
#agent1.channels.channel1.transactionCapactiy = 100

agent.channels.channel1.type = memory
agent.channels.channel1.capacity = 10000
agent.channels.channel1.transactionCapacity = 200


#sinkgroups
agent.sinkgroups = g1
#agent.sinkgroups.g1.sinks = sink1 sink2
#agent.sinkgroups.g1.processor.type = failover
#agent.sinkgroups.g1.processor.priority.sink1 = 10
#agent.sinkgroups.g1.processor.priority.sink2 = 5
#agent.sinkgroups.g1.processor.maxpenalty = 10000

agent.sinkgroups.g1.sinks = sink1 sink2
agent.sinkgroups.g1.processor.type = load_balance
agent.sinkgroups.g1.processor.backoff = true
agent.sinkgroups.g1.processor.selector = round_robin

# Describe the sink
agent.sinks.sink1.type = com.cjm.flume.oraclesink.OracleSink
agent.sinks.sink1.username = cjm
agent.sinks.sink1.password = 111

agent.sinks.sink2.type = com.cjm.flume.oraclesink.OracleSink
agent.sinks.sink2.username = cjm
agent.sinks.sink2.password = 222
agent.sinks.sink2.batchSize= 100
agent.sinks.sink2.charset = UTF-8


# Bind the source and sink to the channel
agent.sources.source1.channels = channel1
agent.sinks.sink1.channel = channel1
agent.sinks.sink2.channel = channel1

 

启动ActiveMQ

 

启动Flume

    cd /opt/apache-flume-1.6.0-bin

    bin/flume-ng agent -c conf -f conf/flume-agent.conf -n agent -Dflume.root.logger=INFO,console

 

    后台服务方式启动 nohup bin/flume-ng agent -c conf -f conf/flume-agent.conf -n agent  >flume.log &

 

模拟发送信息到MQ

public class UMProducer {
	private MessageProducer producer = null;
	private Connection connection = null;
	private Session session = null;
	private String data = null;
	
	public void start(){
		try{
			InputStream in = UMProducer.class.getResourceAsStream("/data.txt");
			this.data = IOUtils.toString(in);
			
			ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(  
	                ActiveMQConnection.DEFAULT_USER,  
	                ActiveMQConnection.DEFAULT_PASSWORD,  
	                "failover://(tcp://192.168.247.2:61616?tcpNoDelay=true)"); 

			connection = connectionFactory.createConnection();  
			((ActiveMQConnection)connection).setUseAsyncSend(true);
			connection.start(); 
			
			session = connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE);
			Destination destination = session.createQueue("my_queue");
			
			producer = session.createProducer(destination);
			producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
			producer.setTimeToLive(1000 * 60 * 60 * 24);
			
			while(true){
				TextMessage message = session.createTextMessage(this.data);
		        producer.send(message);
			}
			
		}catch(Exception ex){
			ex.printStackTrace();
		}
	}
	
	public void stop(){
		try{
			if(session != null) {
				session.close();  
			}
			
			if(connection != null) {
				connection.close();
			}
			
			if(producer != null){
				producer.close();
			}
		}catch(Exception ex){
			ex.printStackTrace();
		}
	}
	
	public static void main(String[] args) {
		UMProducer producer = new UMProducer();
		producer.start();
	}
}

 

  • 大小: 13.3 KB
分享到:
评论

相关推荐

    星环大数据平台_Flume使用方法.pdf

    星环大数据平台_Flume使用方法,基于星环的大数据平台开发手册,flume使用方法,hadoop大数据平台,内部培训文档

    flume集群搭建与使用文档

    Flume使用 1. Flume Agent启动:使用flume-agent.sh脚本启动Flume Agent。 2. 数据传输:Flume Agent从source读取日志数据,并将其传输到sink。 3. 数据处理:sink将日志数据处理后写入HDFS。 4. 负载均衡:Flume ...

    让你快速认识flume及安装和使用flume1 5传输数据 日志 到hadoop2 2 文档

    让你快速认识flume及安装和使用flume1 5传输数据 日志 到hadoop2 2 中文文档 认识 flume 1 flume 是什么 这里简单介绍一下 它是 Cloudera 的一个产品 2 flume 是干什么的 收集日志的 3 flume 如何搜集日志 我们把...

    flume-ng安装

    使用 wget 命令下载 Flume-NG 的安装包: `wget http://mirror.esocc.com/apache/flume/1.4.0/apache-flume-1.4.0-bin.tar.gz` 3. 解压 Flume-NG 使用 tar 命令解压 Flume-NG 的安装包: `tar -xzvf apache-flume-...

    flume 安装和使用

    Flume是Cloudera提供的一个高可用、高可靠、分布式的海量日志采集、聚合和传输的系统。Flume支持在日志系统中定制各类数据发送方用于收集数据,同时Flume提供对数据的简单处理,并将数据处理结果写入各种数据接收方...

    尚硅谷大数据技术之Flume

    如果需要关心数据丢失,那么 Memory Channel 就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。 1.2.4 Sink ...

    Flume-ng在windows环境搭建并测试+log4j日志通过Flume输出到HDFS.docx

    在本文中,我们将介绍如何在 Windows 环境下搭建 Flume-ng,并使用 Log4j 将日志输出到 HDFS。 一、Flume-ng 安装与配置 首先,需要下载 Flume-ng 并解压到指定目录。然后,需要设置环境变量,新建 FLUME_HOME ...

    Flume安装详细步骤

    Flume安装详细步骤 Flume是一款基于Java的分布式日志收集系统,主要用于收集和传输大规模日志数据。下面是Flume安装的详细步骤: Step 1: 安装JDK环境 ...现在,我们可以使用Flume来收集和传输日志数据了。

    Flume1.8安装部署

    1. 启动 Flume:使用命令 `cd $FLUME_HOME/conf`,然后使用命令 `flume-ng a` 启动 Flume。 五、Flume-Ng 组件概述 1. Flume-Ng:Flume 的下一代版本,提供了更好的性能和可扩展性。 2. Flume-Ng 组件:包括 Agent...

    大数据教程-Flume安装使用实录.pdf

    大数据教程-Flume安装使用实录,安装、部署细节详细步骤

    flume自学文档.pdf

    Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集...由于Flume-ng经过重大重构,与Flume-og有很大不同,使用时请注意区分。

    windows下flume1.7使用文档

    在windows端使用flume1.7同步上传数据到hdfs的说明

    Flume1.6.0入门:安装、部署、及flume的案例

    Flume1.6.0入门:安装、部署、及flume的案例

    Flume 安装搭建使用.

    Flume 安装搭建使用 Flume 是一个分布式的、可靠的、高效的日志收集、聚集、移动服务。它可以实时处理大数据量的日志,提供了一个灵活的架构来处理大数据。 一、Flume 简介 Flume 是一个基于流式架构的服务,能够...

    springboot_log4j2_flume

    Log4j直接发送数据到Flume + Kafka (方式一) 通过flume收集系统日记, 收集的方式通常采用以下. 系统logs直接发送给flume系统, 本文主要记录种方式进行说明. 文章链接,请看:...

    Flume集群搭建1

    启动命令位于 Flume 安装目录的 `bin` 子目录下,使用 `flume-ng agent` 命令,指定配置文件启动服务。 为了验证 Flume 集群的工作,可以在 hadoop12 的 `logs` 目录下创建一个测试文件,然后观察 hadoop13 上 `...

    Flume构建高可用、可扩展的海量日志采集系统

    Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集...由于Flume-ng经过重大重构,与Flume-og有很大不同,使用时请注意区分。

    集群flume详细安装步骤

    使用以下命令启动 Flume -Agent: ``` bin/flume-ng agent --conf conf --conf-file conf/flume.conf --name a1 -Dflume.root.logger=INFO,console ``` 测试 Flume 创建一个文件 `1.log` 并写入 `hello flume`,...

    Flume1.7.0用户指南

    Avro 客户端与Flume的AvroSource配合使用,而Thrift客户端则提供另一种选择。开发者可以根据需求选择合适的通信方式来构建自定义客户端。 总之,Apache Flume 1.7.0 是一个强大的日志管理和传输工具,其核心特性...

Global site tag (gtag.js) - Google Analytics