`

Kafka 与 Storm 整合

 
阅读更多

Kafka 与 Storm 整合

原创编写: 王宇 
2016-10-27


 

 


整合Storm的概念

  • 整合场景

    A spout is a source of streams. For example, a spout may read tuples off a Kafka Topic and emit them as a stream. A bolt consumes input streams, process and possibly emits new streams. Bolts can do anything from running functions, filtering tuples, do streaming aggregations, streaming joins, talk to databases, and more. Each node in a Storm topology executes in parallel. A topology runs indefinitely until you terminate it. Storm will automatically reassign any failed tasks. Additionally, Storm guarantees that there will be no data loss, even if the machines go down and messages are dropped.

  • BrokerHosts - ZkHosts & StaticHosts 
    BrokerHosts is an interface and ZkHosts and StaticHosts are its two main implementations. ZkHosts is used to track the Kafka brokers dynamically by maintaining the details in ZooKeeper, while StaticHosts is used to manually / statically set the Kafka brokers and its details. ZkHosts is the simple and fast way to access the Kafka broker

  • KafkaConfig API 
    This API is used to define configuration settings for the Kafka cluster. The signature of Kafka Con-fig is defined as follows 
    public KafkaConfig(BrokerHosts hosts, string topic)

    Hosts − The BrokerHosts can be ZkHosts / StaticHosts. 
    Topic − topic name

  • SpoutConfig API 
    Spoutconfig is an extension of KafkaConfig that supports additional ZooKeeper information. 
    public SpoutConfig(BrokerHosts hosts, string topic, string zkRoot, string id)

    • Hosts − The BrokerHosts can be any implementation of BrokerHosts interface
    • Topic − topic name.
    • zkRoot − ZooKeeper root path.
    • id − The spout stores the state of the offsets its consumed in Zookeeper. The id should uniquely identify your spout.
  • SchemeAsMultiScheme (调度)????

  • KafkaSpout API 
    KafkaSpout is our spout implementation, which will integrate with Storm. It fetches the messages from kafka topic and emits it into Storm ecosystem as tuples. KafkaSpout get its config-uration details from SpoutConfig.
  1. // ZooKeeper connection string
  2. BrokerHosts hosts =newZkHosts(zkConnString);
  3. //Creating SpoutConfig Object
  4. SpoutConfig spoutConfig =newSpoutConfig(hosts,
  5. topicName,"/"+ topicName UUID.randomUUID().toString());
  6. //convert the ByteBuffer to String.
  7. spoutConfig.scheme =newSchemeAsMultiScheme(newStringScheme());
  8. //Assign SpoutConfig to KafkaSpout.
  9. KafkaSpout kafkaSpout =newKafkaSpout(spoutConfig);

创建 Bolt

  • Bolt 接口定义参考《Storm学习笔记》
  • 分割统计单词个数的Bolt例子
    • SplitBolt.java
  1. import java.util.Map;
  2. import org.apache.storm.tuple.Tuple;
  3. import org.apache.storm.tuple.Fields;
  4. import org.apache.storm.tuple.Values;
  5. import org.apache.storm.task.OutputCollector;
  6. import org.apache.storm.topology.OutputFieldsDeclarer;
  7. import org.apache.storm.topology.IRichBolt;
  8. import org.apache.storm.task.TopologyContext;
  9. publicclassSplitBoltimplementsIRichBolt{
  10. privateOutputCollector collector;
  11. @Override
  12. publicvoid prepare(Map stormConf,TopologyContext context,
  13. OutputCollector collector){
  14. this.collector = collector;
  15. }
  16. @Override
  17. publicvoid execute(Tuple input){
  18. String sentence = input.getString(0);
  19. String[] words = sentence.split(" ");
  20. for(String word: words){
  21. word = word.trim();
  22. if(!word.isEmpty()){
  23. word = word.toLowerCase();
  24. collector.emit(newValues(word));
  25. }
  26. }
  27. collector.ack(input);
  28. }
  29. @Override
  30. publicvoid declareOutputFields(OutputFieldsDeclarer declarer){
  31. declarer.declare(newFields("word"));
  32. }
  33. @Override
  34. publicvoid cleanup(){}
  35. @Override
  36. publicMap<String,Object> getComponentConfiguration(){
  37. returnnull;
  38. }
  39. }
  • CountBolt.java
  1. import java.util.Map;
  2. import java.util.HashMap;
  3. import org.apache.storm.tuple.Tuple;
  4. import org.apache.storm.task.OutputCollector;
  5. import org.apache.storm.topology.OutputFieldsDeclarer;
  6. import org.apache.storm.topology.IRichBolt;
  7. import org.apache.storm.task.TopologyContext;
  8. publicclassCountBoltimplementsIRichBolt{
  9. Map<String,Integer> counters;
  10. privateOutputCollector collector;
  11. @Override
  12. publicvoid prepare(Map stormConf,TopologyContext context,
  13. OutputCollector collector){
  14. this.counters =newHashMap<String,Integer>();
  15. this.collector = collector;
  16. }
  17. @Override
  18. publicvoid execute(Tuple input){
  19. String str = input.getString(0);
  20. if(!counters.containsKey(str)){
  21. counters.put(str,1);
  22. }else{
  23. Integer c = counters.get(str)+1;
  24. counters.put(str, c);
  25. }
  26. collector.ack(input);
  27. }
  28. @Override
  29. publicvoid cleanup(){
  30. for(Map.Entry<String,Integer> entry:counters.entrySet()){
  31. System.out.println(entry.getKey()+" : "+ entry.getValue());
  32. }
  33. }
  34. @Override
  35. publicvoid declareOutputFields(OutputFieldsDeclarer declarer){
  36. }
  37. @Override
  38. publicMap<String,Object> getComponentConfiguration(){
  39. returnnull;
  40. }
  41. }

提交到 Topology

  • Topology概念参考《Storm学习笔记》
  • KafkaStormSample.java
  1. import org.apache.storm.Config;
  2. import org.apache.storm.LocalCluster;
  3. import org.apache.storm.topology.TopologyBuilder;
  4. import java.util.ArrayList;
  5. import java.util.List;
  6. import java.util.UUID;
  7. import org.apache.storm.spout.SchemeAsMultiScheme;
  8. import org.apache.storm.kafka.trident.GlobalPartitionInformation;
  9. import org.apache.storm.kafka.ZkHosts;
  10. import org.apache.storm.kafka.Broker;
  11. import org.apache.storm.kafka.StaticHosts;
  12. import org.apache.storm.kafka.BrokerHosts;
  13. import org.apache.storm.kafka.SpoutConfig;
  14. import org.apache.storm.kafka.KafkaConfig;
  15. import org.apache.storm.kafka.KafkaSpout;
  16. import org.apache.storm.kafka.StringScheme;
  17. publicclassKafkaStormSample{
  18. publicstaticvoid main(String[] args)throwsException{
  19. Config config =newConfig();
  20. config.setDebug(true);
  21. config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,1);
  22. String zkConnString ="localhost:2181";
  23. String topic ="my-first-topic";
  24. BrokerHosts hosts =newZkHosts(zkConnString);
  25. SpoutConfig kafkaSpoutConfig =newSpoutConfig(hosts, topic,"/"+ topic, UUID.randomUUID().toString());
  26. kafkaSpoutConfig.bufferSizeBytes =1024*1024*4;
  27. kafkaSpoutConfig.fetchSizeBytes =1024*1024*4;
  28. // kafkaSpoutConfig.forceFromStart = true;
  29. kafkaSpoutConfig.scheme =newSchemeAsMultiScheme(newStringScheme());
  30. TopologyBuilder builder =newTopologyBuilder();
  31. builder.setSpout("kafka-spout",newKafkaSpout(kafkaSpoutConfig));
  32. builder.setBolt("word-spitter",newSplitBolt()).shuffleGrouping("kafka-spout");
  33. builder.setBolt("word-counter",newCountBolt()).shuffleGrouping("word-spitter");
  34. LocalCluster cluster =newLocalCluster();
  35. cluster.submitTopology("KafkaStormSample", config, builder.createTopology());
  36. Thread.sleep(10000);
  37. cluster.shutdown();
  38. }
  39. }

版本

  • Zookeeper: zookeeper-3.5.2-alpha.tar.gz
  • Curator: 2.9.1
  • SLF4j: slf4j-1.7.21.tar.gz
  • Kafka: kafka_2.11-0.10.1.0
  • Storm : apache-storm-1.0.2.tar.gz
  • JSON: json-simple-1.1.1.jar
  • JDK: 1.8.0

编译

  • 依赖包

    • Curator 
      Before moving compilation, Kakfa-Storm integration needs curator ZooKeeper client java library. 这个包的路径,要加到CLASSPATH中 

      curator-client-2.9.1.jar 
      curator-framework-2.9.1.jar 
    • JSON 
      json-simple-1.1.1.jar
    • Storm-Kafka 
      json-simple-1.1.1.jar
    • Kafka Lib
    • Storm Lib
    • SLF4J 
      当CLASSPATH中,包含了Kafka 和Storm 的lib 后, SLF4j会产生冲突,将Kafka中的SLF4j 移除CLASSPATH即可
  • 编译命令

    1. $ javac *.java

执行

  • 开启服务: Zookeeper Kafka Storm
  1. $cd /opt/zookeeper
  2. $.bin/zkServer.sh start
  3. $cd /opt/kafka
  4. $./bin/kafaka-server-start.sh config/server.properties
  5. $ cd /opt/storm
  6. $./bin/storm nimbus
  7. $./bin/storm supervisor
  • 创建Topic: “my-first-topic”
  1. $ ./bin/kafktopics.sh --create --zookeeper localhost:2181--replication-factor 1--partitions 1--topic my-first-topic
  2. $ ./bin/kafktopics.sh --list --zookeeper localhost:2181
  • 在Kafka Producer CLI 输入message
  1. $ ./bin/kafka-console-producer.sh --broker-list localhost:9092--topic my-first-topic
  2. hello
  3. kafka
  4. storm
  5. spark
  6. test message
  7. anther test message
  • 执行例子
  1. $ java KafkaStormSample
  • 输出结果

 

  1. storm :1
  2. test :2
  3. spark :1
  4. another :1
  5. kafka :1
  6. hello :1
  7. message :2
分享到:
评论

相关推荐

    Kafka与Storm整合后java客户端使用实例代码.zip

    当我们将Kafka与Storm整合时,可以构建强大的实时数据流处理平台。以下是一个基于Java的客户端使用实例,详细解释了如何将这两个工具结合在一起。 首先,理解Kafka的基本概念至关重要。Kafka是由LinkedIn开发并贡献...

    Flume+kafka+Storm整合

    ### Flume+kafka+Storm整合知识点详解 #### 一、Flume、Kafka与Storm概述 在大数据领域,数据采集、传输与实时处理是至关重要的环节。本篇内容重点介绍了如何利用Flume、Kafka与Storm这三个开源工具实现一套完整的...

    kafka和storm整合

    这张图片详细的描述了kafka、和storm的过程。。。。。

    flume,kafka,storm整合

    本文将详细介绍如何整合Flume、Kafka和Storm,以实现大数据的高效收集、传输和处理。在大数据运维解决方案中,这三个组件扮演着关键角色。Flume用于数据采集,Kafka作为中间件提供消息队列服务,而Storm则用于实时...

    kafka-storm.7z

    《Kafka与Storm整合:构建大数据实时处理流水线》 在大数据处理领域,Apache Kafka和Apache Storm都是非常重要的组件。Kafka作为一个高吞吐量、分布式的消息发布订阅系统,常用于实时数据管道,而Storm则是一个...

    kafkastorm:kafka整合storm案例

    将Kafka与Storm整合,可以构建出强大的实时数据处理管道,这在大数据实时分析、监控、报警等领域有着广泛应用。 Kafka作为消息中间件,它主要负责数据的生产与消费,支持多个消费者组同时消费数据,确保数据的高...

    kafka+storm+hbase整合案例

    8. **监控与优化**:设置监控系统,监控Kafka、Storm和HBase的运行状态,根据性能指标进行调优。 《中国移动项目架构图.jpg》应该详细描绘了整个系统的架构,包括各个组件之间的交互关系。《cmcc02_hbase.rar》和...

    StormStorm集成Kafka 从Kafka中读取数据

    在大数据实时处理领域,Apache Storm与Apache Kafka经常被结合使用,形成高效的数据流处理系统。本文将深入探讨如何实现Storm与Kafka的集成,重点在于如何从Kafka中读取数据。 **一、整合说明** Apache Storm是一...

    netty+kafka+storm

    在Netty、Kafka和Storm的整合过程中,JDBC扮演了数据存储的角色。经过Storm实时处理后的数据,会被写入到MySQL数据库中,以便后续的查询和分析。 整合这些技术的具体步骤可能包括以下部分: 1. 使用Netty创建一个...

    妳那伊抹微笑_Flume+Kafka+Storm分布式环境整合Eclipse工程_20140714.rar

    该文档与教程http://blog.csdn.net/u012185296/article/details/37762793配套使用的,主要讲的是Flume+Kafka+Storm的环境整合,并且全部都是最新版本 、、、你也可以到博客地址http://blog.csdn.net/u012185296中去...

    storm+kafka+jdbc整合实例

    在大数据处理领域,Storm、Kafka以及JDBC的整合是一个常见的需求,用于实现实时数据流处理、消息队列和数据库交互。以下是对这个整合实例的详细解释。 首先,让我们来了解一下这三个组件: 1. **Apache Storm**:...

    storm-kafka整合代码

    当我们谈论"storm-kafka整合代码"时,这意味着我们要将这两者结合,使得 Storm 可以从 Kafka 中消费数据并进行实时处理。下面将详细介绍 Storm 和 Kafka 的核心概念以及它们整合的关键步骤。 **Apache Storm** ...

    log4j+flume+kafka+storm

    本文详细介绍了如何将Log4j、Flume与Kafka进行整合,使得日志数据能够从Log4j经由Flume传输至Kafka。这一过程不仅涉及具体的配置细节,还包括了环境准备、测试验证等多个方面,确保了整个系统能够稳定高效地运行。...

    storm与kafka整合jar包

    标题中的"storm与kafka整合jar包"指的是在Apache Storm和Apache Kafka两个开源项目之间进行数据流处理集成的Java归档(JAR)文件。Apache Storm是一个分布式实时计算系统,而Apache Kafka是一个高吞吐量的分布式发布...

    03、storm项目实战课程-Kafka0.8Storm0.9.1Optr.rar

    【标题】"03、storm项目实战课程-Kafka0.8Storm0.9.1Optr.rar" 提供了一个关于实时大数据处理的实战教程,主要聚焦于Apache Storm和Apache Kafka的整合应用。Apache Storm是一个开源的分布式实时计算系统,而Kafka则...

    StormStorm集成Kafka 写数据到Kafka

    Storm Kafka Integration (0.10.x+) : 包含 Kafka 新版本的 consumer API,主要对 Kafka 0.10.x + 提供整合支持。 这里我服务端安装的 Kafka 版本为 2.2.0(Released Mar 22, 2019) ,按照官方 0.10.x+ 的整合文档...

    Spring boot集成Kafka+Storm的示例代码

    为了解决这些问题,我们需要了解Spring Boot的启动方式及配置,并了解如何将Storm整合到Spring Boot中。在整合之前,我们需要了解Spring Boot的相关知识,并且需要了解Storm和Kafka的相关知识。 解决方法 为了解决...

    zk-kafka-redis-storm安装

    与Hadoop处理批量数据不同,Storm专注于实时数据流的处理,能够快速地处理和响应新产生的数据。 在"zk-kafka-redis-storm安装"过程中,首先需要安装Zookeeper,确保集群的协调和数据一致性。接着是Kafka的部署,...

    storm与kafka整合的客户端开发实例java源码.zip

    标题中的"storm与kafka整合的客户端开发实例java源码.zip"表明这是一个关于使用Java语言在Storm和Kafka之间建立集成的案例项目。这个压缩包包含的源代码为我们提供了实现这种集成的具体步骤和方法。 首先,让我们...

Global site tag (gtag.js) - Google Analytics