`

storm-wordCount

阅读更多
单词统计


一、拓扑结构

1.数据来源

2.单词拆分

3.单词计数

4.统计结果

5.拓扑构建

二、代码实现

1.单词来源
package com.study.storm.test.wordcount;

import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

/**
 * Spout : 数据源
 * 数据的生产者,将数据发送到一个或多个Bolt上进行数据处理
 */
public class SentenceSpout extends BaseRichSpout {

	/**
	 * 
	 */
	private static final long serialVersionUID = -5569170406079406193L;

	private SpoutOutputCollector collector = null ;

	// 数据
	private static String [] sentences = {
		"i am a boy ","i like eat","i do not like ","what"
	};
	
	// 数组的下标,数据发送的 msgId 
	private int index = 0 ;

	/**
	 * 来自 ISpout
	 * 不停的发送数据
	 * 无数据发送,可以睡眠一段时间,避免CPU频繁调用
	 */
	@Override
	public void nextTuple() {
		// 只发送一次
		if(index >= sentences.length){
			return ;
		}
		// 发送的数据内容,数据的msgId 编号(不传,默认为 null )
		collector.emit(new Values(sentences[index]),index);
		// 循环发送,避免数组越界
//		index = index > sentences.length ? 0 : index++ ;
		index ++ ;
	}

	/**
	 * 慎用静态成员变量,线程安全问题
	 * 因为	SpoutOutputCollector 是线程安全的,所以此处的 全局的 collector 可以设置为 static 
	 */
	
	/**
	 * 来自于 ISpout 接口
	 * 组件初始化时调用
	 * @param arg0  配置信息
	 * @param arg1 任务信息
	 * @param arg2 发射数据用的组件,线程安全 
	 */
	@SuppressWarnings("rawtypes")
	@Override
	public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector arg2) {
		this.collector = arg2 ; 
	}

	/**
	 * 来自IComponent 
	 * 声明输出用的字段
	 */
	@Override
	public void declareOutputFields(OutputFieldsDeclarer arg0) {
		arg0.declare(new Fields("sentence"));
	}

	@Override
	public void ack(Object msgId) {
		// 发送成功应答
		System.out.println("ack : "+msgId);
	}
	
	@Override
	public void fail(Object msgId) {
		System.out.println("fail : "+msgId);
		// 发送失败:重新发送
		this.collector.emit(new Values(sentences[(Integer)msgId]),msgId);
	}
}




2.单词拆分
package com.study.storm.test.wordcount;

import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

/**
 * 单词拆分
 */
public class SentenceBolt extends BaseRichBolt {

	/**
	 * 
	 */
	private static final long serialVersionUID = -5420313164197072550L;

	private OutputCollector collector ;
	/**
	 * 继承自 IBolt
	 * 初始化的方法,在组件初始化时调用
	 * @param stormConf 当前Bolt的配置信息对象
	 * @param context 当前环境信息对象
	 * @param collector 对外输出 tuple 用的对象
	 */
	@SuppressWarnings("rawtypes")
	@Override
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		this.collector = collector;
	}

	@Override
	public void execute(Tuple input) {
		try {
            // 通过 field 获取数据,sentence 为 spout 中定义的			
			String sentences = input.getStringByField("sentence");
			String [] words = sentences.split(" ");
			for(String word : words){
				// 锚定:绑定 bolt 与上一级,数据发送状况传递,如果出现问题,方便查找上一级数据的来源
				this.collector.emit(input,new Values(word));
			}
			// 确认发送成功
			collector.ack(input);
		} catch (Exception e) {
			collector.fail(input);
		}
		
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word"));
	}

}



3.单词计数
package com.study.storm.test.wordcount;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

/**
 * 单词数量统计
 */
public class WordCountBolt extends BaseRichBolt {

	/**
	 * 
	 */
	private static final long serialVersionUID = -4811405807833269368L;

	private OutputCollector collector = null ;

	/**
	 * 线程安全
	 */
	private Map<String,Integer> countMap = new ConcurrentHashMap<String,Integer>();
	
	@SuppressWarnings("rawtypes")
	@Override
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		this.collector = collector ;
	}

	@Override
	public void execute(Tuple input) {
		try {
			
			String word = input.getStringByField("word");
			countMap.put(word, countMap.containsKey(word) ? countMap.get(word)+1 : 1);
			// 按照 declarer 的顺序发送数据,先单词,再单词数量
			this.collector.emit(input,new Values(word,countMap.get(word)));
			
			collector.ack(input);
		} catch (Exception e) {
			collector.fail(input);
		}
		
		
	
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// 数据的发送数据
		declarer.declare(new Fields("word","count"));
	}

}



4.统计结果
package com.study.storm.test.wordcount;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

public class ResultBolt extends BaseRichBolt {

	/**
	 * 
	 */
	private static final long serialVersionUID = 7436620687730623420L;

	private Map<String,Integer> map = new ConcurrentHashMap<String,Integer>();
	
	private OutputCollector collector = null ;
	
	@SuppressWarnings("rawtypes")
	@Override
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		this.collector = collector ;
	}

	@Override
	public void execute(Tuple input) {
		try {
			// 通过field 获取数据
			String word = input.getStringByField("word");
			// 不可通过 getStringByField 获取,会报转换异常
			Integer count = (Integer) input.getValueByField("count");
			
			map.put(word, count);
			
			System.out.println(word +" : " + count);
			
			collector.ack(input);
			
		} catch (Exception e) {
			collector.fail(input);
		}
		
		
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {

	}

	/**
	 * 程序正常运行结束后运行
	 * 
	 */
	@Override
	public void cleanup() {

		System.out.println("统计结果:");
		for(String key : map.keySet()){
			System.out.println(key + " : " + map.get(key));
		}
	}
}



5.拓扑构建
package com.study.storm.test.wordcount;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

public class WordCountTopology {

	public static void main(String args[]) throws AlreadyAliveException, InvalidTopologyException, InterruptedException{
		
		// 实例化 Spout Bolt 
		SentenceSpout sentenceSpout = new SentenceSpout();
		SentenceBolt sentenceBolt = new SentenceBolt();
		WordCountBolt wordCountBolt = new WordCountBolt();
		ResultBolt resultBolt = new ResultBolt();
		
		// Topology 
		TopologyBuilder builder = new TopologyBuilder();
		/**
		 * 并发情况下考虑如何分发数据
		 * 一、并发级别
		 * node : 对应于 storm 集群中的服务器
		 * worker : 线程级别
		 * executor : 线程级别
		 * task : 
		 */
		builder.setSpout("sentenceSpout", sentenceSpout);
		// 随机分发:即数据可以发送到任意的下一级的处理机器上,而不会对统计结果产生影响
		builder.setBolt("sentenceBolt", sentenceBolt).shuffleGrouping("sentenceSpout");
		// 按照 field = word 分发,相同的 word 传递的数据发送到同一台机器上,避免统计遗漏
		// hash % taskNum 
		// 若此处为随机分发且并行级别大于等于2,单词 a 发送 1机器,统计 a : 1 ,第二次单词发送到 2 机器,a 的数量就会失真
		builder.setBolt("wordCountBolt", wordCountBolt).fieldsGrouping("sentenceBolt", new Fields("word"));
		// 若上一级并行数量多,则无论哪一个处理完毕都发送到同一台处理机器上,此种方式,对并发数量设置无效
		builder.setBolt("resultBolt", resultBolt).globalGrouping("wordCountBolt");
		
		// 生产拓扑
		StormTopology stormTopology = builder.createTopology();
		
		Config config = new Config();
		// 集群运行
//		StormSubmitter.submitTopology("wordCountTopology", config, stormTopology);
	
		LocalCluster local = new LocalCluster();
		local.submitTopology("wordCountTopology", config, stormTopology);
		// 断点调试,调整时间
		Thread.sleep(10*1000);
		local.killTopology("wordCountTopology");
		local.shutdown();
	}
}




分享到:
评论

相关推荐

    storm-wordcount例子

    《深入理解Storm-Wordcount实例》 Storm是一个分布式实时计算系统,它被广泛应用于大数据处理领域,尤其是在实时数据流分析方面。"storm-wordcount"是Storm中的一个经典示例,用于演示如何处理实时数据流并进行简单...

    storm-starter-master

    在"storm-starter-master"这个项目中,通常会包含一系列的示例,如简单的单词计数(WordCount)、日志分析等,这些示例有助于初学者理解如何在Storm中创建拓扑结构(Topology)并部署运行。每个示例都会展示如何定义...

    Storm的WordCount实例

    在这个“Storm的WordCount实例”中,我们将深入探讨如何利用Storm来实现经典的WordCount程序,这是一个在大数据处理中常见的示例,用于统计文本中的单词出现频率。 首先,理解Storm的基本架构是至关重要的。Storm由...

    apache-storm-0.9.6

    4. `examples`:可能包含一些示例拓扑,如WordCount,帮助用户快速了解如何使用Storm编写和部署拓扑。 5. `docs`:文档和API参考,帮助开发者理解和使用Storm API。 6. `storm-client`:包含客户端库,开发者在本地...

    storm_wordcount.zip

    【标题】"storm_wordcount.zip" 是一个基于Java开发的Apache Storm项目,主要实现的功能是对英语单词进行实时统计。Storm是一个分布式实时计算系统,能够处理海量数据流,并保证每个事件只被处理一次(Exactly-once...

    storm之WordCount示例Java代码.zip

    本压缩包提供的"storm之WordCount示例Java代码"是针对Storm的一个经典入门教程,展示了如何使用Java语言实现一个简单的WordCount程序。这个程序的主要目标是统计文本数据流中的单词出现次数。 首先,我们需要理解...

    storm-starter:来自 github 的 Storm starter,添加我的新类

    例如,可能有一个名为`wordcount`的示例,展示了如何创建一个简单的单词计数应用。在这个例子中,Spout可能会读取文本输入,然后Bolt会处理这些数据,统计每个单词出现的次数。 学习Storm Starter时,你需要理解...

    STORM - Student Online Record Management-开源

    STORM,全称为Student Online Record Management,是一款专为教育领域设计的开源软件,它提供了一个高效、便捷的方式来管理和存储大规模课程的学生记录。该系统利用MySQL作为其核心数据库接口,确保了数据的安全性和...

    storm-archetype:尝试使用Storm的项目的非常基本的原型

    风暴原型 这是一个简单的项目,可... mvn原型:generate -DarchetypeGroupId = org.apache.storm -DarchetypeArtifactId = storm-archetype -DarchetypeVersion = 1.0-SNAPSHOT -DgroupId =(your-group)-DartifactI

    Storm本地模式WordCount亲测可用

    **Storm本地模式WordCount亲测可用** 在大数据处理领域,Apache Storm是一个实时计算框架,它被广泛用于处理无界数据流。"Storm本地模式"是Storm提供的一种在单机环境中进行开发和测试的机制,无需分布式环境即可...

    storm-cli:Apache Storm 的 cli

    主分支: 风暴-cli Apache Storm 的 cli 字数示例 set topology.name=test_topology; set storm.jar=./jstorm-example-0.9.0.jar;...REGISTER count=BOLT(1, "storm.starter.WordCountTopology$WordCount").FIE

    jstorm2.2.1执行wordcount

    本项目主要实现的功能是:统计单词的个数 jdk1.8 jstorm2.2.1 执行步骤: 1. 本地正确安装maven 2. 本地正确安装zookeeper,并启动 3. Idea导入项目源码,以...4. 可分别运行random或wordcount下topology下的main类

    storm-starter

    7. **示例代码**:压缩包内的示例代码可能包括简单的 WordCount 示例,展示如何统计文本流中的单词出现次数,以及其他复杂示例,如 Twitter 示例,展示如何处理实时社交媒体数据。 8. **配置和监控**:通过"storm-...

    storm开发jar包以及storm例子源码

    例如,你可以创建一个简单的WordCount拓扑,从spout读取文本,然后通过一系列bolt进行单词拆分和计数。 4. **打包为jar**:使用Maven的`mvn package`命令,将项目打包成可执行的jar包。这个jar包包含了所有依赖,...

    test_Storm_wordCount

    【标题】"test_Storm_wordCount" 是一个基于Java实现的Apache Storm项目,它主要用于演示分布式实时计算系统如何处理文本数据并进行词频统计。在大数据处理领域,Apache Storm是一个实时计算框架,能够处理无界数据...

    Storm编程实践(安装Maven-使用Maven编译运行代码-mvn 打包)

    然后,我们可以使用 Maven 运行 Storm-starter 中的 WordCount 例子。我们需要在 storm-starter 目录中运行 mvn 命令,以便 Maven 可以查找当前目录下的 pom.xml 文件,下载所需要的依赖包。 打包代码 在编译和...

    wordcount:WordCount, Storm入门实例

    wordcount 项目说明 WordCount, 一个Storm入门实例。 实现了如下的流程: 抓取ChinaDaily的网页内容作为数据源;对数据进行分词处理,按词频排序并打印排序结果。 相关信息 作者:robin 博客地址:

    storm 的安装使用

    6. **运行示例**:Apache Storm附带了一些示例项目,如WordCount,可以用来验证安装是否成功。在`examples/storm-starter`目录下,你可以找到这些示例的源代码。按照官方文档的指示运行这些示例,观察日志输出以确认...

    Storm API实现词频统计

    生成的JAR文件(如`storm_demo.jar`)可以用Storm的命令行工具提交到本地或远程的Storm集群上运行,例如`storm jar storm_demo.jar WordCountTopology wordcount`。 5. **监控与调试** Storm提供了Web UI,可以...

    Flink入门及实战V1.6.1-2018最新

    Flink入门及实战最新内容分享,包含Flink基本原理及应用场景、Flink vs storm vs sparkStreaming、Flink入门案例-wordCount、Flink集群安装部署standalone+yarn、Flink-HA高可用、Flink scala shell代码调试

Global site tag (gtag.js) - Google Analytics