`

Strom JDBC 整合学习笔记

 
阅读更多

Strom JDBC 整合学习笔记

原创编写: 王宇 
2016-11-24


 

 


插入数据库(Inserting into a database)

ConnectionProvider

统一接口 
org.apache.storm.jdbc.common.ConnectionProvider

  1. publicinterfaceConnectionProviderextendsSerializable{
  2. /**
  3. * method must be idempotent.
  4. */
  5. void prepare();
  6. /**
  7. *
  8. * @return a DB connection over which the queries can be executed.
  9. */
  10. Connection getConnection();
  11. /**
  12. * called once when the system is shutting down, should be idempotent.
  13. */
  14. void cleanup();
  15. }

支持: HikariCP 连接池 
org.apache.storm.jdbc.common.HikariCPConnectionProvider

JdbcMapper

org.apache.storm.jdbc.mapper.JdbcMapper

  1. publicinterfaceJdbcMapperextendsSerializable{
  2. List<Column> getColumns(ITuple tuple);
  3. }

getColume方法,定义了一个storm tuple 如何映射一个数据库表的列。 

The order of the returned list is important. The place holders in the supplied queries are resolved in the same order as returned list. 

返回的列表的顺序性是很重要的。总之就是自己在插入数据的时候,各个字段的顺序要对应一致。 
例如:我们提交了一个插入查询语句insert into user(user_id, user_name, create_date) values (?,?, now())那么values()中第一个?就对应了user_id,第二个?对应了usr_name以此类推。getColumns的返回列表也是这样。咱们的jdbc不提供任何不标准的查询语法。

JdbcInsertBolt

  • 要使用JdbcInsertBolt,我们需要用一个ConnectionProvider的实现以及JdbcMapper的实现(该实现将tuple转换成DB的行)来构建一个JdbcInsertBolt实例。
  • 用withTableName方法提供表名
  • withInsertQuery方法提供一个插入查询
  • 设置一个查询超时时间来规定一个查询最多能花多少时间。默认与topology.message.timeout.secs一样大,如果该值为-1那就意味着不设置查询超时。我们可以设置查询超时时间<=topology.message.timeout.secs。
  1. Map hikariConfigMap =Maps.newHashMap();
  2. hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
  3. hikariConfigMap.put("dataSource.url","jdbc:mysql://localhost/test");
  4. hikariConfigMap.put("dataSource.user","root");
  5. hikariConfigMap.put("dataSource.password","password");
  6. ConnectionProvider connectionProvider =newHikariCPConnectionProvider(hikariConfigMap);
  7. String tableName ="user_details";
  8. JdbcMapper simpleJdbcMapper =newSimpleJdbcMapper(tableName, connectionProvider);
  9. JdbcInsertBolt userPersistanceBolt =newJdbcInsertBolt(connectionProvider, simpleJdbcMapper)
  10. .withTableName("user")
  11. .withQueryTimeoutSecs(30);
  12. Or
  13. JdbcInsertBolt userPersistanceBolt =newJdbcInsertBolt(connectionProvider, simpleJdbcMapper)
  14. .withInsertQuery("insert into user values (?,?)")
  15. .withQueryTimeoutSecs(30);

SimpleJdbcMapper

更一般化的JdbcMapper,它可以将tuple与数据库的行进行映射。SimpleJdbcMapper假设tuple字段名与你将要写入的数据库表中的列名是一样的。

JdbcTridentState

We also support a trident persistent state that can be used with trident topologies.

  1. JdbcState.Options options =newJdbcState.Options()
  2. .withConnectionProvider(connectionProvider)
  3. .withMapper(jdbcMapper)
  4. .withTableName("user_details")
  5. .withQueryTimeoutSecs(30);
  6. JdbcStateFactory jdbcStateFactory =newJdbcStateFactory(options);

可以使用 withInsertQuery 设置条件

查询数据库(Lookup from Database)

org.apache.storm.jdbc.mapper.JdbcLookupMapper

  1. void declareOutputFields(OutputFieldsDeclarer declarer);
  2. List<Column> getColumns(ITuple tuple);
  3. List<Values> toTuple(ITuple input,List<Column> columns);
  • declareOutputFields 
    指定输出的tuple中的字段
  • getColumns 
    确定查询中的占位符(?)以及它们的SQL类型和值。
  • toTuple 
    接收一个输入tuple并且表示数据库一行的列字段值列表作为select搜索的结果。

SimpleJdbcLookupMapper

针对单表简单查询

  • SimpleJdbcMapper认为tuple中的字段与你作为占位符的字段名是一致的
  1. Fields outputFields =newFields("user_id","user_name","create_date");
  2. List<Column> queryParamColumns =Lists.newArrayList(newColumn("user_id",Types.INTEGER));
  3. this.jdbcLookupMapper =newSimpleJdbcLookupMapper(outputFields, queryParamColumns);

JdbcLookupBolt

注意超时设置要<=topology.message.timeout.secs

  1. String selectSql ="select user_name from user_details where user_id = ?";
  2. SimpleJdbcLookupMapper lookupMapper =newSimpleJdbcLookupMapper(outputFields, queryParamColumns)
  3. JdbcLookupBolt userNameLookupBolt =newJdbcLookupBolt(connectionProvider, selectSql, lookupMapper)
  4. .withQueryTimeoutSecs(30);

JdbcTridentState for Lookup

  1. JdbcState.Options options =newJdbcState.Options()
  2. .withConnectionProvider(connectionProvider)
  3. .withJdbcLookupMapper(newSimpleJdbcLookupMapper(newFields("user_name"),Lists.newArrayList(newColumn("user_id",Types.INTEGER))))
  4. .withSelectQuery("select user_name from user_details where user_id = ?");
  5. .withQueryTimeoutSecs(30);

例子

maven中加入storm-jdbc和mysql的connector

  1. <dependency>
  2. <groupId>org.apache.storm</groupId>
  3. <artifactId>storm-jdbc</artifactId>
  4. <version>0.10.0</version>
  5. <scope>provided</scope>
  6. </dependency>
  7. <dependency>
  8. <groupId>mysql</groupId>
  9. <artifactId>mysql-connector-Java</artifactId>
  10. <version>5.1.31</version>
  11. </dependency>

表结构

  1. CREATE TABLE `userinfo`(
  2. `id`int(11) NOT NULL AUTO_INCREMENT,
  3. `user_id` varchar(50) DEFAULT NULL,
  4. `resource_id` varchar(10) DEFAULT NULL,
  5. `create_date` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
  6. `count`int(11) DEFAULT NULL,
  7. PRIMARY KEY (`id`)
  8. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Github中的例子

storm/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/bolt/ 
storm/examples/storm-jdbc-examples/src/main/java/org/apache/storm/jdbc/

实例代码

 

  1. import java.sql.Types;
  2. import java.util.HashMap;
  3. import java.util.List;
  4. import java.util.Map;
  5. import org.apache.storm.guava.collect.Lists;
  6. import org.apache.storm.jdbc.bolt.JdbcInsertBolt;
  7. import org.apache.storm.jdbc.bolt.JdbcLookupBolt;
  8. import org.apache.storm.jdbc.common.Column;
  9. import org.apache.storm.jdbc.common.ConnectionProvider;
  10. import org.apache.storm.jdbc.common.HikariCPConnectionProvider;
  11. import org.apache.storm.jdbc.mapper.JdbcMapper;
  12. import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper;
  13. import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
  14. import backtype.storm.tuple.Fields;
  15. publicclassPersistentBolt{
  16. privatestaticMap<String,Object> hikariConfigMap =newHashMap<String,Object>(){{
  17. put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
  18. put("dataSource.url","jdbc:mysql://localhost/storm");
  19. put("dataSource.user","user");
  20. put("dataSource.password","password");
  21. }};
  22. publicstaticConnectionProvider connectionProvider =newHikariCPConnectionProvider(hikariConfigMap);
  23. publicstaticJdbcInsertBolt getJdbcInsertBolt(){
  24. //使用tablename进行插入数据,需要指定表中的所有字段
  25. /*String tableName="userinfo";
  26. JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider);
  27. JdbcInsertBolt jdbcInsertBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper)
  28. .withTableName("userinfo")
  29. .withQueryTimeoutSecs(50);*/
  30. //使用schemaColumns,可以指定字段要插入的字段
  31. List<Column> schemaColumns =Lists.newArrayList(newColumn("user_id",Types.VARCHAR),
  32. newColumn("resource_id",Types.VARCHAR),newColumn("count",Types.INTEGER));
  33. JdbcMapper simpleJdbcMapper =newSimpleJdbcMapper(schemaColumns);
  34. JdbcInsertBolt jdbcInsertBolt =newJdbcInsertBolt(connectionProvider,simpleJdbcMapper)
  35. .withInsertQuery("insert into userinfo(id,user_id,resource_id,count) values(?,?,?)")
  36. .withQueryTimeoutSecs(50);
  37. return jdbcInsertBolt;
  38. }
  39. publicstaticJdbcLookupBolt getJdbcLookupBlot(){
  40. //查询
  41. //指定bolt的输出字段
  42. Fields outputFields =newFields("user_id","resource_id","count");
  43. //指定查询条件字段
  44. List<Column> queryColumns =Lists.newArrayList(newColumn("user_id",Types.VARCHAR),newColumn("resource_id",Types.VARCHAR));
  45. String selectSql ="select count from userinfo where user_id=? and resource_id=?";
  46. SimpleJdbcLookupMapper lookupMapper =newSimpleJdbcLookupMapper(outputFields, queryColumns);
  47. JdbcLookupBolt jdbcLookupBolt =newJdbcLookupBolt(connectionProvider, selectSql, lookupMapper);
  48. return jdbcLookupBolt;
  49. }
  50. }
分享到:
评论

相关推荐

    storm+kafka+jdbc整合实例

    在大数据处理领域,Storm、Kafka以及JDBC的整合是一个常见的需求,用于实现实时数据流处理、消息队列和数据库交互。以下是对这个整合实例的详细解释。 首先,让我们来了解一下这三个组件: 1. **Apache Storm**:...

    Flume+kafka+Storm整合

    ### Flume+kafka+Storm整合知识点详解 #### 一、Flume、Kafka与Storm概述 在大数据领域,数据采集、传输与实时处理是至关重要的环节。本篇内容重点介绍了如何利用Flume、Kafka与Storm这三个开源工具实现一套完整的...

    IT十八掌_Storm阶段学习笔记(课堂笔记与原理图解)

    IT十八掌第三期配套资料!...1、Storm介绍及特点 2、storm的优势与应用 3、storm使用和配置 4、配置storm并发度 5、配置storm完全分布式集群 6、storm开发环境与生产环境 7、storm的topology再平衡 8、分组、自定义分组

    storm-kafka整合代码

    当我们谈论"storm-kafka整合代码"时,这意味着我们要将这两者结合,使得 Storm 可以从 Kafka 中消费数据并进行实时处理。下面将详细介绍 Storm 和 Kafka 的核心概念以及它们整合的关键步骤。 **Apache Storm** ...

    storm+mq整合完整示例

    【标题】"storm+mq整合完整示例"中涉及的知识点主要集中在分布式计算框架Apache Storm与消息队列MQ(如RabbitMQ、Kafka等)的集成应用上,旨在实现数据流的实时处理。以下是对这些知识点的详细阐述: 1. **Apache ...

    storm学习笔记

    根据《get started with storm》.pdf写的storm学习笔记

    storm 实战笔记

    读者将学习如何整合用户行为数据,利用协同过滤、深度学习等方法生成推荐结果,并通过Storm实现实时推送给用户。 七、Storm之开发混合推荐系统实战 混合推荐系统结合了多种推荐策略,以提高推荐的准确性和多样性。...

    STORM学习文档

    Storm学习文档 Storm 是一个分布式实时计算系统,主要用于处理大规模数据流。...本文档对 Storm 的学习笔记,总结了 Storm 的架构、组件、工作流程等关键知识点,对 Storm 的学习和应用具有重要的参考价值。

    kafka和storm整合

    这张图片详细的描述了kafka、和storm的过程。。。。。

    Storm源码走读笔记

    本文档是关于Storm源码的详细走读笔记,主要分析了Storm的启动场景、Topology提交过程、worker进程中的线程使用情况、消息传递机制以及 TridentTopology的创建和ack机制等多个方面。 首先,文档提到了Storm集群中的...

    Storm笔记-PPT

    **Storm笔记概述** Storm是一个分布式实时计算系统,由Twitter开源并广泛应用于...通过对Storm的学习,我们可以理解实时数据处理的核心原理,掌握如何构建和优化实时数据处理系统,为企业提供更敏捷、高效的决策支持。

    flume,kafka,storm整合

    本文将详细介绍如何整合Flume、Kafka和Storm,以实现大数据的高效收集、传输和处理。在大数据运维解决方案中,这三个组件扮演着关键角色。Flume用于数据采集,Kafka作为中间件提供消息队列服务,而Storm则用于实时...

    storm中文学习资料

    标题中的"storm中文学习资料"表明这是一份关于Apache Storm的中文学习资源,Apache Storm是一个开源的分布式实时计算系统,常用于实时数据处理。在这个压缩包中,我们可以预期找到一系列帮助理解和掌握Storm技术的...

    Storm整合Druid进行实时分析源码

    Druid的摄取层提供了多种方式来摄入数据,如Hadoop批处理、实时流处理(如Tranquility,它是专门为整合Storm等流处理系统设计的)以及索引服务。在整合Storm时,我们通常会利用Tranquility,这是一个轻量级的HTTP...

    netty+kafka+storm

    在Netty、Kafka和Storm的整合过程中,JDBC扮演了数据存储的角色。经过Storm实时处理后的数据,会被写入到MySQL数据库中,以便后续的查询和分析。 整合这些技术的具体步骤可能包括以下部分: 1. 使用Netty创建一个...

    storm集群安装笔记

    storm的集群安装笔记,在我的虚拟机上安装的整个过程,所有注意点都写了。绝对好用

    storm与kafka整合jar包

    Storm可以用于各种实时分析场景,如实时日志分析、社交网络分析、连续计算以及在线机器学习等。它支持多种编程语言,包括Java、Python和Clojure等。 **Apache Kafka** 则是一个分布式消息中间件,它能够以高吞吐量...

    storm 学习资源总结

    "storm 学习资源总结" Storm 是一个免费开源的分布式实时计算系统,利用 storm 可以很容易的做到可靠处理无线数据流。Storm 的架构特点包括编程简单、高性能、低延迟、分布式、可扩展、容错、消息不丢失等。 ...

    storm整合redis的例子源码

    本示例源码“storm整合redis”的重点在于展示如何将这两个强大的工具结合在一起,实现数据的实时捕获、处理和存储。接下来,我们将深入探讨Storm和Redis的相关知识点以及它们的整合方式。 首先,Apache Storm是一个...

    kafka+storm+hbase整合案例

    通过这个案例,我们可以学习到如何构建一个高效的大数据实时处理系统,如何将实时数据流与批处理相结合,以及如何将结果以直观的方式展示出来。这对于理解大数据处理流程和技术选型具有很强的指导意义。

Global site tag (gtag.js) - Google Analytics