`
gaojingsong
  • 浏览: 1201369 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
文章分类
社区版块
存档分类
最新评论

【实战Apache-Flume采集DB数据到kafka】

阅读更多

Flume是一个优秀的数据采集组件,有些重量级,其本质也是根据SQL语句的查询结果组装成opencsv格式的数据,默认的分隔符号是逗号(,),可以重写opencsv某些类进行修改

 

1、下载

[root@hadoop0 bigdata]# wget  http://apache.fayea.com/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz

 

2、解压缩

[root@hadoop0 bigdata]# tar -zxvf apache-flume-1.6.0-bin.tar.gz 

[root@hadoop0 bigdata]# ls

apache-flume-1.6.0-bin         apache-hive-2.0.1-bin.tar.gz  hadoop272    hbase-1.1.5-bin.tar.gz  kafka        sqoop-1.4.6.bin__hadoop-2.0.4-alpha.tar.gz  taokeeper-monitor.tar.gz  zookeeper

apache-flume-1.6.0-bin.tar.gz  apache-tomcat-7.0.69.zip      hbase-1.1.5  hive2.0                 sqoop-1.4.6  stomr096                                    tomcat7                   zookeeper.out

 

 

3、编译flume-ng-sql.jar

flume-ng-sql-source-develop_1.2.1 作者 :@author Luis Lázaro <lalazaro@keedio.com>

 

 <groupId>org.keedio.flume.flume-ng-sources</groupId>

 <artifactId>flume-ng-sql-source</artifactId>

 <version>1.2.1-SNAPSHOT</version>



 

 

4、配置数据源(两个作者的FlumeSink)

[root@hadoop0 apache-flume-1.6.0-bin]# vi conf/agent.conf 

agent.sources = sql-source

agent.channels=c1

agent.sinks=r

 

agent.sources.sql-source.type = org.keedio.flume.source.SQLSource

# URL to connect to database (currently only mysql is supported)

agent.sources.sql-source.connection.url = jdbc:mysql://192.168.1.100:3306/test

# Database connection properties

agent.sources.sql-source.user = root

agent.sources.sql-source.password = 123

agent.sources.sql-source.table = sdfs

agent.sources.sql-source.database = database

# Columns to import to kafka (default * import entire row)

agent.sources.sql-source.columns.to.select = *

# Increment column properties

agent.sources.sql-source.incremental.column.name = id

# Increment value is from you want to start taking data from tables (0 will import entire table)

agent.sources.sql-source.incremental.value = 0

# Query delay, each configured milisecond the query will be sent

agent.sources.sql-source.run.query.delay=10000

# Status file is used to save last readed row

agent.sources.sql-source.status.file.path = /tmp

agent.sources.sql-source.status.file.name = sql-source.status

#Custom query

agent.sources.sql-source.custom.query = SELECT * FROM users WHERE 1=1 AND @

agent.sources.sql-source.batch.size = 1000

agent.sources.sql-source.max.rows = 10000

 

 

agent.channels.c1.type = memory

agent.channels.c1.capacity = 100

agent.channels.c1.transactionCapacity = 100

agent.channels.c1.byteCapacityBufferPercentage = 20

agent.channels.c1.byteCapacity = 800

 

#flume-ng-kafka-sink-1.6.0.jar

#agent.sinks.r.type = org.apache.flume.sink.kafka.KafkaSink

#agent.sinks.r.brokerList=localhost:9092

#agent.sinks.r.batchSize=1

#agent.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition

#agent.sinks.r.serializer.class=kafka.serializer.StringEncoder

#agent.sinks.r.requiredAcks=0

#agent.sinks.r.topic=test

 

 

#gitHub  beyondj2ee  flumeng-kafka-plugin.jar

agent.sinks.r.type = org.apache.flume.plugins.KafkaSink

agent.sinks.r.metadata.broker.list=localhost:9092

agent.sinks.r.partition.key=0

agent.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition

agent.sinks.r.serializer.class=kafka.serializer.StringEncoder

agent.sinks.r.request.required.acks=0

agent.sinks.r.max.message.size=1000000

agent.sinks.r.producer.type=sync

agent.sinks.r.custom.encoding=UTF-8

agent.sinks.r.custom.topic.name=test

 

 

 

5、准备好数据库



 

6、启动zookeeper

[root@hadoop0 ~]# cd /opt/bigdata/

[root@hadoop0 bigdata]# ls

apache-flume-1.6.0-bin         apache-hive-2.0.1-bin.tar.gz  hadoop272    hbase-1.1.5-bin.tar.gz  kafka        sqoop-1.4.6.bin__hadoop-2.0.4-alpha.tar.gz  taokeeper-monitor.tar.gz  zookeeper

apache-flume-1.6.0-bin.tar.gz  apache-tomcat-7.0.69.zip      hbase-1.1.5  hive2.0                 sqoop-1.4.6  stomr096                                    tomcat7                   zookeeper.out

[root@hadoop0 bigdata]# cd zookeeper/bin/

[root@hadoop0 bin]# ./zkServer.sh start 

JMX enabled by default

Using config: /opt/bigdata/zookeeper/bin/../conf/zoo.cfg

Starting zookeeper ... STARTED

 

 

7、启动kafka

[root@hadoop0 bin]# cd ../../kafka/bin/

[root@hadoop0 bin]# ./kafka-server-start.sh  ../config/server.properties  &

[1] 32613

[root@hadoop0 bin]# [1999-05-25 12:34:44,651] INFO KafkaConfig values: 

        request.timeout.ms = 30000

        log.roll.hours = 168

        inter.broker.protocol.version = 0.9.0.X

        log.preallocate = false

        security.inter.broker.protocol = PLAINTEXT

        controller.socket.timeout.ms = 30000

        broker.id.generation.enable = true

        ssl.keymanager.algorithm = SunX509

        ssl.key.password = null

        log.cleaner.enable = true

        ssl.provider = null

[root@hadoop0 bin]# ./kafka-topics.sh  --zookeeper localhost --list

test

 

 

8、启动Flume

[root@hadoop0 apache-flume-1.6.0-bin]# rm -rf /tmp/sql-source.status 

[root@hadoop0 apache-flume-1.6.0-bin]# ./bin/flume-ng agent -n agent  -c conf -f conf/agent.conf -Dflume.root.logger=INFO,console

Info: Including Hadoop libraries found via (/opt/bigdata/hadoop272/bin/hadoop) for HDFS access

Info: Excluding /opt/bigdata/hadoop272/share/hadoop/common/lib/slf4j-api-1.7.10.jar from classpath

Info: Excluding /opt/bigdata/hadoop272/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar from classpath

Info: Including HBASE libraries found via (/opt/bigdata/hbase-1.1.5/bin/hbase) for HBASE access

Info: Excluding /opt/bigdata/hbase-1.1.5/lib/slf4j-api-1.7.7.jar from classpath

Info: Excluding /opt/bigdata/hbase-1.1.5/lib/slf4j-log4j12-1.7.5.jar from classpath

Info: Excluding /opt/bigdata/hadoop272/share/hadoop/common/lib/slf4j-api-1.7.10.jar from classpath

Info: Excluding /opt/bigdata/hadoop272/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar from classpath

Info: Including Hive libraries found via (/opt/bigdata/hive2.0) for Hive access



 

 

9、准备消费者消费数据

[root@hadoop0 bin]# ./kafka-console-consumer.sh  --zookeeper localhost --topic test --from-beginning

test-message

gaojs

杞欢璁捐

1

2

gaojs

nihao

tesdhdhsdhgf

vdxgdgsdg

dfhfdhd

gaojs

gaojingsong

2015-09-02342

535435353

"1","zhangsan","12","17-May-2016 20:06:38"

"3","444","23","17-May-2016 20:06:38"

"4","wan-flume","23","17-May-2016 20:06:38"

"5","gaojs-flume","23","17-May-2016 20:06:38"

"1","zhangsan","12","17-May-2016 20:06:38"

"3","444","23","17-May-2016 20:06:38"

"4","wan-flume","23","17-May-2016 20:06:38"

"5","gaojs-flume","23","17-May-2016 20:06:38"

 

10、结果验证



 

 

 

启动Flume消费过程中日志

 



 

 

  • 描述: flume-ng-kafka-sink-1.6.0.jar 消费结果
  • 大小: 34.8 KB
  • 描述: MYSQL数据库
  • 大小: 47.2 KB
  • 描述: RES
  • 大小: 7.6 KB
  • 描述: 启动Flume
  • 大小: 15.6 KB
  • 描述: 启动Flume消费过程中日志
  • 大小: 64.3 KB
  • 描述: 编译flume-ng-sql.jar
  • 大小: 17.7 KB
  • thirdJar.rar (1.7 MB)
  • 描述: 第三方jar
  • 下载次数: 49
0
10
分享到:
评论
3 楼 sa123574 2017-02-21  
navygame 写道
我知道密码了

navygame 写道
解压密码是多少啊

2 楼 navygame 2016-11-16  
我知道密码了
1 楼 navygame 2016-11-16  
解压密码是多少啊

相关推荐

    相关软件安装文档.docx

    本文档详细介绍了在Linux环境下搭建一套全面的大数据处理生态系统的过程,包括Hadoop、Zookeeper、Flume、Kafka、MySQL、Hive、Redis、Elasticsearch、RabbitMQ、HBase、Spark和Storm以及Azkaban等组件。 首先,...

    tail-file-source:用于拖尾文件的 Flume NG 源

    Flume NG提供了一种灵活、容错性强的架构,能够处理不同来源的数据,并将数据可靠地传输到各种目的地,如HDFS、HBase、Kafka等。 **二、Tail-File-Source特性** 1. **实时性**:Tail-File-Source能实时监控指定文件...

    CDH-HDP-MAPR-DKH-星环组件比较.docx

    7. Kafka和Flume处理数据流入流出,Kafka更侧重消息队列,Flume则用于数据收集。 8. Oozie是工作流管理系统,监控和调度集群任务。 9. ESSQL是基于Elasticsearch的SQL工具,提供对搜索数据的SQL查询支持。 10. DK...

    大数据现状与趋势分析.pdf

    综上所述,大数据市场正在不断演进,解决方案日益多样化,涵盖了从数据采集到分析的全过程。随着新技术的涌现,如边缘计算、人工智能等,大数据将进一步融入到各行各业,为企业决策、产品创新和社会治理提供更深入的...

    Flink 在易车落地应用与实践-Flink Forward Asia 2021.pdf

    全链路监控包括数据的采集、传输、处理和输出的各个环节,如Flume、Kafka、Flink、Hulk等组件的监控。监控指标涵盖了数据条数、延迟、消费堆积、任务状态等关键信息,以确保数据流动的顺畅。 ### 多版本支持 面对...

    CDH-HDP-MAPR-DKH-星环组件比较.pdf

    20. **DK-SPIDER**:大快科技的分布式爬虫系统,用于网络数据采集。 21. **DKM**:大快科技的集群管理工具,类似于Cloudera Manager。 22. **DK-DMYSQL**:大快科技对MySQL的分布式改造,提升分布式环境下的数据库...

    大数据平台技术框架选型分析.pdf

    例如,Twitter Scribe和Cloudera Flume是实时数据处理的工具,它们可以将实时数据流发送到Hadoop存储系统中。 最后,大数据框架中的消息队列服务也非常重要,如Apache Kafka、RabbitMQ、ZeroMQ等。这些消息队列能够...

    星环大数据方案介绍.pptx

    - **Apache Projects**:包括Stream流处理引擎、HBase、Zookeeper、Kafka、Oozie、Flume、Elasticsearch、Sqoop、Hue等,构建全面的大数据生态。 3. **流处理框架**: - 支持Exactly Once语义,确保数据处理的...

    尚硅谷大数据技术之 Hadoop1

    Hadoop的生态系统还包括Flume用于数据收集,Kafka处理实时流数据,Storm进行实时计算,Spark提供更高效的内存计算,以及Hive用于数据分析和数据仓库建模,ETL(抽取、转换、加载)工程师负责数据清洗,算法工程师...

    CDHHDPMAPRDKH星环组件比较.docx

    【Sqoop】用于数据迁移,帮助用户将关系型数据库的数据导入到Hadoop集群,或者从Hadoop导出到关系型数据库。 【Flume】是Cloudera开发的数据收集系统,用于高效、可靠地聚合、聚合和移动大量日志数据。 【Oozie】...

    星环大数据方案介绍.pdf

    包括Inceptor(交互式分析引擎)、Stream流处理引擎、Hyperbase NoSQL数据库、Transwarp Manager(资源管理)、Apache Projects(如Zookeeper、Kafka、Oozie、Flume、Elastic Search、Sqoop、HUE等),以及数据挖掘...

Global site tag (gtag.js) - Google Analytics