`

storm本地执行案例1--->累加

 
阅读更多

 

 

1 使用Eclipse创建maven工程

 

2 pom.xml内增加storm依赖:

<dependency>
	<groupId>org.apache.storm</groupId>
	<artifactId>storm-core</artifactId>
	<version>0.9.3</version>
</dependency>

 

3 自定义一个 spout  一个bolt, 在spout内不断产生i 并发送到bolt内, 在bolt内实现累加并打印效果

   此案例主要是结合storm简介 中关于storm术语(车头 轨道 车次 车厢 车厢内人数tuple)

    对应代码写法的一个参考而已

 

4 代码如下:

 

package changping.houzhihoujue.storm;

import java.util.Map;
import java.util.concurrent.TimeUnit;

import ch.qos.logback.core.util.TimeUtil;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

/**
 * 本地运行:
 *   实现累加
 * @author zm
 *
 */
public class MyLocalStormTopology {

	/**
	 * 组装火车 轨道  并让火车在轨道上行驶
	 * @throws InterruptedException 
	 */
	public static void main(String[] args) throws InterruptedException {

		// 祖品列车
		TopologyBuilder topologyBuilder = new TopologyBuilder();
		topologyBuilder.setSpout("1", new MySpout2()); // 定义1号车厢
		topologyBuilder.setBolt("2", new MyBolt1()).shuffleGrouping("1");// 定义2号车厢 并和1号车厢连接起来   
		// 造出轨道
		LocalCluster localCluster = new LocalCluster();// 造出轨道 在本地运行
		Config config = new Config();   
		// 轨道上运行列车, 三个参数分别为:定义的列车名,列车服务人员,轨道上跑的列车本身
		localCluster.submitTopology(MyLocalStormTopology.class.getSimpleName(), config, topologyBuilder.createTopology());
		
		TimeUnit.SECONDS.sleep(99999);// 设置列车运行时间
		localCluster.shutdown();// 跑完后就停止下来, 否则storm是永不停止
		
	}

}

//创建火车头
class MySpout2 extends BaseRichSpout {

	private Map conf;
	private TopologyContext context;
	private SpoutOutputCollector collector;
	
	// 此方法首先被调用  打开storm系统外的数据源
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		this.conf = conf;
		this.context = context;
		this.collector = collector;
	}

	private int i = 0;
	// 认为是NameNode的heartbeat,永无休息的死循环的调用 并是线程安全的操作, 这里每一次调用此方法 将i++发送到bolt
	public void nextTuple() {
		System.err.println(i);
		// 将数据(i++)放在弹壳(Values)中,并发送给bolt
		this.collector.emit(new Values(i++));
		try {
			TimeUnit.SECONDS.sleep(1);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	//声明输出的字段的名称为 v1   只有在输出给别人时才会重写此方法
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("v1"));
	}
	
}

// 创建车厢
class MyBolt1 extends BaseRichBolt{

	private Map stormConf;
	private TopologyContext context;
	private OutputCollector collector;
	
	// 准备下要对接收storm spout发送来的数据
	public void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {
		this.stormConf = stormConf;
		this.context = context;
		this.collector = collector;
	}

	private int sum = 0;
	// 死循环,用于接收bolt送来的数据   这里storm每调用一次此方法 则获取发送来的tuple数据
	public void execute(Tuple input) {
		int i = input.getIntegerByField("v1");
		sum += i;
		System.err.println(sum);
	}

	// 只有向外发送数据时 此方法才会被调用 否则 不要实现此方法 
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		
	}
	
}

 

 

 

分享到:
评论

相关推荐

    Apache Storm(apache-storm-2.3.0.tar.gz)

    Apache Storm(apache-storm-2.3.0.tar.gz) 是一个免费的开源分布式实时计算系统。Apache Storm 可以轻松可靠地处理无限制的数据流,实时处理就像 Hadoop 进行批处理一样。Apache Storm 很简单,可以与任何编程语言...

    Apache Storm(apache-storm-2.3.0-src.tar.gz 源码)

    Apache Storm(apache-storm-2.3.0-src.tar.gz 源码) 是一个免费的开源分布式实时计算系统。Apache Storm 可以轻松可靠地处理无限制的数据流,实时处理就像 Hadoop 进行批处理一样。Apache Storm 很简单,可以与...

    Storm-EPL-Example-2.0.19.zip

    下载并解压"Storm-EPL-Example-2.0.19.zip"后,通过Eclipse的"File" -&gt; "Import" -&gt; "Existing Projects into Workspace",选择解压后的目录,即可将项目导入到工作空间中。 在Eclipse中,你会看到示例项目包含了一...

    STORM-User-guide-V3.2

    对于STORM软件的安装步骤,请参考官方网站提供的“Getting Started”文档:&lt;http://storm.rts-software.org/&gt;。 #### 三、XML文件作为仿真规格说明 本节将详细介绍作为STORM输入的XML文件的结构和内容。XML文件...

    storm_r1.1-adarna 调试WebService接口的工具

    【Storm_r1.1-Adarna】是一款专用于调试Web Service接口的工具,它为开发者提供了高效、便捷的方式来测试和验证Web服务的功能和性能。在Web服务开发中,正确理解和使用此类工具对于确保服务的质量和稳定性至关重要。...

    Apache Storm(apache-storm-2.3.0-src.zip 源码)

    Apache Storm(apache-storm-2.3.0-src.zip 源码) 是一个免费的开源分布式实时计算系统。Apache Storm 可以轻松可靠地处理无限制的数据流,实时处理就像 Hadoop 进行批处理一样。Apache Storm 很简单,可以与任何...

    ZY_Storm-0.0.1-jar-0401-1

    ZY_Storm-0.0.1-jar-0401-1

    Storm-EPL-Example-2.0.19(sp2)

    标题中的"Storm-EPL-Example-2.0.19(sp2)"指的是一个基于Apache Storm的EPL(Event Processing Language)示例项目,版本号为2.0.19的第二个补丁版本。Apache Storm是一个开源的分布式实时计算系统,它能够处理数据...

    storm-word-count-demo4.zip

    为了运行这个示例项目,首先需要解压"storm-word-count-demo4.zip",然后使用Storm命令行工具提交拓扑到本地或远程的Storm集群。在本地模式下,可以使用`storm local`命令;在分布式模式下,使用`storm jar`命令。在...

    Storm.Applied.Strategies.for.real-time.event.processing

    Chapter 1 Introducing Storm Chapter 2 Core Storm concepts Chapter 3 Topology design Chapter 4 Creating robust topologies Chapter 5 Moving from local to remote topologies Chapter 6 Tuning in Storm ...

    apache-storm-2.4.0.tar.gz

    1. 下载 `apache-storm-2.4.0.tar.gz` 并解压到一个适当的目录。 2. 修改 `conf/storm.yaml` 配置文件,根据实际情况设置 ZooKeeper、nimbus、supervisor等节点的信息。 3. 设置环境变量,如 `STORM_HOME` 指向解压...

    ZY_Storm-0.0.1-0314-1.jar

    ZY_Storm-0.0.1-0314-1.jar

    Apache Storm(apache-storm-2.3.0.zip)

    Apache Storm(apache-storm-2.3.0.zip) 是一个免费的开源分布式实时计算系统。Apache Storm 可以轻松可靠地处理无限制的数据流,实时处理就像 Hadoop 进行批处理一样。Apache Storm 很简单,可以与任何编程语言...

    storm-kafka-0.8-plus-test:Storm-kafka-0.8-plus的简单测试项目

    启动测试环境fig up 启动kafka外壳start-kafka-shell.sh &lt;Docker&gt; &lt;Zookeeper&gt; 在外壳中,创建一个主题$KAFKA_HOME/bin/kafka-topics.sh --create --topic storm-sentence --partitions 2 --zookeeper $ZK --...

    Java-webservice接口测试工具Storm_r1.1-Adarna

    Java WebService接口测试工具Storm_r1.1-Adarna是一款专为Java开发的Web服务测试解决方案,它旨在简化和加速对基于SOAP或RESTful的Web服务接口的验证和测试过程。在本文中,我们将深入探讨Java WebService接口测试的...

    Flume+kafka+Storm整合

    - Storm将处理Flume收集的数据,并执行相应的分析任务。 通过以上步骤,我们可以构建出一个完整的Flume+kafka+Storm数据流处理系统。这套系统能够高效地处理实时数据流,为业务决策提供强有力的支持。

    storm安装包 apache-storm-1.2.1 2

    apache-storm-1.2.1 2,安装包,帮助不能下载的同学apache-storm-1.2.1 2,安装包,帮助不能下载的同学apache-storm-1.2.1 2,安装包,帮助不能下载的同学apache-storm-1.2.1 2,安装包,帮助不能下载的同学apache-storm...

    storm-core源码

    storm-core-1.0.3-sources.jar 源码文件,1.0.3版本

    storm+kafka

    storm+kafka jar包 ,curator-client-2.8.0、curator-framework-2.8.0、curator-recipes-2.8.0、guava-18.0、kafka_2.9.2-0.8.2.2、metrics-core-2.2.0、scala-library-2.10.4、storm-kafka-0.9.2-incubating、...

Global site tag (gtag.js) - Google Analytics