`
农村外出务工男JAVA
  • 浏览: 105738 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论

storm trident实战 分区聚合

阅读更多

一、前言

      先有batch,因为trident内部是基于batch来实现的,然后有partition,分区后再分配并发度,然后才能进行并发处理。并发度的分配是利用parallelismHint来实现。

二、实战

   main方法

public static void main(String[] args) throws AlreadyAliveException,
			InvalidTopologyException, AuthorizationException, IOException {
		FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
				new Values("a"), new Values("b"), new Values("a"), new Values(
						"c"));
		//设置为true,数据源会源源不断发送
                spout.setCycle(true);
		TridentTopology topology = new TridentTopology();
		topology.newStream("spout", spout)
				.shuffle()
				.partitionAggregate(new Fields("sentence"), new SumWord(),
						new Fields("sum"))
						/**
						 * 设置3个并发度,可以理解为3个分区操作
						 */
						.parallelismHint(3)
				.each(new Fields("sum"), new PrintFilter_partition());
		Config config = new Config();
		config.setNumWorkers(2);
		config.setNumAckers(1);
		config.setDebug(false);
		StormSubmitter.submitTopology("trident__partition_aggregate", config,
				topology.build());
	}

   SumWord:

package com.storm.trident.partitionAggregate.分区聚合;

import java.util.HashMap;
import java.util.Map;

import org.apache.commons.collections.MapUtils;
import org.apache.storm.trident.operation.BaseAggregator;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.operation.TridentOperationContext;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Values;

public class SumWord extends BaseAggregator<Map<String,Integer>> {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;
	

	/**
	 * 属于哪个batch
	 */
	private Object batchId;
	
	/**
	 * 属于哪个分区
	 */
	private int partitionId;
	
	/**
	 * 分区数量
	 */
	private int numPartitions;
	
	/**
	 * 用来统计
	 */
	private Map<String,Integer> state;
	
	
	@SuppressWarnings("rawtypes")
	@Override
	public void prepare(Map conf, TridentOperationContext context) {
		state = new HashMap<String,Integer>();
		partitionId = context.getPartitionIndex();
		numPartitions = context.numPartitions();
	}
	@Override
	public Map<String, Integer> init(Object batchId, TridentCollector collector) {
		this.batchId = batchId;
		return state;
	}
	@Override
	public void aggregate(Map<String, Integer> val, TridentTuple tuple,
			TridentCollector collector) {
		System.out.println(tuple+";partitionId="+partitionId+";partitions="+numPartitions
				+",batchId:" + batchId);
		String word = tuple.getString(0);
		val.put(word, MapUtils.getInteger(val, word, 0)+1);
		System.out.println("sumWord:" + val);
	}
	@Override
	public void complete(Map<String, Integer> val, TridentCollector collector) {
		collector.emit(new Values(val));
	}
}

   打印方法

import org.apache.storm.trident.operation.BaseFilter;
import org.apache.storm.trident.tuple.TridentTuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PrintFilter_partition extends BaseFilter {
	
	 private static final Logger LOGGER = 

			 LoggerFactory.getLogger(PrintFilter_partition.class);

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	@Override
	public boolean isKeep(TridentTuple tuple) {
		LOGGER.info("打印出来的tuple:" + tuple);
		return true;
	}
}

   测试效果:

 

2016-12-22 18:39:26.060 STDIO [INFO] [a];partitionId=4;partitions=5,batchId:257308:0
2016-12-22 18:39:26.062 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{}]
2016-12-22 18:39:26.066 STDIO [INFO] [b];partitionId=0;partitions=5,batchId:257308:0
2016-12-22 18:39:26.115 STDIO [INFO] sumWord:{a=1}
2016-12-22 18:39:26.116 STDIO [INFO] [a];partitionId=4;partitions=5,batchId:257308:0
2016-12-22 18:39:26.117 STDIO [INFO] sumWord:{a=2}
2016-12-22 18:39:26.120 STDIO [INFO] sumWord:{b=1}
2016-12-22 18:39:26.121 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{a=2}]
2016-12-22 18:39:26.121 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1}]
2016-12-22 18:39:26.196 STDIO [INFO] [c];partitionId=4;partitions=5,batchId:257309:0
2016-12-22 18:39:26.197 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{}]
2016-12-22 18:39:26.198 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1}]
2016-12-22 18:39:26.197 STDIO [INFO] sumWord:{c=1, a=2}
2016-12-22 18:39:26.205 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{c=1, a=2}]
2016-12-22 18:39:26.683 STDIO [INFO] [a];partitionId=4;partitions=5,batchId:257310:0
2016-12-22 18:39:26.684 STDIO [INFO] sumWord:{c=1, a=3}
2016-12-22 18:39:26.685 STDIO [INFO] [b];partitionId=0;partitions=5,batchId:257310:0
2016-12-22 18:39:26.687 STDIO [INFO] [a];partitionId=2;partitions=5,batchId:257310:0
2016-12-22 18:39:26.689 STDIO [INFO] sumWord:{a=1}
2016-12-22 18:39:26.691 STDIO [INFO] sumWord:{b=2}
2016-12-22 18:39:26.692 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=2}]
2016-12-22 18:39:26.693 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{c=1, a=3}]
2016-12-22 18:39:26.690 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{a=1}]
2016-12-22 18:39:27.188 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{a=1}]
2016-12-22 18:39:27.190 STDIO [INFO] [c];partitionId=0;partitions=5,batchId:257311:0
2016-12-22 18:39:27.192 STDIO [INFO] sumWord:{b=2, c=1}
2016-12-22 18:39:27.199 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{c=1, a=3}]
2016-12-22 18:39:27.203 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=2, c=1}]
2016-12-22 18:39:27.673 STDIO [INFO] [a];partitionId=4;partitions=5,batchId:257312:0
2016-12-22 18:39:27.675 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=2, c=1}]
2016-12-22 18:39:27.674 STDIO [INFO] sumWord:{c=1, a=4}
2016-12-22 18:39:27.677 STDIO [INFO] [b];partitionId=2;partitions=5,batchId:257312:0
2016-12-22 18:39:27.678 STDIO [INFO] sumWord:{b=1, a=1}
2016-12-22 18:39:27.680 STDIO [INFO] [a];partitionId=4;partitions=5,batchId:257312:0
2016-12-22 18:39:27.680 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1, a=1}]
2016-12-22 18:39:27.681 STDIO [INFO] sumWord:{c=1, a=5}
2016-12-22 18:39:27.683 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{c=1, a=5}]
2016-12-22 18:39:28.227 STDIO [INFO] [c];partitionId=4;partitions=5,batchId:257313:0
2016-12-22 18:39:28.232 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1, a=1}]
2016-12-22 18:39:28.236 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=2, c=1}]
2016-12-22 18:39:28.253 STDIO [INFO] sumWord:{c=2, a=5}
2016-12-22 18:39:28.256 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{c=2, a=5}]
2016-12-22 18:39:28.741 STDIO [INFO] [a];partitionId=4;partitions=5,batchId:257314:0
2016-12-22 18:39:28.744 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1, a=1}]
2016-12-22 18:39:28.743 STDIO [INFO] sumWord:{c=2, a=6}
2016-12-22 18:39:28.748 STDIO [INFO] [a];partitionId=0;partitions=5,batchId:257314:0
2016-12-22 18:39:28.749 STDIO [INFO] sumWord:{b=2, c=1, a=1}
2016-12-22 18:39:28.756 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=2, c=1, a=1}]
2016-12-22 18:39:28.755 STDIO [INFO] [b];partitionId=4;partitions=5,batchId:257314:0
2016-12-22 18:39:28.763 STDIO [INFO] sumWord:{b=1, c=2, a=6}
2016-12-22 18:39:28.769 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1, c=2, a=6}]
2016-12-22 18:39:29.218 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1, c=2, a=6}]
2016-12-22 18:39:29.219 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1, a=1}]
2016-12-22 18:39:29.221 STDIO [INFO] [c];partitionId=0;partitions=5,batchId:257315:0
2016-12-22 18:39:29.228 STDIO [INFO] sumWord:{b=2, c=2, a=1}
2016-12-22 18:39:29.229 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=2, c=2, a=1}]
2016-12-22 18:39:29.689 STDIO [INFO] [b];partitionId=2;partitions=5,batchId:257316:0
2016-12-22 18:39:29.693 STDIO [INFO] sumWord:{b=2, a=1}
2016-12-22 18:39:29.694 STDIO [INFO] [a];partitionId=0;partitions=5,batchId:257316:0
2016-12-22 18:39:29.697 STDIO [INFO] sumWord:{b=2, c=2, a=2}
2016-12-22 18:39:29.704 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1, c=2, a=6}]
2016-12-22 18:39:29.723 STDIO [INFO] [a];partitionId=2;partitions=5,batchId:257316:0
2016-12-22 18:39:29.706 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=2, c=2, a=2}]
2016-12-22 18:39:29.740 STDIO [INFO] sumWord:{b=2, a=2}
2016-12-22 18:39:29.746 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=2, a=2}]
2016-12-22 18:39:30.197 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1, c=2, a=6}]
2016-12-22 18:39:30.199 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=2, a=2}]

 

1
1
分享到:
评论

相关推荐

    Storm Trident实战之计算网站PV.rar

    本实战案例将重点介绍如何使用Storm Trident来计算网站的页面浏览量(PV,Page View)。 页面浏览量是衡量一个网站受欢迎程度的重要指标,通常通过记录用户对各个页面的访问次数来计算。在传统的批处理场景下,这...

    Storm Trident API 使用详解.docx

    《Storm Trident API 使用详解》 Storm Trident API 是 Apache Storm 框架中用于构建实时大数据处理应用程序的关键组件。它的核心概念是"Stream",一种无界的数据序列,它被分割成一系列批次(Batch),以便在...

    Storm入门教程 之Storm原理和概念详解

    Storm流计算从入门到精通之技术篇(高并发策略、批处理事务、Trident精解、运维监控、企业场景) Storm入门教程 之Storm原理和概念详解,出自Storm流计算从入门到精通之技术篇,Storm入门视频教程用到技术:Storm集群...

    Storm_Trident

    **Storm Trident:分布式流处理框架详解** Storm Trident是Twitter开源的、基于Apache Storm的一个高级抽象,它提供了一种更强大且高效的方式来处理实时数据流。Trident的核心理念是将数据流划分为一系列的小批量...

    Storm流计算之项目开发视频教程

    Storm视频教程通过含3个Storm完整项目,均为企业实际项目,其中一个是完全由Storm Trident开发。本课程每个技术均采用最新稳定版本,学完后可以从Kafka到Storm项目开发及HighCharts图表开发一个人搞定!涨工资?身价...

    基于Storm流计算天猫双十一作战室项目实战

    ### 基于Storm流计算天猫双十一作战室项目实战 #### 一、课程亮点与核心知识点 本课程针对Apache Storm这一强大的分布式实时计算系统进行了全方位的解析与实战演练。通过本课程的学习,不仅可以掌握Storm的基本...

    大数据分析架构师顶级培训课程storm课程 Trident理论与应用 Trident基础理论与实战 共35页.pptx

    ### 大数据开发高级就业指导课程——Storm及Trident理论与实战 #### 一、Storm并发机制 在Storm中,为了提高数据处理的性能和效率,设计了一套完整的并发机制。这一机制涉及到Topology的组件配置、并发度设置等多...

    Storm实战:构建大数据实时计算

     《Storm实战:构建大数据实时计算》是一本系统并且具有实践指导意义的Storm工具书和参考书,对Storm整个技术体系进行了全面的讲解,不仅包括对基本概念、特性的介绍,也涵盖了一些原理说明。  实战性很强,各章节...

    storm企业应用 实战 运维和调优

    但是根据标题和描述中的信息,我能够为你生成关于Storm企业应用实战运维和调优的知识点。 Apache Storm是一个实时计算的分布式计算框架,它类似于Hadoop,但它是为了实时处理而不是批处理设计的。Storm可以处理大量...

    trident-tutorial:实用的Storm Trident教程

    三叉戟教程实用的Storm Trident教程本教程以的的出色为基础。 流浪者的设置基于Taylor Goetz的。 Hazelcast状态代码基于wurstmeister的。 看看随附的。本教程的结构浏览Part * .java,了解Trident的基础知识使用...

    Storm 实战:构建大数据实时计算 PDF带书签完整版

    《Storm实战:构建大数据实时计算》是一本深入探讨Apache Storm技术的专业书籍,旨在帮助读者理解和掌握如何使用Storm进行大数据实时处理。Apache Storm是一个开源的分布式实时计算系统,它能够处理无限的数据流,...

    trident-examples:用 Storm Trident 编写的一组应用程序

    三叉戟的例子一组用 Storm Trident 编写的应用程序。应用用法建造$ git clone git@github.com:mayconbordin/trident-examples.git$ cd trident-examples/$ mvn -P &lt; profile&gt; package 使用local配置文件以本地模式...

    Storm流计算项目:1号店电商实时数据分析系统-01.Storm项目实战课程大纲.pptx

    【课程大纲】01.Storm项目实战课程大纲02.CDH5搭建之CM5安装部署03.CDH5搭建和CM界面化集群管理04.Hadoop、HBase、Zookeeper集群管理和角色分配05.Kafka基础知识和集群搭建06.Kafka基本操作和最优设置07.Kafka Java ...

    storm_trident_state

    storm_trident_state storm trident自定义state的实现,实现了3种插入数据库的持久化方法,事务,不透明事务,不透明分区事务 下载下面,运行:mvn eclipse:eclipse即可在eclipse使用此项目。

    Storm实战构建大数据实时计算

    《Storm实战构建大数据实时计算》是一本专注于大数据领域实时处理技术的专著,主要围绕Apache Storm这一开源流处理系统展开。Storm被广泛应用于实时数据分析、在线机器学习、持续集成、实时网站仪表板等多个场景,其...

    Storm实战构建大数据实时计算( 带书签目录 高清完整版)

    《Storm实战构建大数据实时计算》是一本专注于大数据处理领域的专著,主要围绕开源分布式实时计算系统Apache Storm展开。Apache Storm是一个强大的工具,用于处理大规模的数据流处理,它以高吞吐量、容错性以及实时...

    trident-elasticsearch:ElasticSearch 的 Storm Trident 集成层

    "trident-elasticsearch"项目是将这两者结合的产物,它提供了一个Storm Trident的集成层,使得在Storm中处理的数据能够无缝地流入和流出Elasticsearch。 首先,让我们深入了解一下Elasticsearch。Elasticsearch基于...

    storm-trident:《风暴蓝图》

    三叉戟《风暴蓝图:分布式实时计算模式》一书的源码和翻译=============(已完成,待校对)(未开始)(已完成,待校对)(已完成,待校对)(未开始)(未开始)(进行中)(未开始)(未开始)(未开始)

    trident-gcd:Storm Trident API 的 Google Cloud Datastore 状态实现

    Trident-GCD是一个开源项目,它为Apache Storm的Trident API提供了一种集成Google Cloud Datastore的状态管理实现。Trident是Storm的一个高级接口,用于构建复杂的数据处理管道,而Google Cloud Datastore则是一个...

Global site tag (gtag.js) - Google Analytics