Storm 与 Redis 整合
原创编写: 王宇
2016-11-07
Storm 与 Redis 整合
参考资料:http://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-redis.html
Strom为Redis提供几个标准Bolt
RedisLookupBolt 例子
RedisStoreBoltBolt 例子
非简单Bolt
RedisLookupBolt 源代码,用于理解WordCountRedisLookupMapper
Kafka + Storm + Redis 完整例子
依赖包
编译执行
参考资料:http://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-redis.html
Strom为Redis提供几个标准Bolt
RedisLookupBolt 例子
RedisStoreBoltBolt 例子
非简单Bolt
RedisLookupBolt 源代码,用于理解WordCountRedisLookupMapper
Kafka + Storm + Redis 完整例子
依赖包
编译执行
参考资料:http://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-redis.html
Strom为Redis提供几个标准Bolt
- RedisLookupBolt: 查询
- RedisStoreBolt: 存储
- AbstractRedisBolt: 存储
RedisLookupBolt 例子
从Redis中,查询单词的计算数量
classWordCountRedisLookupMapperimplementsRedisLookupMapper{
privateRedisDataTypeDescription description;
privatefinalString hashKey ="wordCount";
publicWordCountRedisLookupMapper(){
description =newRedisDataTypeDescription(
RedisDataTypeDescription.RedisDataType.HASH, hashKey);
}
@Override
publicList<Values> toTuple(ITuple input,Object value){
String member = getKeyFromTuple(input);
List<Values> values =Lists.newArrayList();
values.add(newValues(member, value));
return values;
}
@Override
publicvoid declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(newFields("wordName","count"));
}
@Override
publicRedisDataTypeDescription getDataTypeDescription(){
return description;
}
@Override
publicString getKeyFromTuple(ITuple tuple){
return tuple.getStringByField("word");
}
@Override
publicString getValueFromTuple(ITuple tuple){
returnnull;
}
}
JedisPoolConfig poolConfig =newJedisPoolConfig.Builder()
.setHost(host).setPort(port).build();
RedisLookupMapper lookupMapper =newWordCountRedisLookupMapper();
RedisLookupBolt lookupBolt =newRedisLookupBolt(poolConfig, lookupMapper);
- 对例子理解
-
如上图,JedisPoolConfig的作用是,提供Redis相关的配置,给RedisLookupBolt
RedisLookupMapper的作用是,绘制相应的数据结构给RedisLookupBolt.
例如- getKeyFromTuple()方法, 告诉RedisLookupBolt,从输入的Tuple中,取什么样子的Key,让它去Redis中查询。
- declareOutputFields()的作用是,定义RedisLookupBolt的输出格式。
RedisStoreBoltBolt 例子
classWordCountStoreMapperimplementsRedisStoreMapper{
privateRedisDataTypeDescription description;
privatefinalString hashKey ="wordCount";
publicWordCountStoreMapper(){
description =newRedisDataTypeDescription(
RedisDataTypeDescription.RedisDataType.HASH, hashKey);
}
@Override
publicRedisDataTypeDescription getDataTypeDescription(){
return description;
}
@Override
publicString getKeyFromTuple(ITuple tuple){
return tuple.getStringByField("word");
}
@Override
publicString getValueFromTuple(ITuple tuple){
return tuple.getStringByField("count");
}
}
JedisPoolConfig poolConfig =newJedisPoolConfig.Builder()
.setHost(host).setPort(port).build();
RedisStoreMapper storeMapper =newWordCountStoreMapper();
RedisStoreBolt storeBolt =newRedisStoreBolt(poolConfig, storeMapper);
非简单Bolt
采用 AbstractRedisBolt 实现更为复杂的逻辑
publicstaticclassLookupWordTotalCountBoltextendsAbstractRedisBolt{
privatestaticfinalLogger LOG =LoggerFactory.getLogger(LookupWordTotalCountBolt.class);
privatestaticfinalRandom RANDOM =newRandom();
publicLookupWordTotalCountBolt(JedisPoolConfig config){
super(config);
}
publicLookupWordTotalCountBolt(JedisClusterConfig config){
super(config);
}
@Override
publicvoid execute(Tuple input){
JedisCommands jedisCommands =null;
try{
jedisCommands = getInstance();
String wordName = input.getStringByField("word");
String countStr = jedisCommands.get(wordName);
if(countStr !=null){
int count =Integer.parseInt(countStr);
this.collector.emit(newValues(wordName, count));
// print lookup result with low probability
if(RANDOM.nextInt(1000)>995){
LOG.info("Lookup result - word : "+ wordName +" / count : "+ count);
}
}else{
// skip
LOG.warn("Word not found in Redis - word : "+ wordName);
}
}finally{
if(jedisCommands !=null){
returnInstance(jedisCommands);
}
this.collector.ack(input);
}
}
@Override
publicvoid declareOutputFields(OutputFieldsDeclarer declarer){
// wordName, count
declarer.declare(newFields("wordName","count"));
}
}
RedisLookupBolt 源代码,用于理解WordCountRedisLookupMapper
注意下列代码中的:
- get
package org.apache.storm.redis.bolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisLookupMapper;
import org.apache.storm.redis.common.config.JedisClusterConfig;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import redis.clients.jedis.JedisCommands;
import java.util.List;
/**
* Basic bolt for querying from Redis and emits response as tuple.
* <p/>
* Various data types are supported: STRING, LIST, HASH, SET, SORTED_SET, HYPER_LOG_LOG, GEO
*/
publicclassRedisLookupBoltextendsAbstractRedisBolt{
privatefinalRedisLookupMapper lookupMapper;
privatefinalRedisDataTypeDescription.RedisDataType dataType;
privatefinalString additionalKey;
/**
* Constructor for single Redis environment (JedisPool)
* @param config configuration for initializing JedisPool
* @param lookupMapper mapper containing which datatype, query key, output key that Bolt uses
*/
publicRedisLookupBolt(JedisPoolConfig config,RedisLookupMapper lookupMapper){
super(config);
this.lookupMapper = lookupMapper;
RedisDataTypeDescription dataTypeDescription = lookupMapper.getDataTypeDescription();
this.dataType = dataTypeDescription.getDataType();
this.additionalKey = dataTypeDescription.getAdditionalKey();
}
/**
* Constructor for Redis Cluster environment (JedisCluster)
* @param config configuration for initializing JedisCluster
* @param lookupMapper mapper containing which datatype, query key, output key that Bolt uses
*/
publicRedisLookupBolt(JedisClusterConfig config,RedisLookupMapper lookupMapper){
super(config);
this.lookupMapper = lookupMapper;
RedisDataTypeDescription dataTypeDescription = lookupMapper.getDataTypeDescription();
this.dataType = dataTypeDescription.getDataType();
this.additionalKey = dataTypeDescription.getAdditionalKey();
}
/**
* {@inheritDoc}
*/
@Override
publicvoid execute(Tuple input){
String key = lookupMapper.getKeyFromTuple(input);
Object lookupValue;
JedisCommands jedisCommand =null;
try{
jedisCommand = getInstance();
switch(dataType){
case STRING:
lookupValue = jedisCommand.get(key);
break;
case LIST:
lookupValue = jedisCommand.lpop(key);
break;
case HASH:
lookupValue = jedisCommand.hget(additionalKey, key);
break;
case SET:
lookupValue = jedisCommand.scard(key);
break;
case SORTED_SET:
lookupValue = jedisCommand.zscore(additionalKey, key);
break;
case HYPER_LOG_LOG:
lookupValue = jedisCommand.pfcount(key);
break;
case GEO:
lookupValue = jedisCommand.geopos(additionalKey, key);
break;
default:
thrownewIllegalArgumentException("Cannot process such data type: "+ dataType);
}
List<Values> values = lookupMapper.toTuple(input, lookupValue);
for(Values value : values){
collector.emit(input, value);
}
collector.ack(input);
}catch(Exception e){
this.collector.reportError(e);
this.collector.fail(input);
}finally{
returnInstance(jedisCommand);
}
}
/**
* {@inheritDoc}
*/
@Override
publicvoid declareOutputFields(OutputFieldsDeclarer declarer){
lookupMapper.declareOutputFields(declarer);
}
}
Kafka + Storm + Redis 完整例子
- 场景
参考 《Storm与Kafka整合记录》中,统计单词数量的例子。在这个例子基础之上,将统计到单词结果保存到Redis中。 - 修改Topology,增加RedisStoreBolt
publicclassKafkaStormSample{
publicclassKafkaStormSample{
publicstaticvoid main(String[] args)throwsException{
Config config =newConfig();
config.setDebug(true);
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,1);
String zkConnString ="localhost:2181";
String topic ="my-first-topic";
BrokerHosts hosts =newZkHosts(zkConnString);
SpoutConfig kafkaSpoutConfig =newSpoutConfig(hosts, topic,"/"+ topic, UUID.randomUUID().toString());
kafkaSpoutConfig.bufferSizeBytes =1024*1024*4;
kafkaSpoutConfig.fetchSizeBytes =1024*1024*4;
// kafkaSpoutConfig.forceFromStart = true;
kafkaSpoutConfig.scheme =newSchemeAsMultiScheme(newStringScheme());
// Creating RedisStoreBolt
String host ="localhost";
int port =6379;
JedisPoolConfig poolConfig =newJedisPoolConfig.Builder().setHost(host).setPort(port).build();
RedisStoreMapper storeMapper =newWordCountStoreMapper();
RedisStoreBolt storeBolt =newRedisStoreBolt(poolConfig, storeMapper);
// Assemble with topology
TopologyBuilder builder =newTopologyBuilder();
builder.setSpout("kafka-spout",newKafkaSpout(kafkaSpoutConfig));
builder.setBolt("word-spitter",newSplitBolt()).shuffleGrouping("kafka-spout");
builder.setBolt("word-counter",newCountBolt()).shuffleGrouping("word-spitter");
builder.setBolt("redis-store-bolt", storeBolt).shuffleGrouping("word-counter");
LocalCluster cluster =newLocalCluster();
cluster.submitTopology("KafkaStormSample", config, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
- 修改 CountBolt, emit to RedisStoreBolt
publicclassCountBoltimplementsIRichBolt{
Map<String,Integer> counters;
privateOutputCollector collector;
@Override
publicvoid prepare(Map stormConf,TopologyContext context,
OutputCollector collector){
this.counters =newHashMap<String,Integer>();
this.collector = collector;
}
@Override
publicvoid execute(Tuple input){
String str = input.getString(0);
if(!counters.containsKey(str)){
counters.put(str,1);
}else{
Integer c = counters.get(str)+1;
counters.put(str, c);
}
// emit to redis-store-bolt
Integer result = counters.get(str);
this.collector.emit(newValues(str,String.valueOf(result)));
collector.ack(input);
}
@Override
publicvoid cleanup(){
for(Map.Entry<String,Integer> entry:counters.entrySet()){
System.out.println(entry.getKey()+" : "+ entry.getValue());
}
}
@Override
publicvoid declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(newFields("word","count"));
}
@Override
publicMap<String,Object> getComponentConfiguration(){
returnnull;
}
}
依赖包
jedis-2.9.0.jar
commons-pool2-2.4.2.jar
将以上两个包,增加到CLASSPATH路径中
编译执行
-
启动服务
- 启动 zookeeper
$ cd /opt/zookeeper
$ ./bin/zkServer.sh start
- 启动 redis
$ redis-server
- 启动 kafka
$ cd /opt/kafka
$ ./bin/kafka-server-start.sh config/server.properties
- 启动 storm
$ cd /opt/storm
$./bin/storm nimbus
$./bin/storm supervisor
- 启动 zookeeper
-
输入数据
$cd /opt/kafka
$./bin/kafka-console-producer.sh --broker-list localhost:9092--topic my-first-topic
Hello
My first message
My second message
- 执行例子
$ java KafkaStormSample
- 在Redis-cli 中查询结果
$ redis-cli
127.0.0.1:6379> hvals wordCount
相关推荐
整合Storm与Redis,主要目标是利用Storm处理实时数据,并将结果有效地存储到Redis中,以便后续快速访问或进一步分析。以下是一些关键步骤: 1. **创建Spout**:Spout是Storm数据流的源头,通常从外部数据源(如...
与Hadoop处理批量数据不同,Storm专注于实时数据流的处理,能够快速地处理和响应新产生的数据。 在"zk-kafka-redis-storm安装"过程中,首先需要安装Zookeeper,确保集群的协调和数据一致性。接着是Kafka的部署,...
### Flume+kafka+Storm整合知识点详解 #### 一、Flume、Kafka与Storm概述 在大数据领域,数据采集、传输与实时处理是至关重要的环节。本篇内容重点介绍了如何利用Flume、Kafka与Storm这三个开源工具实现一套完整的...
标题中的"storm+kafka+redis+mysql"是一个典型的大数据实时处理架构,广泛应用于日志收集、数据分析等领域。下面将详细介绍这些技术组件及其在实际应用中的作用。 **Apache Storm** Apache Storm是一个开源的分布式...
当我们谈论"storm-kafka整合代码"时,这意味着我们要将这两者结合,使得 Storm 可以从 Kafka 中消费数据并进行实时处理。下面将详细介绍 Storm 和 Kafka 的核心概念以及它们整合的关键步骤。 **Apache Storm** ...
该文档与教程http://blog.csdn.net/u012185296/article/details/37762793配套使用的,主要讲的是Flume+Kafka+Storm的环境整合,并且全部都是最新版本 、、、你也可以到博客地址http://blog.csdn.net/u012185296中去...
Java高级架构线路图学习路线内含Dubbo、Redis、设计模式、Netty、zookeeper、Spring cloud、分布式、 高并发等架构技术(nginx+redis+zookeeper+activemq+storm+dubbo+netty+jvm+并发编程锁+项目实战)点击链接加入...
Storm与Redis的整合涉及将处理后的数据结构化,并高效地写入Redis缓存。 **实时处理系统架构**: 整个系统架构由三个主要部分组成:Flume集群负责数据采集,Kafka集群作为数据中转站,Storm集群执行实时处理,而...
通过这种整合,我们可以使用Spring Boot来管理Kafka、Storm、Redis等所需要的bean,並通过其他服务日志收集至Kafka,Kafka实时发送日志至Storm,在Storm bolt时进行相应的处理操作。 使用工具及环境配置 在本篇...
NoSQL 数据库 Redis 是分布式集群技术的基础,Redis 的集群部署、Redis 的 Java 编程接口、Storm 整合 Redis 等内容将为读者提供了 NoSQL 数据库 Redis 的基础知识。 Storm Trident 是分布式集群技术的基础,...
Storm作为实时计算框架,课程深入讨论了其常用组件、编程API、分组策略、消息可靠性、事务处理、Storm与Hadoop的整合(Storm on Yarn),以及与Kafka的协同工作。 Scala作为Spark的主要编程语言,课程涵盖了其解释...
这个文档是《云计算之Flume+Kafka+Storm+Redis/Hbase+Hadoop+Hive+Mahout+Spark 技术文档分享V1.0.0》系列的一部分,涵盖了多种云计算技术。 首先,Hadoop-2.2.0是一个开源的分布式计算框架,其核心由HDFS(Hadoop ...
- **Storm和Hadoop2.x的整合**:整合Storm与Hadoop2.x,实现更强大的实时和批处理能力。 以上就是“大数据课程体系”中所涉及的主要知识点和技术细节,涵盖了从基础的Java编程到复杂的分布式计算框架等多个方面,为...
为处理这些大数据量,油田采用了分布式云计算技术,特别是Kafka、Storm和Redis技术栈来构建实时计算架构。这种架构能够提供良好的伸缩性,对采油、注水、集输、设备、电力等数据进行实时计算,支持业务运行。 3. ...
《bigwork2.zip》是一个包含多个技术组件的压缩文件,主要涉及Kafka、Redis、Java、可视化和Storm等关键技术和工具。以下将详细介绍这些技术及其相互关系。 **1. Kafka** Kafka是一种分布式流处理平台,由Apache...
另外,还会涉及Storm与Kafka、Storm Trident和DRPC的整合。 这些课程内容为学员提供了全面的大数据处理知识,从基础到高级,涵盖了大数据生态中的关键技术,为从事大数据相关工作奠定了坚实的基础。
3. **Storm与RPC**:掌握Storm流处理技术,用于实时数据处理。对分布式RPC(Remote Procedure Call)有一定认识,能够在处理定时批量任务时保证系统间通信的高效和稳定。 4. **Spark与Scala**:对Spark的底层运行...