`

Storm+Kafka集成

 
阅读更多

http://blog.csdn.net/ch717828/article/details/50748912

 

 

 

 

1. 机器&环境 准备

我准备了3台机器 ,分别是

 

 

 

 

 

 

   10.101.214.71 

 

   10.101.214.73

 

   10.101.214.74

且这三台机器均安装了 kafka和storm。详细参考上面两篇文章。

注意,之前的文章我安装的storm版本为0.9.1 ,该版本中缺少许多与kafka集成需要的包,因此,升级为0.9.2 。

 


 

 

 

2.Storm自定义日志 

为了清晰得打印出Storm处理 Kafka发送来的消息,此处自定义了一个日志。

 

 

[plain] view plain copy在CODE上查看代码片派生到我的代码片
  1. // 在73,74机器上 修改 /usr/share/storm/logback/cluster.xml  
  2. <appender name="mylog" class="ch.qos.logback.core.rolling.RollingFileAppender">  
  3.       <file>${storm.home}/logs/mylog.log</file><!-- log文件输出path -->  
  4.       <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">  
  5.         <fileNamePattern>${storm.home}/logs/mylog.log.%i</fileNamePattern><!-- 保留多个文件的文件命名格式 -->  
  6.         <minIndex>1</minIndex>  
  7.         <maxIndex>20</maxIndex><!-- 这两行可以共同配置保留多少个文件 -->  
  8.       </rollingPolicy>  
  9.       <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">  
  10.         <maxFileSize>100MB</maxFileSize><!-- log文件的最大大小 -->  
  11.       </triggeringPolicy>  
  12.       <encoder>  
  13.         <pattern>%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ} %c{1} [%p] %m%n</pattern> <!-- 输出的日志信息的格式 -->  
  14.       </encoder>  
  15.   </appender>  
  16.   
  17. <logger name="ch.main.MyKafkaTopology" additivity="false" >  
  18. <!-- name 可以配置哪些包下的日志信息要输出,也可以精准到一个类 -->  
  19.     <level value="INFO"/><!-- 要输出的日志信息的级别,我要输出业务日志,则配置为INFO -->  
  20.     <appender-ref ref="mylog"/><!-- 上面的appender的name -->  
  21.   </logger>  


配置好后, ch.main.MyKafkaTopology打印出的INFO日志,均会存在 /usr/share/storm/logs/mylog.log 文件下

 

 

3.代码编写

 

pom.xml

 

 

[html] view plain copy在CODE上查看代码片派生到我的代码片
  1. <dependencies>  
  2.         <dependency>  
  3.             <groupId>org.apache.storm</groupId>  
  4.             <artifactId>storm-core</artifactId>  
  5.             <version>0.9.2-incubating</version>  
  6.         </dependency>  
  7.         <dependency>  
  8.             <groupId>org.apache.storm</groupId>  
  9.             <artifactId>storm-kafka</artifactId>  
  10.             <version>0.9.2-incubating</version>  
  11.         </dependency>  
  12.         <dependency>  
  13.             <groupId>org.apache.kafka</groupId>  
  14.             <artifactId>kafka_2.11</artifactId>  
  15.             <version>0.9.0.0</version>  
  16.             <exclusions>  
  17.                 <exclusion>  
  18.                     <groupId>org.apache.zookeeper</groupId>  
  19.                     <artifactId>zookeeper</artifactId>  
  20.                 </exclusion>  
  21.                 <exclusion>  
  22.                     <groupId>log4j</groupId>  
  23.                     <artifactId>log4j</artifactId>  
  24.                 </exclusion>  
  25.                 <exclusion>  
  26.                     <groupId>org.slf4j</groupId>  
  27.                     <artifactId>slf4j-log4j12</artifactId>  
  28.                 </exclusion>  
  29.             </exclusions>  
  30.         </dependency>  
  31.     </dependencies>  


java 代码

 

[java] view plain copy在CODE上查看代码片派生到我的代码片
  1. package ch.main;  
  2.   
  3. import backtype.storm.Config;  
  4. import backtype.storm.LocalCluster;  
  5. import backtype.storm.StormSubmitter;  
  6. import backtype.storm.generated.AlreadyAliveException;  
  7. import backtype.storm.generated.InvalidTopologyException;  
  8. import backtype.storm.spout.SchemeAsMultiScheme;  
  9. import backtype.storm.task.OutputCollector;  
  10. import backtype.storm.task.TopologyContext;  
  11. import backtype.storm.topology.OutputFieldsDeclarer;  
  12. import backtype.storm.topology.TopologyBuilder;  
  13. import backtype.storm.topology.base.BaseRichBolt;  
  14. import backtype.storm.tuple.Fields;  
  15. import backtype.storm.tuple.Tuple;  
  16. import backtype.storm.tuple.Values;  
  17.   
  18.   
  19. import org.slf4j.Logger;  
  20. import org.slf4j.LoggerFactory;  
  21. import storm.kafka.*;  
  22.   
  23.   
  24. import java.util.Arrays;  
  25. import java.util.HashMap;  
  26.   
  27. import java.util.Iterator;  
  28. import java.util.Map;  
  29. import java.util.concurrent.atomic.AtomicInteger;  
  30.   
  31. /** 
  32.  * Created by chenhong on 16/2/24. 
  33.  */  
  34. public class MyKafkaTopology {  
  35.   
  36.   
  37.    public static class KafkaWordSplitter extends BaseRichBolt{  
  38.       // private static final Log LOG = LogFactory.getLog(KafkaWordSplitter.class);  
  39.        private static final Logger LOG = LoggerFactory.getLogger(KafkaWordSplitter.class);  
  40.        private static final long serialVersionUID = 1L;  
  41.        private OutputCollector collector;  
  42.   
  43.   
  44.        @Override  
  45.        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {  
  46.            this.collector = collector;  
  47.        }  
  48.   
  49.        @Override  
  50.        public void execute(Tuple input) {  
  51.            String line = input.getString(0);  
  52.            LOG.info("RECE[kafka -> splitter] "+line);  
  53.            String[] words = line.split("\\s+");  
  54.            for(String word : words){  
  55.                LOG.info("EMIT[splitter -> counter] "+word);  
  56.                collector.emit(input,new Values(word,1));  
  57.            }  
  58.            collector.ack(input);  
  59.        }  
  60.   
  61.        @Override  
  62.        public void declareOutputFields(OutputFieldsDeclarer declarer) {  
  63.             declarer.declare(new Fields("word","count"));  
  64.        }  
  65.    }  
  66.   
  67.     public static class WordCounter extends BaseRichBolt {  
  68.        // private static final Log LOG = LogFactory.getLog(WordCounter.class);  
  69.         private static final Logger LOG = LoggerFactory.getLogger(WordCounter.class);  
  70.         private static final long serialVersionUID =1L;  
  71.         private OutputCollector collector;  
  72.         private Map<String,AtomicInteger> counterMap;  
  73.   
  74.         @Override  
  75.         public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {  
  76.             this.collector=collector;  
  77.             this.counterMap = new HashMap<String,AtomicInteger>();  
  78.         }  
  79.   
  80.         @Override  
  81.         public void execute(Tuple input) {  
  82.             String word = input.getString(0);  
  83.             int count = input.getInteger(1);  
  84.             LOG.info("RECE[splitter -> counter] "+word+" : "+count);  
  85.             AtomicInteger ai = this.counterMap.get(word);  
  86.             if(ai==null){  
  87.                 ai= new AtomicInteger();  
  88.                 this.counterMap.put(word,ai);  
  89.             }  
  90.             ai.addAndGet(count);  
  91.             collector.ack(input);  
  92.             LOG.info("CHECK statistics map: "+this.counterMap);  
  93.         }  
  94.   
  95.         @Override  
  96.         public void declareOutputFields(OutputFieldsDeclarer declarer) {  
  97.             declarer.declare(new Fields("word","count"));  
  98.         }  
  99.   
  100.         @Override  
  101.         public void cleanup() {  
  102.             LOG.info("The final result:");  
  103.             Iterator<Map.Entry<String,AtomicInteger>> iter = this.counterMap.entrySet().iterator();  
  104.             while(iter.hasNext()){  
  105.                 Map.Entry<String,AtomicInteger> entry =iter.next();  
  106.                 LOG.info(entry.getKey()+"\t:\t"+entry.getValue().get());  
  107.             }  
  108.         }  
  109.     }  
  110.   
  111.     public static void main(String[] args) throws AlreadyAliveException,InvalidTopologyException,InterruptedException{  
  112.         String zks = "10.101.214.71:2181,10.101.214.73:2181,10.101.214.74:2181";  
  113.         String topic ="my-replicated-topic5";  
  114.         String zkRoot ="/kafka" ;  
  115.         String id ="word"// 读取的status会被存在,/zkRoot/id下面,所以id类似consumer group  
  116.   
  117.         BrokerHosts brokerHosts = new ZkHosts(zks,"/kafka/brokers");  
  118.         SpoutConfig spoutConf = new SpoutConfig(brokerHosts,topic,zkRoot,id);  
  119.         spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());  
  120.         spoutConf.forceFromStart = false;  
  121.         spoutConf.zkServers= Arrays.asList(new String[]{"10.101.214.71","10.101.214.73","10.101.214.74"});  
  122.         spoutConf.zkPort=2181;  
  123.   
  124.         TopologyBuilder  builder = new TopologyBuilder();  
  125.         builder.setSpout("kafka-reader"new KafkaSpout(spoutConf), 5); //// Kafka我们创建了一个5分区的Topic,这里并行度设置为5  
  126.         builder.setBolt("word-splitter",new KafkaWordSplitter(),2).shuffleGrouping("kafka-reader");  
  127.         builder.setBolt("word-counter",new WordCounter() ).fieldsGrouping("word-splitter",new Fields("word"));  
  128.   
  129.         Config config = new Config();  
  130.         String name = MyKafkaTopology.class.getSimpleName();  
  131.         if(args !=null && args.length>0 ){  
  132.           //config.put(Config.NIMBUS_HOST,args[0]);  
  133.             config.setNumWorkers(3);  
  134.             StormSubmitter.submitTopology(name,config,builder.createTopology());  
  135.         }else{  
  136.             config.setMaxTaskParallelism(3);  
  137.             LocalCluster cluster = new LocalCluster();  
  138.             cluster.submitTopology(name,config,builder.createTopology());  
  139.             Thread.sleep(60000);  
  140.             cluster.shutdown();  
  141.         }  
  142.   
  143.   
  144.   
  145.     }  
  146.   
  147. }  

 

4 提交运行

使用 mvn将项目打包 

 

[plain] view plain copy在CODE上查看代码片派生到我的代码片
  1. mvn clean install  

 

 

 

为了在storm中使用kafka,需要将 依赖jar文件到Storm集群中的lib目录下面

 

 

 

[plain] view plain copy在CODE上查看代码片派生到我的代码片
  1. cp /usr/local/kafka/libs/kafka_2.11-0.9.0.0.jar /usr/share/storm/lib/  
  2. cp /usr/local/kafka/libs/scala-library-2.11.7.jar /usr/share/storm/lib/  
  3. cp /usr/local/kafka/libs/metrics-core-2.2.0.jar /usr/share/storm/lib/  
  4. cp /usr/local/kafka/libs/snappy-java-1.1.1.7.jar /usr/share/storm/lib/  
  5. cp /usr/local/kafka/libs/zkclient-0.7.jar /usr/share/storm/lib/  
  6. cp /usr/local/kafka/libs/log4j-1.2.17.jar /usr/share/storm/lib/  
  7. cp /usr/local/kafka/libs/slf4j-api-1.7.6.jar /usr/share/storm/lib/  
  8. cp /usr/local/kafka/libs/jopt-simple-3.2.jar /usr/share/storm/lib/  

提交

 

[plain] view plain copy在CODE上查看代码片派生到我的代码片
  1. //在 71机器上提交  
  2. storm jar StormKafka0.1-1.0-SNAPSHOT.jar ch.main.MyKafkaTopology MyKafkaTopology  
  3. //在71机器上打开 kafka启动Producer ,产生日志  
  4. /usr/local/kafka_2.11-0.9.0.0/bin/kafka-console-producer.sh --broker-list 10.101.214.71:9092,10.101.214.73:9092,10.101.214.74:9092 --topic my-replicated-topic5  
  5.   
  6. (随便输入一些内容)  
  7. //在 73,74机器上查看日志   
  8. cat  /usr/share/storm/logs/mylog.log   
  9. (可以看到 MyKafkaTopology 打出的日志)  


下面是我查看mylog.log的部分日志

 

 

 

[plain] view plain copy在CODE上查看代码片派生到我的代码片
  1. 2016-02-26T12:47:13.238+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] 123  
  2. 2016-02-26T12:47:13.238+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] 123123  
  3. 2016-02-26T12:47:13.238+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] 123123  
  4. 2016-02-26T12:47:13.238+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] 123123  
  5. 2016-02-26T12:47:13.238+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] aa  
  6. 2016-02-26T12:47:13.238+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] bbc  
  7. 2016-02-26T12:47:13.238+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] ccc  
  8. 2016-02-26T12:47:13.238+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] ddd  
  9. 2016-02-26T12:47:13.239+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] eeee  
  10. 2016-02-26T12:47:13.239+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] ffffff  
  11. 2016-02-26T12:47:13.239+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] jsdkfjasnng  
  12. 2016-02-26T12:47:13.239+0800 c.m.MyKafkaTopology$WordCounter [INFO] RECE[splitter -> counter] 123 : 1  
  13. 2016-02-26T12:47:13.239+0800 c.m.MyKafkaTopology$WordCounter [INFO] CHECK statistics map: {=2, aa=6, bbc=6, ccc=6, --broker-list=1, ddd=6, eeee=6, my-replicated-topic5=1, asdfasdfasdf=15, 123=7, jsdkfjasnng=6, 123123=18, 10.101.214.71:9092,10.101.214.73:9092,10.101.214.74:9092=1, --topic=1, v=1, /usr/local/kafka_2.11-0.9.0.0/bin/kafka-console-producer.sh=1, ffffff=6}  
  14. 2016-02-26T12:47:13.239+0800 c.m.MyKafkaTopology$WordCounter [INFO] RECE[splitter -> counter] 123123 : 1  
  15. 2016-02-26T12:47:13.239+0800 c.m.MyKafkaTopology$WordCounter [INFO] CHECK statistics map: {=2, aa=6, bbc=6, ccc=6, --broker-list=1, ddd=6, eeee=6, my-replicated-topic5=1, asdfasdfasdf=15, 123=7, jsdkfjasnng=6, 123123=19, 10.101.214.71:9092,10.101.214.73:9092,10.101.214.74:9092=1, --topic=1, v=1, /usr/local/kafka_2.11-0.9.0.0/bin/kafka-console-producer.sh=1, ffffff=6}  
  16. 2016-02-26T12:47:13.239+0800 c.m.MyKafkaTopology$WordCounter [INFO] RECE[splitter -> counter] 123123 : 1  
  17. 2016-02-26T12:47:13.239+0800 c.m.MyKafkaTopology$WordCounter [INFO] CHECK statistics map: {=2, aa=6, bbc=6, ccc=6, --broker-list=1, ddd=6, eeee=6, my-replicated-topic5=1, asdfasdfasdf=15, 123=7, jsdkfjasnng=6, 123123=20, 10.101.214.71:9092,10.101.214.73:9092,10.101.214.74:9092=1, --topic=1, v=1, /usr/local/kafka_2.11-0.9.0.0/bin/kafka-console-producer.sh=1, ffffff=6}  



 

 

其他

 

 

 

1 启动storm 发生 line 61:normclasspath = cygpath if sys.platform == 'cygwin' else identity  错误

 

 

[plain] view plain copy在CODE上查看代码片派生到我的代码片
  1. 安装python2.7  
  2. 修改/usr/bin/storm  
  3. 将首行显示的 !#/usr/bin/python 修改为 !#/home/tops/bin/python2.7  

 

 

 

 

在集成过程中可能会遇到许多奇怪的问题,一路走来也踩了许多坑,有问题的可以私信或者留言。

分享到:
评论

相关推荐

    lamp安装配置及flume+Kafka+Storm+HDFS实时系统搭分享

    《LNMP环境构建与Flume+Kafka+Storm+HDFS实时系统集成详解》 在当前的互联网时代,数据量的急剧增长使得大数据处理成为一项关键任务。本篇将深入探讨如何在Linux环境下搭建LNMP(Linux + Nginx + MySQL + PHP)...

    storm+kafka+redis+mysql

    标题中的"storm+kafka+redis+mysql"是一个典型的大数据实时处理架构,广泛应用于日志收集、数据分析等领域。下面将详细介绍这些技术组件及其在实际应用中的作用。 **Apache Storm** Apache Storm是一个开源的分布式...

    StormStorm集成Kafka 从Kafka中读取数据

    在Storm-Kafka集成中,首先需要将数据写入Kafka。这通常通过生产者(Producer)完成。生产者连接到Kafka集群,创建主题(Topic),然后将数据发布到指定的主题中。以下是一些关键步骤: 1. 创建Kafka生产者配置:...

    log4j+flume+kafka+storm

    在集成Log4j与Flume的过程中,Avro Source是一种常用的方式。Avro Source允许远程客户端通过网络接口发送数据到Flume Agent。在这个场景中,Flume版本为1.5.2。 **步骤一:配置Flume** 首先需要对Flume的配置文件`...

    storm集成kafka插demo.zip

    【标题】"storm集成kafka插demo.zip"指的是一个演示如何将Apache Storm与Apache Kafka集成的实例项目。这个压缩包包含了一个示例,用于展示如何在Storm拓扑中消费和处理Kafka的消息。 【描述】"storm集成kafka插件...

    StormStorm集成Kafka 写数据到Kafka

    Storm集成Kafka 一、整合说明 二、写入数据到Kafka 三、从Kafka中读取数据 整合说明 Storm 官方对 Kafka 的整合分为两个版本,官方说明文档分别如下: Storm Kafka Integration : 主要是针对 0.8.x 版本的 Kafka ...

    storm-kafka实时趋势分析

    在大数据处理领域,Storm和Kafka是两个非常重要的组件,它们常常被联合使用来构建实时数据处理系统。本文将深入探讨"storm-kafka实时趋势分析"这一主题,讲解这两个技术如何协同工作,以及如何实现通用性强、适应多...

    flume+kafka+storm搭建

    本文将详细介绍如何利用Flume、Kafka和Storm搭建一个大数据消息平台。 首先,我们来了解Flume。Flume是Cloudera公司提供的一款分布式、可靠且高可用的海量日志采集、聚合和传输的系统。它允许你定制数据发送方,...

    Strom+kafka整合jar包汇总

    在分布式计算领域,Apache Storm和Apache Kafka是两个非常重要的组件。Storm是一个实时处理系统,而Kafka是一个高吞吐量的分布式消息系统。两者结合使用,可以构建强大的实时数据流处理解决方案。本压缩包提供了...

    storm链接kafka时需要的jar包

    基础的Storm发行版提供了核心功能,如`storm-core.jar`,但为了将Storm与Kafka集成,我们还需要包含特定的Storm库,比如`storm-kafka-*版本.jar`,这个JAR包提供了与Kafka交互的Spout实现。 3. **Storm-Kafka JAR包...

    storm与kafka整合jar包

    标题中的"storm与kafka整合jar包"指的是在Apache Storm和Apache Kafka两个开源项目之间进行数据流处理集成的Java归档(JAR)文件。Apache Storm是一个分布式实时计算系统,而Apache Kafka是一个高吞吐量的分布式发布...

    Flume+Kafka环境构建和实战.zip

    3. **Flume与Kafka集成**: - **配置Flume Source**:将Kafka作为Flume的数据源,需要配置为`kafka.Source`类型,指定Kafka的broker列表、topic等信息。 - **配置Flume Sink**:设置Flume将数据发送到Kafka,配置...

    storm+zookeeper+maven安装包

    Storm可以与这些组件无缝集成,实现数据的实时处理和分析,从而为企业决策提供实时洞见。同时,理解和掌握Zookeeper的角色和功能,有助于理解分布式系统中的协调机制,提高系统的稳定性和可靠性。 总之,"storm+...

    storm之集成kafka操作示例代码.zip

    在分布式计算领域,Apache Storm和Apache Kafka都是...通过以上步骤,我们可以理解并实现Storm与Kafka的集成,实现高效的数据流处理。这个示例代码提供了具体的实现细节,帮助开发者更好地理解和运用这两个强大的工具。

    Storm流计算项目:1号店电商实时数据分析系统-08.storm-kafka 详解和实战案例.pptx

    本篇将深入探讨storm-kafka的集成与应用。 首先,storm-kafka是专门为Apache Storm设计的Kafka消费者组件,它允许Storm拓扑从Kafka主题中消费消息。在storm-kafka-0.8-plus版本中,它提供了一种高效且可靠的从Kafka...

    Kafka分布式消息系统实战(与Java+Scala+Hadoop+Storm集成)

    Kafka的版本:kafka_2.9.2-0.8.1.1.tgz和kafka_2.11-0.10.0.0.tgz 开发工具: Linux;Eclipse;Scala IDE 2.内容简介 Kafka是分布式的消息队列,作为云计算服务的基石,它广泛的应用在实时数据流方面,是实时数据处理...

    storm与kafka整合的客户端开发实例java源码.zip

    标题中的"storm与kafka整合的客户端开发实例java源码.zip"表明这是一个关于使用Java语言在Storm和Kafka之间建立集成的案例项目。这个压缩包包含的源代码为我们提供了实现这种集成的具体步骤和方法。 首先,让我们...

    storm+mq整合完整示例

    【标题】"storm+mq整合完整示例"中涉及的知识点主要集中在分布式计算框架Apache Storm与消息队列MQ(如RabbitMQ、Kafka等)的集成应用上,旨在实现数据流的实时处理。以下是对这些知识点的详细阐述: 1. **Apache ...

    Storm整合Kafka

    - **配置Storm-Kafka连接器**:Storm提供了一个名为`storm-kafka`的连接器,简化了Storm与Kafka的集成。需要正确配置连接器的参数,如 zookeeper 的主机名和端口,以及Kafka的主题名。 - **运行拓扑**:启动Storm...

    从零开始学Storm+第2版(2016).pdf

    10. **与其他技术集成**:Storm 可以与 Hadoop、Cassandra、HBase 等大数据技术集成,构建完整的实时处理解决方案。 学习 Storm,你需要理解分布式计算的基本原理,熟悉 Java 或者 Clojure 开发,因为 Storm 的 API...

Global site tag (gtag.js) - Google Analytics