`
liyonghui160com
  • 浏览: 778440 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Flumeng和Mysql进行整合 Flumeng批量处理

阅读更多

 

 

package com.iteblog.flume;

	 
	import com.google.common.base.Preconditions;
	import com.google.common.base.Throwables;
	import com.google.common.collect.Lists;
	import org.apache.flume.*;
	import org.apache.flume.conf.Configurable;
	import org.apache.flume.sink.AbstractSink;
	import org.slf4j.Logger;
	import org.slf4j.LoggerFactory;
	 
	import java.sql.Connection;
	import java.sql.DriverManager;
	import java.sql.PreparedStatement;
	import java.sql.SQLException;
	import java.util.List;
	 
	public class MysqlSink extends AbstractSink implements Configurable {
	 
	    private Logger LOG = LoggerFactory.getLogger(MysqlSink.class);
	    private String hostname;
	    private String port;
	    private String databaseName;
	    private String tableName;
	    private String user;
	    private String password;
	    private PreparedStatement preparedStatement;
	    private Connection conn;
	    private int batchSize;
	 
	    public MysqlSink() {
	        LOG.info("MysqlSink start...");
	    }
	 
	    @Override
	    public void configure(Context context) {
	        hostname = context.getString("hostname");
	        Preconditions.checkNotNull(hostname, "hostname must be set!!");
	        port = context.getString("port");
	        Preconditions.checkNotNull(port, "port must be set!!");
	        databaseName = context.getString("databaseName");
	        Preconditions.checkNotNull(databaseName, "databaseName must be set!!");
	        tableName = context.getString("tableName");
	        Preconditions.checkNotNull(tableName, "tableName must be set!!");
	        user = context.getString("user");
	        Preconditions.checkNotNull(user, "user must be set!!");
	        password = context.getString("password");
	        Preconditions.checkNotNull(password, "password must be set!!");
	        batchSize = context.getInteger("batchSize", 100);
	        Preconditions.checkNotNull(batchSize > 0, "batchSize must be a positive number!!");
	    }
	 
	    @Override
	    public void start() {
	        super.start();
	        try {
	            //调用Class.forName()方法加载驱动程序
	            Class.forName("com.mysql.jdbc.Driver");
	        } catch (ClassNotFoundException e) {
	            e.printStackTrace();
	        }
	 
	        String url = "jdbc:mysql://" + hostname + ":" + port + "/" + databaseName;
	        //调用DriverManager对象的getConnection()方法,获得一个Connection对象
	 
	        try {
	            conn = DriverManager.getConnection(url, user, password);
	            conn.setAutoCommit(false);
	            //创建一个Statement对象
	            preparedStatement = conn.prepareStatement("insert into " + tableName +
	                                               " (content) values (?)");
	 
	        } catch (SQLException e) {
	            e.printStackTrace();
	            System.exit(1);
	        }
	 
	    }
	 
	    @Override
	    public void stop() {
	        super.stop();
	        if (preparedStatement != null) {
	            try {
	                preparedStatement.close();
	            } catch (SQLException e) {
	                e.printStackTrace();
	            }
	        }
	 
	        if (conn != null) {
	            try {
	                conn.close();
	            } catch (SQLException e) {
	                e.printStackTrace();
	            }
	        }
	    }
	 
	    @Override
	    public Status process() throws EventDeliveryException {
	        Status result = Status.READY;
	        Channel channel = getChannel();
	        Transaction transaction = channel.getTransaction();
	        Event event;
	        String content;
	 
	        List<String> actions = Lists.newArrayList();
	        transaction.begin();
	        try {
	            for (int i = 0; i < batchSize; i++) {
	                event = channel.take();
	                if (event != null) {
	                    content = new String(event.getBody());
	                    actions.add(content);
	                } else {
	                    result = Status.BACKOFF;
	                    break;
	                }
	            }
	 
	            if (actions.size() > 0) {
	                preparedStatement.clearBatch();
	                for (String temp : actions) {
	                    preparedStatement.setString(1, temp);
						preparedStatement.addBatch();
	                }
	                preparedStatement.executeBatch();
	 
	                conn.commit();
	            }
	            transaction.commit();
	        } catch (Throwable e) {
	            try {
	                transaction.rollback();
	            } catch (Exception e2) {
	                LOG.error("Exception in rollback. Rollback might not have been" +
	                        "successful.", e2);
	            }
	            LOG.error("Failed to commit transaction." +
	                    "Transaction rolled back.", e);
	            Throwables.propagate(e);
	        } finally {
	            transaction.close();
	        }
 
	        return result;
	    }
	}

 

 

pom文件中的依赖:

<dependencies>
	        <dependency>
	            <groupId>org.apache.flume</groupId>
	            <artifactId>flume-ng-core</artifactId>
	        </dependency>
	 
	        <dependency>
	            <groupId>org.apache.flume</groupId>
	            <artifactId>flume-ng-configuration</artifactId>
	        </dependency>
	 
	        <dependency>
	            <groupId>mysql</groupId>
	            <artifactId>mysql-connector-java</artifactId>
	            <version>5.1.25</version>
	        </dependency>
	 
	        <dependency>
	            <groupId>org.slf4j</groupId>
	            <artifactId>slf4j-api</artifactId>
	        </dependency>
	 
	        <dependency>
	            <groupId>org.slf4j</groupId>
	            <artifactId>slf4j-log4j12</artifactId>
	            <scope>test</scope>
	        </dependency>
	</dependencies>

 

 

运行程序时,先在Mysql中创建一个表

mysql> create table mysqltest(
	    -> id int(11) NOT NULL AUTO_INCREMENT,
	    -> content varchar(50000) NOT NULL,
	    -> PRIMARY KEY (`id`)
	    -> ) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8; 
	Query OK, 0 rows affected, 1 warning (0.05 sec)

 

 

然后在flume中创建以下配置

 

agent.sinks.mysqlSink.type = com.iteblog.flume.MysqlSink
agent.sinks.mysqlSink.hostname=localhost
agent.sinks.mysqlSink.port=3306
agent.sinks.mysqlSink.databaseName=ngmonitor
agent.sinks.mysqlSink.tableName=mysqltest
agent.sinks.mysqlSink.user=root
agent.sinks.mysqlSink.password=123456
agent.sinks.mysqlSink.channel = c1

 

 

用下面的命令就可以启动:

bin/flume-ng agent -c conf/ -f conf/mysql_test.conf  -n agent

 

 

再看下Mysql中的情况:

mysql> select count(*) from mysqltest;
	+----------+
	| count(*) |
	+----------+
	|    98300 |
	+----------+

 

 

 

分享到:
评论

相关推荐

    利用Flume将MySQL表数据准实时抽取到HDFS、MySQL、Kafka

    标题中的“利用Flume将MySQL表数据准实时抽取到HDFS、MySQL、Kafka”是一项数据集成任务,涉及Apache Flume、MySQL数据库、Hadoop Distributed File System (HDFS) 和Apache Kafka这四个关键技术。Flume是Apache的一...

    flume-mysql.zip

    《Flume、MySQL与Kafka的数据同步实践》 在大数据处理领域,实时数据流...这一过程对于构建实时数据管道,实现大数据的实时分析和处理具有重要意义。理解并熟练掌握这一流程,将有助于提升大数据处理的效率和灵活性。

    Flume采集MySQL数据所需jar包.zip

    "Flume 采集 MySQL 数据所需 jar 包.zip" 文件提供了实现这一目标所需的两个关键组件:flume-ng-sql-source-1.5.2.jar 和 mysql-connector-java.jar。 首先,`flume-ng-sql-source-1.5.2.jar` 是 Flume 的 SQL 源...

    flume-ng-sql-source-1.5.2

    这一特性使得Flume可以方便地整合各种结构化数据源,如MySQL、PostgreSQL、Oracle等,用于实时数据流处理和分析。 1. **Flume基础**: - **架构**:Flume由三个主要组件构成:Sources、Channels和Sinks。Sources...

    Flume 抽取MYSQL Oracle数据 JSON格式 推送Kafka

    Flume二次开发,支持抽取MYSQL Oracle数据库数据 以JSON格式推送至Kafka。 demo: sql_json.sources.sql_source.type = com.hbn.rdb.source.SQLSource sql_json.sources.sql_source.connectionurl = jdbc:oracle:...

    flume-ng-sql-source-1.5.3.jar

    flume-ng-sql-source-1.5.3.jar,flume采集mysql数据jar包,将此文件拖入FLUME_HOME/lib目录下,如果是CM下CDH版本的flume,则放到/opt/cloudera/parcels/CDH-xxxx/lib/flume-ng/lib下,同样需要的包还有mysql-...

    Flume-ng-1.6.0-cdh.zip

    Flume-ng-1.6.0-cdh.zip 内压缩了 3 个项目,分别为:flume-ng-1.6.0-cdh5.5.0.tar.gz、flume-ng-1.6.0-cdh5.7.0.tar.gz 和 flume-ng-1.6.0-cdh5.10.1.tar.gz,选择你需要的版本。

    flume-ng-sql-source-mysql

    flume抽取mysql工具包,利用flume工具,自定义工具包,实现连接mysql

    flume sink直写mysql

    flume 自定义sink组件 实现sink直接写入mysql数据库

    flume+mysql+zookeeper.zip

    在大数据处理领域,Flume、MySQL、Zookeeper和Hadoop是四个至关重要的组件。Flume是Apache开发的一款分布式、可靠且可用于有效收集、聚合和移动大量日志数据的工具。MySQL是一款广泛使用的开源关系型数据库,而...

    mysql批量修改语句

    mysql批量语句,传入list 批量修改mysql批量语句,传入list 批量修改mysql批量语句,传入list 批量修改

    flume-ng-sql-source-1.5.2.jar

    flume-ng-sql-source-1.5.2.jar从数据库中增量读取数据到hdfs中的jar包

    golang(gin)整合mysql,整合redis通用脚手架

    本项目主要探讨的是如何将Gin与MySQL数据库和Redis缓存系统进行整合,构建一个通用的后端脚手架。 首先,我们需要了解Gin框架。Gin基于Martini设计,但性能更优,因为它使用了httprouter,这是一个高性能的路由树,...

    flume+kafka+flink+mysql数据统计

    在大数据处理领域,Flume、Kafka、Flink 和 MySQL 是四个非常重要的组件,它们各自承担着不同的职责,共同构建了一套高效的数据流处理系统。本文将深入探讨这些技术及其在"flume+kafka+flink+mysql数据统计"中的应用...

    使用批处理对MySQL进行数据批量操作

    批处理是一种非交互式运行MySQL程序的方法,如同您在MySQL中使用的命令一样,你仍然将使用这些命令。本文介绍了使用批处理对MySQL进行数据批量操作的方法。

    mysql批量导出建表语句.zip

    Kettle是一款强大的ETL(提取、转换、加载)工具,常用于数据处理和整合。 首先,让我们了解一下MySQL建表语句。在SQL中,`CREATE TABLE`语句用于创建一个新的数据库表。它定义了表的结构,包括字段名、数据类型、...

    Java实现批量向mysql写入数据的方法

    本文中提供了一个完整的示例代码,演示了Java实现批量向mysql写入数据的方法,包括JDBC连接mysql数据库、批量向mysql写入数据和基本的异常处理等操作。该示例代码可以作为Java程序设计的参考,帮助读者更好地理解...

    flume-ng-sql-source-1.4.1

    flume-ng-sql-source-1.4.1 flume采集mysql数据到kudu

    mysql批量增加字段.txt

    MySQL批量对表增加指定字段,会快速实现批量字段的添加。写了2个存储存储过程,执行即可。方便快速!

Global site tag (gtag.js) - Google Analytics