package com.iteblog.flume; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.List; public class MysqlSink extends AbstractSink implements Configurable { private Logger LOG = LoggerFactory.getLogger(MysqlSink.class); private String hostname; private String port; private String databaseName; private String tableName; private String user; private String password; private PreparedStatement preparedStatement; private Connection conn; private int batchSize; public MysqlSink() { LOG.info("MysqlSink start..."); } @Override public void configure(Context context) { hostname = context.getString("hostname"); Preconditions.checkNotNull(hostname, "hostname must be set!!"); port = context.getString("port"); Preconditions.checkNotNull(port, "port must be set!!"); databaseName = context.getString("databaseName"); Preconditions.checkNotNull(databaseName, "databaseName must be set!!"); tableName = context.getString("tableName"); Preconditions.checkNotNull(tableName, "tableName must be set!!"); user = context.getString("user"); Preconditions.checkNotNull(user, "user must be set!!"); password = context.getString("password"); Preconditions.checkNotNull(password, "password must be set!!"); batchSize = context.getInteger("batchSize", 100); Preconditions.checkNotNull(batchSize > 0, "batchSize must be a positive number!!"); } @Override public void start() { super.start(); try { //调用Class.forName()方法加载驱动程序 Class.forName("com.mysql.jdbc.Driver"); } catch (ClassNotFoundException e) { e.printStackTrace(); } String url = "jdbc:mysql://" + hostname + ":" + port + "/" + databaseName; //调用DriverManager对象的getConnection()方法,获得一个Connection对象 try { conn = DriverManager.getConnection(url, user, password); conn.setAutoCommit(false); //创建一个Statement对象 preparedStatement = conn.prepareStatement("insert into " + tableName + " (content) values (?)"); } catch (SQLException e) { e.printStackTrace(); System.exit(1); } } @Override public void stop() { super.stop(); if (preparedStatement != null) { try { preparedStatement.close(); } catch (SQLException e) { e.printStackTrace(); } } if (conn != null) { try { conn.close(); } catch (SQLException e) { e.printStackTrace(); } } } @Override public Status process() throws EventDeliveryException { Status result = Status.READY; Channel channel = getChannel(); Transaction transaction = channel.getTransaction(); Event event; String content; List<String> actions = Lists.newArrayList(); transaction.begin(); try { for (int i = 0; i < batchSize; i++) { event = channel.take(); if (event != null) { content = new String(event.getBody()); actions.add(content); } else { result = Status.BACKOFF; break; } } if (actions.size() > 0) { preparedStatement.clearBatch(); for (String temp : actions) { preparedStatement.setString(1, temp); preparedStatement.addBatch(); } preparedStatement.executeBatch(); conn.commit(); } transaction.commit(); } catch (Throwable e) { try { transaction.rollback(); } catch (Exception e2) { LOG.error("Exception in rollback. Rollback might not have been" + "successful.", e2); } LOG.error("Failed to commit transaction." + "Transaction rolled back.", e); Throwables.propagate(e); } finally { transaction.close(); } return result; } }
pom文件中的依赖:
<dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> </dependency> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-configuration</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.25</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <scope>test</scope> </dependency> </dependencies>
运行程序时,先在Mysql中创建一个表
mysql> create table mysqltest( -> id int(11) NOT NULL AUTO_INCREMENT, -> content varchar(50000) NOT NULL, -> PRIMARY KEY (`id`) -> ) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8; Query OK, 0 rows affected, 1 warning (0.05 sec)
然后在flume中创建以下配置
agent.sinks.mysqlSink.type = com.iteblog.flume.MysqlSink agent.sinks.mysqlSink.hostname=localhost agent.sinks.mysqlSink.port=3306 agent.sinks.mysqlSink.databaseName=ngmonitor agent.sinks.mysqlSink.tableName=mysqltest agent.sinks.mysqlSink.user=root agent.sinks.mysqlSink.password=123456 agent.sinks.mysqlSink.channel = c1
用下面的命令就可以启动:
bin/flume-ng agent -c conf/ -f conf/mysql_test.conf -n agent
再看下Mysql中的情况:
mysql> select count(*) from mysqltest; +----------+ | count(*) | +----------+ | 98300 | +----------+
相关推荐
标题中的“利用Flume将MySQL表数据准实时抽取到HDFS、MySQL、Kafka”是一项数据集成任务,涉及Apache Flume、MySQL数据库、Hadoop Distributed File System (HDFS) 和Apache Kafka这四个关键技术。Flume是Apache的一...
《Flume、MySQL与Kafka的数据同步实践》 在大数据处理领域,实时数据流...这一过程对于构建实时数据管道,实现大数据的实时分析和处理具有重要意义。理解并熟练掌握这一流程,将有助于提升大数据处理的效率和灵活性。
"Flume 采集 MySQL 数据所需 jar 包.zip" 文件提供了实现这一目标所需的两个关键组件:flume-ng-sql-source-1.5.2.jar 和 mysql-connector-java.jar。 首先,`flume-ng-sql-source-1.5.2.jar` 是 Flume 的 SQL 源...
Flume二次开发,支持抽取MYSQL Oracle数据库数据 以JSON格式推送至Kafka。 demo: sql_json.sources.sql_source.type = com.hbn.rdb.source.SQLSource sql_json.sources.sql_source.connectionurl = jdbc:oracle:...
这一特性使得Flume可以方便地整合各种结构化数据源,如MySQL、PostgreSQL、Oracle等,用于实时数据流处理和分析。 1. **Flume基础**: - **架构**:Flume由三个主要组件构成:Sources、Channels和Sinks。Sources...
flume-ng-sql-source-1.5.3.jar,flume采集mysql数据jar包,将此文件拖入FLUME_HOME/lib目录下,如果是CM下CDH版本的flume,则放到/opt/cloudera/parcels/CDH-xxxx/lib/flume-ng/lib下,同样需要的包还有mysql-...
Flume-ng-1.6.0-cdh.zip 内压缩了 3 个项目,分别为:flume-ng-1.6.0-cdh5.5.0.tar.gz、flume-ng-1.6.0-cdh5.7.0.tar.gz 和 flume-ng-1.6.0-cdh5.10.1.tar.gz,选择你需要的版本。
flume抽取mysql工具包,利用flume工具,自定义工具包,实现连接mysql
flume 自定义sink组件 实现sink直接写入mysql数据库
在大数据处理领域,Flume、MySQL、Zookeeper和Hadoop是四个至关重要的组件。Flume是Apache开发的一款分布式、可靠且可用于有效收集、聚合和移动大量日志数据的工具。MySQL是一款广泛使用的开源关系型数据库,而...
mysql批量语句,传入list 批量修改mysql批量语句,传入list 批量修改mysql批量语句,传入list 批量修改
flume-ng-sql-source-1.5.2.jar从数据库中增量读取数据到hdfs中的jar包
本项目主要探讨的是如何将Gin与MySQL数据库和Redis缓存系统进行整合,构建一个通用的后端脚手架。 首先,我们需要了解Gin框架。Gin基于Martini设计,但性能更优,因为它使用了httprouter,这是一个高性能的路由树,...
批处理是一种非交互式运行MySQL程序的方法,如同您在MySQL中使用的命令一样,你仍然将使用这些命令。本文介绍了使用批处理对MySQL进行数据批量操作的方法。
Kettle是一款强大的ETL(提取、转换、加载)工具,常用于数据处理和整合。 首先,让我们了解一下MySQL建表语句。在SQL中,`CREATE TABLE`语句用于创建一个新的数据库表。它定义了表的结构,包括字段名、数据类型、...
本文中提供了一个完整的示例代码,演示了Java实现批量向mysql写入数据的方法,包括JDBC连接mysql数据库、批量向mysql写入数据和基本的异常处理等操作。该示例代码可以作为Java程序设计的参考,帮助读者更好地理解...
在大数据处理领域,Flume、Kafka、Flink 和 MySQL 是四个非常重要的组件,它们各自承担着不同的职责,共同构建了一套高效的数据流处理系统。本文将深入探讨这些技术及其在"flume+kafka+flink+mysql数据统计"中的应用...
MySQL批量对表增加指定字段,会快速实现批量字段的添加。写了2个存储存储过程,执行即可。方便快速!
10. **维护与升级**:随着 Flume 和数据库技术的发展,SQL Source 插件也会进行更新,以支持更多功能和改进性能。定期检查并更新到最新版本可以充分利用新特性并解决已知问题。 综上所述,Flume 的 SQL Source 插件...
利用flume将mysql的数据同步到kafak,flume是基于CDH6.2.0安装的。解决已下问题:java.lang.NoSuchMethodError: org.apache.flume.Context.getSubProperties(Ljava/lang/String;)Lcom/google/common/collect/...