`
brandNewUser
  • 浏览: 457106 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Storm存储结果至Redis

阅读更多
 
原有的事务支持使用MemcachedState来进行,现在需要将其迁移至Redis,并且需要记录所有key值列表,因为在redis中虽然可以使用keys *操作,但不是被推荐的方式,所以把所有结果存在Redis中的一个HASH格式字段中。
 
关于Redis与Storm集成的相关文档,可以参考:
 
 
由于Redis中也有着较多种类型的数据结构,这也为我们提供了可能,将所有的key至统一放置到set中,或其他更为合适的数据结构中。
 
搭建启动Redis
 
目前,分配过来的4台服务器,只有135剩余内存较多,分出1G用来作为Redis存储使用,搭建一台单机Redis服务,用于记录所有的查询日志。
 
 
启动该服务:
 
sudo bin/redis-server conf/redis.6388.conf
 
 
Storm集成Redis
 
添加maven依赖:
 
<dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-redis</artifactId>
            <version>${storm.version}</version>
        </dependency>
 
 
对于正常的Bolt来说,storm-redis提供了基本的bolt实现,RedisLookupBolt和RedisStoreBolt,
 


 
 
 
其中使用了策略模式,将实际要查询/保存相关的key设置以及策略放到了RedisLookup/StoreMapper中,在LookupBolt和StoreBolt中进行实际的查找、保存操作,根据RedisDataType的不同,支持Redis的各种数据类型:STRING, HASH, LIST, SET, SORTED_SET, HYPER_LOG_LOG。
 
从对应传输过来的Tuple中查找、保存相应字段的值,在RedisLookupBolt中,根据不同的key值,从key值/或者additionalKey中使用不同的方法来get得到对应的值。
 
@Override
    public void execute(Tuple input) {
        String key = lookupMapper.getKeyFromTuple(input);
        Object lookupValue;

        JedisCommands jedisCommand = null;
        try {
            jedisCommand = getInstance();

            switch (dataType) {
                case STRING:
                    lookupValue = jedisCommand.get(key);
                    break;

                case LIST:
                    lookupValue = jedisCommand.lpop(key);
                    break;

                case HASH:
                    lookupValue = jedisCommand.hget(additionalKey, key);
                    break;

                case SET:
                    lookupValue = jedisCommand.scard(key);
                    break;

                case SORTED_SET:
                    lookupValue = jedisCommand.zscore(additionalKey, key);
                    break;

                case HYPER_LOG_LOG:
                    lookupValue = jedisCommand.pfcount(key);
                    break;

                default:
                    throw new IllegalArgumentException("Cannot process such data type: " + dataType);
            }

            List<Values> values = lookupMapper.toTuple(input, lookupValue);
            for (Values value : values) {
                collector.emit(input, value);
            }

            collector.ack(input);
        } catch (Exception e) {
            this.collector.reportError(e);
            this.collector.fail(input);
        } finally {
            returnInstance(jedisCommand);
        }
 
 
 
Redis TridentState支持
 
此外,storm-redis中还支持trident state:
 
RedisState and RedisMapState, which provide Jedis interface just for single redis.

RedisClusterState and RedisClusterMapState, which provide JedisCluster interface, just for redis cluster.
 
由于我们使用的是single redis模式(非集群),在下面的UML图中会有所体现:
 
 


 
 
 
使用RedisDataTypeDescription来定义保存到Redis的数据类型和额外的key,其中支持两种数据类型:STRING和HASH。如果使用HASH类型,则需要定义额外的key,因为hash属于两层的,我们定义的additionalKey为最外层的key类型。
 
例如我们需要保存结果至Redis的Hash数据结构中,则需要定义RedisDataTypeDescription.RedisDataType.HASH,定义hash的key:"controller:5min”,根据key进行group by操作,当前使用非事务型(对数据正确性敏感度不高)。
            Options<Object> fiveMinitesOptions = new Options<>();
            fiveMinitesOptions.dataTypeDescription = new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH,
                    "controller:5min");
            logStream.each(new Fields("logObject"), new Log5MinGroupFunction(), new Fields("key"))
                    .groupBy(new Fields("key"))
                    .persistentAggregate(RedisMapState.nonTransactional(poolConfig, fiveMinitesOptions), new Fields("logObject"),
                            new LogCombinerAggregator(), new Fields("statistic"));
 
 
最后在Redis中保存的值为:
 
controller:5min
          Log5MinGroupFunction生成的key,LogCombinerAggregator合并完成后的value;
 
 
Log5MinGroupFunction生成的key会经过KeyFactory.build(List<Object> key)方法转换,可以考虑自定义生成的key;最终的value会通过Serializer的序列化以及反序列化方法转换成byte[]存放至Redis中,默认是通过JSON的格式。
 
在AbstractRedisMapState中,对于传过来的keys进行统一KeyFactory.get操作,而实际获取值和持久化值是通过 retrieveValuesFromRedis以及updateStatesToRedis两个方法来实现的
@Override public List<T> multiGet(List<List<Object>> keys) {
        if (keys.size() == 0) {
            return Collections.emptyList();
        }

        List<String> stringKeys = buildKeys(keys);
        List<String> values = retrieveValuesFromRedis(stringKeys);

        return deserializeValues(keys, values);
    }

private List<String> buildKeys(List<List<Object>> keys) {
        List<String> stringKeys = new ArrayList<String>();
        for (List<Object> key : keys) {
            stringKeys.add(getKeyFactory().build(key));
        }
        return stringKeys;
    }

@Override
    public void multiPut(List<List<Object>> keys, List<T> vals) {
        if (keys.size() == 0) {
            return;
        }

        Map<String, String> keyValues = new HashMap<String, String>();
        for (int i = 0; i < keys.size(); i++) {
            String val = new String(getSerializer().serialize(vals.get(i)));
            String redisKey = getKeyFactory().build(keys.get(i));
            keyValues.put(redisKey, val);
        }

        updateStatesToRedis(keyValues);
    }
 
 
在RedisMapState中,从Redis中获取值的方法:
 
@Override
    protected List<String> retrieveValuesFromRedis(List<String> keys) {
        String[] stringKeys = keys.toArray(new String[keys.size()]);

        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();

            RedisDataTypeDescription description = this.options.dataTypeDescription;
            switch (description.getDataType()) {
            case STRING:
                return jedis.mget(stringKeys);

            case HASH:
                return jedis.hmget(description.getAdditionalKey(), stringKeys);
 
 
可以看出,支持两种类型STRING以及HASH,可以通过批量获取的API获取多个keys值,update的过程也比较类似,如果是STRING类型,通过pipeline的方式(分布式不支持)可以极大提高查找效率;如果为hash类型,直接通过hmget即可。
protected void updateStatesToRedis(Map<String, String> keyValues) {
        Jedis jedis = null;

        try {
            jedis = jedisPool.getResource();

            RedisDataTypeDescription description = this.options.dataTypeDescription;
            switch (description.getDataType()) {
            case STRING:
                String[] keyValue = buildKeyValuesList(keyValues);
                jedis.mset(keyValue);
                if(this.options.expireIntervalSec > 0){
                    Pipeline pipe = jedis.pipelined();
                    for(int i = 0; i < keyValue.length; i += 2){
                        pipe.expire(keyValue[i], this.options.expireIntervalSec);
                    }
                    pipe.sync();
                }
                break;

            case HASH:
                jedis.hmset(description.getAdditionalKey(), keyValues);
                if (this.options.expireIntervalSec > 0) {
                    jedis.expire(description.getAdditionalKey(), this.options.expireIntervalSec);
                }
                break;
 
 
 
  • 大小: 39.3 KB
  • 大小: 76.6 KB
分享到:
评论

相关推荐

    storm+kafka+redis+mysql

    在本实例中,Storm可能被用作实时数据处理引擎,接收来自Kafka的消息,进行实时分析或转换,并将结果存储到Redis或MySQL。 **Apache Kafka** Apache Kafka是一个高吞吐量的分布式消息队列,通常用于构建实时数据...

    zk-kafka-redis-storm安装

    在IT行业中,流处理组件是大数据处理的关键技术之一,它涉及到多个开源项目,如Zookeeper(zk)、Kafka、Redis和Storm。这些组件各有其独特的功能,并常常协同工作以实现高效的数据管理和处理。 Zookeeper(动物园...

    storm集成Redis操作示例代码.zip

    本示例代码旨在展示如何将Storm与Redis进行集成,以实现高效的数据处理和存储。 在Storm中,通常会使用Spout作为数据源,Bolt进行数据处理,最后通过某种方式将结果持久化或传递到下一个处理阶段。在本示例中,...

    storm整合redis的例子源码

    整合Storm与Redis,主要目标是利用Storm处理实时数据,并将结果有效地存储到Redis中,以便后续快速访问或进一步分析。以下是一些关键步骤: 1. **创建Spout**:Spout是Storm数据流的源头,通常从外部数据源(如...

    storm和Redis.zip

    综合这两个组件,用户可能正在搭建一个实时数据处理系统,其中Storm负责数据的实时处理,而Redis则作为中间件提供快速的数据存储和检索。这样的组合在大数据分析、实时监控和快速响应系统中非常常见。

    storm-kafka整合代码

    在分布式计算领域,Apache Storm 和 Apache Kafka 是两个非常重要的组件。Storm 用于实时数据处理,而 Kafka 则是一个高吞吐量的分布式消息系统。当我们谈论"storm-kafka整合代码"时,这意味着我们要将这两者结合,...

    storm-redis:Storm Bolt 状态管理

    卡夫卡风暴Redis 该项目实施了一种在实现螺栓状态的方法。 ###项目使用以下开源项目:### ###解释### ... ####Bolts#### Redis用作内存数据库来存储螺栓的中间状态。 该项目为具有容错状态的螺

    storm入门.pdf

    处理后的数据可以继续传输给其他bolt进一步处理,或者将最终结果输出到存储系统中。 Storm设计上的特性包括: 1. 分布式:它可以在多台机器上分布式执行。 2. 可靠性:能够保证每个消息至少被处理一次。 3. 容错性...

    storm流式计算(实时系统)

    例如,它可以存储Storm处理后的结果数据,提供快速查询服务,或者在Storm拓扑中充当消息队列,帮助协调不同组件之间的数据传输。 **日志系统** 日志系统在实时系统中扮演着重要角色,它记录了系统的运行状态和事件...

    Redis集群演化的心路历程.pdf

    另外,为了支持实时计算和准实时计算,Redis集群还需要与Hadoop/Hive、Storm/Spark等数据处理平台进行有效的集成。 通过持续的迭代和优化,Redis集群在保持其高性能特性的同时,也在安全性方面做出了很多努力。安全...

    order_storm.zip

    Apache Storm是一个开源的分布式实时计算系统,它能够处理无界的数据流,确保每个事件都能被正确处理,这在订单分析这种实时性强的场景中至关重要。Storm通过将数据流分解为多个小的处理单元(称为 bolts),并将其...

    storm实时单词计数

    Jedis是Java的Redis客户端,Redis是一个内存中的数据结构存储系统,可以作为数据库、缓存和消息代理。在Storm中,Jedis常用来实现Bolt之间的共享状态,例如在我们的例子中存储单词计数。 4. **Aggregator Bolt (可...

    Redis集群HA架构+双写一致性解决方案、Nginx+Storm负载均衡策略、Hyst-shop-detail.zip

    而Redis Cluster则是将数据分布到多个节点,每个节点都存储一部分数据,通过槽映射实现数据的定位和分发,同时具备自动故障转移的能力。 双写一致性解决方案在分布式环境中是为了确保数据的一致性,避免在主从复制...

    基于Storm构建实时热力分布项目实战.txt

    - **状态管理**:为了保证数据处理的正确性和一致性,需要合理设计状态存储策略,如使用Redis或Zookeeper等工具进行状态管理。 - **性能调优**:通过对Storm集群参数的调整以及算法优化等方式提升整体性能表现。 ##...

    Building Python Real-Time Applications with Storm - Kartik Bhatnagar.pdf.pdf

    第五部分讨论了数据持久化的问题,涉及到使用Redis和MongoDB存储处理结果。这部分提供了如何发现排名靠前的话题的方法,这对于构建一个能够对流数据进行存储和分析的实时应用是至关重要的。 整体而言,本书提供了从...

    01_流式计算基础_第1天(Storm是什么、Storm核心组件、Storm编程模型).docx

    这些阶段可能包括数据接入(如Flume)、数据存储(如Kafka)、实时计算(如Storm)以及结果缓存和持久化(如Redis、MySQL)。这样的架构设计能够保证数据从产生到处理的全程实时性,为业务决策提供快速响应的能力。 ...

    存储能力详细方案-参考.docx

    平台Redis包含7个Master和7个Slave节点,每个节点内存512GB,预留30%空间,可用存储容量为2508.8G。前置Redis包含4个节点,同样是512GB内存,预留30%空间,可用存储容量为1433.6G。 MySQL作为关系型数据库,用于...

    大数据技术之Storm.doc

    - **Redis**:用于实时结果的缓存。 - **MySQL**:持久化存储计算结果。 **1.3 Storm简介** Storm是一个开源的分布式实时计算框架,主要用于处理无界的数据流,类似于Hadoop对数据进行批处理的方式。Storm的主要...

    基于Storm的车联网数据实时分析系统.pdf

    - **低延迟(lowlatency)**:指数据从一个处理流程的开始到结束所需时间非常短,对于实时系统而言至关重要。 - **高吞吐(highthroughput)**:指系统每单位时间内能够处理和分析的数据量很大,对于处理大规模数据...

Global site tag (gtag.js) - Google Analytics