`

storm两个案例(1单词计数本地执行 2累加集群执行 3集群关闭storm任务写法)

 
阅读更多

 

1 如何监听到文件夹内新增了文件

2 如何监听到原文件内容数据做了变更

3 为了有两个bolt 一个用于切分  一个用于统计单词个数 为何不写在一起呢??

    每一个组件完成单独功能  执行速度非常快  并且提高每个组件的并行度已达到单位时间内处理数据更大

4 参考flume ng 对处理过的文件做修改, 这里是将处理过的文件后缀更改达到目的

 

 操作图如下:



 

 

 2 单词计数代码:

package changping.houzhihoujue.storm;

import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.commons.io.FileUtils;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

/**
 * 作业:实现单词计数。
 *     (1)要求从一个文件夹中把所有文件都读取,计算所有文件中的单词出现次数。
 *     (2)当文件夹中的文件数量增加是,实时计算所有文件中的单词出现次数。
 */
public class MyWordCountTopology {

	 // 祖品火车 创建轨道 发车流程
	 public static void main( String[] args ) {
		 	String DATASOURCE_SPOUT = DataSourceSpout.class.getSimpleName();
	    	String SPLIT_BOLD = SplitBolt.class.getSimpleName();
	    	String COUNT_BOLT = CountBolt.class.getSimpleName();
	    	
	        final TopologyBuilder builder = new TopologyBuilder();
	        builder.setSpout(DATASOURCE_SPOUT, new DataSourceSpout());
	        builder.setBolt(SPLIT_BOLD, new SplitBolt()).shuffleGrouping(DATASOURCE_SPOUT);
	        builder.setBolt(COUNT_BOLT, new CountBolt()).shuffleGrouping(SPLIT_BOLD);
	        
	        final LocalCluster localCluster = new LocalCluster();
	        final Config config = new Config();
			localCluster.submitTopology(MyWordCountTopology.class.getSimpleName(), config, builder.createTopology());
			Utils.sleep(9999999);
			localCluster.shutdown();
	 }
	
}
// 数据源
class DataSourceSpout extends BaseRichSpout{

	private Map conf;
	private TopologyContext context;
	private SpoutOutputCollector collector;
	
	public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) {
		this.conf = conf;
		this.context = context;
		this.collector = collector;
	}

	public void nextTuple() {
		// 过滤文件夹D:/father 得到以txt结尾的文件
		Collection<File> files = FileUtils.listFiles(new File("D:/father"), new String[]{"txt"}, true);
		if(files != null && files.size() > 0){
			for(File file : files){
				try {// 将文件每一行都发射到 bolt内
					List<String> lines = FileUtils.readLines(file);
					for(String line : lines){
						collector.emit(new Values(line));
					}
					//修改操作完的文件(这里是修改后缀) 这样nextTuple方法就不会再重新处理该文件
					FileUtils.moveFile(file, new File(file.getAbsolutePath() + "." + System.currentTimeMillis()));
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
			
		}
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("line"));
	}
	
}
// 切分单词逻辑
class SplitBolt extends BaseRichBolt{

	private Map conf;
	private TopologyContext context;
	private OutputCollector collector;
	
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		this.conf = stormConf;
		this.context = context;
		this.collector = collector;
	}

	public void execute(Tuple input) {
		String line = input.getStringByField("line");
		String[] words = line.split("\\s");
		for(String word : words){ // 发送每一个单词
			System.err.println(word);
			collector.emit(new Values(word));
		}
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word"));
	}
	
}

//统计单词逻辑
class CountBolt extends BaseRichBolt{

	private Map stormConf;
	private TopologyContext context;
	private OutputCollector collector;
	public void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {
		this.stormConf = stormConf;
		this.context = context;
		this.collector = collector;
	}

	private HashMap<String, Integer> map = new  HashMap<String, Integer>();
	public void execute(Tuple input) {
		String word = input.getStringByField("word");
		System.err.println(word);
		Integer value = map.get(word);
		if(value==null){
			value = 0;
		}
		value++;
		map.put(word, value);
		//把结果写出去
		System.err.println("============================================");
		Utils.sleep(2000);
		for (Entry<String, Integer> entry : map.entrySet()) {
			System.out.println(entry);
		}
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		
	}
	
}
/*class CountBolt extends BaseRichBolt{
	private Map conf;
	private TopologyContext context;
	private OutputCollector collector;
	
	public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
		this.conf = conf;
		this.context = context;
		this.collector = collector;
	}
	
	*//**
	 * 对单词进行计数
	 *//*
	
	Map<String, Integer> countMap = new HashMap<String, Integer>();
	public void execute(Tuple tuple) {
		//读取tuple
		String word = tuple.getStringByField("word");
		//保存每个单词
		Integer value = countMap.get(word);
		if(value==null){
			value = 0;
		}
		value++;
		countMap.put(word, value);
		//把结果写出去
		System.err.println("============================================");
		Utils.sleep(2000);
		for (Entry<String, Integer> entry : countMap.entrySet()) {
			System.out.println(entry);
		}
	}

	public void declareOutputFields(OutputFieldsDeclarer arg0) {
		
	}

}*/

 

 

 

3 集群运行累加写法:

 

package changping.houzhihoujue.storm;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.commons.io.FileUtils;

import ch.qos.logback.core.util.TimeUtil;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

/**
 * 本地运行:
 *   实现累加
 * @author zm
 *
 */
public class MyLocalStormTopology {

	/**
	 * 组装火车 轨道  并让火车在轨道上行驶
	 * @throws InterruptedException 
	 */
	public static void main(String[] args) throws Exception {

		// 祖品列车
		TopologyBuilder topologyBuilder = new TopologyBuilder();
		topologyBuilder.setSpout("1", new MySpout2()); // 定义1号车厢
		topologyBuilder.setBolt("2", new MyBolt1()).shuffleGrouping("1");// 定义2号车厢 并和1号车厢连接起来   
		// 造出轨道
		/*LocalCluster localCluster = new LocalCluster();// 造出轨道 在本地运行
		Config config = new Config();   
		// 轨道上运行列车, 三个参数分别为:定义的列车名,列车服务人员,轨道上跑的列车本身
		localCluster.submitTopology(MyLocalStormTopology.class.getSimpleName(), config, topologyBuilder.createTopology());
		
		TimeUnit.SECONDS.sleep(99999);// 设置列车运行时间
		localCluster.shutdown();// 跑完后就停止下来, 否则storm是永不停止
*/		
		// 造出轨道 在集群中运行
		StormSubmitter stormSubmitter = new StormSubmitter();// storm集群执行
		HashMap conf = new HashMap();
		stormSubmitter.submitTopology(MyLocalStormTopology.class.getSimpleName(), conf, topologyBuilder.createTopology());
		
	}

}

//创建火车头
class MySpout2 extends BaseRichSpout {

	private Map conf;
	private TopologyContext context;
	private SpoutOutputCollector collector;
	
	// 此方法首先被调用  打开storm系统外的数据源
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		this.conf = conf;
		this.context = context;
		this.collector = collector;
	}

	private int i = 0;
	// 认为是NameNode的heartbeat,永无休息的死循环的调用 并是线程安全的操作, 这里每一次调用此方法 将i++发送到bolt
	public void nextTuple() {
		System.err.println(i);
		// 将数据(i++)放在弹壳(Values)中,并发送给bolt
		this.collector.emit(new Values(i++));
		try {
			List linesNum = FileUtils.readLines(new File("D:/father"));
			System.err.println("文件行数为: " + linesNum.size());
		} catch (IOException e1) {
			System.err.println("文件行数为: 0 读取失败" );
			// TODO Auto-generated catch block
			e1.printStackTrace();
		}
		try {
			TimeUnit.SECONDS.sleep(1);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	//声明输出的字段的名称为 v1   只有在输出给别人时才会重写此方法
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("v1"));
	}
	
}

// 创建车厢
class MyBolt1 extends BaseRichBolt{

	private Map stormConf;
	private TopologyContext context;
	private OutputCollector collector;
	
	// 准备下要对接收storm spout发送来的数据
	public void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {
		this.stormConf = stormConf;
		this.context = context;
		this.collector = collector;
	}

	private int sum = 0;
	// 死循环,用于接收bolt送来的数据   这里storm每调用一次此方法 则获取发送来的tuple数据
	public void execute(Tuple input) {
		int i = input.getIntegerByField("v1");
		sum += i;
		System.err.println(sum);
	}

	// 只有向外发送数据时 此方法才会被调用 否则 不要实现此方法 
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		
	}
	
}

 

 
3.1) 将代码通过eclipse export 导出包为: stormApp.jar, 上传到storm集群中,执行命令如下:
[root@h2master local]# storm/bin/storm jar stormApp.jar changping.houzhihoujue.storm.MyLocalStormTopology

可以看到提交成功:
Successfully uploaded topology jar to assigned location: /usr/local/storm/tmp/nimbus/inbox/stormjar-ffe04877-a31b-426e-bc21-02f201e8cdc2.jar

 
 
4 在UI上可以看到提交的 累加Topology summary 任务如下:


 
 
停止此任务命令写法为:  kill 后面的名称为 上面截图中 红圈的名称
 这样就关闭了累加的storm任务
 
# storm/bin/storm kill MyLocalStormTopology
 
 
  • 大小: 21.6 KB
  • 大小: 429.3 KB
  • 大小: 29.7 KB
分享到:
评论

相关推荐

    storm实时单词计数

    在"storm实时单词计数"这个场景中,我们主要探讨的是如何使用Storm来实现一个实时分析应用,该应用可以统计输入文本中的单词数量。 在Storm中,数据流是通过拓扑结构(Topology)进行组织的,由多个 bolts(处理...

    storm统计单词数的demo

    在这个例子中,可能存在两个Bolt:一个用于分割单词,另一个用于统计每个单词出现的次数。前者将接收到的句子拆分成单个单词,后者则对每个单词进行计数。 4. **Topology构建**:用户需要定义一个Topology,将Spout...

    02_流式计算基础_第1天(Storm集群部署、单词计数、Stream Grouping).docx

    02_流式计算基础_第1天 (Storm集群部署、单词计数、Stream Grouping).docx

    storm1.2.1-wangzs-可靠单词计数

    《storm1.2.1-wangzs-可靠单词计数》是基于Apache Storm的一个实践项目,专注于演示如何在分布式环境中实现可靠且精确的单词计数。Apache Storm是一个开源的流处理系统,它允许实时处理数据流,确保每个事件都能得到...

    storm自定义计数小案例

    标题中的“storm自定义计数小案例”指的是利用Apache Storm框架构建的一个小型应用程序,这个程序主要是为了演示如何在实时流处理环境中对数据进行计数。Apache Storm是一个开源的分布式实时计算系统,它允许开发者...

    Storm综合案例二Storm集群向Kafka集群读取数据并写入MySQL远程模式

    今天接上文,来实现一个Storm数据流处理综合案例的第二部分,Storm集群向Kafka集群源源不断读取数据,通过MyBatis写入到MySQL数据库,并部署为远程模式 准备工作 参考上文准备工作 代码编写 思路:Storm集群从...

    Storm集群搭建

    【Storm集群搭建】是关于构建分布式实时计算系统Storm的集群过程。Storm是一个开源的、用于处理实时数据流的计算框架,常被用于大数据处理、实时分析等场景。在Storm集群中,主要有两种类型的节点——主控节点...

    Storm集群环境搭建

    Storm的运行依赖于Java 7+和Python 2.6.6+,因此需要预先安装这两个软件。此外,还需要安装ZooKeeper集群,以便于Storm集群的协调管理。 三、集群搭建 1. 下载并解压 Storm安装包 首先,需要下载Storm的安装包...

    第一个Storm应用

    1.本地模式(Local Mode): 即Topology(相当于一个任务,后续会详细讲解) 运行在本地机器的单一JVM上,这个模式主要用来开发、调试。 2.远程模式(Remote Mode):在这个模式,我们把我们的Topology提交到集群,在这个...

    基于Storm本地集群搭建实时统计CallLog实现可运行

    Storm是用于处理无界数据流的平台,它允许多个任务并行执行,保证每个消息只被处理一次(Exactly Once语义)。它的工作原理是通过拓扑结构(Topology)来定义数据流的处理逻辑,由Spout(数据源)生成数据流,然后...

    storm部署(包括所有依赖rpm包、集群搭建详解)

    Nimbus是Storm集群的主节点,负责任务分配和监控。Supervisor则是工作节点,每个节点上都运行一个Supervisor实例,管理worker进程。UI则提供了可视化界面,帮助监控和管理集群状态。 以下是详细的部署步骤: 1. **...

    storm集群部署和配置过程详解

    Storm集群的部署和配置是构建实时大数据处理系统的关键步骤,本文将详细讲解这一过程。Storm是一个分布式实时计算系统,能够处理大规模数据流,确保每个事件都得到正确的处理。以下是搭建Storm集群的详细步骤: 1. ...

    基于Storm的分布式流计算集群详细配置

    4. **创建myid文件**:每个ZooKeeper节点的`data`目录下创建`myid`文件,写入对应节点的ID(1、2、3),表示节点在ZooKeeper集群中的角色。 ### 三、Storm集群配置 1. **Storm安装**:下载Storm的tar包,解压并...

    Storm综合案例一Storm集群向Kafka集群写入数据远程模式

    今天来实现一个Storm数据流处理综合案例的第一部分,Storm集群向Kafka集群源源不断写入数据,并部署为远程模式 准备工作 搭建三台Kafka集群服务器 参考文档:Linux部署Kafka集群 搭建三台Storm集群服务器 参考...

    storma集群安装手册(包含zookeeper集群安装和storm集群安装)

    ### Storma集群安装手册知识点详解 ...接下来可以进一步配置 Storm 的任务拓扑结构,并开始执行实时数据处理任务。请注意,在实际部署过程中可能还会遇到各种细节问题,建议仔细阅读官方文档以获取更多帮助和支持。

    storm集群启动与停止脚本共2页.pdf.zip

    【标题】"storm集群启动与停止脚本共2页.pdf.zip" 提供的是一份关于storm集群管理的文档,主要涵盖了如何启动和停止storm集群的关键步骤。Storm是Apache开源的一个分布式实时计算系统,它能够处理大规模的数据流并...

    storm集群部署文档

    - **启动Supervisor服务**:在每个Supervisor节点(如`work1`、`work2`)上执行: ```bash nohup storm supervisor & ``` #### 四、Storm集群组件介绍 - **Nimbus**:集群中的主节点,负责接收用户提交的任务,...

    storm-wordcount例子

    3. **Count Bolt**:最后,Count Bolt接收到Split Bolt发送的单词Tuples,它会维护一个单词计数表,每当接收到一个单词,就在表中增加相应的计数。当计数达到一定阈值或满足特定条件时,可以将结果写入持久化存储...

    storm一个简单实例

    3. **Storm集群部署**:实例在集群中运行,说明会涉及Zookeeper协调、Nimbus节点分配任务、Supervisor管理worker进程等集群管理内容。 4. **Java编程**:作为主要开发语言,Java提供了丰富的类库和工具,使得开发、...

    Storm API实现词频统计

    生成的JAR文件(如`storm_demo.jar`)可以用Storm的命令行工具提交到本地或远程的Storm集群上运行,例如`storm jar storm_demo.jar WordCountTopology wordcount`。 5. **监控与调试** Storm提供了Web UI,可以...

Global site tag (gtag.js) - Google Analytics