- 浏览: 25872 次
- 性别:
- 来自: 深圳
文章分类
最新评论
public class SentenceSpout extends BaseRichSpout{
private static final long serialVersionUID = 1L;
/**
* This output collector exposes the API for emitting tuples from an {@link org.apache.storm.topology.IRichSpout}.
* The main difference between this output collector and {@link OutputCollector}
* for {@link org.apache.storm.topology.IRichBolt} is that spouts can tag messages with ids so that they can be
* acked or failed later on. This is the Spout portion of Storm's API to
* guarantee that each message is fully processed at least once.
*/
private SpoutOutputCollector collector;
//private OutputCollector collector;
//准备测试数据
private String[] sentences={
"my dog has fleas",
"i like cold beverages",
"the dog ate my homework",
"don't have a cow man",
"i don't think i like fleas"};
private int index=0;
/**
* private Map<String, StreamInfo> _fields = new HashMap<>();
* public void declareStream(String streamId, boolean direct, Fields fields) {
* if(_fields.containsKey(streamId)) {
* throw new IllegalArgumentException("Fields for " + streamId + " already set");
* }
* _fields.put(streamId, new StreamInfo(fields.toList(), direct));
* }
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentences"));
}
/**
* open方法在ISpout接口中定义,所有Spout组件在初始化时调用这个方法,open()方法接收三个参数
* 一个包含了Storm配置信息的map
* TopologyContext对象提供了topology中组件的信息
* SpoutOutputCollector对象提供了发射tuple的方法
*/
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector=collector;
}
/**
* 是所有spout实现的核心所在,Storm通过调用这个方法向输出的collector发射tuple
*/
public void nextTuple() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
this.collector.emit(new Values(sentences[index]));
//System.out.println("===============");
index++;
if(index>=sentences.length){
index=0;
}
}
}
public class SplitSentenceBolt extends BaseRichBolt{
private static final long serialVersionUID = 1L;
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector=collector;
}
public void execute(Tuple input) {
String sentence=input.getStringByField("sentences");
String[] words=sentence.split(" ");
for(String word :words){
this.collector.emit(new Values(word));
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("words"));
}
}
public class WordCountBolt extends BaseRichBolt{
private static final long serialVersionUID = 1L;
private OutputCollector collector;
private HashMap<String,Long> counts=null;
/**
* 通常情况下最好是在构造函数中对基本数据类型和可序列化的对象进行赋值和实例化
* 在prepare()方法中对不可序列化的对象进行实例化
*/
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector=collector;
this.counts=new HashMap<String,Long>();
}
public void execute(Tuple input) {
String word=input.getStringByField("words");
Long count=this.counts.get(word);
if(count==null){
count=0L;
}
count++;
//出现就添加到map中,word相同的,会覆盖掉 所以最后的word就是准确的数据
this.counts.put(word,count);
this.collector.emit(new Values(word,count));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word","count"));
}
}
public class ReportBolt extends BaseRichBolt{
private static final long serialVersionUID = 1L;
private HashMap<String,Long> counts=null;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.counts=new HashMap<String,Long>();
}
public void execute(Tuple input) {
String word=input.getStringByField("word");
Long count=input.getLongByField("count");
this.counts.put(word, count);
System.out.println("--------FINAL COUNTS--------");
List<String> keys=new ArrayList<String>();
keys.addAll(this.counts.keySet());
Collections.sort(keys);
for(String key:keys){
System.out.println(key+":"+this.counts.get(key));
}
System.out.println("----------------------------");
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// this bolt does not emit anything
}
}
public class WordCountTopology{
private static final String SENTENCE_SPOUT_ID="sentence-sput";
private static final String SPLIT_BOLT_ID="split-bolt";
private static final String COUNT_BOLT_ID="count-bolt";
private static final String REPORT_BOLT_ID="report-bolt";
private static final String TOPOLOGY_NAME="word-count-topology";
public static void main(String[] args) throws InterruptedException {
SentenceSpout spout=new SentenceSpout();
SplitSentenceBolt splitbolt=new SplitSentenceBolt();
WordCountBolt countbolt=new WordCountBolt();
ReportBolt reportbolt=new ReportBolt();
TopologyBuilder builder=new TopologyBuilder();
// 设置并发为2个executor,每个Task指派各自的executor线程
builder.setSpout(SENTENCE_SPOUT_ID,spout,2);
// 设置并发为2个executor,每个executor执行2个task
builder.setBolt(SPLIT_BOLT_ID,splitbolt,2).setNumTasks(4).shuffleGrouping(SENTENCE_SPOUT_ID);
// 有时候我们需要将特定数据的tuple路由到特殊的bolt实例中,在此我们使用fieldsGrouping
// 来保证所有"word"字段值相同的tuple会被路由到同一个WordCountBolt实例中
builder.setBolt(COUNT_BOLT_ID,countbolt,2).fieldsGrouping(SPLIT_BOLT_ID,new Fields("words"));
builder.setBolt(REPORT_BOLT_ID,reportbolt).globalGrouping(COUNT_BOLT_ID);
/*Map conf=new HashMap();
conf.put(Config.TOPOLOGY_WORKERS,4);
conf.put(Config.TOPOLOGY_DEBUG,true);*/
Config conf = new Config();
//conf.setDebug(true);
LocalCluster cluster=new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME,conf,builder.createTopology());
// Thread.sleep(1000);
// cluster.shutdown();
}
}
private static final long serialVersionUID = 1L;
/**
* This output collector exposes the API for emitting tuples from an {@link org.apache.storm.topology.IRichSpout}.
* The main difference between this output collector and {@link OutputCollector}
* for {@link org.apache.storm.topology.IRichBolt} is that spouts can tag messages with ids so that they can be
* acked or failed later on. This is the Spout portion of Storm's API to
* guarantee that each message is fully processed at least once.
*/
private SpoutOutputCollector collector;
//private OutputCollector collector;
//准备测试数据
private String[] sentences={
"my dog has fleas",
"i like cold beverages",
"the dog ate my homework",
"don't have a cow man",
"i don't think i like fleas"};
private int index=0;
/**
* private Map<String, StreamInfo> _fields = new HashMap<>();
* public void declareStream(String streamId, boolean direct, Fields fields) {
* if(_fields.containsKey(streamId)) {
* throw new IllegalArgumentException("Fields for " + streamId + " already set");
* }
* _fields.put(streamId, new StreamInfo(fields.toList(), direct));
* }
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentences"));
}
/**
* open方法在ISpout接口中定义,所有Spout组件在初始化时调用这个方法,open()方法接收三个参数
* 一个包含了Storm配置信息的map
* TopologyContext对象提供了topology中组件的信息
* SpoutOutputCollector对象提供了发射tuple的方法
*/
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector=collector;
}
/**
* 是所有spout实现的核心所在,Storm通过调用这个方法向输出的collector发射tuple
*/
public void nextTuple() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
this.collector.emit(new Values(sentences[index]));
//System.out.println("===============");
index++;
if(index>=sentences.length){
index=0;
}
}
}
public class SplitSentenceBolt extends BaseRichBolt{
private static final long serialVersionUID = 1L;
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector=collector;
}
public void execute(Tuple input) {
String sentence=input.getStringByField("sentences");
String[] words=sentence.split(" ");
for(String word :words){
this.collector.emit(new Values(word));
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("words"));
}
}
public class WordCountBolt extends BaseRichBolt{
private static final long serialVersionUID = 1L;
private OutputCollector collector;
private HashMap<String,Long> counts=null;
/**
* 通常情况下最好是在构造函数中对基本数据类型和可序列化的对象进行赋值和实例化
* 在prepare()方法中对不可序列化的对象进行实例化
*/
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector=collector;
this.counts=new HashMap<String,Long>();
}
public void execute(Tuple input) {
String word=input.getStringByField("words");
Long count=this.counts.get(word);
if(count==null){
count=0L;
}
count++;
//出现就添加到map中,word相同的,会覆盖掉 所以最后的word就是准确的数据
this.counts.put(word,count);
this.collector.emit(new Values(word,count));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word","count"));
}
}
public class ReportBolt extends BaseRichBolt{
private static final long serialVersionUID = 1L;
private HashMap<String,Long> counts=null;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.counts=new HashMap<String,Long>();
}
public void execute(Tuple input) {
String word=input.getStringByField("word");
Long count=input.getLongByField("count");
this.counts.put(word, count);
System.out.println("--------FINAL COUNTS--------");
List<String> keys=new ArrayList<String>();
keys.addAll(this.counts.keySet());
Collections.sort(keys);
for(String key:keys){
System.out.println(key+":"+this.counts.get(key));
}
System.out.println("----------------------------");
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// this bolt does not emit anything
}
}
public class WordCountTopology{
private static final String SENTENCE_SPOUT_ID="sentence-sput";
private static final String SPLIT_BOLT_ID="split-bolt";
private static final String COUNT_BOLT_ID="count-bolt";
private static final String REPORT_BOLT_ID="report-bolt";
private static final String TOPOLOGY_NAME="word-count-topology";
public static void main(String[] args) throws InterruptedException {
SentenceSpout spout=new SentenceSpout();
SplitSentenceBolt splitbolt=new SplitSentenceBolt();
WordCountBolt countbolt=new WordCountBolt();
ReportBolt reportbolt=new ReportBolt();
TopologyBuilder builder=new TopologyBuilder();
// 设置并发为2个executor,每个Task指派各自的executor线程
builder.setSpout(SENTENCE_SPOUT_ID,spout,2);
// 设置并发为2个executor,每个executor执行2个task
builder.setBolt(SPLIT_BOLT_ID,splitbolt,2).setNumTasks(4).shuffleGrouping(SENTENCE_SPOUT_ID);
// 有时候我们需要将特定数据的tuple路由到特殊的bolt实例中,在此我们使用fieldsGrouping
// 来保证所有"word"字段值相同的tuple会被路由到同一个WordCountBolt实例中
builder.setBolt(COUNT_BOLT_ID,countbolt,2).fieldsGrouping(SPLIT_BOLT_ID,new Fields("words"));
builder.setBolt(REPORT_BOLT_ID,reportbolt).globalGrouping(COUNT_BOLT_ID);
/*Map conf=new HashMap();
conf.put(Config.TOPOLOGY_WORKERS,4);
conf.put(Config.TOPOLOGY_DEBUG,true);*/
Config conf = new Config();
//conf.setDebug(true);
LocalCluster cluster=new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME,conf,builder.createTopology());
// Thread.sleep(1000);
// cluster.shutdown();
}
}
发表评论
-
Canal相关理解
2017-12-29 16:18 465转载:http://www.importnew.com/251 ... -
kettle部署
2017-12-26 16:04 7361.将jmbi sql先上生产环境, 参考附件jmbi.sql ... -
crontab定时运行MR不行,手动shell可以执行成功问题排查过程
2017-12-26 15:48 871设置了定时任务,但MR任务没有执行。 第一步:手动执行she ... -
Flume+kafka+Spark Steaming demo2
2017-11-22 13:15 464一,flume配置 # Name the components ... -
Flume+Kafka+Spark Steaming demo
2017-11-21 15:21 446一.准备flume配置 a1.sources = r1 a1. ... -
HBase表导出成HDFS
2017-10-19 19:40 905导出步骤:在old cluster上/opt/cloudera ... -
zepplin实战
2017-10-13 16:10 366一句话介绍Zeppelin 以笔记(Note)的形式展示的数据 ... -
Azkaban安装
2017-10-10 18:32 910一.下载 https://github.com/azkaban ... -
KYKIN安装
2017-09-30 17:35 121. Kylin的一些概念 No. 关键字 解释 1 Kyl ... -
KYKIN安装
2017-09-30 17:40 3701. Kylin的一些概念 No. 关键字 解释 1 Kyl ... -
Logstash安装部署配置
2017-04-28 10:24 1046为了实现各业务平台日志信息采集到大数据平台hdf ... -
HBASE API
2017-04-18 11:01 479package org.jumore.test; impor ... -
Ambari卸载shell
2017-03-28 17:28 475#!/bin/bash # Program: # uni ... -
linux ssh 相互密码登录
2017-02-22 13:40 4221.修改集群各机器名称 vim /etc/sysconfig/ ... -
Kettle Linux 安装部署
2017-02-15 17:20 1363一.安装JDK环境:根据自己的linux系统选择相应的版本,比 ... -
hadoop环境搭建
2017-01-23 17:31 361192.168.23.231 server1 192.168. ... -
环境安装
2017-01-17 16:26 397物理机部署分配 3台物理机上部署 Zookeeper 3个,F ... -
运行Hadoop jar 第三方jar包依赖
2016-08-22 13:47 1040将自己编写的MapReduce程序打包成jar后,在运行 ha ... -
windows10下运行MR错误
2016-07-05 13:45 1682当在windows下运行MR程序时,会报各种错误。现把这次碰到 ... -
HBase问题
2016-06-16 17:02 3111.java.net.UnknownHostException ...
相关推荐
【storm demo】是一个基于Apache Storm的实践项目,旨在教授如何运用Storm的核心组件——Spout、Bolt和Topology来构建实时数据处理系统。Apache Storm是一个开源的分布式实时计算系统,能够处理大量的流数据,并保证...
【标题】:“Storm 上手 demo 例子演示” 【描述】:“Storm demo例子案例” 这篇文章将深入探讨Apache Storm,一个开源的分布式实时计算系统,通过实际的demo案例来帮助你快速上手。Apache Storm的设计目标是处理...
【标题】"StormDemo.tar.gz" 是一个与Apache Storm相关的压缩包文件,它提供了一个入门级别的示例,帮助用户理解并开始使用这个分布式实时计算系统。Apache Storm是一个开源的流处理框架,它允许开发者处理和分析...
本资料“stormdemo.zip”提供了一个关于Storm的实战示例,名为“stormdemo”,旨在帮助用户深入理解并掌握Storm的核心概念和操作流程。 Apache Storm是一个开源的分布式实时计算系统,它允许开发者连续处理数据流,...
描述提到"stormdemo实现不安装storm系列软件的基础上,运行调试storm拓扑",这暗示了该项目包含了一种方法,可以在没有预先安装Apache Storm的情况下运行和调试Storm拓扑。通常,这可以通过使用Maven的`storm-...
simple storm demosimple storm demosimple storm demosimple storm demosimple storm demosimple storm demo
通过"StormDemo"这个练习,你可以了解如何构建和运行一个简单的Storm应用,包括定义Spout和Bolt,配置拓扑,设置分组策略,以及如何在本地和集群环境中部署。实践中,你可以逐步增加复杂性,如实现更复杂的业务逻辑...
【标题】"storm集成kafka插demo.zip"指的是一个演示如何将Apache Storm与Apache Kafka集成的实例项目。这个压缩包包含了一个示例,用于展示如何在Storm拓扑中消费和处理Kafka的消息。 【描述】"storm集成kafka插件...
标题“storm_Kafka_demo”指的是一个使用Apache Storm处理Kafka数据流的示例项目。Apache Storm是一个开源的分布式实时计算系统,它允许开发者处理无界数据流,而Kafka是高吞吐量的分布式发布订阅消息系统。在这个...
7. **stormdemo**:这个文件可能是storm项目的示例代码,包含了一个使用DRPC的简单示例,可能包括了拓扑定义、DRPC服务端实现、客户端调用的代码示例。 综上所述,"storm DRPC简单例程"是一个关于如何在Apache ...
编写StormDemo时,需要搭建开发环境,安装Eclipse、Maven等开发工具,并配置相关开发插件。编写程序时,可从一个简单的wordcounter单词计数器开始,其程序结构包括拓扑驱动类、WordReader读取数据源、WordNormalizer...
解压并研究这个stormdemo,可以帮助理解Storm的工作原理和如何编写实时数据处理的Topologies。 总结来说,Apache Storm是一个强大的实时数据处理工具,通过安装、配置和运行示例,可以深入理解其工作机制,并掌握...
4. **StormDemo** - 这可能是一个Storm项目的实例,包含了一个完整的拓扑定义,用于演示如何创建、部署和运行Storm应用。通过这个例子,我们可以看到实际操作中的代码结构和逻辑。 在接下来的内容中,我们可能会...
"jstorm storm入门demo" 这个标题表明了这是一个关于JStorm和Storm框架的基础教学示例。JStorm是阿里巴巴开源的一个分布式实时计算系统,它基于Apache Storm,但提供了更稳定、高性能以及易用的特性。这个demo可能是...
【storm统计单词数的demo】是一个基于Apache Storm的入门级示例,旨在帮助初学者理解这个分布式实时计算系统的运作机制。Apache Storm是一个开源的流处理框架,它允许开发者处理无界数据流,实现高吞吐量、低延迟的...
stormdemostorm hello world, 参考自 storm blueprint chapter1最新说明请见:本示例使用storm运行经典的wordcount程序,拓扑如下:sentence-spout—>split-bolt—>count-bolt—>report-bolt分别完成句子的产生、...
简单的storm入门示例,从0到1让你清楚的理解storm.下面是代码示例: import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology...
此存储库专用于 Apache Storm 项目和代码示例。 关于风暴 要了解有关 Storm 的更多信息,最好阅读 Storm 官方网页上的。 这是一个很好的指南,并且有一些非常好的链接。 它也不长且易于理解。 运行示例 在做任何事情...
该项目包含3个演示: storm demo kafka demo storm-kafka-demo 你可以很容易地测试这个。 storm-kafka-demo主类是my.storm.kafka.demo.MyKafkaTopology storm demo主类是word.count.topology.WordCountTopology 包...
总结来说,"storm之drpc操作demo示例"是一个很好的学习资源,它涵盖了Storm DRPC的核心概念和实践操作,对于想要在实时计算项目中运用Storm DRPC功能的开发者来说,极具参考价值。通过实际操作这个示例,你将能够...