tail-to-avro
agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1 # Describe/configure spooldir source1 #agent1.sources.source1.type = spooldir #agent1.sources.source1.spoolDir = /var/log/apache/flumeSpool1 #agent1.sources.source1.fileHeader = true # Describe/configure tail -F source1 agent1.sources.source1.type = exec agent1.sources.source1.command = tail -n +0 -F /tmp/log.log agent1.sources.source1.channels = channel1 # Describe/configure nc source1 #agent1.sources.source1.type = netcat #agent1.sources.source1.bind = localhost #agent1.sources.source1.port = 44444 #configure host for source agent1.sources.source1.interceptors = i1 agent1.sources.source1.interceptors.i1.type = host agent1.sources.source1.interceptors.i1.hostHeader = hostname # Describe logger sink1 #agent1.sinks.sink1.type = logger # Describe avro sink1 agent1.sinks.sink1.type = avro agent1.sinks.sink1.hostname = 172.16.10.175 agent1.sinks.sink1.port = 4545 # Use a channel which buffers events in memory agent1.channels.channel1.type = memory agent1.channels.channel1.keep-alive = 120 agent1.channels.channel1.capacity = 500000 agent1.channels.channel1.transactionCapacity = 600 # Bind the source and sink to the channel agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1
avro-to-rollfile
agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1 # Describe/configure avro source agent1.sources.source1.type = avro agent1.sources.source1.bind =172.16.10.175 agent1.sources.source1.port = 4545 # Describe logger sink1 #agent1.sinks.sink1.type = logger # Describe file sink1 agent1.sinks.sink1.type = file_roll agent1.sinks.sink1.sink.directory = /var/log/flume # Use a channel which buffers events in memory agent1.channels.channel1.type = memory agent1.channels.channel1.keep-alive = 120 agent1.channels.channel1.capacity = 500000 agent1.channels.channel1.transactionCapacity = 600 # Bind the source and sink to the channel agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1
启动:
./flume-ng agent -c /usr/local/flumeng/apache-flume-1.5.0-bin/conf/ -f /usr/local/flumeng/apache-flume-1.5.0-bin/conf/flume-single.properties -n agent1 -Dflume.root.logger=INFO,console
亲自操作如下:
source配置(接收):
# The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'a #agent section producer.sources = s producer.channels = c producer.sinks = r #producer.sources.s.type = seq producer.sources.s.channels = c #producer.sources.s.type = exec #producer.sources.s.command=tail -n +0 -F /usr/local/nginx/nginxlog/access.log producer.sources.s.deletePolicy=never producer.sources.s.type = avro producer.sources.s.bind = localhost producer.sources.s.port = 4545 # Each sink's type must be defined(给谁了) #producer.sinks.r.type = avro #producer.sinks.r.hostname = 10.1.1.100 #producer.sinks.r.port = 20000 producer.sinks.r.type = org.xx.clickstream.sink.kafka.KafkaSink producer.sinks.r.zk.connect = 127.0.0.1:2181 producer.sinks.r.metadata.broker.list=127.0.0.1:9092 producer.sinks.r.partitioner.class=org.xx.clickstream.partition.TypePartitioner producer.sinks.r.serializer.class=kafka.serializer.StringEncoder producer.sinks.r.request.required.acks=1 producer.sinks.r.max.message.size=1000000 producer.sinks.r.producer.type=sync producer.sinks.r.custom.encoding=UTF-8 #Specify the channel the sink should use producer.sinks.r.channel = c # Each channel's type is defined. producer.channels.c.type = memory producer.channels.c.capacity = 1000000 producer.channels.c.transactionCapacity = 1000000 #producer.channels.c.type=file #producer.channels.c.checkpointDir=/usr/local/flumeng/checkpointdir/tcpdir/example_agent #producer.channels.c.dataDirs=/usr/local/flumeng/datadirs/tddirs/example_agen
sink配置(发送):
# The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'a #agent section producer.sources = s producer.channels = c producer.sinks = r #producer.sources.s.type = seq producer.sources.s.channels = c producer.sources.s.type = exec producer.sources.s.command=tail -n +0 -F /usr/local/nginx/nginxlog/access.log producer.sources.s.deletePolicy=never #producer.sources.s.type = avro #producer.sources.s.bind = localhost #producer.sources.s.port = 10000 # Each sink's type must be defined(给谁了) producer.sinks.r.type = avro producer.sinks.r.hostname = localhost producer.sinks.r.port = 4545 #producer.sinks.r.type = org.xx.clickstream.sink.kafka.KafkaSink #producer.sinks.r.zk.connect = 127.0.0.1:2181 #producer.sinks.r.metadata.broker.list=127.0.0.1:9092 #producer.sinks.r.partitioner.class=org.xx.clickstream.partition.TypePartitioner #producer.sinks.r.serializer.class=kafka.serializer.StringEncoder #producer.sinks.r.request.required.acks=1 producer.sinks.r.max.message.size=1000000 producer.sinks.r.producer.type=sync producer.sinks.r.custom.encoding=UTF-8 #Specify the channel the sink should use producer.sinks.r.channel = c # Each channel's type is defined. producer.channels.c.type = memory producer.channels.c.capacity = 1000000 producer.channels.c.transactionCapacity = 1000000 #producer.channels.c.type=file #producer.channels.c.checkpointDir=/usr/local/flumeng/checkpointdir/tcpdir/example_agent #producer.channels.c.dataDirs=/usr/local/flumeng/datadirs/tddirs/example_agen
启动顺序,先启动source接收,再启动sink
#先启动接收source,准备好接收 #./flume-ng agent -c /usr/local/flumeng/apache-flume-1.5.2-bin/conf/ -f /usr/local/flumeng/apache-flume-1.5.2-bin/conf/flume-avrosource.properties -n producer -Dflume.root.logger=INFO,console #再启动发送sink,发送 #./flume-ng agent -c /usr/local/flumeng/apache-flume-1.5.2-bin/conf/ -f /usr/local/flumeng/apache-flume-1.5.2-bin/conf/flume-avrosink.properties -n producer -Dflume.root.logger=INFO,console
相关推荐
它被设计用来高效地处理大量数据流,能够从多个源头收集数据并将其传输至不同的存储系统中。Flume支持自定义数据发送端,便于收集不同类型的数据;此外,它还提供了对数据进行初步处理的功能,以及将数据写入到各种...
具体到配置文件的编写,我们先通过实例讲解一个简单的单节点Flume配置。例如,一个名为a1的Agent,它有一个netcat类型的数据源r1,绑定在所有网络接口上的44444端口,一个内存类型的Channel c1以及一个logger类型的...
然而,在后续的 Flume-ng 版本中,去掉了集中式的 Master 和 Zookeeper,使得 Flume 成为一个更纯粹的数据传输工具。在这个版本中,读入和写出数据由不同的工作线程(Runner)处理,提高了系统性能,避免了因写出慢...
它负责将数据从通道移除并传输到目的地,可能是另一个 Flume 实例、HDFS(Hadoop 分布式文件系统)、HBase、Kafka 或其他存储或处理系统。接收器可以是单个实例,也可以是一个集群,以实现高可用性和负载均衡。 **...
主要涵盖了配置exec-avro-new.conf和avro-hdfs.conf两个配置文件、确保所有节点间Jar包版本一致的操作以及启动相应的Agent来实现日志数据流的传输过程。文中特别提到,当完成这些设置以后,在每个参与节点都会创建一...
此外,为了保证服务的高可用性和容错性,可以配置多个副本,使用负载均衡策略,确保即使单个节点故障,数据也能正常流动。 总结来说,“FtpFlumeMysql”项目展示了如何利用Java编程和Apache Flume的特性,构建一个...
- **Flume Avro Client开发**:指导如何开发Flume Avro Client,实现数据发送功能。 - **Flume和Kafka的整合**:解释如何将Flume与Kafka结合使用,构建高效的数据流管道。 #### 八、Zookeeper分布式协调服务 - **...
通过Flume、Kafka、HDFS等技术的结合使用,可以有效地解决大规模日志数据的实时采集、传输、存储和分析问题,为数据分析人员提供强大的数据支持,帮助企业挖掘用户行为模式,制定符合用户习惯的产品或服务策略,从而...