`

Storm高并发运用WordSum

 
阅读更多
1、创建发射所有字符串统计总个数及去重个数处理类

public class SumBolt implements IBasicBolt {

	/**
	 * 对发射所有字符串统计总个数及去重个数
	 */
	private static final long serialVersionUID = 1L;

	Map<String, Integer> counts = new HashMap<String, Integer>();

	public void execute(Tuple input, BasicOutputCollector collector) {
		try {
			// 变量放在方法外面会进行累加,所有数据放在counts这个Map当中。
			long word_sum = 0;// 总数
			long word_count = 0;// 去重后个数

			String word = input.getString(0);
			Integer count = input.getInteger(1);
			counts.put(word, count);

			// 获取总数,遍历counts的values,进行sum
			Iterator<Integer> i = counts.values().iterator();
			while (i.hasNext()) {
				word_sum += i.next();
			}
			Iterator<String> i2 = counts.keySet().iterator();
			while (i2.hasNext()) {
				String oneWord = i2.next();
				if (oneWord != null) {
					word_count ++;
				}
			}
			System.err.println("a="+counts.get("a")+"     b="+counts.get("b")+"     c="+counts.get("c")+"     d="+counts.get("d"));
			System.err.println(
					Thread.currentThread().getName() + "word_sum=" + word_sum + ",-------word_count=" + word_count);
		} catch (Exception e) {
			throw new FailedException("split error!");
		}

	}

	public void cleanup() {
		// TODO Auto-generated method stub

	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// TODO Auto-generated method stub

	}

	public Map<String, Object> getComponentConfiguration() {
		// TODO Auto-generated method stub
		return null;
	}

	public void prepare(Map stormConf, TopologyContext context) {
		// TODO Auto-generated method stub

	}

}


2、topology增加字符统计

public class WordCountTopology {

	/**
	 * 提交topology的main函数及字符统计处理类
	 */
	public static class SplitSentence extends ShellBolt implements IRichBolt {
		
		private static final long serialVersionUID = 1L;
		
		
		
		/**
		 * 字符统计处理bolt类
		 */
		public static class WordCount extends BaseBasicBolt {
			private static final long serialVersionUID = 1L;
			// 多线程下不能统计Map的key个数,和value表示的字符总数。因为多线下表示只是一部分
			Map<String, Integer> counts = new HashMap<String, Integer>();

			public void execute(Tuple tuple, BasicOutputCollector collector) {
				String word = tuple.getString(0);
				Integer count = counts.get(word);
				if (count == null) {
					count = 0;
				}
				count++;
				counts.put(word, count);
				//System.err.println(Thread.currentThread().getName() + "---word:" + word + "   count:" + count);
				collector.emit(new Values(word, count));
			}

			public void declareOutputFields(OutputFieldsDeclarer declarer) {
				declarer.declare(new Fields("word", "count"));
			}
		}

		//提交topology的main函数
		public static void main(String[] args) throws Exception {
			TopologyBuilder builder = new TopologyBuilder();
			//读取数据用1个线程,防止数据重复读取
			builder.setSpout("spout", new RandomSentenceSpout(), 1);
			//从spout源读取数据,设置2个线程处理字符分割
			builder.setBolt("split", new MysplitBolt(" "), 2).shuffleGrouping("spout");
			/**
			 * 上个bolt接收数据,设置3个线程处理数据统计。
			 * Fields Grouping:按Field分组,相同的tuple会分发给同一个线程(Executer或task)处理。所以不担心多线程问题
			 * 比如按singleWord来分组, 具有同样singleWord的tuple会被分到相同的Bolts, 而不同的word则会被分配到不同的Bolts。
			 */
			//builder.setBolt("count", new WordCount(), 3).fieldsGrouping("split", new Fields("singleWord"));
			//平均分配tuple数据至每一个线程处理,统计会有线程安全问题
			builder.setBolt("count", new WordCount(), 3).shuffleGrouping("split");
			builder.setBolt("sum", new SumBolt(),1).shuffleGrouping("count");

			Config conf = new Config();
			conf.setDebug(true);

			if (args != null && args.length > 0) {
				conf.setNumWorkers(3);

				StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
			} else {
				conf.setMaxTaskParallelism(3);

				LocalCluster cluster = new LocalCluster();
				cluster.submitTopology("word-count", conf, builder.createTopology());
			}
		}

		public void declareOutputFields(OutputFieldsDeclarer declarer) {
			// TODO Auto-generated method stub
		}

		public Map<String, Object> getComponentConfiguration() {
			// TODO Auto-generated method stub
			return null;
		}
	}

}


-----------------------其它类--------------------------------------
3.字符发射spout类
/**
 * 字符发射spout类
 */
public class RandomSentenceSpout extends BaseRichSpout {

	private static final long serialVersionUID = 1L;

	SpoutOutputCollector _collector;
	Random _rand;

	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		_collector = collector;
		_rand = new Random();
	}

	public void nextTuple() {
		//a:2 , b:2 , c:1 , d:3
		String[] sentences = new String[] { sentence("a b c d "), sentence("b d"), sentence("a d") };
		for (String sentence : sentences) {// 发射三行数据致bolt处理
			_collector.emit(new Values(sentence));
		}
		Utils.sleep(1000 * 1000);
	}

	protected String sentence(String input) {
		return input;
	}

	@Override
	public void ack(Object id) {
	}

	@Override
	public void fail(Object id) {
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("firstSpout"));
	}

	// Add unique identifier to each tuple, which is helpful for debugging
	public static class TimeStamped extends RandomSentenceSpout {
		private final String prefix;

		public TimeStamped() {
			this("");
		}

		public TimeStamped(String prefix) {
			this.prefix = prefix;
		}

		protected String sentence(String input) {
			return prefix + currentDate() + " " + input;
		}

		private String currentDate() {
			return new SimpleDateFormat("yyyy.MM.dd_HH:mm:ss.SSSSSSSSS").format(new Date());
		}
	}
}

4.字符切割处理bolt类
/**
 * 字符切割处理bolt类
 */
public class MysplitBolt implements IBasicBolt {

	private static final long serialVersionUID = 1L;

	String patton;

	public MysplitBolt(String patton) {
		this.patton = patton;
	}

	/**
	 * 接收处理每一行数据
	 */
	public void execute(Tuple input, BasicOutputCollector collector) {
		try {
			String sen = input.getStringByField("firstSpout");
			if (sen != null) {
				for (String word : sen.split(patton)) {// 发射多个字符数据,让下一级bolt处理
					collector.emit(new Values(word));
				}

			}

		} catch (Exception e) {
			throw new FailedException("split error!");
		}

	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("singleWord"));
	}

	public Map<String, Object> getComponentConfiguration() {
		// TODO Auto-generated method stub
		return null;
	}

	public void prepare(Map stormConf, TopologyContext context) {
		// TODO Auto-generated method stub

	}

	public void cleanup() {
		// TODO Auto-generated method stub

	}

}


5.pom文件
引用
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.test</groupId>
  <artifactId>StormMavenProject</artifactId>
  <packaging>jar</packaging>
  <version>0.0.1-SNAPSHOT</version>
  <name>StormMavenProject</name>
  <url>http://maven.apache.org</url>
  <dependencies>
   
   <dependency>
    <groupId>org.ow2.asm</groupId>
    <artifactId>asm</artifactId>
    <version>5.0.3</version>
   </dependency>
<dependency>
    <groupId>org.clojure</groupId>
    <artifactId>clojure</artifactId>
    <version>1.7.0</version>
</dependency>
<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.3.2</version>
</dependency>
<dependency>
    <groupId>com.esotericsoftware</groupId>
    <artifactId>kryo</artifactId>
    <version>3.0.3</version>
</dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-api</artifactId>
    <version>2.8</version>
</dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-core</artifactId>
    <version>2.8</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>log4j-over-slf4j</artifactId>
    <version>1.6.6</version>
</dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-slf4j-impl</artifactId>
    <version>2.8</version>
</dependency>
<dependency>
    <groupId>com.esotericsoftware</groupId>
    <artifactId>minlog</artifactId>
    <version>1.3.0</version>
</dependency>
<dependency>
    <groupId>org.objenesis</groupId>
    <artifactId>objenesis</artifactId>
    <version>2.1</version>
</dependency>
<dependency>
    <groupId>com.esotericsoftware</groupId>
    <artifactId>reflectasm</artifactId>
    <version>1.10.1</version>
</dependency>

<dependency>
    <groupId>javax.servlet</groupId>
    <artifactId>servlet-api</artifactId>
    <version>2.5</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.21</version>
</dependency>
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-rename-hack</artifactId>
    <version>1.1.0</version>
</dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>

<dependency>
    <groupId>ring-cors</groupId>
    <artifactId>ring-cors</artifactId>
    <version>0.1.5</version>
</dependency>

  </dependencies>
  <build>
    <finalName>StormMavenProject</finalName>
  </build>
</project>

分享到:
评论

相关推荐

    Storm入门教程 之Storm原理和概念详解

    Storm流计算从入门到精通之技术篇(高并发策略、批处理事务、Trident精解、运维监控、企业场景) Storm入门教程 之Storm原理和概念详解,出自Storm流计算从入门到精通之技术篇,Storm入门视频教程用到技术:Storm集群...

    storm入门.pdf

    在实践中,开发者应重视对Storm各种术语的准确理解和运用,例如spout、bolt、topology、nimbus和事务性拓扑等。 Storm作为一个开源的实时计算系统,在大数据和云计算领域具有广泛的应用前景。通过阅读和学习Storm...

    storm-ui:Apache Storm 的用户界面

    主分支: ##包裹包战 mvn clean package -DskipTests=true -Dwarcp ./target/storm-ui.war $TOMCAT_HOME/webapps/包装罐 mvn clean package -DskipTests=truecp ./target/storm-ui-*.jar $STORM_HOME/external/...

    第一个Storm应用

    写第一个Storm应用--数单词数量(一个spout读取文本,第一个bolt用来标准化单词,第二个bolt为单词计数) 一、Storm运行模式: 1.本地模式(Local Mode): 即Topology(相当于一个任务,后续会详细讲解) 运行在本地...

    从零开始学Storm.pdf

    4. 鲁棒性:Storm设计目标之一是提供健壮、容易管理的集群,即使在高并发的情况下也能保证系统的稳定运行。 5. 容错性:Storm具备容错能力,当计算出现错误时,系统能够重新分配任务,保证计算的持续进行。 6. 编程...

    Storm入门到精通

    * fault-tolerant:Storm 可以自动恢复故障节点,保证系统的高可用性。 * scalable:Storm 可以根据需要水平扩展,提高系统的处理能力。 * flexible:Storm 支持多种数据源和处理方式,可以满足不同的业务需求。 ...

    webservice测试工具storm

    在Storm中,你可以模拟多个并发用户,进行负载和压力测试,评估Web服务在高并发情况下的稳定性和性能。通过分析响应时间、错误率等指标,可以找出系统的瓶颈和优化方向。 6. 自动化测试: Storm支持脚本化的测试...

    storm原理分析

    ### Storm原理分析 #### 一、Storm基本结构 Apache Storm 是一个开源的分布式实时计算系统,主要用于处理流式数据。Storm 提供了一种简单而强大的模型来定义并行计算过程,使得用户能够轻松地处理无限的数据流。...

    StormStorm集成Kafka 从Kafka中读取数据

    而Apache Kafka则是一个高吞吐量的分布式发布订阅消息系统,常用于构建实时数据管道和流处理应用。将两者结合,可以构建出强大的实时数据处理平台。 **二、写入数据到Kafka** 在Storm-Kafka集成中,首先需要将数据...

    apache-storm-2.4.0.tar.gz

    Apache Storm 是一个分布式实时计算系统,它被设计用于处理大规模数据流,提供高吞吐量、低延迟的数据处理能力。在大数据领域,Storm 被广泛应用于实时分析、在线机器学习、持续计算、数据集成以及任何需要实时处理...

    细细品味Storm_Storm简介及安装

    - **低延迟和高性能**:Storm设计用于处理高频率、低延迟的数据流,可以处理每秒数百万条消息。 - **分布式和可扩展**:Storm可以在多个服务器上分布运行,通过添加更多的服务器轻松扩展处理能力。 - **容错**:...

    基于Storm流计算天猫双十一作战室项目实战

    Storm Trident作为Storm的一个高级API,提供了更高级别的抽象和事务支持,使得复杂的数据处理变得更加简单和可靠。 **2. Storm的全面讲解** - **深度解析**:课程不仅覆盖了Storm的基础概念和架构,还深入探讨了其...

    Storm API实现词频统计

    在大数据处理领域,Apache Storm是一个实时计算系统,它能够持续处理数据流,实现低延迟、高吞吐量的数据分析。在这个“Storm API实现词频统计”的案例中,我们将深入探讨如何利用Java编程语言和Storm API来构建一个...

    storm调试webservice

    使用Storm可以实现这些功能的自动化和实时化,对于大规模或高并发的Web Service环境尤其有价值。 然而,提供的压缩包文件名称“Storm_r1.1-Adarna”并没有给出足够的信息来详细解释具体如何使用Storm调试Web ...

    storm-hbase集成

    它的设计目标是处理PB级别的数据,适用于需要低延迟、高并发读写的场景。 三、Storm 与 HBase 集成的原因 1. 实时性:Storm 的实时处理能力与 HBase 的高速写入和查询性能相结合,可以实现实时数据的快速处理和存储...

    Storm 源码分析

    Apache Storm是一款开源的分布式实时计算系统,它提供了简单而强大的API来定义数据流处理逻辑,同时保证了消息处理的容错性和高性能。Storm的设计目标是成为实时计算领域的Hadoop,支持多种编程语言,并且能够很好地...

    Apache Storm(apache-storm-2.3.0.tar.gz)

    Apache Storm(apache-storm-2.3.0.tar.gz) 是一个免费的开源分布式实时计算系统。Apache Storm 可以轻松可靠地处理无限制的数据流,实时处理就像 Hadoop 进行批处理一样。Apache Storm 很简单,可以与任何编程语言...

    storm开发jar包以及storm例子源码

    标题中的"storm开发jar包以及storm例子源码"表明了我们即将探讨的是关于Apache Storm的开发环境设置和示例代码。Apache Storm是一个开源的分布式实时计算系统,它允许开发者处理无限的数据流,常用于大数据实时处理...

    storm程序代码示例

    Apache Storm是一个开源的分布式实时计算系统,它能够处理无界数据流,确保每个事件都得到正确的处理,即使在高并发和大规模数据输入的情况下也能保持高效。 **一、Storm简介** Apache Storm的核心概念包括:拓扑...

Global site tag (gtag.js) - Google Analytics