package com.num.one.storm.learn.jdbc; import java.sql.Types; import java.util.List; import java.util.Map; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.jdbc.bolt.JdbcInsertBolt; import org.apache.storm.jdbc.bolt.JdbcLookupBolt; import org.apache.storm.jdbc.common.Column; import org.apache.storm.jdbc.common.ConnectionProvider; import org.apache.storm.jdbc.common.HikariCPConnectionProvider; import org.apache.storm.jdbc.common.JdbcClient; import org.apache.storm.jdbc.mapper.JdbcLookupMapper; import org.apache.storm.jdbc.mapper.JdbcMapper; import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper; import org.apache.storm.jdbc.mapper.SimpleJdbcMapper; import org.apache.storm.jdbc.trident.state.JdbcQuery; import org.apache.storm.jdbc.trident.state.JdbcState; import org.apache.storm.jdbc.trident.state.JdbcStateFactory; import org.apache.storm.jdbc.trident.state.JdbcUpdater; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.trident.Stream; import org.apache.storm.trident.TridentState; import org.apache.storm.trident.TridentTopology; import org.apache.storm.tuple.Fields; import com.google.common.collect.Lists; import com.google.common.collect.Maps; public class StormJDBC { protected static final String TABLE_NAME = "user"; protected static final String JDBC_CONF = "jdbc.conf"; private static final List<String> setupSqls = Lists.newArrayList("drop table if exists user", "drop table if exists department", "drop table if exists user_department", "create table if not exists user (user_id integer, user_name varchar(100), dept_name varchar(100), create_date date)", "create table if not exists department (dept_id integer, dept_name varchar(100))", "create table if not exists user_department (user_id integer, dept_id integer)", "insert into department values (1, 'R&D')", "insert into department values (2, 'Finance')", "insert into department values (3, 'HR')", "insert into department values (4, 'Sales')", "insert into user_department values (1, 1)", "insert into user_department values (2, 2)", "insert into user_department values (3, 3)", "insert into user_department values (4, 4)"); protected static final String SELECT_QUERY = "select dept_name from department, user_department where department.dept_id = user_department.dept_id" + " and user_department.user_id = ?"; private static final String USER_SPOUT = "USER_SPOUT"; private static final String LOOKUP_BOLT = "LOOKUP_BOLT"; private static final String PERSISTANCE_BOLT = "PERSISTANCE_BOLT"; public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { Map<String, Object> map = Maps.newHashMap(); map.put("dataSourceClassName", "com.mysql.jdbc.jdbc2.optional.MysqlDataSource");// map.put("dataSource.url", "jdbc:mysql://localhost/test");// map.put("dataSource.user", "root");// root map.put("dataSource.password", "root");// password Config config = new Config(); config.put(JDBC_CONF, map); ConnectionProvider connectionProvider = new HikariCPConnectionProvider(map); connectionProvider.prepare(); int queryTimeoutSecs = 60; JdbcClient jdbcClient = new JdbcClient(connectionProvider, queryTimeoutSecs); for (String sql : setupSqls) { jdbcClient.executeSql(sql); } connectionProvider.cleanup(); connectionProvider = new HikariCPConnectionProvider(map); UserSpout userSpout = new UserSpout(); String topoName = "stom-jdbc"; Fields outputFields = new Fields("user_id", "user_name", "dept_name", "create_date"); List<Column> queryParamColumns = Lists.newArrayList(new Column("user_id", Types.INTEGER)); //SimpleJdbcLookupMapper 必须要指定输出字段和输入字段。其中queryParamColumns为输入字段,outputFields为输出字段 //outputFields输出字段首先会从输入tuple中查找,比如我们UserSpout输出了三个字段,new Fields("user_id","user_name","create_date"),其中这三个字段都出现在输出字段中,所以直接获取作为输出字段 //另外还有个字段dept_name没有在输入字段中,则会去查询返回的参数中查找,发现刚好有dept_name这个字段,于是将这个字段作为输出字段dept_name。 JdbcLookupMapper jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns); JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt(connectionProvider, SELECT_QUERY, jdbcLookupMapper); //must specify column schema when providing custom query. List<Column> schemaColumns = Lists.newArrayList(new Column("create_date", Types.DATE), new Column("dept_name", Types.VARCHAR), new Column("user_id", Types.INTEGER), new Column("user_name", Types.VARCHAR)); //JdbcMapper 用于定义数据库表中的字段和tuple中字段的对应关系 //其中SimpleJdbcMapper是JDBCMapper的实现方式 //如果直接创建new SimpleJdbcMapper(tableName, connectionProvider),会认为要插入的tuple已经具有了对应数据库表tableName 列的所有值。 //如果定义了自己的插入语句,必须显示的初始化SimpleJdbcMapper //Insert into user (user_id, user_name) values (?,?) //List<Column> columnSchema = Lists.newArrayList(new Column("user_id", java.sql.Types.INTEGER),new Column("user_name", java.sql.Types.VARCHAR));//顺序和上面对应 //JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema); JdbcMapper mapper = new SimpleJdbcMapper(schemaColumns); JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, mapper) .withInsertQuery("insert into user (create_date, dept_name, user_id, user_name) values (?,?,?,?)"); // userSpout ==> jdbcBolt TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(USER_SPOUT, userSpout, 1); builder.setBolt(LOOKUP_BOLT, departmentLookupBolt, 1).shuffleGrouping(USER_SPOUT); builder.setBolt(PERSISTANCE_BOLT, userPersistanceBolt, 1).shuffleGrouping(LOOKUP_BOLT); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(topoName, config, builder.createTopology()); try { Thread.sleep(3000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } cluster.shutdown(); } }
package com.num.one.storm.learn.jdbc; import java.util.List; import java.util.Map; import java.util.Random; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.IRichSpout; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import com.google.common.collect.Lists; public class UserSpout implements IRichSpout{ /** * */ private static final long serialVersionUID = 2517357980152641508L; SpoutOutputCollector collector; public static final List<Values> rows = Lists.newArrayList( new Values(1,"peter",System.currentTimeMillis()), new Values(2,"bob",System.currentTimeMillis()), new Values(3,"alice",System.currentTimeMillis())); Random random; public void ack(Object arg0) { // TODO Auto-generated method stub } public void activate() { // TODO Auto-generated method stub } public void close() { // TODO Auto-generated method stub } public void deactivate() { // TODO Auto-generated method stub } public void fail(Object arg0) { // TODO Auto-generated method stub } public void nextTuple() { // TODO Auto-generated method stub collector.emit(rows.get(random.nextInt(rows.size()-1))); } public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector arg2) { // TODO Auto-generated method stub this.collector = arg2; random = new Random(); } public void declareOutputFields(OutputFieldsDeclarer arg0) { // TODO Auto-generated method stub arg0.declare(new Fields("user_id","user_name","create_date")); } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } }
使用Trident
相关推荐
Bolt则用来接收spout或其他bolt发射的数据,对数据进行处理,如过滤、聚合、查询数据库等操作。处理后的数据可以继续传输给其他bolt进一步处理,或者将最终结果输出到存储系统中。 Storm设计上的特性包括: 1. ...
在Go语言中,ORM(Object-Relational Mapping)框架是一种工具,它允许开发者用面向对象的方式来操作数据库,而无需直接编写SQL语句。...通过学习和使用这个框架,你可以提升Go语言开发中的数据库操作效率和代码质量。
Apache Storm(apache-storm...Apache Storm 与您已经使用的队列和数据库技术集成。Apache Storm 拓扑使用数据流并以任意复杂的方式处理这些流,根据需要在计算的每个阶段之间对流进行重新分区。在教程中阅读更多内容。
Apache Storm是一个开源的分布式实时计算系统,它允许开发者处理无限的数据流,就像处理数据库查询一样简单。Storm在实时处理领域的应用广泛,涵盖了社交媒体分析、实时仪表盘、物联网(IoT)数据处理等多个领域。 二...
Apache Storm(apache-storm...Apache Storm 与您已经使用的队列和数据库技术集成。Apache Storm 拓扑使用数据流并以任意复杂的方式处理这些流,根据需要在计算的每个阶段之间对流进行重新分区。在教程中阅读更多内容。
- **Spout**:负责生成数据流,可以是从数据库、消息队列或其他数据源读取。 - **Bolt**:执行数据处理逻辑,如过滤、聚合、转换等操作。 - **Trident**:Storm的一个高级API,提供了一种更强大且状态化的数据处理...
2. **Bolt**:数据处理组件,执行实际的业务逻辑,如清洗、聚合、过滤等操作。 3. **Topology**:拓扑结构,定义了Spout和Bolt之间的数据流关系,是Storm程序的基本执行单元。 4. **Stream Groupings**:数据分发...
7. **Spout**:数据源,负责产生数据流,可以是数据库、消息队列或任何其他数据源。 安装Apache Storm 1.0.6的步骤如下: 1. **下载**:访问Apache Storm官方网站下载最新稳定版(1.0.6)的压缩包,通常为tar.gz...
Storm 是一个实时处理系统,用于处理持续的数据流,而 HBase 则是基于 Hadoop 的分布式列式数据库,适用于大规模数据存储和随机读写操作。将 Storm 与 HBase 集成,可以实现高效、实时的数据流处理与存储。 一、...
2. **Split Bolt**:接收到Spout发送的Tuples后,Split Bolt负责对每个Tuple中的文本进行分词操作,将单词分离出来。这个Bolt会创建新的Tuples,每个Tuple包含一个单词,然后将这些Tuples发送到下一个阶段。 3. **...
1. Spout:Spout是Apache Storm的数据输入组件,负责从外部数据源(如Kafka、Twitter或数据库)读取数据并生成数据流。在源码中,`backtype.storm.spout`包包含了各种Spout的实现,如`KafkaSpout`用于从Kafka消费...
在实际应用中,Spout可以是从消息队列、数据库或其他数据源拉取数据的组件。在"weekend-storm"项目中,Spout可能会模拟一个实时数据生成器,例如生成随机日志事件或者从某个API获取实时数据。编写Spout时,需要实现`...
**Storm** 是一款强大的实时数据流处理系统,其设计目的是为了处理持续不断涌入的数据流,并将其处理后输出至数据库或其它系统。相较于 **Hadoop** 主要用于批处理,**Storm** 专注于实时处理。 ##### Storm的核心...
为了确保结果的持久性,可以定期将计数结果写入数据库或其他持久化存储。 5. **运行和监控** 一旦Topology定义完成,就可以提交到Storm集群运行。Storm提供了命令行工具和API来管理Topologies的生命周期。同时,...
1. `Optional_Storm.exe.config`:这是Storm主执行文件的配置文件,用于定义程序运行时的环境设置,如数据库连接字符串、日志配置、性能调整等。 2. `Storm.UI.dll`:这是用户界面相关的组件,包含了 Storm 软件的...
**Storm编程实例** Storm是一个开源的分布式实时计算系统,它被设计用来处理大规模的数据流,具有高吞吐量和容错性。...通过实际操作,你可以深入理解Storm如何处理实时数据流,以及如何用Maven构建和管理这样的项目。
3. **实时处理**:Storm拓扑接收到数据后,根据业务逻辑进行实时处理,例如清洗、聚合、过滤等操作。 4. **数据持久化**:处理后的结果通过HBase的Java API或者HBase的Storm连接器写入到HBase表中,以便后续的查询和...
在大数据处理领域,Storm、Kafka以及JDBC的整合是一个常见的需求,用于实现实时数据流处理、消息队列和数据库交互。以下是对这个整合实例的详细解释。 首先,让我们来了解一下这三个组件: 1. **Apache Storm**:...
标题 "storm实现井字棋游戏源码" 涉及的核心技术是Apache Storm,这是一个分布式实时计算系统,常用于处理大规模的数据流。井字棋游戏(Tic-Tac-Toe)是一个简单的两人对战游戏,这里用Storm实现,可能是为了展示...
它可以是从数据库、消息队列或任何其他数据源读取数据的组件。 2. **Bolt**:Bolt是处理数据的组件,执行如过滤、聚合、转换等操作。Bolts可以连接在一起形成复杂的处理逻辑。 3. **Topology**:Topology是Spouts和...