Strom JDBC 整合学习笔记
原创编写: 王宇
2016-11-24
插入数据库(Inserting into a database)
ConnectionProvider
JdbcMapper
JdbcInsertBolt
SimpleJdbcMapper
JdbcTridentState
查询数据库(Lookup from Database)
SimpleJdbcLookupMapper
JdbcLookupBolt
JdbcTridentState for Lookup
例子
maven中加入storm-jdbc和mysql的connector
表结构
Github中的例子
实例代码
插入数据库(Inserting into a database)
ConnectionProvider
统一接口
org.apache.storm.jdbc.common.ConnectionProvider
publicinterfaceConnectionProviderextendsSerializable{
/**
* method must be idempotent.
*/
void prepare();
/**
*
* @return a DB connection over which the queries can be executed.
*/
Connection getConnection();
/**
* called once when the system is shutting down, should be idempotent.
*/
void cleanup();
}
支持: HikariCP 连接池
org.apache.storm.jdbc.common.HikariCPConnectionProvider
JdbcMapper
org.apache.storm.jdbc.mapper.JdbcMapper
publicinterfaceJdbcMapperextendsSerializable{
List<Column> getColumns(ITuple tuple);
}
getColume方法,定义了一个storm tuple 如何映射一个数据库表的列。
The order of the returned list is important. The place holders in the supplied queries are resolved in the same order as returned list.
返回的列表的顺序性是很重要的。总之就是自己在插入数据的时候,各个字段的顺序要对应一致。
例如:我们提交了一个插入查询语句insert into user(user_id, user_name, create_date) values (?,?, now())那么values()中第一个?就对应了user_id,第二个?对应了usr_name以此类推。getColumns的返回列表也是这样。咱们的jdbc不提供任何不标准的查询语法。
JdbcInsertBolt
- 要使用JdbcInsertBolt,我们需要用一个ConnectionProvider的实现以及JdbcMapper的实现(该实现将tuple转换成DB的行)来构建一个JdbcInsertBolt实例。
- 用withTableName方法提供表名
- withInsertQuery方法提供一个插入查询
- 设置一个查询超时时间来规定一个查询最多能花多少时间。默认与topology.message.timeout.secs一样大,如果该值为-1那就意味着不设置查询超时。我们可以设置查询超时时间<=topology.message.timeout.secs。
Map hikariConfigMap =Maps.newHashMap();
hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
hikariConfigMap.put("dataSource.url","jdbc:mysql://localhost/test");
hikariConfigMap.put("dataSource.user","root");
hikariConfigMap.put("dataSource.password","password");
ConnectionProvider connectionProvider =newHikariCPConnectionProvider(hikariConfigMap);
String tableName ="user_details";
JdbcMapper simpleJdbcMapper =newSimpleJdbcMapper(tableName, connectionProvider);
JdbcInsertBolt userPersistanceBolt =newJdbcInsertBolt(connectionProvider, simpleJdbcMapper)
.withTableName("user")
.withQueryTimeoutSecs(30);
Or
JdbcInsertBolt userPersistanceBolt =newJdbcInsertBolt(connectionProvider, simpleJdbcMapper)
.withInsertQuery("insert into user values (?,?)")
.withQueryTimeoutSecs(30);
SimpleJdbcMapper
更一般化的JdbcMapper,它可以将tuple与数据库的行进行映射。SimpleJdbcMapper假设tuple字段名与你将要写入的数据库表中的列名是一样的。
JdbcTridentState
We also support a trident persistent state that can be used with trident topologies.
JdbcState.Options options =newJdbcState.Options()
.withConnectionProvider(connectionProvider)
.withMapper(jdbcMapper)
.withTableName("user_details")
.withQueryTimeoutSecs(30);
JdbcStateFactory jdbcStateFactory =newJdbcStateFactory(options);
可以使用 withInsertQuery 设置条件
查询数据库(Lookup from Database)
org.apache.storm.jdbc.mapper.JdbcLookupMapper
void declareOutputFields(OutputFieldsDeclarer declarer);
List<Column> getColumns(ITuple tuple);
List<Values> toTuple(ITuple input,List<Column> columns);
- declareOutputFields
指定输出的tuple中的字段 - getColumns
确定查询中的占位符(?)以及它们的SQL类型和值。 - toTuple
接收一个输入tuple并且表示数据库一行的列字段值列表作为select搜索的结果。
SimpleJdbcLookupMapper
针对单表简单查询
- SimpleJdbcMapper认为tuple中的字段与你作为占位符的字段名是一致的
Fields outputFields =newFields("user_id","user_name","create_date");
List<Column> queryParamColumns =Lists.newArrayList(newColumn("user_id",Types.INTEGER));
this.jdbcLookupMapper =newSimpleJdbcLookupMapper(outputFields, queryParamColumns);
JdbcLookupBolt
注意超时设置要<=topology.message.timeout.secs
String selectSql ="select user_name from user_details where user_id = ?";
SimpleJdbcLookupMapper lookupMapper =newSimpleJdbcLookupMapper(outputFields, queryParamColumns)
JdbcLookupBolt userNameLookupBolt =newJdbcLookupBolt(connectionProvider, selectSql, lookupMapper)
.withQueryTimeoutSecs(30);
JdbcTridentState for Lookup
JdbcState.Options options =newJdbcState.Options()
.withConnectionProvider(connectionProvider)
.withJdbcLookupMapper(newSimpleJdbcLookupMapper(newFields("user_name"),Lists.newArrayList(newColumn("user_id",Types.INTEGER))))
.withSelectQuery("select user_name from user_details where user_id = ?");
.withQueryTimeoutSecs(30);
例子
maven中加入storm-jdbc和mysql的connector
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-jdbc</artifactId>
<version>0.10.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-Java</artifactId>
<version>5.1.31</version>
</dependency>
表结构
CREATE TABLE `userinfo`(
`id`int(11) NOT NULL AUTO_INCREMENT,
`user_id` varchar(50) DEFAULT NULL,
`resource_id` varchar(10) DEFAULT NULL,
`create_date` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
`count`int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Github中的例子
storm/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/bolt/
storm/examples/storm-jdbc-examples/src/main/java/org/apache/storm/jdbc/
实例代码
import java.sql.Types;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.storm.guava.collect.Lists;
import org.apache.storm.jdbc.bolt.JdbcInsertBolt;
import org.apache.storm.jdbc.bolt.JdbcLookupBolt;
import org.apache.storm.jdbc.common.Column;
import org.apache.storm.jdbc.common.ConnectionProvider;
import org.apache.storm.jdbc.common.HikariCPConnectionProvider;
import org.apache.storm.jdbc.mapper.JdbcMapper;
import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper;
import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
import backtype.storm.tuple.Fields;
publicclassPersistentBolt{
privatestaticMap<String,Object> hikariConfigMap =newHashMap<String,Object>(){{
put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
put("dataSource.url","jdbc:mysql://localhost/storm");
put("dataSource.user","user");
put("dataSource.password","password");
}};
publicstaticConnectionProvider connectionProvider =newHikariCPConnectionProvider(hikariConfigMap);
publicstaticJdbcInsertBolt getJdbcInsertBolt(){
//使用tablename进行插入数据,需要指定表中的所有字段
/*String tableName="userinfo";
JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider);
JdbcInsertBolt jdbcInsertBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper)
.withTableName("userinfo")
.withQueryTimeoutSecs(50);*/
//使用schemaColumns,可以指定字段要插入的字段
List<Column> schemaColumns =Lists.newArrayList(newColumn("user_id",Types.VARCHAR),
newColumn("resource_id",Types.VARCHAR),newColumn("count",Types.INTEGER));
JdbcMapper simpleJdbcMapper =newSimpleJdbcMapper(schemaColumns);
JdbcInsertBolt jdbcInsertBolt =newJdbcInsertBolt(connectionProvider,simpleJdbcMapper)
.withInsertQuery("insert into userinfo(id,user_id,resource_id,count) values(?,?,?)")
.withQueryTimeoutSecs(50);
return jdbcInsertBolt;
}
publicstaticJdbcLookupBolt getJdbcLookupBlot(){
//查询
//指定bolt的输出字段
Fields outputFields =newFields("user_id","resource_id","count");
//指定查询条件字段
List<Column> queryColumns =Lists.newArrayList(newColumn("user_id",Types.VARCHAR),newColumn("resource_id",Types.VARCHAR));
String selectSql ="select count from userinfo where user_id=? and resource_id=?";
SimpleJdbcLookupMapper lookupMapper =newSimpleJdbcLookupMapper(outputFields, queryColumns);
JdbcLookupBolt jdbcLookupBolt =newJdbcLookupBolt(connectionProvider, selectSql, lookupMapper);
return jdbcLookupBolt;
}
}
相关推荐
在大数据处理领域,Storm、Kafka以及JDBC的整合是一个常见的需求,用于实现实时数据流处理、消息队列和数据库交互。以下是对这个整合实例的详细解释。 首先,让我们来了解一下这三个组件: 1. **Apache Storm**:...
### Flume+kafka+Storm整合知识点详解 #### 一、Flume、Kafka与Storm概述 在大数据领域,数据采集、传输与实时处理是至关重要的环节。本篇内容重点介绍了如何利用Flume、Kafka与Storm这三个开源工具实现一套完整的...
IT十八掌第三期配套资料!...1、Storm介绍及特点 2、storm的优势与应用 3、storm使用和配置 4、配置storm并发度 5、配置storm完全分布式集群 6、storm开发环境与生产环境 7、storm的topology再平衡 8、分组、自定义分组
当我们谈论"storm-kafka整合代码"时,这意味着我们要将这两者结合,使得 Storm 可以从 Kafka 中消费数据并进行实时处理。下面将详细介绍 Storm 和 Kafka 的核心概念以及它们整合的关键步骤。 **Apache Storm** ...
【标题】"storm+mq整合完整示例"中涉及的知识点主要集中在分布式计算框架Apache Storm与消息队列MQ(如RabbitMQ、Kafka等)的集成应用上,旨在实现数据流的实时处理。以下是对这些知识点的详细阐述: 1. **Apache ...
根据《get started with storm》.pdf写的storm学习笔记
读者将学习如何整合用户行为数据,利用协同过滤、深度学习等方法生成推荐结果,并通过Storm实现实时推送给用户。 七、Storm之开发混合推荐系统实战 混合推荐系统结合了多种推荐策略,以提高推荐的准确性和多样性。...
Storm学习文档 Storm 是一个分布式实时计算系统,主要用于处理大规模数据流。...本文档对 Storm 的学习笔记,总结了 Storm 的架构、组件、工作流程等关键知识点,对 Storm 的学习和应用具有重要的参考价值。
这张图片详细的描述了kafka、和storm的过程。。。。。
本文档是关于Storm源码的详细走读笔记,主要分析了Storm的启动场景、Topology提交过程、worker进程中的线程使用情况、消息传递机制以及 TridentTopology的创建和ack机制等多个方面。 首先,文档提到了Storm集群中的...
**Storm笔记概述** Storm是一个分布式实时计算系统,由Twitter开源并广泛应用于...通过对Storm的学习,我们可以理解实时数据处理的核心原理,掌握如何构建和优化实时数据处理系统,为企业提供更敏捷、高效的决策支持。
本文将详细介绍如何整合Flume、Kafka和Storm,以实现大数据的高效收集、传输和处理。在大数据运维解决方案中,这三个组件扮演着关键角色。Flume用于数据采集,Kafka作为中间件提供消息队列服务,而Storm则用于实时...
标题中的"storm中文学习资料"表明这是一份关于Apache Storm的中文学习资源,Apache Storm是一个开源的分布式实时计算系统,常用于实时数据处理。在这个压缩包中,我们可以预期找到一系列帮助理解和掌握Storm技术的...
Druid的摄取层提供了多种方式来摄入数据,如Hadoop批处理、实时流处理(如Tranquility,它是专门为整合Storm等流处理系统设计的)以及索引服务。在整合Storm时,我们通常会利用Tranquility,这是一个轻量级的HTTP...
在Netty、Kafka和Storm的整合过程中,JDBC扮演了数据存储的角色。经过Storm实时处理后的数据,会被写入到MySQL数据库中,以便后续的查询和分析。 整合这些技术的具体步骤可能包括以下部分: 1. 使用Netty创建一个...
storm的集群安装笔记,在我的虚拟机上安装的整个过程,所有注意点都写了。绝对好用
Storm可以用于各种实时分析场景,如实时日志分析、社交网络分析、连续计算以及在线机器学习等。它支持多种编程语言,包括Java、Python和Clojure等。 **Apache Kafka** 则是一个分布式消息中间件,它能够以高吞吐量...
"storm 学习资源总结" Storm 是一个免费开源的分布式实时计算系统,利用 storm 可以很容易的做到可靠处理无线数据流。Storm 的架构特点包括编程简单、高性能、低延迟、分布式、可扩展、容错、消息不丢失等。 ...
本示例源码“storm整合redis”的重点在于展示如何将这两个强大的工具结合在一起,实现数据的实时捕获、处理和存储。接下来,我们将深入探讨Storm和Redis的相关知识点以及它们的整合方式。 首先,Apache Storm是一个...
通过这个案例,我们可以学习到如何构建一个高效的大数据实时处理系统,如何将实时数据流与批处理相结合,以及如何将结果以直观的方式展示出来。这对于理解大数据处理流程和技术选型具有很强的指导意义。