`

strom使用示例

 
阅读更多

 

strom程序开发需要几个组件

1 Topology 即程序的主要入口 main,配置文件要通过主函数加载后放到conf中,然后bolt才能拿到,./conf可以到jar包中得到配置文件。

2 Sport 程序的数据来源 

3 bolt程序处理节点 ,一个程序可能n个bolt节点 。 

 

一 Topology  

 

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;

public class MyTopology1 {
public static void main(String[] args) {

	try{
		
		
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new RandomWordSpout(), 1).setNumTasks(8);
        
        builder.setBolt("longBolt", new longBolt(),1).shuffleGrouping("spout");
        
        //初始化配置文件,保存配置文件到全局config
        ConfigFactory.init("./conf/config-kafka.xml");
        Config config = new Config();
        
        config.put("KafkaConfig", KafkaConfig.getProducerPro() ) ; 
        config.put("redis.url", ConfigFactory.getString("redis.url") ) ; 
  
        
        config.setDebug(true);
        
         
        LocalCluster localCluster = new LocalCluster();
		localCluster.submitTopology("MyTopology1", config, builder.createTopology());
        if(args!=null && args.length > 0) {
            config.setNumWorkers(2);
            
            StormSubmitter.submitTopology(args[0], config, builder.createTopology());
        } else {        
            config.setMaxTaskParallelism(3);

            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("MyTopology1", config, builder.createTopology());
        
            Thread.sleep(1000);

            //cluster.shutdown();
        }
	}catch (Exception e) {
		e.printStackTrace();
	}

}
}

二、 Spout 数据来源

  

import java.util.Map;
import java.util.Random;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

public class RandomWordSpout extends BaseRichSpout{

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	private SpoutOutputCollector collector;
	
	//模拟一些数据
	String[] words = {"iphone","xiaomi","mate","sony","sumsung","moto","meizu"};
	
	//不断地往下一个组件发送tuple消息
	//这里面是该spout组件的核心逻辑
	@Override
	public void nextTuple() {

		//可以从kafka消息队列中拿到数据,简便起见,我们从words数组中随机挑选一个商品名发送出去
		Random random = new Random();
		int index = random.nextInt(words.length);
		
		//通过随机数拿到一个商品名
		String godName = words[index];
		
		
		//将商品名封装成tuple,发送消息给下一个组件
		collector.emit(new Values(godName));
		
		//每发送一个消息,休眠500ms,
		Utils.sleep(10000);
		
		
	}

	//初始化方法,在spout组件实例化时调用一次
	@Override
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

		this.collector = collector;
		
	}

	//声明本spout组件发送出去的tuple中的数据的字段名
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {

		declarer.declare(new Fields("orignname"));
		
	}

}

 三、bolt业务处理代码

 

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;

public class longBolt2 extends BaseBasicBolt {
	

	 
	private static final long serialVersionUID = -4561045897494106937L;

	@Override
	public void prepare(Map stormConf, TopologyContext context) {
		 
		
	}
	
	public void execute(Tuple tuple, BasicOutputCollector collector) {
 
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word"));
	}

}

 

启动一个storm程序: 

 storm jar storm-kafka-0.0.2_wzt.jar   com.jusfoun.testStrom.MyTopology1 MyTopology1

杀死之前的程序:

storm kill MyTopology1

 

 

 

分享到:
评论

相关推荐

    Strom优化

    - 文件`storm_android_player_01_27`可能是一个关于如何使用Storm处理Android播放器应用的实时数据分析的示例。可能涉及到收集用户行为数据、分析播放模式、推荐算法等,通过实时处理为用户提供个性化体验。 5. **...

    【Storm入门级JAVA示例演示】

    在这个示例中,我们将探讨如何使用Java编写Storm拓扑结构,以及如何在本地或集群环境中运行它们。 首先,理解Storm的基本概念至关重要。一个Storm拓扑是数据流处理任务的逻辑表示,由Spouts(数据源)和Bolts(数据...

    超级简单入门的strom的java代码demo

    本示例项目适用于Java开发者,特别是初学者,帮助他们快速上手Storm。 【描述】中的信息表明,此压缩包包含了一个可以直接在Eclipse集成开发环境中运行的Storm项目。这意味着项目已经配置好了所有必要的依赖和设置...

    storm 的安装使用

    标题中的“storm”的安装使用,指的是Apache Storm,一个开源的分布式实时计算系统。Apache Storm能够处理数据流,就像Hadoop处理批处理数据一样。它在实时大数据处理领域扮演着重要角色,尤其适用于需要低延迟、高...

    strom的安装

    要测试集群,可以运行 Storm 提供的示例 Topology,例如 WordCount,通过 `storm jar` 命令提交 Topology,然后使用 `storm kill` 命令可以终止运行的 Topology。 总的来说,Strom 的强大在于其灵活的实时处理能力...

    storm学习入门的例子,100%可运行

    storm学习入门的例子,100%可运行,storm学习入门的例子,100%可运行,storm学习入门的例子,100%可运行,storm学习入门的例子,100%可运行,storm学习入门的例子,100%可运行.

    storm开发jar包以及storm例子源码

    标题中的"storm开发jar包以及storm例子源码"表明了我们即将探讨的是关于Apache Storm的开发环境设置和示例代码。Apache Storm是一个开源的分布式实时计算系统,它允许开发者处理无限的数据流,常用于大数据实时处理...

    workshop-tinkerforge-strom:通过JavaFX可视化电流和电压的研讨会

    【压缩包子文件的文件名称列表】:"workshop-tinkerforge-strom-master"可能包含了该研讨会的源代码、教程文档、示例项目和其他相关资源。这个目录名暗示着它是项目的主要分支或原始版本,通常在GitHub等版本控制...

    Kafka简介及使用PHP处理Kafka消息

    使用 PHP 生产、消费 Kafka 消息的例子: 1. 启动 zookeeper 和 kafka 2. 创建由 2 个 partition 组成的、名为 testtopic 的 topic 3. 使用 PHP 生产、消费 Kafka 消息 Kafka 是一种高吞吐的分布式消息系统,能够...

    LTspice电子线路模拟教程.doc

    在LTspice中,我们可以使用Strom-Messungen功能来进行电流测量。在信号分部窗口中,我们可以选择要测量的信号,并点击测量按钮。 Änderung von Bauteilwerten 在LTspice中,我们可以使用Änderung von ...

    php 输入输出流详解及示例代码

    以下是一个简单的PHP示例,演示如何使用`php://input`接收POST请求数据: ```php $data = file_get_contents("php://input"); // 解析并处理接收到的数据 POSTData: ($data); ?> ?> ``` 此外,对于POST请求,通过...

    精品课程推荐 大数据与云计算教程课件 优质大数据课程 24.使用Zookeeper构建应用(共34页).pptx

    课程中提供了读写配置信息的代码示例,帮助理解Zookeeper的API使用。 【配置服务】在分布式环境中,配置服务的重要性不言而喻,它使得所有节点能够同步共享关键配置。通过Zookeeper,开发者可以实现一个高可用的...

    ePHASORsim_STM32F103_

    描述中的"new book for strom work in sofware"可能指的是一个关于在软件中处理电流(strom)工作的新教材,这可能涵盖了如何使用STM32F103与ePHASORsim进行电力系统仿真和控制的教程。 标签 "STM32F103" 明确了...

    85精美英文简历模板.docx

    以下是一份基于模板的示例简历: 个人信息: - 姓名:Adam Miller - 电子邮件:adam.miller@mail.com - 地址:3099 Julia Street, Rome - Italy - 电话:+1325 7894 5612 - 社交媒体链接:Facebook、Twitter、...

    07_Nexus的介绍和安装

    除了上述内容,搭建Nexus服务器的目的还包括提高软件构建的效率,由于每个开发机不必都从远程中央仓库下载依赖,可以显著减少网络带宽的使用和下载时间,从而加快构建过程,提升开发效率。此外,私有仓库还能有效...

    Storm教程

    个别例子会使用python以演示storm的多语言特性。这个教程使用storm-starter项目里面的例子。我推荐你们下载这个项目的代码并且跟着教程一起做。先读一下:配置storm开发环境和新建一个strom项目这两篇文章把你的机器...

    大数据课程体系

    - **使用Strom开发一个WordCount例子**:通过一个简单的WordCount示例学习Storm的使用。 - **Storm程序本地模式debug、Storm程序远程debug**:调试Storm程序的技巧。 - **Storm事物处理**:实现事务性的数据处理流程...

    bake:如何制作一本书

    如何制作一本书 这个 repo 包含我用来编写的构建框架。... 您需要这些程序,如果您使用的是 Ubuntu,则可以使用apt-get install 。 asciidoc fop ebook-convert (Calibre 的一部分) make python和python-pygm

    twitter4j-2.2.6.zip

    2. **readme.txt**:通常包含项目的基本信息、安装指南、快速入门示例等,是开发者开始使用Twitter4j的第一步。 3. **pom.xml**:这是一个Maven项目的配置文件,包含了项目的依赖、构建指令等信息,方便用户在Maven...

Global site tag (gtag.js) - Google Analytics