范例主要的处理流程说明如下:
1、web应用的日志信息实时输出到ActiveMQ
2、Flume的Source从MQ指定队列中获取消息,并提交到内存型Channel中
3、自定义Sink从Channel提取event,对event进行转换处理,并写入到Oracle数据库
开发自定义的Sink,编译打包成jar包,并上传到/opt/apache-flume-1.6.0-bin/plugins.d/flume-oracle-sink/lib,自定义sink的存放目录规范请查阅官方相关文档。
自定义Sink的代码骨架如下:
public class OracleSink extends AbstractSink implements Configurable { private int batchSize; private String charset; public void configure(Context context) { this.batchSize = context.getInteger(OracleSinkConfiguration.BATCH_SIZE, new Integer(OracleSinkConfiguration.BATCH_SIZE_DEFAULT)).intValue(); this.charset = context.getString(OracleSinkConfiguration.CHARSET, OracleSinkConfiguration.CHARSET_DEFAULT); } public synchronized void start() { super.start(); } public Status process() throws EventDeliveryException { Sink.Status result = Sink.Status.READY; Channel channel = getChannel(); Transaction transaction = null; Event event = null; try{ transaction = channel.getTransaction(); transaction.begin(); for(int i=0; i<this.batchSize; i++){ event = channel.take(); //Map headers = event.getHeaders(); if(event == null){ break; }else{ byte[] eventBody = event.getBody(); String bodyString = new String(eventBody, this.charset); //something code here } } transaction.commit(); } catch (Exception ex) { System.out.println(ex.toString());x result = Sink.Status.BACKOFF; if (transaction != null) { try { transaction.rollback(); } catch (Exception e) { throw Throwables.propagate(e); } } } finally { if (transaction != null) { transaction.close(); } } return result; } public synchronized void stop() { super.stop(); } }
在Flume根目录的config文件夹内新建一个flume config文件,内容如下:
# Name the components on this agent agent.sources = source1 agent.sinks = sink1 sink2 agent.channels = channel1 #source agent.sources.source1.type = jms agent.sources.source1.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory agent.sources.source1.connectionFactory = ConnectionFactory agent.sources.source1.providerURL = failover://(tcp://192.168.247.2:61616?tcpNoDelay=true) agent.sources.source1.destinationType = QUEUE agent.sources.source1.destinationName = my_queue agent.sources.source1.batchSize = 200 agent.sources.source1.pollTimeout = 2000 #channel #agent.channels.channel1.type = file #agent.channels.channel1.checkpointDir = /tmp/flume/loadcheckpoint #agent.channels.channel1.dataDirs = /tmp/flume/loaddata #agent1.channels.channel1.capacity = 1000 #agent1.channels.channel1.transactionCapactiy = 100 agent.channels.channel1.type = memory agent.channels.channel1.capacity = 10000 agent.channels.channel1.transactionCapacity = 200 #sinkgroups agent.sinkgroups = g1 #agent.sinkgroups.g1.sinks = sink1 sink2 #agent.sinkgroups.g1.processor.type = failover #agent.sinkgroups.g1.processor.priority.sink1 = 10 #agent.sinkgroups.g1.processor.priority.sink2 = 5 #agent.sinkgroups.g1.processor.maxpenalty = 10000 agent.sinkgroups.g1.sinks = sink1 sink2 agent.sinkgroups.g1.processor.type = load_balance agent.sinkgroups.g1.processor.backoff = true agent.sinkgroups.g1.processor.selector = round_robin # Describe the sink agent.sinks.sink1.type = com.cjm.flume.oraclesink.OracleSink agent.sinks.sink1.username = cjm agent.sinks.sink1.password = 111 agent.sinks.sink2.type = com.cjm.flume.oraclesink.OracleSink agent.sinks.sink2.username = cjm agent.sinks.sink2.password = 222 agent.sinks.sink2.batchSize= 100 agent.sinks.sink2.charset = UTF-8 # Bind the source and sink to the channel agent.sources.source1.channels = channel1 agent.sinks.sink1.channel = channel1 agent.sinks.sink2.channel = channel1
启动ActiveMQ
启动Flume
cd /opt/apache-flume-1.6.0-bin
bin/flume-ng agent -c conf -f conf/flume-agent.conf -n agent -Dflume.root.logger=INFO,console
后台服务方式启动 nohup bin/flume-ng agent -c conf -f conf/flume-agent.conf -n agent >flume.log &
模拟发送信息到MQ
public class UMProducer { private MessageProducer producer = null; private Connection connection = null; private Session session = null; private String data = null; public void start(){ try{ InputStream in = UMProducer.class.getResourceAsStream("/data.txt"); this.data = IOUtils.toString(in); ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "failover://(tcp://192.168.247.2:61616?tcpNoDelay=true)"); connection = connectionFactory.createConnection(); ((ActiveMQConnection)connection).setUseAsyncSend(true); connection.start(); session = connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE); Destination destination = session.createQueue("my_queue"); producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); producer.setTimeToLive(1000 * 60 * 60 * 24); while(true){ TextMessage message = session.createTextMessage(this.data); producer.send(message); } }catch(Exception ex){ ex.printStackTrace(); } } public void stop(){ try{ if(session != null) { session.close(); } if(connection != null) { connection.close(); } if(producer != null){ producer.close(); } }catch(Exception ex){ ex.printStackTrace(); } } public static void main(String[] args) { UMProducer producer = new UMProducer(); producer.start(); } }
相关推荐
星环大数据平台提供的Flume使用方法文档是一份面向数据工程师的培训材料,旨在教授如何安装和使用Flume进行数据的分布式采集。文档详细介绍了Flume组件和配置,并通过实验步骤,帮助工程师理解和掌握数据采集的流程...
启动 Flume Agent 使用 `bin/flume-ng agent` 命令,指定配置文件、Agent 名称以及日志级别。在上面的例子中,命令是 `bin/flume-ng agent --conf conf --conf-file conf/flume.conf --name a1 -Dflume.root.logger=...
Flume使用 1. Flume Agent启动:使用flume-agent.sh脚本启动Flume Agent。 2. 数据传输:Flume Agent从source读取日志数据,并将其传输到sink。 3. 数据处理:sink将日志数据处理后写入HDFS。 4. 负载均衡:Flume ...
3. **重命名解压后的文件夹**:为了方便管理和使用,通常需要将解压后的文件夹重命名为一个简短易记的名字,如文档中提到的使用命令`mv apache-flume-1.6.0-bin flume`将文件夹重命名为`flume`。 4. **设置环境变量...
使用 wget 命令下载 Flume-NG 的安装包: `wget http://mirror.esocc.com/apache/flume/1.4.0/apache-flume-1.4.0-bin.tar.gz` 3. 解压 Flume-NG 使用 tar 命令解压 Flume-NG 的安装包: `tar -xzvf apache-flume-...
让你快速认识flume及安装和使用flume1 5传输数据 日志 到hadoop2 2 中文文档 认识 flume 1 flume 是什么 这里简单介绍一下 它是 Cloudera 的一个产品 2 flume 是干什么的 收集日志的 3 flume 如何搜集日志 我们把...
- **池伟的博客:** 池伟先生在CSDN上发表了多篇关于Flume的详细教程和经验分享,如链接所示,这些文章详细讲解了Flume的安装配置、架构设计以及使用中碰到的问题和解决方案。 ### 知识点总结: 1. **Flume发展历史...
### Windows 下 Flume 1.7 的使用指南 #### Apache Flume 概述 Apache Flume 是一个分布式的、可靠的且高可用的系统,用于从不同的数据源收集、汇总和传输大量的日志数据到集中式的数据存储中心。Flume 的设计目的...
在使用前,你需要根据你的需求配置Flume的配置文件,例如`conf/flume.conf`,定义数据流的源(source)、通道(channel)和接收器(sink)。 在部署Flume时,有几点需要注意: 1. **配置**: 配置文件定义了数据流动...
在本文中,我们将介绍如何在 Windows 环境下搭建 Flume-ng,并使用 Log4j 将日志输出到 HDFS。 一、Flume-ng 安装与配置 首先,需要下载 Flume-ng 并解压到指定目录。然后,需要设置环境变量,新建 FLUME_HOME ...
如果需要关心数据丢失,那么 Memory Channel 就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。 1.2.4 Sink ...
由于文件权限问题,可能需要给予该 jar 包 `777` 权限,以确保 Flume 能够正确启动并使用 Spark Streaming。 3. **Flume 的配置**:在 Flume 的配置文件中,定义 SparkSink,指定 Spark 集群的相关参数,如 master ...
- 引入必要的pom.xml依赖,并使用 `mvn package` 命令打包JAR文件,放入Flume的lib目录下。 3. **编写Storm代码:** - 开发一个Storm拓扑,该拓扑将从Kafka中消费数据,并进行实时分析。 4. **启动服务:** - ...
### Flume的安装与使用详解 #### 一、Flume简介 Apache Flume 是一个分布式的、可靠的、高可用的服务,用于有效地收集、聚合和移动大量日志数据。Flume 支持简单灵活的配置,这使得它可以适用于各种场景中的数据...
- **容错性**:Flume使用持久化通道(如文件通道)来确保在故障时不会丢失数据。 - **数据聚合**:Flume可以集成HDFS、HBase等大数据存储系统,实现数据的实时聚合和存储。 7. **集成其他服务**: Flume可以...
Flume安装详细步骤 Flume是一款基于Java的分布式日志收集系统,主要用于收集和传输大规模日志数据。下面是Flume安装的详细步骤: Step 1: 安装JDK环境 ...现在,我们可以使用Flume来收集和传输日志数据了。
1. 启动 Flume:使用命令 `cd $FLUME_HOME/conf`,然后使用命令 `flume-ng a` 启动 Flume。 五、Flume-Ng 组件概述 1. Flume-Ng:Flume 的下一代版本,提供了更好的性能和可扩展性。 2. Flume-Ng 组件:包括 Agent...
将这些库添加到 Flume 的类路径中,可以让 Flume 正确识别和使用新的 API。 4. **配置 Flume**: 修改 Flume 配置文件,确保所有配置项(如 HBase 表名、列族、主机名等)都正确无误,同时设置正确的连接参数,以便 ...
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集...由于Flume-ng经过重大重构,与Flume-og有很大不同,使用时请注意区分。