`

Storm 与 Redis 整合

 
阅读更多

Storm 与 Redis 整合

原创编写: 王宇 
2016-11-07


 

 


参考资料:http://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-redis.html

Strom为Redis提供几个标准Bolt

  • RedisLookupBolt: 查询
  • RedisStoreBolt: 存储
  • AbstractRedisBolt: 存储

RedisLookupBolt 例子

从Redis中,查询单词的计算数量

  1. classWordCountRedisLookupMapperimplementsRedisLookupMapper{
  2. privateRedisDataTypeDescription description;
  3. privatefinalString hashKey ="wordCount";
  4. publicWordCountRedisLookupMapper(){
  5. description =newRedisDataTypeDescription(
  6. RedisDataTypeDescription.RedisDataType.HASH, hashKey);
  7. }
  8. @Override
  9. publicList<Values> toTuple(ITuple input,Object value){
  10. String member = getKeyFromTuple(input);
  11. List<Values> values =Lists.newArrayList();
  12. values.add(newValues(member, value));
  13. return values;
  14. }
  15. @Override
  16. publicvoid declareOutputFields(OutputFieldsDeclarer declarer){
  17. declarer.declare(newFields("wordName","count"));
  18. }
  19. @Override
  20. publicRedisDataTypeDescription getDataTypeDescription(){
  21. return description;
  22. }
  23. @Override
  24. publicString getKeyFromTuple(ITuple tuple){
  25. return tuple.getStringByField("word");
  26. }
  27. @Override
  28. publicString getValueFromTuple(ITuple tuple){
  29. returnnull;
  30. }
  31. }
  32. JedisPoolConfig poolConfig =newJedisPoolConfig.Builder()
  33. .setHost(host).setPort(port).build();
  34. RedisLookupMapper lookupMapper =newWordCountRedisLookupMapper();
  35. RedisLookupBolt lookupBolt =newRedisLookupBolt(poolConfig, lookupMapper);
  • 对例子理解


  •  
    如上图,JedisPoolConfig的作用是,提供Redis相关的配置,给RedisLookupBolt 
    RedisLookupMapper的作用是,绘制相应的数据结构给RedisLookupBolt. 
    例如
    • getKeyFromTuple()方法, 告诉RedisLookupBolt,从输入的Tuple中,取什么样子的Key,让它去Redis中查询。
    • declareOutputFields()的作用是,定义RedisLookupBolt的输出格式。

RedisStoreBoltBolt 例子

  1. classWordCountStoreMapperimplementsRedisStoreMapper{
  2. privateRedisDataTypeDescription description;
  3. privatefinalString hashKey ="wordCount";
  4. publicWordCountStoreMapper(){
  5. description =newRedisDataTypeDescription(
  6. RedisDataTypeDescription.RedisDataType.HASH, hashKey);
  7. }
  8. @Override
  9. publicRedisDataTypeDescription getDataTypeDescription(){
  10. return description;
  11. }
  12. @Override
  13. publicString getKeyFromTuple(ITuple tuple){
  14. return tuple.getStringByField("word");
  15. }
  16. @Override
  17. publicString getValueFromTuple(ITuple tuple){
  18. return tuple.getStringByField("count");
  19. }
  20. }
  21. JedisPoolConfig poolConfig =newJedisPoolConfig.Builder()
  22. .setHost(host).setPort(port).build();
  23. RedisStoreMapper storeMapper =newWordCountStoreMapper();
  24. RedisStoreBolt storeBolt =newRedisStoreBolt(poolConfig, storeMapper);

非简单Bolt

采用 AbstractRedisBolt 实现更为复杂的逻辑

  1. publicstaticclassLookupWordTotalCountBoltextendsAbstractRedisBolt{
  2. privatestaticfinalLogger LOG =LoggerFactory.getLogger(LookupWordTotalCountBolt.class);
  3. privatestaticfinalRandom RANDOM =newRandom();
  4. publicLookupWordTotalCountBolt(JedisPoolConfig config){
  5. super(config);
  6. }
  7. publicLookupWordTotalCountBolt(JedisClusterConfig config){
  8. super(config);
  9. }
  10. @Override
  11. publicvoid execute(Tuple input){
  12. JedisCommands jedisCommands =null;
  13. try{
  14. jedisCommands = getInstance();
  15. String wordName = input.getStringByField("word");
  16. String countStr = jedisCommands.get(wordName);
  17. if(countStr !=null){
  18. int count =Integer.parseInt(countStr);
  19. this.collector.emit(newValues(wordName, count));
  20. // print lookup result with low probability
  21. if(RANDOM.nextInt(1000)>995){
  22. LOG.info("Lookup result - word : "+ wordName +" / count : "+ count);
  23. }
  24. }else{
  25. // skip
  26. LOG.warn("Word not found in Redis - word : "+ wordName);
  27. }
  28. }finally{
  29. if(jedisCommands !=null){
  30. returnInstance(jedisCommands);
  31. }
  32. this.collector.ack(input);
  33. }
  34. }
  35. @Override
  36. publicvoid declareOutputFields(OutputFieldsDeclarer declarer){
  37. // wordName, count
  38. declarer.declare(newFields("wordName","count"));
  39. }
  40. }

RedisLookupBolt 源代码,用于理解WordCountRedisLookupMapper

注意下列代码中的:

  • get
  1. package org.apache.storm.redis.bolt;
  2. import org.apache.storm.topology.OutputFieldsDeclarer;
  3. import org.apache.storm.tuple.Tuple;
  4. import org.apache.storm.tuple.Values;
  5. import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
  6. import org.apache.storm.redis.common.mapper.RedisLookupMapper;
  7. import org.apache.storm.redis.common.config.JedisClusterConfig;
  8. import org.apache.storm.redis.common.config.JedisPoolConfig;
  9. import redis.clients.jedis.JedisCommands;
  10. import java.util.List;
  11. /**
  12. * Basic bolt for querying from Redis and emits response as tuple.
  13. * <p/>
  14. * Various data types are supported: STRING, LIST, HASH, SET, SORTED_SET, HYPER_LOG_LOG, GEO
  15. */
  16. publicclassRedisLookupBoltextendsAbstractRedisBolt{
  17. privatefinalRedisLookupMapper lookupMapper;
  18. privatefinalRedisDataTypeDescription.RedisDataType dataType;
  19. privatefinalString additionalKey;
  20. /**
  21. * Constructor for single Redis environment (JedisPool)
  22. * @param config configuration for initializing JedisPool
  23. * @param lookupMapper mapper containing which datatype, query key, output key that Bolt uses
  24. */
  25. publicRedisLookupBolt(JedisPoolConfig config,RedisLookupMapper lookupMapper){
  26. super(config);
  27. this.lookupMapper = lookupMapper;
  28. RedisDataTypeDescription dataTypeDescription = lookupMapper.getDataTypeDescription();
  29. this.dataType = dataTypeDescription.getDataType();
  30. this.additionalKey = dataTypeDescription.getAdditionalKey();
  31. }
  32. /**
  33. * Constructor for Redis Cluster environment (JedisCluster)
  34. * @param config configuration for initializing JedisCluster
  35. * @param lookupMapper mapper containing which datatype, query key, output key that Bolt uses
  36. */
  37. publicRedisLookupBolt(JedisClusterConfig config,RedisLookupMapper lookupMapper){
  38. super(config);
  39. this.lookupMapper = lookupMapper;
  40. RedisDataTypeDescription dataTypeDescription = lookupMapper.getDataTypeDescription();
  41. this.dataType = dataTypeDescription.getDataType();
  42. this.additionalKey = dataTypeDescription.getAdditionalKey();
  43. }
  44. /**
  45. * {@inheritDoc}
  46. */
  47. @Override
  48. publicvoid execute(Tuple input){
  49. String key = lookupMapper.getKeyFromTuple(input);
  50. Object lookupValue;
  51. JedisCommands jedisCommand =null;
  52. try{
  53. jedisCommand = getInstance();
  54. switch(dataType){
  55. case STRING:
  56. lookupValue = jedisCommand.get(key);
  57. break;
  58. case LIST:
  59. lookupValue = jedisCommand.lpop(key);
  60. break;
  61. case HASH:
  62. lookupValue = jedisCommand.hget(additionalKey, key);
  63. break;
  64. case SET:
  65. lookupValue = jedisCommand.scard(key);
  66. break;
  67. case SORTED_SET:
  68. lookupValue = jedisCommand.zscore(additionalKey, key);
  69. break;
  70. case HYPER_LOG_LOG:
  71. lookupValue = jedisCommand.pfcount(key);
  72. break;
  73. case GEO:
  74. lookupValue = jedisCommand.geopos(additionalKey, key);
  75. break;
  76. default:
  77. thrownewIllegalArgumentException("Cannot process such data type: "+ dataType);
  78. }
  79. List<Values> values = lookupMapper.toTuple(input, lookupValue);
  80. for(Values value : values){
  81. collector.emit(input, value);
  82. }
  83. collector.ack(input);
  84. }catch(Exception e){
  85. this.collector.reportError(e);
  86. this.collector.fail(input);
  87. }finally{
  88. returnInstance(jedisCommand);
  89. }
  90. }
  91. /**
  92. * {@inheritDoc}
  93. */
  94. @Override
  95. publicvoid declareOutputFields(OutputFieldsDeclarer declarer){
  96. lookupMapper.declareOutputFields(declarer);
  97. }
  98. }

Kafka + Storm + Redis 完整例子

  • 场景 
    参考 《Storm与Kafka整合记录》中,统计单词数量的例子。在这个例子基础之上,将统计到单词结果保存到Redis中。
  • 修改Topology,增加RedisStoreBolt
  1. publicclassKafkaStormSample{
  2. publicclassKafkaStormSample{
  3. publicstaticvoid main(String[] args)throwsException{
  4. Config config =newConfig();
  5. config.setDebug(true);
  6. config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,1);
  7. String zkConnString ="localhost:2181";
  8. String topic ="my-first-topic";
  9. BrokerHosts hosts =newZkHosts(zkConnString);
  10. SpoutConfig kafkaSpoutConfig =newSpoutConfig(hosts, topic,"/"+ topic, UUID.randomUUID().toString());
  11. kafkaSpoutConfig.bufferSizeBytes =1024*1024*4;
  12. kafkaSpoutConfig.fetchSizeBytes =1024*1024*4;
  13. // kafkaSpoutConfig.forceFromStart = true;
  14. kafkaSpoutConfig.scheme =newSchemeAsMultiScheme(newStringScheme());
  15. // Creating RedisStoreBolt
  16. String host ="localhost";
  17. int port =6379;
  18. JedisPoolConfig poolConfig =newJedisPoolConfig.Builder().setHost(host).setPort(port).build();
  19. RedisStoreMapper storeMapper =newWordCountStoreMapper();
  20. RedisStoreBolt storeBolt =newRedisStoreBolt(poolConfig, storeMapper);
  21. // Assemble with topology
  22. TopologyBuilder builder =newTopologyBuilder();
  23. builder.setSpout("kafka-spout",newKafkaSpout(kafkaSpoutConfig));
  24. builder.setBolt("word-spitter",newSplitBolt()).shuffleGrouping("kafka-spout");
  25. builder.setBolt("word-counter",newCountBolt()).shuffleGrouping("word-spitter");
  26. builder.setBolt("redis-store-bolt", storeBolt).shuffleGrouping("word-counter");
  27. LocalCluster cluster =newLocalCluster();
  28. cluster.submitTopology("KafkaStormSample", config, builder.createTopology());
  29. Thread.sleep(10000);
  30. cluster.shutdown();
  31. }
  32. }
  • 修改 CountBolt, emit to RedisStoreBolt
  1. publicclassCountBoltimplementsIRichBolt{
  2. Map<String,Integer> counters;
  3. privateOutputCollector collector;
  4. @Override
  5. publicvoid prepare(Map stormConf,TopologyContext context,
  6. OutputCollector collector){
  7. this.counters =newHashMap<String,Integer>();
  8. this.collector = collector;
  9. }
  10. @Override
  11. publicvoid execute(Tuple input){
  12. String str = input.getString(0);
  13. if(!counters.containsKey(str)){
  14. counters.put(str,1);
  15. }else{
  16. Integer c = counters.get(str)+1;
  17. counters.put(str, c);
  18. }
  19. // emit to redis-store-bolt
  20. Integer result = counters.get(str);
  21. this.collector.emit(newValues(str,String.valueOf(result)));
  22. collector.ack(input);
  23. }
  24. @Override
  25. publicvoid cleanup(){
  26. for(Map.Entry<String,Integer> entry:counters.entrySet()){
  27. System.out.println(entry.getKey()+" : "+ entry.getValue());
  28. }
  29. }
  30. @Override
  31. publicvoid declareOutputFields(OutputFieldsDeclarer declarer){
  32. declarer.declare(newFields("word","count"));
  33. }
  34. @Override
  35. publicMap<String,Object> getComponentConfiguration(){
  36. returnnull;
  37. }
  38. }

依赖包

jedis-2.9.0.jar 
commons-pool2-2.4.2.jar

将以上两个包,增加到CLASSPATH路径中

编译执行

  • 启动服务

    • 启动 zookeeper
      1. $ cd /opt/zookeeper
      2. $ ./bin/zkServer.sh start
    • 启动 redis
      1. $ redis-server
    • 启动 kafka
      1. $ cd /opt/kafka
      2. $ ./bin/kafka-server-start.sh config/server.properties
    • 启动 storm
      1. $ cd /opt/storm
      2. $./bin/storm nimbus
      3. $./bin/storm supervisor
  • 输入数据

  1. $cd /opt/kafka
  2. $./bin/kafka-console-producer.sh --broker-list localhost:9092--topic my-first-topic
  3. Hello
  4. My first message
  5. My second message
  • 执行例子
  1. $ java KafkaStormSample
  • 在Redis-cli 中查询结果

 

  1. $ redis-cli
  2. 127.0.0.1:6379> hvals wordCount
  • 大小: 46 KB
0
0
分享到:
评论

相关推荐

    storm整合redis的例子源码

    整合Storm与Redis,主要目标是利用Storm处理实时数据,并将结果有效地存储到Redis中,以便后续快速访问或进一步分析。以下是一些关键步骤: 1. **创建Spout**:Spout是Storm数据流的源头,通常从外部数据源(如...

    zk-kafka-redis-storm安装

    与Hadoop处理批量数据不同,Storm专注于实时数据流的处理,能够快速地处理和响应新产生的数据。 在"zk-kafka-redis-storm安装"过程中,首先需要安装Zookeeper,确保集群的协调和数据一致性。接着是Kafka的部署,...

    Flume+kafka+Storm整合

    ### Flume+kafka+Storm整合知识点详解 #### 一、Flume、Kafka与Storm概述 在大数据领域,数据采集、传输与实时处理是至关重要的环节。本篇内容重点介绍了如何利用Flume、Kafka与Storm这三个开源工具实现一套完整的...

    storm+kafka+redis+mysql

    标题中的"storm+kafka+redis+mysql"是一个典型的大数据实时处理架构,广泛应用于日志收集、数据分析等领域。下面将详细介绍这些技术组件及其在实际应用中的作用。 **Apache Storm** Apache Storm是一个开源的分布式...

    storm-kafka整合代码

    当我们谈论"storm-kafka整合代码"时,这意味着我们要将这两者结合,使得 Storm 可以从 Kafka 中消费数据并进行实时处理。下面将详细介绍 Storm 和 Kafka 的核心概念以及它们整合的关键步骤。 **Apache Storm** ...

    妳那伊抹微笑_Flume+Kafka+Storm分布式环境整合Eclipse工程_20140714.rar

    该文档与教程http://blog.csdn.net/u012185296/article/details/37762793配套使用的,主要讲的是Flume+Kafka+Storm的环境整合,并且全部都是最新版本 、、、你也可以到博客地址http://blog.csdn.net/u012185296中去...

    Java高级架构线路图学习路线内含Dubbo、Redis、设计模式、Netty、zookeeper、Spring cloud、分布式、 高并发等架构技术

    Java高级架构线路图学习路线内含Dubbo、Redis、设计模式、Netty、zookeeper、Spring cloud、分布式、 高并发等架构技术(nginx+redis+zookeeper+activemq+storm+dubbo+netty+jvm+并发编程锁+项目实战)点击链接加入...

    大数据实时处理系统技术方案.docx

    Storm与Redis的整合涉及将处理后的数据结构化,并高效地写入Redis缓存。 **实时处理系统架构**: 整个系统架构由三个主要部分组成:Flume集群负责数据采集,Kafka集群作为数据中转站,Storm集群执行实时处理,而...

    Spring boot集成Kafka+Storm的示例代码

    通过这种整合,我们可以使用Spring Boot来管理Kafka、Storm、Redis等所需要的bean,並通过其他服务日志收集至Kafka,Kafka实时发送日志至Storm,在Storm bolt时进行相应的处理操作。 使用工具及环境配置 在本篇...

    分布式集群技术.pdf

    NoSQL 数据库 Redis 是分布式集群技术的基础,Redis 的集群部署、Redis 的 Java 编程接口、Storm 整合 Redis 等内容将为读者提供了 NoSQL 数据库 Redis 的基础知识。 Storm Trident 是分布式集群技术的基础,...

    大数据课程体系.docx

    Storm作为实时计算框架,课程深入讨论了其常用组件、编程API、分组策略、消息可靠性、事务处理、Storm与Hadoop的整合(Storm on Yarn),以及与Kafka的协同工作。 Scala作为Spark的主要编程语言,课程涵盖了其解释...

    妳那伊抹微笑_云计算之Hadoop-2.2.0+Hbaase-0.96.2 +Hive-0.13.1完全分布式环境整合安装文档V1.0.0.docx

    这个文档是《云计算之Flume+Kafka+Storm+Redis/Hbase+Hadoop+Hive+Mahout+Spark 技术文档分享V1.0.0》系列的一部分,涵盖了多种云计算技术。 首先,Hadoop-2.2.0是一个开源的分布式计算框架,其核心由HDFS(Hadoop ...

    大数据课程体系

    - **Storm和Hadoop2.x的整合**:整合Storm与Hadoop2.x,实现更强大的实时和批处理能力。 以上就是“大数据课程体系”中所涉及的主要知识点和技术细节,涵盖了从基础的Java编程到复杂的分布式计算框架等多个方面,为...

    油田开发生产实时数据处理及联动技术研究与应用.pdf

    为处理这些大数据量,油田采用了分布式云计算技术,特别是Kafka、Storm和Redis技术栈来构建实时计算架构。这种架构能够提供良好的伸缩性,对采油、注水、集输、设备、电力等数据进行实时计算,支持业务运行。 3. ...

    bigwork2.zip

    《bigwork2.zip》是一个包含多个技术组件的压缩文件,主要涉及Kafka、Redis、Java、可视化和Storm等关键技术和工具。以下将详细介绍这些技术及其相互关系。 **1. Kafka** Kafka是一种分布式流处理平台,由Apache...

    大数据课程体系(20210925082704).pdf

    另外,还会涉及Storm与Kafka、Storm Trident和DRPC的整合。 这些课程内容为学员提供了全面的大数据处理知识,从基础到高级,涵盖了大数据生态中的关键技术,为从事大数据相关工作奠定了坚实的基础。

    大数据工程师简历3份.docx

    3. **Storm与RPC**:掌握Storm流处理技术,用于实时数据处理。对分布式RPC(Remote Procedure Call)有一定认识,能够在处理定时批量任务时保证系统间通信的高效和稳定。 4. **Spark与Scala**:对Spark的底层运行...

Global site tag (gtag.js) - Google Analytics