`
betakoli
  • 浏览: 168725 次
社区版块
存档分类
最新评论

storm 数据库操作

 
阅读更多
package com.num.one.storm.learn.jdbc;

import java.sql.Types;
import java.util.List;
import java.util.Map;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.jdbc.bolt.JdbcInsertBolt;
import org.apache.storm.jdbc.bolt.JdbcLookupBolt;
import org.apache.storm.jdbc.common.Column;
import org.apache.storm.jdbc.common.ConnectionProvider;
import org.apache.storm.jdbc.common.HikariCPConnectionProvider;
import org.apache.storm.jdbc.common.JdbcClient;
import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
import org.apache.storm.jdbc.mapper.JdbcMapper;
import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper;
import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
import org.apache.storm.jdbc.trident.state.JdbcQuery;
import org.apache.storm.jdbc.trident.state.JdbcState;
import org.apache.storm.jdbc.trident.state.JdbcStateFactory;
import org.apache.storm.jdbc.trident.state.JdbcUpdater;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentState;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.tuple.Fields;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

public class StormJDBC {
	protected static final String TABLE_NAME = "user";
	protected static final String JDBC_CONF = "jdbc.conf";
	private static final List<String> setupSqls = Lists.newArrayList("drop table if exists user",
			"drop table if exists department", "drop table if exists user_department",
			"create table if not exists user (user_id integer, user_name varchar(100), dept_name varchar(100), create_date date)",
			"create table if not exists department (dept_id integer, dept_name varchar(100))",
			"create table if not exists user_department (user_id integer, dept_id integer)",
			"insert into department values (1, 'R&D')", "insert into department values (2, 'Finance')",
			"insert into department values (3, 'HR')", "insert into department values (4, 'Sales')",
			"insert into user_department values (1, 1)", "insert into user_department values (2, 2)",
			"insert into user_department values (3, 3)", "insert into user_department values (4, 4)");
	
	protected static final String SELECT_QUERY = "select dept_name from department, user_department where department.dept_id = user_department.dept_id" +
            " and user_department.user_id = ?";
	private static final String USER_SPOUT = "USER_SPOUT";
    private static final String LOOKUP_BOLT = "LOOKUP_BOLT";
    private static final String PERSISTANCE_BOLT = "PERSISTANCE_BOLT";
	
	
	public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
		Map<String, Object> map = Maps.newHashMap();
		map.put("dataSourceClassName", "com.mysql.jdbc.jdbc2.optional.MysqlDataSource");// 
		map.put("dataSource.url", "jdbc:mysql://localhost/test");// 
		map.put("dataSource.user", "root");// root
		map.put("dataSource.password", "root");// password

		Config config = new Config();
		config.put(JDBC_CONF, map);

		ConnectionProvider connectionProvider = new HikariCPConnectionProvider(map);
		connectionProvider.prepare();
		int queryTimeoutSecs = 60;
		JdbcClient jdbcClient = new JdbcClient(connectionProvider, queryTimeoutSecs);
		for (String sql : setupSqls) {
			jdbcClient.executeSql(sql);
		}
		connectionProvider.cleanup();
		connectionProvider = new HikariCPConnectionProvider(map);
		
		UserSpout userSpout = new UserSpout();
        String topoName = "stom-jdbc";
        
        Fields outputFields = new Fields("user_id", "user_name", "dept_name", "create_date");
        List<Column> queryParamColumns = Lists.newArrayList(new Column("user_id", Types.INTEGER));
        //SimpleJdbcLookupMapper 必须要指定输出字段和输入字段。其中queryParamColumns为输入字段,outputFields为输出字段
        //outputFields输出字段首先会从输入tuple中查找,比如我们UserSpout输出了三个字段,new Fields("user_id","user_name","create_date"),其中这三个字段都出现在输出字段中,所以直接获取作为输出字段
        //另外还有个字段dept_name没有在输入字段中,则会去查询返回的参数中查找,发现刚好有dept_name这个字段,于是将这个字段作为输出字段dept_name。
        JdbcLookupMapper jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns);
        JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt(connectionProvider, SELECT_QUERY, jdbcLookupMapper);
        
        //must specify column schema when providing custom query.
        List<Column> schemaColumns = Lists.newArrayList(new Column("create_date", Types.DATE),
                new Column("dept_name", Types.VARCHAR), new Column("user_id", Types.INTEGER), new Column("user_name", Types.VARCHAR));
        //JdbcMapper 用于定义数据库表中的字段和tuple中字段的对应关系
        //其中SimpleJdbcMapper是JDBCMapper的实现方式
        //如果直接创建new SimpleJdbcMapper(tableName, connectionProvider),会认为要插入的tuple已经具有了对应数据库表tableName 列的所有值。
        //如果定义了自己的插入语句,必须显示的初始化SimpleJdbcMapper
        //Insert into user (user_id, user_name) values (?,?) 
        //List<Column> columnSchema = Lists.newArrayList(new Column("user_id", java.sql.Types.INTEGER),new Column("user_name", java.sql.Types.VARCHAR));//顺序和上面对应
        //JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema); 
        JdbcMapper mapper = new SimpleJdbcMapper(schemaColumns);

        JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, mapper)
                .withInsertQuery("insert into user (create_date, dept_name, user_id, user_name) values (?,?,?,?)");

        // userSpout ==> jdbcBolt
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout(USER_SPOUT, userSpout, 1);
        builder.setBolt(LOOKUP_BOLT, departmentLookupBolt, 1).shuffleGrouping(USER_SPOUT);
        builder.setBolt(PERSISTANCE_BOLT, userPersistanceBolt, 1).shuffleGrouping(LOOKUP_BOLT);
        
		LocalCluster cluster = new LocalCluster();
		cluster.submitTopology(topoName, config, builder.createTopology());
		try {
			Thread.sleep(3000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		cluster.shutdown();
	}
}
package com.num.one.storm.learn.jdbc;

import java.util.List;
import java.util.Map;
import java.util.Random;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import com.google.common.collect.Lists;

public class UserSpout implements IRichSpout{

	/**
	 * 
	 */
	private static final long serialVersionUID = 2517357980152641508L;

    SpoutOutputCollector collector;
    public static final List<Values> rows = Lists.newArrayList(
            new Values(1,"peter",System.currentTimeMillis()),
            new Values(2,"bob",System.currentTimeMillis()),
            new Values(3,"alice",System.currentTimeMillis()));
    Random random;

	public void ack(Object arg0) {
		// TODO Auto-generated method stub
		
	}

	public void activate() {
		// TODO Auto-generated method stub
		
	}

	public void close() {
		// TODO Auto-generated method stub
		
	}

	public void deactivate() {
		// TODO Auto-generated method stub
		
	}

	public void fail(Object arg0) {
		// TODO Auto-generated method stub
		
	}

	public void nextTuple() {
		// TODO Auto-generated method stub
		collector.emit(rows.get(random.nextInt(rows.size()-1)));
	}

	public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector arg2) {
		// TODO Auto-generated method stub
		this.collector = arg2;
		random = new Random();
	}

	public void declareOutputFields(OutputFieldsDeclarer arg0) {
		// TODO Auto-generated method stub
		arg0.declare(new Fields("user_id","user_name","create_date"));
	}

	public Map<String, Object> getComponentConfiguration() {
		// TODO Auto-generated method stub
		return null;
	}

    

}

 

 

使用Trident 

package com.num.one.storm.learn.jdbc;

import java.sql.Types;
import java.util.List;
import java.util.Map;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.jdbc.common.Column;
import org.apache.storm.jdbc.common.ConnectionProvider;
import org.apache.storm.jdbc.common.HikariCPConnectionProvider;
import org.apache.storm.jdbc.common.JdbcClient;
import org.apache.storm.jdbc.mapper.JdbcMapper;
import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper;
import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
import org.apache.storm.jdbc.trident.state.JdbcQuery;
import org.apache.storm.jdbc.trident.state.JdbcState;
import org.apache.storm.jdbc.trident.state.JdbcStateFactory;
import org.apache.storm.jdbc.trident.state.JdbcUpdater;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentState;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.tuple.Fields;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

public class TridentJDBC {

	protected static final String TABLE_NAME = "user";
	protected static final String JDBC_CONF = "jdbc.conf";
	private static final List<String> setupSqls = Lists.newArrayList("drop table if exists user",
			"drop table if exists department", "drop table if exists user_department",
			"create table if not exists user (user_id integer, user_name varchar(100), dept_name varchar(100), create_date date)",
			"create table if not exists department (dept_id integer, dept_name varchar(100))",
			"create table if not exists user_department (user_id integer, dept_id integer)",
			"insert into department values (1, 'R&D')", "insert into department values (2, 'Finance')",
			"insert into department values (3, 'HR')", "insert into department values (4, 'Sales')",
			"insert into user_department values (1, 1)", "insert into user_department values (2, 2)",
			"insert into user_department values (3, 3)", "insert into user_department values (4, 4)");
	
	protected static final String SELECT_QUERY = "select dept_name from department, user_department where department.dept_id = user_department.dept_id" +
            " and user_department.user_id = ?";
	private static final String USER_SPOUT = "USER_SPOUT";
    private static final String LOOKUP_BOLT = "LOOKUP_BOLT";
    private static final String PERSISTANCE_BOLT = "PERSISTANCE_BOLT";
	public static void main(String[] args) {
		Map<String, Object> map = Maps.newHashMap();
		map.put("dataSourceClassName", "com.mysql.jdbc.jdbc2.optional.MysqlDataSource");// 
		map.put("dataSource.url", "jdbc:mysql://localhost/test");// 
		map.put("dataSource.user", "root");// root
		map.put("dataSource.password", "root");// password

		Config config = new Config();
		config.put(JDBC_CONF, map);

		ConnectionProvider connectionProvider = new HikariCPConnectionProvider(map);
		connectionProvider.prepare();
		int queryTimeoutSecs = 60;
		JdbcClient jdbcClient = new JdbcClient(connectionProvider, queryTimeoutSecs);
		for (String sql : setupSqls) {
			jdbcClient.executeSql(sql);
		}
		connectionProvider.cleanup();
		connectionProvider = new HikariCPConnectionProvider(map);
		
		UserSpout userSpout = new UserSpout();
        String topoName = "stom-jdbc";
        
        JdbcMapper jdbcMapper = new SimpleJdbcMapper(TABLE_NAME, connectionProvider);
        
        TridentTopology topology = new TridentTopology();
        JdbcState.Options options = new JdbcState.Options()
                .withConnectionProvider(connectionProvider)
                .withMapper(jdbcMapper)
                .withJdbcLookupMapper(new SimpleJdbcLookupMapper(new Fields("dept_name"), Lists.newArrayList(new Column("user_id", Types.INTEGER))))
                .withTableName(TABLE_NAME)
                .withSelectQuery(SELECT_QUERY);

        JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options);
        
        Stream stream = topology.newStream("userSpout", new UserSpout());
        TridentState state = topology.newStaticState(jdbcStateFactory);
        stream = stream.stateQuery(state, new Fields("user_id","user_name","create_date"), new JdbcQuery(), new Fields("dept_name"));
        stream.partitionPersist(jdbcStateFactory, new Fields("user_id","user_name","dept_name","create_date"),  new JdbcUpdater(), new Fields());
        
        
        LocalCluster cluster = new LocalCluster();
		cluster.submitTopology(topoName, config, topology.build());
		try {
			Thread.sleep(3000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		cluster.shutdown();
	}
}

 

<audio controls="controls" style="display: none;"></audio>

分享到:
评论

相关推荐

    storm入门.pdf

    Bolt则用来接收spout或其他bolt发射的数据,对数据进行处理,如过滤、聚合、查询数据库等操作。处理后的数据可以继续传输给其他bolt进一步处理,或者将最终结果输出到存储系统中。 Storm设计上的特性包括: 1. ...

    Go-Storm-BoltDB的简单而强大ORM框架

    在Go语言中,ORM(Object-Relational Mapping)框架是一种工具,它允许开发者用面向对象的方式来操作数据库,而无需直接编写SQL语句。...通过学习和使用这个框架,你可以提升Go语言开发中的数据库操作效率和代码质量。

    Apache Storm(apache-storm-2.3.0.tar.gz)

    Apache Storm(apache-storm...Apache Storm 与您已经使用的队列和数据库技术集成。Apache Storm 拓扑使用数据流并以任意复杂的方式处理这些流,根据需要在计算的每个阶段之间对流进行重新分区。在教程中阅读更多内容。

    storm实时数据处理

    Apache Storm是一个开源的分布式实时计算系统,它允许开发者处理无限的数据流,就像处理数据库查询一样简单。Storm在实时处理领域的应用广泛,涵盖了社交媒体分析、实时仪表盘、物联网(IoT)数据处理等多个领域。 二...

    Apache Storm(apache-storm-2.3.0-src.tar.gz 源码)

    Apache Storm(apache-storm...Apache Storm 与您已经使用的队列和数据库技术集成。Apache Storm 拓扑使用数据流并以任意复杂的方式处理这些流,根据需要在计算的每个阶段之间对流进行重新分区。在教程中阅读更多内容。

    storm0.9.0jar包

    - **Spout**:负责生成数据流,可以是从数据库、消息队列或其他数据源读取。 - **Bolt**:执行数据处理逻辑,如过滤、聚合、转换等操作。 - **Trident**:Storm的一个高级API,提供了一种更强大且状态化的数据处理...

    storm大数据相关代码

    2. **Bolt**:数据处理组件,执行实际的业务逻辑,如清洗、聚合、过滤等操作。 3. **Topology**:拓扑结构,定义了Spout和Bolt之间的数据流关系,是Storm程序的基本执行单元。 4. **Stream Groupings**:数据分发...

    storm安装包

    7. **Spout**:数据源,负责产生数据流,可以是数据库、消息队列或任何其他数据源。 安装Apache Storm 1.0.6的步骤如下: 1. **下载**:访问Apache Storm官方网站下载最新稳定版(1.0.6)的压缩包,通常为tar.gz...

    storm-hbase集成

    Storm 是一个实时处理系统,用于处理持续的数据流,而 HBase 则是基于 Hadoop 的分布式列式数据库,适用于大规模数据存储和随机读写操作。将 Storm 与 HBase 集成,可以实现高效、实时的数据流处理与存储。 一、...

    storm-wordcount例子

    2. **Split Bolt**:接收到Spout发送的Tuples后,Split Bolt负责对每个Tuple中的文本进行分词操作,将单词分离出来。这个Bolt会创建新的Tuples,每个Tuple包含一个单词,然后将这些Tuples发送到下一个阶段。 3. **...

    apache-storm-0.9.5源码

    1. Spout:Spout是Apache Storm的数据输入组件,负责从外部数据源(如Kafka、Twitter或数据库)读取数据并生成数据流。在源码中,`backtype.storm.spout`包包含了各种Spout的实现,如`KafkaSpout`用于从Kafka消费...

    storm demo

    在实际应用中,Spout可以是从消息队列、数据库或其他数据源拉取数据的组件。在"weekend-storm"项目中,Spout可能会模拟一个实时数据生成器,例如生成随机日志事件或者从某个API获取实时数据。编写Spout时,需要实现`...

    storm与spark简介

    **Storm** 是一款强大的实时数据流处理系统,其设计目的是为了处理持续不断涌入的数据流,并将其处理后输出至数据库或其它系统。相较于 **Hadoop** 主要用于批处理,**Storm** 专注于实时处理。 ##### Storm的核心...

    Storm的WordCount实例

    为了确保结果的持久性,可以定期将计数结果写入数据库或其他持久化存储。 5. **运行和监控** 一旦Topology定义完成,就可以提交到Storm集群运行。Storm提供了命令行工具和API来管理Topologies的生命周期。同时,...

    Storm_r1.1-Adarna

    1. `Optional_Storm.exe.config`:这是Storm主执行文件的配置文件,用于定义程序运行时的环境设置,如数据库连接字符串、日志配置、性能调整等。 2. `Storm.UI.dll`:这是用户界面相关的组件,包含了 Storm 软件的...

    Storm编程实例

    **Storm编程实例** Storm是一个开源的分布式实时计算系统,它被设计用来处理大规模的数据流,具有高吞吐量和容错性。...通过实际操作,你可以深入理解Storm如何处理实时数据流,以及如何用Maven构建和管理这样的项目。

    storm实时代码

    3. **实时处理**:Storm拓扑接收到数据后,根据业务逻辑进行实时处理,例如清洗、聚合、过滤等操作。 4. **数据持久化**:处理后的结果通过HBase的Java API或者HBase的Storm连接器写入到HBase表中,以便后续的查询和...

    storm+kafka+jdbc整合实例

    在大数据处理领域,Storm、Kafka以及JDBC的整合是一个常见的需求,用于实现实时数据流处理、消息队列和数据库交互。以下是对这个整合实例的详细解释。 首先,让我们来了解一下这三个组件: 1. **Apache Storm**:...

    storm实现井字棋游戏源码

    标题 "storm实现井字棋游戏源码" 涉及的核心技术是Apache Storm,这是一个分布式实时计算系统,常用于处理大规模的数据流。井字棋游戏(Tic-Tac-Toe)是一个简单的两人对战游戏,这里用Storm实现,可能是为了展示...

    apache-storm-0.9.3.tar

    它可以是从数据库、消息队列或任何其他数据源读取数据的组件。 2. **Bolt**:Bolt是处理数据的组件,执行如过滤、聚合、转换等操作。Bolts可以连接在一起形成复杂的处理逻辑。 3. **Topology**:Topology是Spouts和...

Global site tag (gtag.js) - Google Analytics