`
brandNewUser
  • 浏览: 457077 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Storm集成Kafka的Trident实现

阅读更多
 
原本打算将storm直接与flume直连,发现相应组件支持比较弱,topology任务对应的supervisor也不一定在哪个节点上,只能采用统一的分布式消息服务Kafka。
 
原本打算将结构设置为:


 
最后结构更改为:
 


  

集成Kafka

 
storm中已经写好了KafkaSpout用来接收Kafka中间件上的消息,并发射到Bolt中,只需要依赖 storm-kafka即可:
 
<dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>${storm.version}</version>
        </dependency>
 
 
调用org.apache.storm.kafka.KafkaSpout,  需要传递一个SpoutConfig用来配置kafka对应的zookeeper以及topic:
 
String zks = "192.168.1.1xx:2181,192.168.1.1xx:2181,192.168.1.1xx:2181/kafka";
        String topic = "log-storm";

        BrokerHosts brokerHosts = new ZkHosts(zks);
        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, "/"+ topic, UUID.randomUUID().toString());
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        spoutConfig.zkServers = Arrays.asList("192.168.1.1xx","192.168.1.1xx","192.168.1.1xx");
        spoutConfig.zkPort = 2181;
 
 
建立KafkaSpout(spoutConfig)即可。
 
需要注意的是,我们在Bolt中需要对收到的消息进行主动ack/fail,否则会出现消息重复发送的情况,一般情况下Bolt的写法类似下面,在prepare中缓存collector,executor中通过try/catch块决定是否确认消息(以通知Spout是否需要对消息进行重发),declareOutputFields中声明需要输出的字段。
 
private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        try {
            String msgBody = input.getString(0);
            int traceIndex = msgBody.indexOf(TRACE_CONST);
            if (traceIndex >= 0) {
                String completeLog = msgBody.substring(traceIndex + TRACE_CONST.length());
                collector.emit(new Values(completeLog));
            }
            collector.ack(input);
        } catch (Exception e) {
            collector.fail(input);
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("log"));
    }
 
 
Storm中需要有一个main函数,用于构建和启动topology,以便将spout,bolt等组件连接起来,代码类似下面:
 
TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafka-reader", new KafkaSpoutNoMetrics(spoutConfig), 3);

        builder.setBolt("log-extractor", new LogExtractorBolt(), 2).shuffleGrouping("kafka-reader");
        builder.setBolt("log-splitter", new LogSplitterBolt(), 2).shuffleGrouping("log-extractor");
        builder.setBolt("memcached-store", new MemcachedBolt()).fieldsGrouping("log-splitter", new Fields("md"));

        Config config = new Config();
        String name = "LogStormProcessor";

        config.setNumWorkers(1);
        StormSubmitter.submitTopologyWithProgressBar(name, config, builder.createTopology());
 
 
 

使用Storm Trident

 
Trident在Storm上进行了高级抽象,例如事务处理和状态管理的细节,可以让一批tuple进行离散的事务处理,还允许topology在数据上执行函数功能、过滤和聚合操作。
 


 
 
 
使用Trident,我们需要使用TridentTopology替换原有的TopologyBuilder构造Storm的拓扑图。在Trident中的spout引入了数据批次(batch)的概念,不像Strom中的spout,Trident Spout必须成批地发送tuple。
 
在Trident中,spout并没有真正发射tuple,而是把这项工作分解给了BatchCoordinator和Emitter方法,Emitter方法负责发送tuple,BatchCoordinator负责管理批次和元数据,Emitter需要依靠元数据来恰当地进行批次的数据重放。
 
首先,需要根据ITridentSpout新建一个数据流,
 
       
 Stream stream = tridentTopology.newStream("event", kafkaSpout);
 
在使用KafkaSpout作为TridentSpout时,其默认的输出字段名称为str,
 
 
Exception in thread "main" java.lang.IllegalArgumentException: Trying to select non-existent field: 'event' from stream containing fields fields: <[str]>
    at org.apache.storm.trident.Stream.projectionValidation(Stream.java:853)
    at org.apache.storm.trident.Stream.each(Stream.java:320)
    at com.zhen.log.processor.trident.Main.main(Main.java:48)
 
 
我们原来使用的KafkaSpout,虽然可以将其直接用于newStream方法,但是运行时会出现错误:
 


 
 
原因就在于进行适配的过程中,注册方法registerMetric只能够被宰ISpout::open()方法中被调用,虽然可以进行合理地改造(由于有一些包访问控制权限的相关依赖,新建一个同名package,并将其中的registerMetric方法删除),但是其事务性不能得到保证,在本人测试的过程中,Kafka的消息不能被正常消费,每次重启服务都会读到完整的所有数据。
 
但是这并不是推荐的用法,storm-kafka中存在另外一个实现:org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout,可以满足要求:
 
TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, topic);
        OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);

        TridentTopology tridentTopology = new TridentTopology();
        Stream stream = tridentTopology.newStream("event", kafkaSpout);
 
 
而使用OpaqueTridentKafkaSpout时,默认的输出名称为“bytes”,其输出格式也并不是String字符串而是byte[],需要根据编码格式将其转换为String。
 
在Spout编写完成后,就可以加入后续的运算操作,trident处理是通过创建Stream的各种operation并连接来进行处理的,比较常用的两种运算:filter和function,例如我们下面处理流的方式,每次返回Stream都可以继续用来创建新的数据流:
 
Stream logStream = stream.each(new Fields("bytes"), new LogExtractorFunction(), new Fields("log"))
                .each(new Fields("log"), new LogSplitterFunction(), new Fields("logObject"))
                .each(new Fields("logObject"), new LogTypeFilter("TRACE"));
 
在filter中,继承自BaseFilter,唯一的isKeep方法会根据tuple的属性进行相应过滤操作,需要指定对应输入的Field,filter没有额外输出的多余字段。注意filter中不能改变tuple,如果既想要过滤又想添加字段时必须使用function。
 
在function中,继承自BaseFunction,通过execute方法来对所有的数据增加额外的字段,并不会删除或者变更已有的字段。使用function需要指定多余输出的Fields,function中发射的字段数要与声明的fields字段数据保持一致。
 
和function比较类似,aggregator允许topology组合tuple,不同的是,它会替换tuple字段和值,有三种聚合器可以被使用:CombinerAggregator,ReducerAggregator和Aggregator。这里,我们使用的是CombinerAggregator。
 
CombinerAggregator用来将一个集合的tuple组合到一个单独到一个单独的字段中,定义如下:
 
public interface CombinerAggregator<T> extends Serializable {
    T init(TridentTuple tuple);
    T combine(T val1, T val2);
    T zero();
}
 
 
Storm会对每个tuple调用init方法,然后重复调用combiner方法指导一个分片的数据处理完成,传递给combine方法的两个参数是局部聚合的结果,以及调用了init返回的值,如果没有聚合结果,会直接调用zero方法返回一个自定义空值。
 
聚合一般需要首先对数据流进行groupBy操作后,在GroupedStream流上进行实际操作,一般情况下,首先根据前面的流输出一个用于分组的键值用于groupBy,然后进行persistentAggregate,根据分组将数据归并计算合并结果。
 
        logStream
                .each(new Fields("logObject"), new LogGroupFunction(), new Fields("key")).groupBy(new Fields("key"))
                .persistentAggregate(MemcachedState.nonTransactional(servers), new Fields("logObject"), new LogCombinerAggregator(),
                        new Fields("statistic"))
 
 
 
注意,使用Trident时也是可以分成多个流的,只需要在特定的节点上,保存本地变量,就可以在其上执行多次each,分出多条路径流进行独立处理(也可以对多条输入流进行合并,这里没有使用到这样高级的功能)。
 
Stream logStream = stream.each(new Fields("bytes"), new LogExtractorFunction(), new Fields("log"))
                .each(new Fields("log"), new LogSplitterFunction(), new Fields("logObject"))
                .each(new Fields("logObject"), new LogTypeFilter("TRACE"));

        logStream.each(new Fields("log"), new LocalFileSaveFunction(), new Fields());

        logStream
                .each(new Fields("logObject"), new LogGroupFunction(), new Fields("key")).groupBy(new Fields("key"))
                .persistentAggregate(MemcachedState.nonTransactional(servers), new Fields("logObject"), new LogCombinerAggregator(),
                        new Fields("statistic"))
        ;
 
 
在使用任何function,aggregator时,都可以通过声明Fields的方式来设置使用到的字段名称,Combiner中可以不使用任何定义的Fields,此时传递给Trident的Tuple中将不会包含任何字段(一般代码示例中如此)。使用聚合时,还需要持续存储聚合的Trident状态,持久化操作从状态管理开始,Trident对状态有底层的操作原语,可以参考State接口的方法。
 
Storm中用State来持久化存储信息,有三种状态类型:非事务型,重复事务型以及不透明事务型,在分布式环境下,数据可能被重放,为了支持计数和状态更新,Trident将状态更新操作进行序列化,使用不同的状态更新模式对重放和错误数据进行容错。
 
我们存储中间数据状态使用了memcached作为媒介,关于trident与memcached进行事务处理相关代码,可以参考工程(storm创建者编写)
 
 
其中调用了twitter中定义的所以使用改造过的客户端:
<dependency>
            <groupId>com.twitter</groupId>
            <artifactId>finagle-memcached_2.9.2</artifactId>
            <version>6.20.0</version>
        </dependency>
 
但是将源码copy到工程中并添加对应的maven依赖(多数是twitter相关的依赖),其中的twitter依赖始终找不到:
 
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal on project log-storm-processor: Could not resolve dependencies for project com.zhen:log-storm-processor:jar:1.0.0-SNAPSHOT: The following artifacts could not be resolved: com.twitter.common.zookeeper:server-set:jar:1.0.83, com.twitter.common.zookeeper:client:jar:0.0.60, com.twitter.common.zookeeper:group:jar:0.0.78: Failure to find com.twitter.common.zookeeper:server-set:jar:1.0.83 in http://192.168.1.14:8081/nexus/content/repositories/releases/ was cached in the local repository, resolution will not be reattempted until the update interval of nexus-releases has elapsed or updates are forced -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
 
 
因此只能将其进行改造,使用com.whalin对应的memcached客户端jar包,以满足从Storm存储到memcached的需求。
 
storm trident的结构:
 


 
 
 
storm中还存在其他的工具,Storm-contribute项目地址:https://github.com/nathanmarz/storm-contrib,同样是作者所写,加入了支持Redis,Kafka,MongoDB等数据源。
  • 大小: 101.5 KB
  • 大小: 87.9 KB
  • 大小: 56.5 KB
  • 大小: 43.9 KB
  • 大小: 37.4 KB
分享到:
评论

相关推荐

    Storm流计算项目:1号店电商实时数据分析系统-08.storm-kafka 详解和实战案例.pptx

    本篇将深入探讨storm-kafka的集成与应用。 首先,storm-kafka是专门为Apache Storm设计的Kafka消费者组件,它允许Storm拓扑从Kafka主题中消费消息。在storm-kafka-0.8-plus版本中,它提供了一种高效且可靠的从Kafka...

    storm企业应用 实战 运维和调优

    - 常用的监控工具包括Storm自带的UI以及集成度量指标库如Ganglia或Prometheus。 6. Storm Trident与高级特性: - 掌握Storm Trident API使用,它为高级流处理提供了状态管理和事务处理机制。 - 了解Storm的事务...

    基于Storm流计算天猫双十一作战室项目实战.docx

    4. **Storm与Kafka集成**:Kafka作为一个分布式消息系统,常与Storm结合使用,形成实时数据管道。课程将涵盖如何配置和使用Storm消费者连接Kafka,实现数据的实时处理和传输。 5. **HighCharts图表开发与实时数据...

    Storm流计算项目:1号店电商实时数据分析系统-24.项目1-地区销售额-Trident代码开发二.pptx

    "项目1-地区销售额-基于HBase存储的State运用"详细介绍了如何将Trident与HBase集成,利用HBase的分布式存储特性来优化状态管理。 此外,项目还涵盖了其他几个关键部分,如使用HighCharts创建动态图表,实现HTTP长...

    Storm流计算项目:1号店电商实时数据分析系统-27.项目2-省份销售排行-双纵轴HighCharts图表开发二.pptx

    2. **Kafka与Storm集成**:Storm-kafka是Storm用于连接和消费Kafka数据的组件。理解如何配置和使用storm-kafka,确保数据能够正确地从Kafka流到Storm的Spout,是项目实施的关键步骤。 3. **双纵轴图表开发**:...

    Storm流计算项目:1号店电商实时数据分析系统-39.项目3-非跳出UV-升级图表增加柱图二.pptx

    在项目中,我们使用了storm-kafka模块,它允许Storm与Kafka进行无缝集成,使Spout能够消费Kafka中的消息。 3. **HighCharts图表**: - HighCharts是一个JavaScript库,用于创建高质量的图表。在项目中,我们利用...

    收集的storm的pdf版资料

    5. **Storm与其他大数据工具的集成**:Storm可以与其他大数据生态系统中的组件,如Hadoop HDFS、Cassandra、Kafka等无缝集成,形成强大的实时数据处理管道。了解这些集成可以帮助优化整体解决方案。 6. **配置与...

    Storm流计算项目:1号店电商实时数据分析系统-30.项目2-省份销售排行-Top N展示优化和项目开发思路总结.pptx

    - **Storm-kafka**:Storm与Kafka的集成,使得Storm可以从Kafka获取实时数据流。 - **Trident**:Storm的一种高级接口,用于构建强一致性的实时应用。 - **HBase**:分布式列式数据库,用于存储和检索大规模数据...

    Storm流计算项目:1号店电商实时数据分析系统-21.项目1-地区销售额-项目发布及总结.pptx

    Storm-kafka模块是连接Kafka和Storm的关键,它允许Spout从Kafka消费数据,同时确保线程安全。 在项目的核心部分,Spout设计融合了Kafka Consumer,确保数据流的持续获取。Bolt组件则承担了业务逻辑处理,包括销售额...

    从零开始学Storm+第2版(2016).pdf

    7. ** Trident**:Trident 是 Storm 的高级抽象,提供了一种声明式的数据处理模型,可以实现精确一次的状态一致性,适用于需要高度准确结果的场景。 8. **本地模式与生产模式**:Storm 提供了本地模式,方便开发者...

    从零开始学习storm最新版

    同时,书中也会介绍如何将Storm与其他大数据技术(如Hadoop、Kafka、Zookeeper等)集成,以实现更高效的数据处理流程。 最后,书中可能还涵盖了Storm的最新发展和未来趋势,例如Storm与其他实时处理框架(如Flink、...

    storm中文学习资料

    9. **进阶主题**:如多语言支持、 Trident API的使用、与其他大数据组件(如Hadoop、Kafka)的集成等。 通过这个压缩包的学习,用户可以从基础到进阶全面了解并掌握Apache Storm,从而在实际工作中有效地利用这一...

    Storm实战构建大数据实时计算

    书中会讨论如何将Storm与其他大数据技术集成,实现数据的实时摄入、处理和存储。这有助于构建完整的端到端实时数据流水线。 此外,书中可能还会涉及Storm的最新发展和相关工具,比如Trident,它提供了一种更高级的...

    storm源码包 apache-0.9.4

    Storm 可以与多种数据存储系统集成,如 HDFS、Cassandra 和 MySQL,以及消息队列如 Kafka。这种灵活性使得 Storm 能适应各种不同的应用场景。 5. ** Trident API**: 在 Storm 0.9.4 版本中,Trident 是一个高级...

    实时计算平台STORM流式数据核心技术与报文系统.pdf

    未来,随着实时处理需求的持续增长,Storm可能与其他技术如Kafka、Flink等进一步融合,以应对更复杂的实时业务挑战。 通过对Storm的深入理解和应用,开发者可以构建出高性能、高可靠性的实时数据处理系统,满足现代...

    storm blueprints

    9. **与其他系统的集成**:Storm可以与Hadoop、Cassandra、Kafka等其他大数据技术集成,实现更灵活的数据处理解决方案。 10. **最佳实践和性能调优**:最后,书中会提供一些使用Storm的最佳实践,包括如何优化性能...

    从零开始学Storm 第2版

    8. **与其他系统的集成**:理解如何将Storm与Hadoop、Cassandra、Kafka等其他大数据组件结合,实现数据的实时处理和存储。 9. **故障恢复与备份策略**:了解在出现故障时,如何恢复数据流处理,并制定备份策略。 ...

    hadoop、storm、spark的区别对比

    Storm也有自己的生态系统,包括Trident这样的实时处理库,以及Kafka、Zookeeper等组件。 在容错性方面,Hadoop通过数据冗余保证了容错性,但延迟较高。Storm通过消息的ack机制保证了消息至少处理一次,支持事务拓扑...

    大数据课程体系.docx

    - **Kafka和Storm的整合**:解释如何将Kafka作为数据源与Storm的数据处理引擎相集成。 #### 十一、Storm实时计算框架 - **Storm的基本概念**:介绍Storm的核心概念,如Topology、Spout、Bolt等。 - **Storm的应用...

    大数据课程体系

    - **Kafka和storm的整合**:将Kafka与Storm集成,实现实时流处理。 #### 十、实时计算框架Storm - **Storm的基本概念**:了解Storm的基本架构和工作原理。 - **Storm的应用场景**:探讨Storm在实时数据分析中的应用...

Global site tag (gtag.js) - Google Analytics