`

Storm 字符统计Demo

 
阅读更多
1、数据源读取,字符发射spout类
/**
 * 字符发射spout类
 */
public class RandomSentenceSpout extends BaseRichSpout {

	private static final long serialVersionUID = 1L;

	SpoutOutputCollector _collector;
	Random _rand;

	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		_collector = collector;
		_rand = new Random();
	}

	public void nextTuple() {
		String[] sentences = new String[] { sentence("a b c d "), sentence("b d"), sentence("a d") };
		for (String sentence : sentences) {// 发射三行数据致bolt处理
			_collector.emit(new Values(sentence));
		}
		Utils.sleep(1000 * 1000);
	}

	protected String sentence(String input) {
		return input;
	}

	@Override
	public void ack(Object id) {
	}

	@Override
	public void fail(Object id) {
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("firstSpout"));
	}

	// Add unique identifier to each tuple, which is helpful for debugging
	public static class TimeStamped extends RandomSentenceSpout {
		private final String prefix;

		public TimeStamped() {
			this("");
		}

		public TimeStamped(String prefix) {
			this.prefix = prefix;
		}

		protected String sentence(String input) {
			return prefix + currentDate() + " " + input;
		}

		private String currentDate() {
			return new SimpleDateFormat("yyyy.MM.dd_HH:mm:ss.SSSSSSSSS").format(new Date());
		}
	}
}


2、第一次对字符串加工处下,切割处理bolt类
/**
 * 字符切割处理bolt类
 */
public class MysplitBolt implements IBasicBolt {

	private static final long serialVersionUID = 1L;

	String patton;

	public MysplitBolt(String patton) {
		this.patton = patton;
	}

	/**
	 * 接收处理每一行数据
	 */
	public void execute(Tuple input, BasicOutputCollector collector) {
		try {
			String sen = input.getStringByField("firstSpout");
			if (sen != null) {
				for (String word : sen.split(patton)) {// 发射多个字符数据,让下一级bolt处理
					collector.emit(new Values(word));
				}

			}

		} catch (FailedException e) {
			e.printStackTrace();// TODO: handle exception
		}

	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("singleWord"));
	}

	public Map<String, Object> getComponentConfiguration() {
		// TODO Auto-generated method stub
		return null;
	}

	public void prepare(Map stormConf, TopologyContext context) {
		// TODO Auto-generated method stub

	}

	public void cleanup() {
		// TODO Auto-generated method stub

	}

}


3、提交topology的main函数及字符统计处理类
public class WordCountTopology {

	/**
	 * 提交topology的main函数及字符统计处理类
	 */
	public static class SplitSentence extends ShellBolt implements IRichBolt {
		
		private static final long serialVersionUID = 1L;
		
		
		
		/**
		 * 字符统计处理bolt类
		 */
		public static class WordCount extends BaseBasicBolt {
			private static final long serialVersionUID = 1L;
			// 声明当前线程全局变量,统计字母个数,线程一直处于运行状态
			Map<String, Integer> counts = new HashMap<String, Integer>();

			public void execute(Tuple tuple, BasicOutputCollector collector) {
				String word = tuple.getString(0);
				Integer count = counts.get(word);
				if (count == null) {
					count = 0;
				}
				count++;
				counts.put(word, count);
				System.err.println(Thread.currentThread().getName() + "---word:" + word + "   count:" + count);
				collector.emit(new Values(word, count));
			}

			public void declareOutputFields(OutputFieldsDeclarer declarer) {
				declarer.declare(new Fields("word", "count"));
			}
		}

		//提交topology的main函数
		public static void main(String[] args) throws Exception {
			TopologyBuilder builder = new TopologyBuilder();
			//读取数据用1个线程,防止数据重复读取
			builder.setSpout("spout", new RandomSentenceSpout(), 1);
			//从spout源读取数据,设置2个线程处理字符分割
			builder.setBolt("split", new MysplitBolt(" "), 2).shuffleGrouping("spout");
			/**
			 * 上个bolt接收数据,设置3个线程处理数据统计。
			 * Fields Grouping:按Field分组,相同的tuple会分发给同一个线程(Executer或task)处理。
			 * 比如按singleWord来分组, 具有同样singleWord的tuple会被分到相同的Bolts, 而不同的word则会被分配到不同的Bolts。
			 */
			builder.setBolt("count", new WordCount(), 3).fieldsGrouping("split", new Fields("singleWord"));

			Config conf = new Config();
			conf.setDebug(true);

			if (args != null && args.length > 0) {
				conf.setNumWorkers(3);

				StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
			} else {
				conf.setMaxTaskParallelism(3);

				LocalCluster cluster = new LocalCluster();
				cluster.submitTopology("word-count", conf, builder.createTopology());
			}
		}

		public void declareOutputFields(OutputFieldsDeclarer declarer) {
			// TODO Auto-generated method stub
		}

		public Map<String, Object> getComponentConfiguration() {
			// TODO Auto-generated method stub
			return null;
		}
	}

}


4、处理结果
引用

Thread-22-count-executor[3 3]---word:c   count:1
Thread-18-count-executor[2 2]---word:b   count:1
Thread-32-count-executor[4 4]---word:a   count:1
Thread-32-count-executor[4 4]---word:d   count:1

Thread-18-count-executor[2 2]---word:b   count:2
Thread-32-count-executor[4 4]---word:d   count:2
Thread-32-count-executor[4 4]---word:a   count:2

Thread-32-count-executor[4 4]---word:d   count:3


5、相关总结
引用

1、每一个线程bolt获取处理数据与上一个bolt或spout输出的数据方式一致。
   declarer.declare(new Fields("firstSpout"));

2、每一个线程bolt在topology运行中,一直处理运行状态。而声明的全局变量是针对每个线程的全局变量,每一个线程输出统计数据是当前线程的变量数据。

3、每个spout或bolt处理数据时,都可以设置对应的线程数。但spout读取数据时,会重复读取数据。

4、bolt与bolt数据传递,bolt数据输出格式与下一个bolt数据接收格式扭转,都是通过对应的”相同字符”扭转。


6、相关pom.xml文件
引用

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.test</groupId>
  <artifactId>StormMavenProject</artifactId>
  <packaging>jar</packaging>
  <version>0.0.1-SNAPSHOT</version>
  <name>StormMavenProject</name>
  <url>http://maven.apache.org</url>
  <dependencies>
   
   <dependency>
    <groupId>org.ow2.asm</groupId>
    <artifactId>asm</artifactId>
    <version>5.0.3</version>
   </dependency>
<dependency>
    <groupId>org.clojure</groupId>
    <artifactId>clojure</artifactId>
    <version>1.7.0</version>
</dependency>
<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.3.2</version>
</dependency>
<dependency>
    <groupId>com.esotericsoftware</groupId>
    <artifactId>kryo</artifactId>
    <version>3.0.3</version>
</dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-api</artifactId>
    <version>2.8</version>
</dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-core</artifactId>
    <version>2.8</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>log4j-over-slf4j</artifactId>
    <version>1.6.6</version>
</dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-slf4j-impl</artifactId>
    <version>2.8</version>
</dependency>
<dependency>
    <groupId>com.esotericsoftware</groupId>
    <artifactId>minlog</artifactId>
    <version>1.3.0</version>
</dependency>
<dependency>
    <groupId>org.objenesis</groupId>
    <artifactId>objenesis</artifactId>
    <version>2.1</version>
</dependency>
<dependency>
    <groupId>com.esotericsoftware</groupId>
    <artifactId>reflectasm</artifactId>
    <version>1.10.1</version>
</dependency>

<dependency>
    <groupId>javax.servlet</groupId>
    <artifactId>servlet-api</artifactId>
    <version>2.5</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.21</version>
</dependency>
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-rename-hack</artifactId>
    <version>1.1.0</version>
</dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>

<dependency>
    <groupId>ring-cors</groupId>
    <artifactId>ring-cors</artifactId>
    <version>0.1.5</version>
</dependency>

  </dependencies>
  <build>
    <finalName>StormMavenProject</finalName>
  </build>
</project>
分享到:
评论

相关推荐

    Storm 上手 demo 例子 演示

    【标题】:“Storm 上手 demo 例子演示” 【描述】:“Storm demo例子案例” 这篇文章将深入探讨Apache Storm,一个开源的分布式实时计算系统,通过实际的demo案例来帮助你快速上手。Apache Storm的设计目标是处理...

    storm集成kafka插demo.zip

    【标题】"storm集成kafka插demo.zip"指的是一个演示如何将Apache Storm与Apache Kafka集成的实例项目。这个压缩包包含了一个示例,用于展示如何在Storm拓扑中消费和处理Kafka的消息。 【描述】"storm集成kafka插件...

    storm_Kafka_demo

    标题“storm_Kafka_demo”指的是一个使用Apache Storm处理Kafka数据流的示例项目。Apache Storm是一个开源的分布式实时计算系统,它允许开发者处理无界数据流,而Kafka是高吞吐量的分布式发布订阅消息系统。在这个...

    storm统计单词数的demo

    【storm统计单词数的demo】是一个基于Apache Storm的入门级示例,旨在帮助初学者理解这个分布式实时计算系统的运作机制。Apache Storm是一个开源的流处理框架,它允许开发者处理无界数据流,实现高吞吐量、低延迟的...

    Storm API实现词频统计

    生成的JAR文件(如`storm_demo.jar`)可以用Storm的命令行工具提交到本地或远程的Storm集群上运行,例如`storm jar storm_demo.jar WordCountTopology wordcount`。 5. **监控与调试** Storm提供了Web UI,可以...

    StormDemo.tar.gz

    【标题】"StormDemo.tar.gz" 是一个与Apache Storm相关的压缩包文件,它提供了一个入门级别的示例,帮助用户理解并开始使用这个分布式实时计算系统。Apache Storm是一个开源的流处理框架,它允许开发者处理和分析...

    storm demo

    【storm demo】是一个基于Apache Storm的实践项目,旨在教授如何运用Storm的核心组件——Spout、Bolt和Topology来构建实时数据处理系统。Apache Storm是一个开源的分布式实时计算系统,能够处理大量的流数据,并保证...

    simple storm demo

    simple storm demosimple storm demosimple storm demosimple storm demosimple storm demosimple storm demo

    jstorm storm入门demo

    "jstorm storm入门demo" 这个标题表明了这是一个关于JStorm和Storm框架的基础教学示例。JStorm是阿里巴巴开源的一个分布式实时计算系统,它基于Apache Storm,但提供了更稳定、高性能以及易用的特性。这个demo可能是...

    storm demo 单机版 maven

    描述提到"stormdemo实现不安装storm系列软件的基础上,运行调试storm拓扑",这暗示了该项目包含了一种方法,可以在没有预先安装Apache Storm的情况下运行和调试Storm拓扑。通常,这可以通过使用Maven的`storm-...

    stormdemo.zip

    本资料“stormdemo.zip”提供了一个关于Storm的实战示例,名为“stormdemo”,旨在帮助用户深入理解并掌握Storm的核心概念和操作流程。 Apache Storm是一个开源的分布式实时计算系统,它允许开发者连续处理数据流,...

    storm之drpc操作demo示例.zip

    总结来说,"storm之drpc操作demo示例"是一个很好的学习资源,它涵盖了Storm DRPC的核心概念和实践操作,对于想要在实时计算项目中运用Storm DRPC功能的开发者来说,极具参考价值。通过实际操作这个示例,你将能够...

    storm 示例demo

    简单的storm入门示例,从0到1让你清楚的理解storm.下面是代码示例: import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology...

    地区销售实时统计 kakfa+storm+hbase+servlet+highcharts

    标题中的“地区销售实时统计”指的是一个实时数据分析系统,它能够快速收集、处理并展示不同地区的销售数据。这个系统利用了Kafka、Storm、HBase、Servlet和Highcharts等技术来实现这一目标。 Kafka是一个分布式流...

    Storm JAVA版上手demo下载地址

    Storm JAVA版上手demo下载地址 整理学习Storm过程中的代码和文档 更新中......

    storm-demo:风暴演示应用程序

    "storm-demo" 项目是针对 Storm 的一个入门级示例,旨在帮助初学者快速理解 Storm 的工作原理和编程模型。 在 Java 开发环境中,Storm 可以通过编写 Bolt 和 Spout 组件来构建实时处理管道。Bolt 负责执行业务逻辑...

    storm-kafka-demo

    我们知道storm的作用主要是进行流式计算,对于源源不断的均匀数据流流入处理是非常有效的,而现实生活中大部分场景并不是均匀的数据流,而是时而多时而少的数据流入,这种情况下显然用批量处理是不合适的,如果使用...

    storm利用ack保证数据的可靠性源码

    在分布式计算领域,Apache Storm是一个实时处理系统,它允许开发者处理和分析连续的数据流。Storm的设计目标是确保数据处理的高可靠性和低延迟。在Storm中,"ack机制"是实现数据可靠传输的关键特性,它确保了每个...

    storm实时单词计数

    在"storm实时单词计数"这个场景中,我们主要探讨的是如何使用Storm来实现一个实时分析应用,该应用可以统计输入文本中的单词数量。 在Storm中,数据流是通过拓扑结构(Topology)进行组织的,由多个 bolts(处理...

    storm-word-count-demo4.zip

    本篇将详细探讨"storm-word-count-demo4.zip"这个项目,这是一个基于Storm的Java实现的Word Count示例,旨在帮助初学者理解如何在Storm框架下进行实时数据处理。 一、Spout组件 在Storm中,Spout是数据流的源头,...

Global site tag (gtag.js) - Google Analytics