`
qindongliang1922
  • 浏览: 2183633 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
7265517b-f87e-3137-b62c-5c6e30e26109
证道Lucene4
浏览量:117522
097be4a0-491e-39c0-89ff-3456fadf8262
证道Hadoop
浏览量:125920
41c37529-f6d8-32e4-8563-3b42b2712a50
证道shell编程
浏览量:59881
43832365-bc15-3f5d-b3cd-c9161722a70c
ELK修真
浏览量:71297
社区版块
存档分类
最新评论

Storm的wordcount实战示例

阅读更多
有关strom的具体介绍,本文不再过多叙述,不了解的朋友可参考之前的文章
http://qindongliang.iteye.com/category/361820
本文主要以一个简单的wordcount例子,来了解下storm应用程序的开发,虽然只是一个简单的例子
但麻雀虽小,五脏俱全,主要涉及的内容:

(1)wordcount的拓扑定义
(2)spout的使用
(3)bolt的使用
(4)tick定时器的使用
(5) bolt之间数据传输的坑
简单的数据流程图如下:





提交到storm集群上的拓扑图:






maven项目的pom依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.jstrom.demo</groupId>
    <artifactId>jstrom-test</artifactId>
    <version>1.0-SNAPSHOT</version>



    <properties>

        <jstorm.version>2.1.1</jstorm.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <slf4j.version>1.7.12</slf4j.version>
        <joad-time.version>2.9.4</joad-time.version>
        <storm-kafka.version>0.9.4</storm-kafka.version>
        <kafka.version>0.9.0.0</kafka.version>
        <esper.version>5.4.0</esper.version>



     </properties>






    <dependencies>


        <!-- https://mvnrepository.com/artifact/com.espertech/esper -->



        <!-- https://mvnrepository.com/artifact/joda-time/joda-time -->
        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>${joad-time.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11 -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>${kafka.version}</version>
            <scope>provided</scope>
        </dependency>


        <dependency>
            <groupId>com.alibaba.jstorm</groupId>
            <artifactId>jstorm-core</artifactId>
            <version>${jstorm.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-kafka -->
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>${storm-kafka.version}</version>
            <scope>provided</scope>
        </dependency>





        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-jdk14</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-nop</artifactId>
            <version>${slf4j.version}</version>
        </dependency>


    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <mainClass>换成自己的主类</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-my-jar-with-dependencies</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>


(1)Topology主拓扑类:

package com.jstorm.wd;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

/**
 * Created by QinDongLiang on 2016/9/12.
 */
public class TopologyWordCount {

    public static void main(String[] args) throws  Exception {
        TopologyBuilder builder=new TopologyBuilder();
        //设置数据源
        builder.setSpout("spout",new CreateSentenceSpout(),1);
        //读取spout数据源的数据,进行split业务逻辑
        builder.setBolt("split",new SplitWordBolt(),1).shuffleGrouping("spout");
        //读取split后的数据,进行count (tick周期10秒)
        builder.setBolt("count",new SumWordBolt(),1).fieldsGrouping("split",new Fields("word"));
        //读取count后的数据,进行缓冲打印 (tick周期3秒,仅仅为测试tick使用,所以多加了这个bolt)
        builder.setBolt("show",new ShowBolt(),1).shuffleGrouping("count");
        //读取show后缓冲后的数据,进行最终的打印 (实际应用中,最后一个阶段应该为持久层)
        builder.setBolt("final",new FinalBolt(),1).allGrouping("show");

        Config config=new Config();
        config.setDebug(false);
        //集群模式
        if(args!=null&&args.length>0){
            config.setNumWorkers(2);
            StormSubmitter.submitTopology(args[0],config,builder.createTopology());
        //单机模式
        }else{
            config.setMaxTaskParallelism(1);;
            LocalCluster cluster=new LocalCluster();
            cluster.submitTopology("word-count",config,builder.createTopology());
            Thread.sleep(3000000);
            cluster.shutdown();
        }
    }

}



(2)Spout数据源类

package com.jstorm.wd;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import org.joda.time.DateTime;

import java.util.Map;
import java.util.Random;

/**
 * Created by QinDongLiang on 2016/8/31.
 * 创建数据源
 */
public class CreateSentenceSpout extends BaseRichSpout {
    //
    SpoutOutputCollector collector;
    Random random;
    String [] sentences=null;

    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.collector=spoutOutputCollector;//spout_collector
        random=new Random();//
        sentences=new String[]{"hadoop hadoop hadoop java java "};

    }

    @Override
    public void nextTuple() {
        Utils.sleep(10000);
        //获取数据
        String sentence=sentences[random.nextInt(sentences.length)];
        System.out.println("线程名:"+Thread.currentThread().getName()+"  "+new DateTime().toString("yyyy-MM-dd HH:mm:ss  ")+"10s发射一次数据:"+sentence);
        //向下游发射数据
        this.collector.emit(new Values(sentence));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("sentence"));
    }
}



(3)Split的bolt类

package com.jstorm.wd;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.HashMap;
import java.util.Map;

/**
 * 简单的按照空格进行切分后,发射到下一阶段bolt
 * Created by QinDongLiang on 2016/8/31.
 */
public class SplitWordBolt extends BaseRichBolt {

    Map<String,Integer> counts=new HashMap<>();

    private OutputCollector outputCollector;

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.outputCollector=outputCollector;
    }

    @Override
    public void execute(Tuple tuple) {
        String sentence=tuple.getString(0);
//        System.out.println("线程"+Thread.currentThread().getName());
//        简单的按照空格进行切分后,发射到下一阶段bolt
       for(String word:sentence.split(" ") ){
           outputCollector.emit(new Values(word));//发送split
       }

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        //声明输出的filed
        outputFieldsDeclarer.declare(new Fields("word"));
    }
}



(4)Sum的bolt类


package com.jstorm.wd;

import backtype.storm.Config;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.TupleHelpers;
import backtype.storm.utils.Utils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.util.HashMap;
import java.util.Map;

/**
 * Created by QinDongLiang on 2016/8/31.
 */
public class SumWordBolt extends BaseRichBolt {

    Map<String,Integer> counts=new HashMap<>();

    private OutputCollector outputCollector;
    final static Logger logger= LoggerFactory.getLogger(SumWordBolt.class);
    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.outputCollector=outputCollector;
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        Map<String, Object> conf = new HashMap<String, Object>();
        conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);//加入Tick时间窗口,进行统计
        return conf;
    }

    public static Object deepCopy(Object srcObj) {
        Object cloneObj = null;
        try {
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            ObjectOutputStream oo = new ObjectOutputStream(out);
            oo.writeObject(srcObj);

            ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
            ObjectInputStream oi = new ObjectInputStream(in);
            cloneObj = oi.readObject();
        } catch(IOException e) {
            e.printStackTrace();
        } catch(ClassNotFoundException e) {
            e.printStackTrace();
        }
        return cloneObj;
    }

    @Override
    public void execute(Tuple tuple) {
        //时间窗口定义为10s内的统计数据,统计完毕后,发射到下一阶段的bolt进行处理
        //发射完成后retun结束,开始新一轮的时间窗口计数操作
        if(TupleHelpers.isTickTuple(tuple)){
            System.out.println(new DateTime().toString("yyyy-MM-dd HH:mm:ss")+" 每隔10s发射一次map 大小:"+counts.size());
//            Map<String,Integer> copyMap= (Map<String, Integer>) deepCopy(counts);
            outputCollector.emit(new Values(counts));//10S发射一次
//            counts.clear();
           counts=new HashMap<>();//这个地方,不能执行clear方法,可以再new一个对象,否则下游接受的数据,有可能为空 或者深度copy也行,推荐new
            return;
        }

        //如果没到发射时间,就继续统计wordcount
        System.out.println("线程"+Thread.currentThread().getName()+"  map 缓冲统计中......  map size:"+counts.size());
        //String word=tuple.getString(0);//如果有多tick,就不用使用这种方式获取tuple里面的数据
        String word=tuple.getStringByField("word");
        Integer count=counts.get(word);
        if(count==null){
            count=0;
        }
         count++;
         counts.put(word,count);


//        System.out.println(word+" =====>  "+count);



    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

        outputFieldsDeclarer.declare(new Fields("word_map"));
    }
}




(5)Show的bolt类

/**
 * Created by QinDongLiang on 2016/9/12.
 */
public class ShowBolt extends BaseRichBolt {


    private  OutputCollector outputCollector;

    @Override
    public Map<String, Object> getComponentConfiguration() {
        Map<String, Object> conf = new HashMap<String, Object>();
        conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 3);//tick时间窗口3秒后,发射到下一阶段的bolt,仅为测试用
        return conf;
    }

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.outputCollector=outputCollector;
    }

    Map<String,Integer> counts=new HashMap<>();

    @Override
    public void execute(Tuple tuple) {
 //tick时间窗口3秒后,发射到下一阶段的bolt,仅为测试用,故多加了这个bolt逻辑
        if(TupleHelpers.isTickTuple(tuple)){
            System.out.println(new DateTime().toString("yyyy-MM-dd HH:mm:ss")+"  showbolt间隔  应该是 3 秒后 ");
//        System.out.println("what: "+tuple.getValue(0)+"  "+tuple.getFields().toList());
            outputCollector.emit(new Values(counts));
        return;
        }

        counts= (Map<String, Integer>) tuple.getValueByField("word_map");




    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

         outputFieldsDeclarer.declare(new Fields("final_result"));
    }
}


(6)Final的bolt类

package com.jstorm.wd;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import org.joda.time.DateTime;

import java.util.Map;

/**
 * Created by QinDongLiang on 2016/9/12.
 * 最终的结果打印bolt
 */
public class FinalBolt extends BaseRichBolt {

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {

    }

    @Override
    public void execute(Tuple tuple) {
//        最终的结果打印bolt
        System.out.println(new DateTime().toString("yyyy-MM-dd HH:mm:ss")+"  final bolt ");
        Map<String,Integer> counts= (Map<String, Integer>) tuple.getValue(0);
        for(Map.Entry<String,Integer> kv:counts.entrySet()){
            System.out.println(kv.getKey()+"  "+kv.getValue());
        }
        //实际应用中,最后一个阶段,大部分应该是持久化到mysql,redis,es,solr或mongodb中
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}



有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。
技术债不能欠,健康债更不能欠, 求道之路,与君同行。

  • 大小: 116.3 KB
  • 大小: 97.2 KB
0
0
分享到:
评论

相关推荐

    storm之WordCount示例Java代码.zip

    本压缩包提供的"storm之WordCount示例Java代码"是针对Storm的一个经典入门教程,展示了如何使用Java语言实现一个简单的WordCount程序。这个程序的主要目标是统计文本数据流中的单词出现次数。 首先,我们需要理解...

    Storm的WordCount实例

    在这个“Storm的WordCount实例”中,我们将深入探讨如何利用Storm来实现经典的WordCount程序,这是一个在大数据处理中常见的示例,用于统计文本中的单词出现频率。 首先,理解Storm的基本架构是至关重要的。Storm由...

    storm-wordcount例子

    "storm-wordcount"是Storm中的一个经典示例,用于演示如何处理实时数据流并进行简单的统计计算,类似于Hadoop的WordCount程序。这个例子的核心目标是统计输入文本中每个单词出现的次数。 在Storm中,数据流被抽象为...

    Hadoop安装教程和Wordcount示例

    文档详细的描述了Hadoop在Linux上的安装过程,并且附带了Wordcount程序示例

    spark local下 WordCount运行示例

    在"Spark local下 WordCount运行示例"中,我们将探讨如何在本地模式(local mode)下使用Spark执行一个简单的WordCount程序。WordCount是大数据处理领域的一个经典例子,用于统计文本中各个单词出现的次数。 首先,...

    Hadoop集群中WordCount示例

    ### Hadoop集群中WordCount示例详解 #### Hadoop简介 Hadoop是一个开源软件框架,用于分布式存储和处理大规模数据集。它通过提供一个高效、可靠且可扩展的环境来支持大数据处理应用,使得开发者可以在商品硬件上...

    jstorm2.2.1执行wordcount

    本项目主要实现的功能是:统计单词的个数 jdk1.8 jstorm2.2.1 执行步骤: 1. 本地正确安装maven 2. 本地正确安装zookeeper,并启动 3. Idea导入项目源码,以...4. 可分别运行random或wordcount下topology下的main类

    storm_wordcount.zip

    【标题】"storm_wordcount.zip" 是一个基于Java开发的Apache Storm项目,主要实现的功能是对英语单词进行实时统计。Storm是一个分布式实时计算系统,能够处理海量数据流,并保证每个事件只被处理一次(Exactly-once...

    Storm本地模式WordCount亲测可用

    **Storm本地模式WordCount亲测可用** 在大数据处理领域,Apache Storm是一个实时计算框架,它被广泛用于处理无界数据流。"Storm本地模式"是Storm提供的一种在单机环境中进行开发和测试的机制,无需分布式环境即可...

    Hadoop示例程序WordCount运行及详解

    Hadoop示例程序WordCount运行及详解 Hadoop平台上进行WordCount是非常重要的,掌握了WordCount可以更好地理解Hadoop的map-reduce编程模型。本文将详细讲解Hadoop平台上WordCount的运行和实现。 基于Hadoop的map-...

    storm开发jar包以及storm例子源码

    标题中的"storm开发jar包以及storm例子源码"表明了我们即将探讨的是关于Apache Storm的开发环境设置和示例代码。Apache Storm是一个开源的分布式实时计算系统,它允许开发者处理无限的数据流,常用于大数据实时处理...

    Hadoop mapreduce实现wordcount

    WordCount 是 MapReduce 框架中经典的入门示例,它统计文本文件中每个单词出现的次数。在这个案例中,我们将深入探讨如何在 Hadoop 环境中使用 MapReduce 实现 WordCount。 【描述】在 Hadoop 环境中,WordCount 的...

    linux环境下运行Eclipse用来测试WordCount示例

    ### Linux环境下运行Eclipse以测试WordCount示例 #### Eclipse简介 Eclipse 是一款非常流行的开源集成开发环境(IDE)。其强大的功能不仅限于Java应用的开发,还通过丰富的插件支持多种编程语言如C/C++、Python、...

    Storm的集群搭建实战课程代码和PPT.rar

    在Storm中,WordCount示例会演示如何实时地统计文本中的单词出现次数,这将帮助初学者快速上手Storm的编程模型。 通过深入学习这些资料,你不仅能掌握Storm的集群搭建,还能了解到其核心概念、编程模型以及实际应用...

    wordcount:大数据组件的单词计数示例

    而"WordCount"是Hadoop入门的经典示例,用于演示如何处理大规模数据。这个例子简单直观,帮助初学者理解Hadoop MapReduce的工作原理。下面我们将详细讨论这个"wordcount"示例及其相关知识点。 一、Hadoop简介 ...

    mapreduce:用于罗马尼亚大数据研讨会的 Wordcount MapReduce 示例

    在“mapreduce:用于罗马尼亚大数据研讨会的 Wordcount MapReduce 示例”中,我们将探讨如何使用Java实现这一经典案例,以及它在大数据处理中的应用。 MapReduce的工作流程如下: 1. **映射(Map)阶段**: 在这个...

    Storm API实现词频统计

    生成的JAR文件(如`storm_demo.jar`)可以用Storm的命令行工具提交到本地或远程的Storm集群上运行,例如`storm jar storm_demo.jar WordCountTopology wordcount`。 5. **监控与调试** Storm提供了Web UI,可以...

    wordcount:WordCount, Storm入门实例

    wordcount 项目说明 WordCount, 一个Storm入门实例。 实现了如下的流程: 抓取ChinaDaily的网页内容作为数据源;对数据进行分词处理,按词频排序并打印排序结果。 相关信息 作者:robin 博客地址:

    最简单MR WordCount

    【标题】"最简单MR WordCount" 涉及到的是MapReduce编程模型中的一个经典示例,WordCount。在Hadoop生态系统中,WordCount是一个基础但非常重要的应用,用于统计文本文件中每个单词出现的次数。这个程序展示了...

Global site tag (gtag.js) - Google Analytics