- 浏览: 58244 次
- 性别:
- 来自: 北京
-
文章分类
最新评论
1、数据源读取,字符发射spout类
2、第一次对字符串加工处下,切割处理bolt类
3、提交topology的main函数及字符统计处理类
4、处理结果
Thread-22-count-executor[3 3]---word:c count:1
Thread-18-count-executor[2 2]---word:b count:1
Thread-32-count-executor[4 4]---word:a count:1
Thread-32-count-executor[4 4]---word:d count:1
Thread-18-count-executor[2 2]---word:b count:2
Thread-32-count-executor[4 4]---word:d count:2
Thread-32-count-executor[4 4]---word:a count:2
Thread-32-count-executor[4 4]---word:d count:3
5、相关总结
1、每一个线程bolt获取处理数据与上一个bolt或spout输出的数据方式一致。
declarer.declare(new Fields("firstSpout"));
2、每一个线程bolt在topology运行中,一直处理运行状态。而声明的全局变量是针对每个线程的全局变量,每一个线程输出统计数据是当前线程的变量数据。
3、每个spout或bolt处理数据时,都可以设置对应的线程数。但spout读取数据时,会重复读取数据。
4、bolt与bolt数据传递,bolt数据输出格式与下一个bolt数据接收格式扭转,都是通过对应的”相同字符”扭转。
6、相关pom.xml文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test</groupId>
<artifactId>StormMavenProject</artifactId>
<packaging>jar</packaging>
<version>0.0.1-SNAPSHOT</version>
<name>StormMavenProject</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
<version>5.0.3</version>
</dependency>
<dependency>
<groupId>org.clojure</groupId>
<artifactId>clojure</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.6.6</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>minlog</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>reflectasm</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-rename-hack</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ring-cors</groupId>
<artifactId>ring-cors</artifactId>
<version>0.1.5</version>
</dependency>
</dependencies>
<build>
<finalName>StormMavenProject</finalName>
</build>
</project>
/** * 字符发射spout类 */ public class RandomSentenceSpout extends BaseRichSpout { private static final long serialVersionUID = 1L; SpoutOutputCollector _collector; Random _rand; public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; _rand = new Random(); } public void nextTuple() { String[] sentences = new String[] { sentence("a b c d "), sentence("b d"), sentence("a d") }; for (String sentence : sentences) {// 发射三行数据致bolt处理 _collector.emit(new Values(sentence)); } Utils.sleep(1000 * 1000); } protected String sentence(String input) { return input; } @Override public void ack(Object id) { } @Override public void fail(Object id) { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("firstSpout")); } // Add unique identifier to each tuple, which is helpful for debugging public static class TimeStamped extends RandomSentenceSpout { private final String prefix; public TimeStamped() { this(""); } public TimeStamped(String prefix) { this.prefix = prefix; } protected String sentence(String input) { return prefix + currentDate() + " " + input; } private String currentDate() { return new SimpleDateFormat("yyyy.MM.dd_HH:mm:ss.SSSSSSSSS").format(new Date()); } } }
2、第一次对字符串加工处下,切割处理bolt类
/** * 字符切割处理bolt类 */ public class MysplitBolt implements IBasicBolt { private static final long serialVersionUID = 1L; String patton; public MysplitBolt(String patton) { this.patton = patton; } /** * 接收处理每一行数据 */ public void execute(Tuple input, BasicOutputCollector collector) { try { String sen = input.getStringByField("firstSpout"); if (sen != null) { for (String word : sen.split(patton)) {// 发射多个字符数据,让下一级bolt处理 collector.emit(new Values(word)); } } } catch (FailedException e) { e.printStackTrace();// TODO: handle exception } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("singleWord")); } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } public void prepare(Map stormConf, TopologyContext context) { // TODO Auto-generated method stub } public void cleanup() { // TODO Auto-generated method stub } }
3、提交topology的main函数及字符统计处理类
public class WordCountTopology { /** * 提交topology的main函数及字符统计处理类 */ public static class SplitSentence extends ShellBolt implements IRichBolt { private static final long serialVersionUID = 1L; /** * 字符统计处理bolt类 */ public static class WordCount extends BaseBasicBolt { private static final long serialVersionUID = 1L; // 声明当前线程全局变量,统计字母个数,线程一直处于运行状态 Map<String, Integer> counts = new HashMap<String, Integer>(); public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) { count = 0; } count++; counts.put(word, count); System.err.println(Thread.currentThread().getName() + "---word:" + word + " count:" + count); collector.emit(new Values(word, count)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } } //提交topology的main函数 public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); //读取数据用1个线程,防止数据重复读取 builder.setSpout("spout", new RandomSentenceSpout(), 1); //从spout源读取数据,设置2个线程处理字符分割 builder.setBolt("split", new MysplitBolt(" "), 2).shuffleGrouping("spout"); /** * 上个bolt接收数据,设置3个线程处理数据统计。 * Fields Grouping:按Field分组,相同的tuple会分发给同一个线程(Executer或task)处理。 * 比如按singleWord来分组, 具有同样singleWord的tuple会被分到相同的Bolts, 而不同的word则会被分配到不同的Bolts。 */ builder.setBolt("count", new WordCount(), 3).fieldsGrouping("split", new Fields("singleWord")); Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); } else { conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", conf, builder.createTopology()); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } } }
4、处理结果
引用
Thread-22-count-executor[3 3]---word:c count:1
Thread-18-count-executor[2 2]---word:b count:1
Thread-32-count-executor[4 4]---word:a count:1
Thread-32-count-executor[4 4]---word:d count:1
Thread-18-count-executor[2 2]---word:b count:2
Thread-32-count-executor[4 4]---word:d count:2
Thread-32-count-executor[4 4]---word:a count:2
Thread-32-count-executor[4 4]---word:d count:3
5、相关总结
引用
1、每一个线程bolt获取处理数据与上一个bolt或spout输出的数据方式一致。
declarer.declare(new Fields("firstSpout"));
2、每一个线程bolt在topology运行中,一直处理运行状态。而声明的全局变量是针对每个线程的全局变量,每一个线程输出统计数据是当前线程的变量数据。
3、每个spout或bolt处理数据时,都可以设置对应的线程数。但spout读取数据时,会重复读取数据。
4、bolt与bolt数据传递,bolt数据输出格式与下一个bolt数据接收格式扭转,都是通过对应的”相同字符”扭转。
6、相关pom.xml文件
引用
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test</groupId>
<artifactId>StormMavenProject</artifactId>
<packaging>jar</packaging>
<version>0.0.1-SNAPSHOT</version>
<name>StormMavenProject</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
<version>5.0.3</version>
</dependency>
<dependency>
<groupId>org.clojure</groupId>
<artifactId>clojure</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.6.6</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>minlog</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>reflectasm</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-rename-hack</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ring-cors</groupId>
<artifactId>ring-cors</artifactId>
<version>0.1.5</version>
</dependency>
</dependencies>
<build>
<finalName>StormMavenProject</finalName>
</build>
</project>
发表评论
-
ITridentSpout、FirstN(取Top N)实现、 流合并和join
2017-05-25 10:01 1052一、ITridentSpout 基于事务 static int ... -
Trident实战之计算网站PV
2017-05-24 13:24 6611、Trident实战之计算网站PV /** * ... -
Trident API和概念
2017-05-23 10:57 773一、Trident API——Spout ITride ... -
Trident入门
2017-05-22 13:44 534英文原址:https://github.com/nathanm ... -
分布式远程调用drpc实例
2017-05-22 10:53 430一、DRPC定义 分布式dRPC(distributed RP ... -
不透明分区事务IOpaquePartitionedTransactional实例
2017-05-22 10:54 6901、spout public class MyOpaq ... -
分区事务IPartitionedTransactionalSpout实例
2017-05-21 11:02 5951.分区事务spout public class My ... -
普通事务ITransactionalSpout实例之按天统计数据
2017-05-20 16:56 4991、普通事务Spout /** * 普通事务Spou ... -
普通事务ITransactionalSpout实例
2017-05-20 15:45 8341、普通事务Spout /** * 普通事务Spou ... -
Storm事务API
2017-05-19 16:00 624Spout ITransactionalSpout<T& ... -
Storm批处理事务原理详解
2017-05-19 15:54 2137事务:Storm容错机制通 ... -
集群统一启动和停止shell脚本开发
2017-05-17 09:56 4701、cd 2、ls -al 显示隐藏目录 3、rm -rf ... -
storm高并发UV统计
2017-05-14 22:05 1158统计高并发UV可行的方案(类似WordCount的计算去重wo ... -
storm高并发PV统计,利用zookeeper锁输出汇总值
2017-05-14 14:42 917汇总型方案: 1、shuffleGrouping下,pv(单线 ... -
storm高并发PV统计
2017-04-16 17:54 709一、PV统计思考 方案需要考虑分析多线程下,注意线程安全问题。 ... -
Storm高并发运用WordSum
2017-04-16 14:21 10841、创建发射所有字符串统计总个数及去重个数处理类 pub ... -
storm分组策略介绍
2017-04-16 11:46 712一、storm数据来源 Spout的数据源: MQ:直接流数 ... -
Storm高并发介绍
2017-04-16 10:18 619并发度: worker:指的是component (spo ... -
Storm 本地模式
2017-04-09 22:25 407本地模式,是在eclipse等编译器编写strom运行文件 ... -
Storm启动配置
2017-03-29 17:40 686一、安装Storm wget ...
相关推荐
public class Demo { public static void main(String[] args) { String msg = "天气阴了,我犯困了"; RSAPrivateKey privateKey = RSAUtils.privateKey("F:\\RSAkeys\\pri"); RSAPublicKey publicKey = ...
1. **完全分布式部署**:在5台虚拟机上设置Kafka集群,我们需要配置每台机器上的`server.properties`文件,确保正确的broker ID(节点标识)和zookeeper连接字符串。同时,为了实现高可用性,需要配置副本因子和分区...
轴类零件加工工艺设计.zip
资源内项目源码是来自个人的毕业设计,代码都测试ok,包含源码、数据集、可视化页面和部署说明,可产生核心指标曲线图、混淆矩阵、F1分数曲线、精确率-召回率曲线、验证集预测结果、标签分布图。都是运行成功后才上传资源,毕设答辩评审绝对信服的保底85分以上,放心下载使用,拿来就能用。包含源码、数据集、可视化页面和部署说明一站式服务,拿来就能用的绝对好资源!!! 项目备注 1、该资源内项目代码都经过测试运行成功,功能ok的情况下才上传的,请放心下载使用! 2、本项目适合计算机相关专业(如计科、人工智能、通信工程、自动化、电子信息等)的在校学生、老师或者企业员工下载学习,也适合小白学习进阶,当然也可作为毕设项目、课程设计、大作业、项目初期立项演示等。 3、如果基础还行,也可在此代码基础上进行修改,以实现其他功能,也可用于毕设、课设、作业等。 下载后请首先打开README.txt文件,仅供学习参考, 切勿用于商业用途。
seaborn基本绘图人力资源数据集
移动机器人(sw三维)
自制html网页源代码查看器
3吨叉车的液压系统设计().zip
1_实验三 扰码、卷积编码及交织.ppt
北京交通大学软件学院自命题科目考试大纲.pdf
雅鲁藏布江流域 shp矢量数据 (范围+DEM).zip
基于RUST的数据结构代码示例,栈、队列、图等
NIFD:2024Q1房地产金融报告
详细介绍及样例数据:https://blog.csdn.net/li514006030/article/details/146916652
【工业机器视觉定位软件Vision-Detect】基于C#的WPF与Halcon开发的工业机器视觉定位软件(整套源码),开箱即用 有用户登录,图片加载,模板创建,通讯工具,抓边抓圆,良率统计,LOG日志,异常管理,九点标定和流程加载保存等模块,功能不是很完善,适合初学者参考学习。 资源介绍请查阅:https://blog.csdn.net/m0_37302966/article/details/146912206 更多视觉框架资源:https://blog.csdn.net/m0_37302966/article/details/146583453
内容概要:本文档详细介绍了Java虚拟机(JVM)的相关知识点,涵盖Java内存模型、垃圾回收机制及算法、垃圾收集器、内存分配策略、虚拟机类加载机制和JVM调优等内容。首先阐述了Java代码的编译和运行过程,以及JVM的基本组成部分及其运行流程。接着深入探讨了JVM的各个运行时数据区,如程序计数器、Java虚拟机栈、本地方法栈、Java堆、方法区等的作用和特点。随后,文档详细解析了垃圾回收机制,包括GC的概念、工作原理、优点和缺点,并介绍了几种常见的垃圾回收算法。此外,文档还讲解了JVM的分代收集策略,新生代和老年代的区别,以及不同垃圾收集器的工作方式。最后,文档介绍了类加载机制、JVM调优的方法和工具,以及常用的JVM调优参数。 适合人群:具备一定Java编程基础的研发人员,尤其是希望深入了解JVM内部机制、优化程序性能的技术人员。 使用场景及目标:①帮助开发人员理解Java代码的编译和执行过程;②掌握JVM内存管理机制,包括内存分配、垃圾回收等;③熟悉类加载机制,了解类加载器的工作原理;④学会使用JVM调优工具,掌握常用调优参数,提升应用程序性能。 其他说明:本文档内容详尽,适合用作面试准备材料和技术学习资料,有助于提高开发人员对JVM的理解和应用能力。
Android项目原生java语言课程设计,包含LW+ppt
戴德梁行&中国房地产协会:2021亚洲房地产投资信托基金研究报告
Android项目原生java语言课程设计,包含LW+ppt
Thinkphp6.0+vue个人虚拟物品发卡网站源码 支持码支付对接 扫码自动发货 源码一共包含两个部分thinkphp6.0后端文件,以及vue前端文件.zip