strom程序开发需要几个组件
1 Topology 即程序的主要入口 main,配置文件要通过主函数加载后放到conf中,然后bolt才能拿到,./conf可以到jar包中得到配置文件。
2 Sport 程序的数据来源
3 bolt程序处理节点 ,一个程序可能n个bolt节点 。
一 Topology
import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.topology.TopologyBuilder; public class MyTopology1 { public static void main(String[] args) { try{ TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomWordSpout(), 1).setNumTasks(8); builder.setBolt("longBolt", new longBolt(),1).shuffleGrouping("spout"); //初始化配置文件,保存配置文件到全局config ConfigFactory.init("./conf/config-kafka.xml"); Config config = new Config(); config.put("KafkaConfig", KafkaConfig.getProducerPro() ) ; config.put("redis.url", ConfigFactory.getString("redis.url") ) ; config.setDebug(true); LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("MyTopology1", config, builder.createTopology()); if(args!=null && args.length > 0) { config.setNumWorkers(2); StormSubmitter.submitTopology(args[0], config, builder.createTopology()); } else { config.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("MyTopology1", config, builder.createTopology()); Thread.sleep(1000); //cluster.shutdown(); } }catch (Exception e) { e.printStackTrace(); } } }
二、 Spout 数据来源
import java.util.Map; import java.util.Random; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; public class RandomWordSpout extends BaseRichSpout{ /** * */ private static final long serialVersionUID = 1L; private SpoutOutputCollector collector; //模拟一些数据 String[] words = {"iphone","xiaomi","mate","sony","sumsung","moto","meizu"}; //不断地往下一个组件发送tuple消息 //这里面是该spout组件的核心逻辑 @Override public void nextTuple() { //可以从kafka消息队列中拿到数据,简便起见,我们从words数组中随机挑选一个商品名发送出去 Random random = new Random(); int index = random.nextInt(words.length); //通过随机数拿到一个商品名 String godName = words[index]; //将商品名封装成tuple,发送消息给下一个组件 collector.emit(new Values(godName)); //每发送一个消息,休眠500ms, Utils.sleep(10000); } //初始化方法,在spout组件实例化时调用一次 @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } //声明本spout组件发送出去的tuple中的数据的字段名 @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("orignname")); } }
三、bolt业务处理代码
import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; public class longBolt2 extends BaseBasicBolt { private static final long serialVersionUID = -4561045897494106937L; @Override public void prepare(Map stormConf, TopologyContext context) { } public void execute(Tuple tuple, BasicOutputCollector collector) { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
启动一个storm程序:
storm jar storm-kafka-0.0.2_wzt.jar com.jusfoun.testStrom.MyTopology1 MyTopology1
杀死之前的程序:
storm kill MyTopology1
相关推荐
- 文件`storm_android_player_01_27`可能是一个关于如何使用Storm处理Android播放器应用的实时数据分析的示例。可能涉及到收集用户行为数据、分析播放模式、推荐算法等,通过实时处理为用户提供个性化体验。 5. **...
在这个示例中,我们将探讨如何使用Java编写Storm拓扑结构,以及如何在本地或集群环境中运行它们。 首先,理解Storm的基本概念至关重要。一个Storm拓扑是数据流处理任务的逻辑表示,由Spouts(数据源)和Bolts(数据...
本示例项目适用于Java开发者,特别是初学者,帮助他们快速上手Storm。 【描述】中的信息表明,此压缩包包含了一个可以直接在Eclipse集成开发环境中运行的Storm项目。这意味着项目已经配置好了所有必要的依赖和设置...
标题中的“storm”的安装使用,指的是Apache Storm,一个开源的分布式实时计算系统。Apache Storm能够处理数据流,就像Hadoop处理批处理数据一样。它在实时大数据处理领域扮演着重要角色,尤其适用于需要低延迟、高...
要测试集群,可以运行 Storm 提供的示例 Topology,例如 WordCount,通过 `storm jar` 命令提交 Topology,然后使用 `storm kill` 命令可以终止运行的 Topology。 总的来说,Strom 的强大在于其灵活的实时处理能力...
storm学习入门的例子,100%可运行,storm学习入门的例子,100%可运行,storm学习入门的例子,100%可运行,storm学习入门的例子,100%可运行,storm学习入门的例子,100%可运行.
标题中的"storm开发jar包以及storm例子源码"表明了我们即将探讨的是关于Apache Storm的开发环境设置和示例代码。Apache Storm是一个开源的分布式实时计算系统,它允许开发者处理无限的数据流,常用于大数据实时处理...
【压缩包子文件的文件名称列表】:"workshop-tinkerforge-strom-master"可能包含了该研讨会的源代码、教程文档、示例项目和其他相关资源。这个目录名暗示着它是项目的主要分支或原始版本,通常在GitHub等版本控制...
使用 PHP 生产、消费 Kafka 消息的例子: 1. 启动 zookeeper 和 kafka 2. 创建由 2 个 partition 组成的、名为 testtopic 的 topic 3. 使用 PHP 生产、消费 Kafka 消息 Kafka 是一种高吞吐的分布式消息系统,能够...
在LTspice中,我们可以使用Strom-Messungen功能来进行电流测量。在信号分部窗口中,我们可以选择要测量的信号,并点击测量按钮。 Änderung von Bauteilwerten 在LTspice中,我们可以使用Änderung von ...
以下是一个简单的PHP示例,演示如何使用`php://input`接收POST请求数据: ```php $data = file_get_contents("php://input"); // 解析并处理接收到的数据 POSTData: ($data); ?> ?> ``` 此外,对于POST请求,通过...
课程中提供了读写配置信息的代码示例,帮助理解Zookeeper的API使用。 【配置服务】在分布式环境中,配置服务的重要性不言而喻,它使得所有节点能够同步共享关键配置。通过Zookeeper,开发者可以实现一个高可用的...
描述中的"new book for strom work in sofware"可能指的是一个关于在软件中处理电流(strom)工作的新教材,这可能涵盖了如何使用STM32F103与ePHASORsim进行电力系统仿真和控制的教程。 标签 "STM32F103" 明确了...
以下是一份基于模板的示例简历: 个人信息: - 姓名:Adam Miller - 电子邮件:adam.miller@mail.com - 地址:3099 Julia Street, Rome - Italy - 电话:+1325 7894 5612 - 社交媒体链接:Facebook、Twitter、...
除了上述内容,搭建Nexus服务器的目的还包括提高软件构建的效率,由于每个开发机不必都从远程中央仓库下载依赖,可以显著减少网络带宽的使用和下载时间,从而加快构建过程,提升开发效率。此外,私有仓库还能有效...
个别例子会使用python以演示storm的多语言特性。这个教程使用storm-starter项目里面的例子。我推荐你们下载这个项目的代码并且跟着教程一起做。先读一下:配置storm开发环境和新建一个strom项目这两篇文章把你的机器...
- **使用Strom开发一个WordCount例子**:通过一个简单的WordCount示例学习Storm的使用。 - **Storm程序本地模式debug、Storm程序远程debug**:调试Storm程序的技巧。 - **Storm事物处理**:实现事务性的数据处理流程...
如何制作一本书 这个 repo 包含我用来编写的构建框架。... 您需要这些程序,如果您使用的是 Ubuntu,则可以使用apt-get install 。 asciidoc fop ebook-convert (Calibre 的一部分) make python和python-pygm
2. **readme.txt**:通常包含项目的基本信息、安装指南、快速入门示例等,是开发者开始使用Twitter4j的第一步。 3. **pom.xml**:这是一个Maven项目的配置文件,包含了项目的依赖、构建指令等信息,方便用户在Maven...