`

Storm demo

阅读更多
public class SentenceSpout extends BaseRichSpout{ 
     
    private static final long serialVersionUID = 1L; 
 
    /**
     * This output collector exposes the API for emitting tuples from an {@link org.apache.storm.topology.IRichSpout}.
     * The main difference between this output collector and {@link OutputCollector}
     * for {@link org.apache.storm.topology.IRichBolt} is that spouts can tag messages with ids so that they can be
     * acked or failed later on. This is the Spout portion of Storm's API to
     * guarantee that each message is fully processed at least once.
     */ 
    private SpoutOutputCollector collector; 
    //private OutputCollector collector; 
     
    //准备测试数据 
    private String[] sentences={ 
            "my dog has fleas", 
            "i like cold beverages", 
            "the dog ate my homework", 
            "don't have a cow man", 
            "i don't think i like fleas"}; 
     
    private int index=0; 
     
    /**
     * private Map<String, StreamInfo> _fields = new HashMap<>();
     * public void declareStream(String streamId, boolean direct, Fields fields) {
     *   if(_fields.containsKey(streamId)) {
     *       throw new IllegalArgumentException("Fields for " + streamId + " already set");
     *    }
     *   _fields.put(streamId, new StreamInfo(fields.toList(), direct));
     * }
     */ 
     
    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
        declarer.declare(new Fields("sentences")); 
    } 
     
     
    /**
     * open方法在ISpout接口中定义,所有Spout组件在初始化时调用这个方法,open()方法接收三个参数
     * 一个包含了Storm配置信息的map
     * TopologyContext对象提供了topology中组件的信息
     * SpoutOutputCollector对象提供了发射tuple的方法
     */ 
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { 
        this.collector=collector; 
    } 
     
    /**
     * 是所有spout实现的核心所在,Storm通过调用这个方法向输出的collector发射tuple
     */ 
    public void nextTuple() { 
        try { 
            Thread.sleep(100); 
        } catch (InterruptedException e) { 
            // TODO Auto-generated catch block 
            e.printStackTrace(); 
        } 
        this.collector.emit(new Values(sentences[index])); 
        //System.out.println("==============="); 
        index++; 
        if(index>=sentences.length){ 
            index=0; 
        } 
    } 





public class SplitSentenceBolt extends BaseRichBolt{ 
 
    private static final long serialVersionUID = 1L; 
    private OutputCollector collector; 
     
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 
        this.collector=collector; 
    } 
 
    public void execute(Tuple input) { 
        String sentence=input.getStringByField("sentences"); 
        String[] words=sentence.split(" "); 
        for(String word :words){ 
            this.collector.emit(new Values(word)); 
        } 
    } 
 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
        declarer.declare(new Fields("words")); 
    } 




public class WordCountBolt extends BaseRichBolt{ 
 
    private static final long serialVersionUID = 1L; 
     
    private OutputCollector collector; 
    private HashMap<String,Long> counts=null; 
     
    /**
     * 通常情况下最好是在构造函数中对基本数据类型和可序列化的对象进行赋值和实例化
     * 在prepare()方法中对不可序列化的对象进行实例化
     */ 
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 
        this.collector=collector; 
        this.counts=new HashMap<String,Long>(); 
    } 
    public void execute(Tuple input) { 
        String word=input.getStringByField("words"); 
        Long count=this.counts.get(word); 
        if(count==null){ 
            count=0L; 
        } 
        count++; 
        //出现就添加到map中,word相同的,会覆盖掉 所以最后的word就是准确的数据 
        this.counts.put(word,count); 
        this.collector.emit(new Values(word,count)); 
    } 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
        declarer.declare(new Fields("word","count")); 
    }    



public class ReportBolt extends BaseRichBolt{ 
 
    private static final long serialVersionUID = 1L; 
     
    private HashMap<String,Long> counts=null; 
 
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 
        this.counts=new HashMap<String,Long>(); 
    } 
 
    public void execute(Tuple input) { 
        String word=input.getStringByField("word"); 
        Long count=input.getLongByField("count"); 
        this.counts.put(word, count); 
         
         
        System.out.println("--------FINAL COUNTS--------"); 
        List<String> keys=new ArrayList<String>(); 
        keys.addAll(this.counts.keySet()); 
        Collections.sort(keys); 
        for(String key:keys){ 
            System.out.println(key+":"+this.counts.get(key)); 
        } 
        System.out.println("----------------------------"); 
         
    } 
 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
        // this bolt does not emit anything 
    } 



public class WordCountTopology{ 
     
    private static final String SENTENCE_SPOUT_ID="sentence-sput"; 
    private static final String SPLIT_BOLT_ID="split-bolt"; 
    private static final String COUNT_BOLT_ID="count-bolt"; 
    private static final String REPORT_BOLT_ID="report-bolt"; 
    private static final String TOPOLOGY_NAME="word-count-topology"; 
     
    public static void main(String[] args) throws InterruptedException { 
        SentenceSpout spout=new SentenceSpout(); 
        SplitSentenceBolt splitbolt=new SplitSentenceBolt(); 
        WordCountBolt countbolt=new WordCountBolt(); 
        ReportBolt reportbolt=new ReportBolt(); 
         
        TopologyBuilder builder=new TopologyBuilder(); 
        // 设置并发为2个executor,每个Task指派各自的executor线程 
        builder.setSpout(SENTENCE_SPOUT_ID,spout,2); 
        // 设置并发为2个executor,每个executor执行2个task 
        builder.setBolt(SPLIT_BOLT_ID,splitbolt,2).setNumTasks(4).shuffleGrouping(SENTENCE_SPOUT_ID); 
        // 有时候我们需要将特定数据的tuple路由到特殊的bolt实例中,在此我们使用fieldsGrouping 
        // 来保证所有"word"字段值相同的tuple会被路由到同一个WordCountBolt实例中 
        builder.setBolt(COUNT_BOLT_ID,countbolt,2).fieldsGrouping(SPLIT_BOLT_ID,new Fields("words")); 
        builder.setBolt(REPORT_BOLT_ID,reportbolt).globalGrouping(COUNT_BOLT_ID); 
         
        /*Map conf=new HashMap();
        conf.put(Config.TOPOLOGY_WORKERS,4);
        conf.put(Config.TOPOLOGY_DEBUG,true);*/ 
         
        Config conf = new Config(); 
        //conf.setDebug(true); 
        LocalCluster cluster=new LocalCluster(); 
        cluster.submitTopology(TOPOLOGY_NAME,conf,builder.createTopology()); 
         
//      Thread.sleep(1000); 
//      cluster.shutdown(); 
         
    } 

分享到:
评论

相关推荐

    storm demo

    【storm demo】是一个基于Apache Storm的实践项目,旨在教授如何运用Storm的核心组件——Spout、Bolt和Topology来构建实时数据处理系统。Apache Storm是一个开源的分布式实时计算系统,能够处理大量的流数据,并保证...

    Storm 上手 demo 例子 演示

    【标题】:“Storm 上手 demo 例子演示” 【描述】:“Storm demo例子案例” 这篇文章将深入探讨Apache Storm,一个开源的分布式实时计算系统,通过实际的demo案例来帮助你快速上手。Apache Storm的设计目标是处理...

    StormDemo.tar.gz

    【标题】"StormDemo.tar.gz" 是一个与Apache Storm相关的压缩包文件,它提供了一个入门级别的示例,帮助用户理解并开始使用这个分布式实时计算系统。Apache Storm是一个开源的流处理框架,它允许开发者处理和分析...

    stormdemo.zip

    本资料“stormdemo.zip”提供了一个关于Storm的实战示例,名为“stormdemo”,旨在帮助用户深入理解并掌握Storm的核心概念和操作流程。 Apache Storm是一个开源的分布式实时计算系统,它允许开发者连续处理数据流,...

    storm demo 单机版 maven

    描述提到"stormdemo实现不安装storm系列软件的基础上,运行调试storm拓扑",这暗示了该项目包含了一种方法,可以在没有预先安装Apache Storm的情况下运行和调试Storm拓扑。通常,这可以通过使用Maven的`storm-...

    simple storm demo

    simple storm demosimple storm demosimple storm demosimple storm demosimple storm demosimple storm demo

    storm练习代码

    通过"StormDemo"这个练习,你可以了解如何构建和运行一个简单的Storm应用,包括定义Spout和Bolt,配置拓扑,设置分组策略,以及如何在本地和集群环境中部署。实践中,你可以逐步增加复杂性,如实现更复杂的业务逻辑...

    storm集成kafka插demo.zip

    【标题】"storm集成kafka插demo.zip"指的是一个演示如何将Apache Storm与Apache Kafka集成的实例项目。这个压缩包包含了一个示例,用于展示如何在Storm拓扑中消费和处理Kafka的消息。 【描述】"storm集成kafka插件...

    storm_Kafka_demo

    标题“storm_Kafka_demo”指的是一个使用Apache Storm处理Kafka数据流的示例项目。Apache Storm是一个开源的分布式实时计算系统,它允许开发者处理无界数据流,而Kafka是高吞吐量的分布式发布订阅消息系统。在这个...

    storm DRPC简单例程

    7. **stormdemo**:这个文件可能是storm项目的示例代码,包含了一个使用DRPC的简单示例,可能包括了拓扑定义、DRPC服务端实现、客户端调用的代码示例。 综上所述,"storm DRPC简单例程"是一个关于如何在Apache ...

    storm自学文档

    编写StormDemo时,需要搭建开发环境,安装Eclipse、Maven等开发工具,并配置相关开发插件。编写程序时,可从一个简单的wordcounter单词计数器开始,其程序结构包括拓扑驱动类、WordReader读取数据源、WordNormalizer...

    storm 的安装使用

    解压并研究这个stormdemo,可以帮助理解Storm的工作原理和如何编写实时数据处理的Topologies。 总结来说,Apache Storm是一个强大的实时数据处理工具,通过安装、配置和运行示例,可以深入理解其工作机制,并掌握...

    Storm第02天

    4. **StormDemo** - 这可能是一个Storm项目的实例,包含了一个完整的拓扑定义,用于演示如何创建、部署和运行Storm应用。通过这个例子,我们可以看到实际操作中的代码结构和逻辑。 在接下来的内容中,我们可能会...

    jstorm storm入门demo

    "jstorm storm入门demo" 这个标题表明了这是一个关于JStorm和Storm框架的基础教学示例。JStorm是阿里巴巴开源的一个分布式实时计算系统,它基于Apache Storm,但提供了更稳定、高性能以及易用的特性。这个demo可能是...

    storm统计单词数的demo

    【storm统计单词数的demo】是一个基于Apache Storm的入门级示例,旨在帮助初学者理解这个分布式实时计算系统的运作机制。Apache Storm是一个开源的流处理框架,它允许开发者处理无界数据流,实现高吞吐量、低延迟的...

    stormdemo:风暴你好世界,参考自风暴蓝图第1章

    stormdemostorm hello world, 参考自 storm blueprint chapter1最新说明请见:本示例使用storm运行经典的wordcount程序,拓扑如下:sentence-spout—&gt;split-bolt—&gt;count-bolt—&gt;report-bolt分别完成句子的产生、...

    storm 示例demo

    简单的storm入门示例,从0到1让你清楚的理解storm.下面是代码示例: import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology...

    Storm:使用 Apache Storm 的示例

    此存储库专用于 Apache Storm 项目和代码示例。 关于风暴 要了解有关 Storm 的更多信息,最好阅读 Storm 官方网页上的。 这是一个很好的指南,并且有一些非常好的链接。 它也不长且易于理解。 运行示例 在做任何事情...

    storm-kafka:风暴演示,卡夫卡演示,风暴卡夫卡演示

    该项目包含3个演示: storm demo kafka demo storm-kafka-demo 你可以很容易地测试这个。 storm-kafka-demo主类是my.storm.kafka.demo.MyKafkaTopology storm demo主类是word.count.topology.WordCountTopology 包...

    storm之drpc操作demo示例.zip

    总结来说,"storm之drpc操作demo示例"是一个很好的学习资源,它涵盖了Storm DRPC的核心概念和实践操作,对于想要在实时计算项目中运用Storm DRPC功能的开发者来说,极具参考价值。通过实际操作这个示例,你将能够...

Global site tag (gtag.js) - Google Analytics