- 浏览: 56651 次
- 性别:
- 来自: 北京
文章分类
最新评论
1、普通事务Spout
2、事务Spout创建一个新的事务(元数据)metadata
2、1元数据定义
2、2 获得(元数据)metadata,逐个发射实际batch的tuple
------------------------spout与上篇相同-----------------------------------------
3、按天统计数据事务Bolt,会从Emitter接收数据处理,处理完成,提交给finishBatch方法处理。
4、接收统计数据,累加汇总MyDailyCommitterBolt,batch之间强制按照顺序进行提交
5、topo类
6、测试结果
/** * 普通事务Spout */ public class MyTxSpout implements ITransactionalSpout<MyMata>{ private static final long serialVersionUID = 1L; /** * 数据源 */ Map<Long, String> dbMap = null; public MyTxSpout() { Random random = new Random(); dbMap = new HashMap<Long, String> (); String[] hosts = { "www.taobao.com" }; String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7", "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" }; String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53", "2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" }; for (long i = 0; i < 100; i++) { dbMap.put(i,hosts[0]+"\t"+session_id[random.nextInt(5)]+"\t"+time[random.nextInt(8)]); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("tx","log")); } public Map<String, Object> getComponentConfiguration() { return null; } public org.apache.storm.transactional.ITransactionalSpout.Coordinator<MyMata> getCoordinator(Map conf, TopologyContext context) { /** * 发射该metadata(事务tuple)到“batch emit”流 */ return new MyCoordinator(); } public org.apache.storm.transactional.ITransactionalSpout.Emitter<MyMata> getEmitter(Map conf, TopologyContext context) { /** * 逐个发射实际batch的tuple */ return new MyEmitter(dbMap); } }
2、事务Spout创建一个新的事务(元数据)metadata
2、1元数据定义
public class MyMata implements Serializable{ /** * metadata(元数据)中包含当前事务可以从哪个point进行重放数据,存放在zookeeper中的,spout可以通过Kryo从zookeeper中序列化和反序列化该元数据。 */ private static final long serialVersionUID = 1L; private long beginPoint ;//事务开始位置 private int num ;//batch 的tuple个数 @Override public String toString() { return getBeginPoint()+"----"+getNum(); } public long getBeginPoint() { return beginPoint; } public void setBeginPoint(long beginPoint) { this.beginPoint = beginPoint; } public int getNum() { return num; } public void setNum(int num) { this.num = num; } }
2、2 获得(元数据)metadata,逐个发射实际batch的tuple
public class MyEmitter implements ITransactionalSpout.Emitter<MyMata> { Map<Long, String> dbMap = null; public MyEmitter(Map<Long, String> dbMap) { this.dbMap = dbMap; } //逐个发射实际batch的tuple public void emitBatch(TransactionAttempt tx, MyMata coordinatorMeta, BatchOutputCollector collector) { long beginPoint = coordinatorMeta.getBeginPoint();// 从上一个批次获得开始位置 int num = coordinatorMeta.getNum();// 从批次中获取批次数量 for (long i = beginPoint; i < num + beginPoint; i++) { if (dbMap.get(i) == null) { continue; } collector.emit(new Values(tx, dbMap.get(i))); } } public void cleanupBefore(BigInteger txid) { } public void close() { } }
------------------------spout与上篇相同-----------------------------------------
3、按天统计数据事务Bolt,会从Emitter接收数据处理,处理完成,提交给finishBatch方法处理。
public class MyDailyBatchBolt implements IBatchBolt<TransactionAttempt> { /** * 按天统计数据 */ private static final long serialVersionUID = 1L; Map<String, Integer> countMap = new HashMap<String, Integer>(); BatchOutputCollector collector ; Integer count = null; String today = null; TransactionAttempt tx = null; @Override public void execute(Tuple tuple) { // TODO Auto-generated method stub String log = tuple.getString(1); tx = (TransactionAttempt)tuple.getValue(0); if (log != null && log.split("\\t").length >=3 ) { today = DateFmt.getCountDate(log.split("\\t")[2], DateFmt.date_short) ; count = countMap.get(today); if(count == null) { count = 0; } count ++ ; countMap.put(today, count); } } @Override public void finishBatch() { collector.emit(new Values(tx,today,count)); } @Override public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) { // TODO Auto-generated method stub this.collector = collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("tx","date","count")); } @Override public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } }
4、接收统计数据,累加汇总MyDailyCommitterBolt,batch之间强制按照顺序进行提交
public class MyDailyCommitterBolt extends BaseTransactionalBolt implements ICommitter{ /** * 接收统计数据,累加汇总 */ private static final long serialVersionUID = 1L; public static final String GLOBAL_KEY = "GLOBAL_KEY"; public static Map<String, DbValue> dbMap = new HashMap<String, DbValue>() ; Map<String, Integer> countMap = new HashMap<String, Integer>(); TransactionAttempt id ; BatchOutputCollector collector; String today = null; @Override public void execute(Tuple tuple) { today = tuple.getString(1) ; Integer count = tuple.getInteger(2); id = (TransactionAttempt)tuple.getValue(0); if (today !=null && count != null) { Integer batchCount = countMap.get(today) ; if (batchCount == null) { batchCount = 0; } batchCount += count ; countMap.put(today, batchCount); } } @Override public void finishBatch() { // TODO Auto-generated method stub if (countMap.size() > 0) { DbValue value = dbMap.get(GLOBAL_KEY); DbValue newValue ; if (value == null || !value.txid.equals(id.getTransactionId())) { //更新数据库 newValue = new DbValue(); newValue.txid = id.getTransactionId() ; newValue.dateStr = today; if (value == null) { newValue.count = countMap.get(today) ; }else { newValue.count = value.count + countMap.get("2014-01-07") ; } dbMap.put(GLOBAL_KEY, newValue); }else { newValue = value; } } System.out.println("total==========================:"+dbMap.get(GLOBAL_KEY).count); // collector.emit(tuple) } @Override public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) { // TODO Auto-generated method stub this.id = id ; this.collector = collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub } public static class DbValue { BigInteger txid; int count = 0; String dateStr; } }
5、topo类
public class MyDailyTopo { /** * @param args */ public static void main(String[] args) { // TODO Auto-generated method stub TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("ttbId","spoutid",new MyTxSpout(),1); builder.setBolt("bolt1", new MyDailyBatchBolt(),3).shuffleGrouping("spoutid"); builder.setBolt("committer", new MyDailyCommitterBolt(),1).shuffleGrouping("bolt1") ; Config conf = new Config() ; conf.setDebug(true); if (args.length > 0) { try { StormSubmitter.submitTopology(args[0], conf, builder.buildTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } }else { LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("mytopology", conf, builder.buildTopology()); } } }
6、测试结果
引用
启动一个事务:0----10
total==========================:10
启动一个事务:10----10
total==========================:20
启动一个事务:20----10
total==========================:30
total==========================:10
启动一个事务:10----10
total==========================:20
启动一个事务:20----10
total==========================:30
发表评论
-
ITridentSpout、FirstN(取Top N)实现、 流合并和join
2017-05-25 10:01 1032一、ITridentSpout 基于事务 static int ... -
Trident实战之计算网站PV
2017-05-24 13:24 6481、Trident实战之计算网站PV /** * ... -
Trident API和概念
2017-05-23 10:57 750一、Trident API——Spout ITride ... -
Trident入门
2017-05-22 13:44 516英文原址:https://github.com/nathanm ... -
分布式远程调用drpc实例
2017-05-22 10:53 416一、DRPC定义 分布式dRPC(distributed RP ... -
不透明分区事务IOpaquePartitionedTransactional实例
2017-05-22 10:54 6791、spout public class MyOpaq ... -
分区事务IPartitionedTransactionalSpout实例
2017-05-21 11:02 5821.分区事务spout public class My ... -
普通事务ITransactionalSpout实例
2017-05-20 15:45 8201、普通事务Spout /** * 普通事务Spou ... -
Storm事务API
2017-05-19 16:00 613Spout ITransactionalSpout<T& ... -
Storm批处理事务原理详解
2017-05-19 15:54 2102事务:Storm容错机制通 ... -
集群统一启动和停止shell脚本开发
2017-05-17 09:56 4481、cd 2、ls -al 显示隐藏目录 3、rm -rf ... -
storm高并发UV统计
2017-05-14 22:05 1130统计高并发UV可行的方案(类似WordCount的计算去重wo ... -
storm高并发PV统计,利用zookeeper锁输出汇总值
2017-05-14 14:42 896汇总型方案: 1、shuffleGrouping下,pv(单线 ... -
storm高并发PV统计
2017-04-16 17:54 686一、PV统计思考 方案需要考虑分析多线程下,注意线程安全问题。 ... -
Storm高并发运用WordSum
2017-04-16 14:21 10681、创建发射所有字符串统计总个数及去重个数处理类 pub ... -
storm分组策略介绍
2017-04-16 11:46 702一、storm数据来源 Spout的数据源: MQ:直接流数 ... -
Storm高并发介绍
2017-04-16 10:18 590并发度: worker:指的是component (spo ... -
Storm 字符统计Demo
2017-04-14 13:57 5331、数据源读取,字符发射spout类 /** * 字符 ... -
Storm 本地模式
2017-04-09 22:25 395本地模式,是在eclipse等编译器编写strom运行文件 ... -
Storm启动配置
2017-03-29 17:40 672一、安装Storm wget ...
相关推荐
python数据分析实例 python数据分析实例(源码) # python数据分析 #### 介绍 python数据可视化例子 ##### 1.SARIMAX模型对公路车流量预测 ##### 2.古诗词云统计 ##### 3.对大数据岗位可视化分析
《SAS系统与数据分析:深度解析与实例应用》 SAS(Statistical Analysis System)是一种强大的统计分析软件,广泛应用于商业智能、数据挖掘、预测分析等领域。本资料集旨在为初学者提供一个全面且深入的SAS学习平台...
4-9 JTA多数据源事务实例 第5章 分布式系统 介绍了分布式系统的定义、实现原则和几种形式,详细介绍了微服务架构的分布式系统,并使用Spring Cloud框架演示了一个完整的微服务系统的实现过程。 5-1 CAP原则和BASE...
首先,`基于echarts的中国地图省份数据统计显示`的标题表明我们将使用Echarts的特定地图组件来呈现中国各个省份的数据统计结果。Echarts提供了预置的地图数据,包括中国的省级和市级地图,可以直接调用,无需手动...
这份资源是一份针对深度学习计算机视觉领域的实例分割源码,使用 Ultralytics YOLOv8-seg 模型和 COCO128-seg 数据集进行目标检测和实例分割任务。提供了一个亲身测试且直接可运行的实例分割解决方案。 数据集我已经...
持续更新数据分析实例,包括但不限于数据清洗,统计检验,数据挖掘等内容,实现细节描述请参考博客~_python-data-analysis
实例31 使用文件统计在线人数 实例32 设置和获取Cookie参数 实例33 删除会话中已注册变量 实例34 数据编码 实例35 简单购物车 第5章 PEAR和正则表达式实例 实例36 PEAR管理器安装及PEAR包常见操作 实例37 ...
《十天学会单片机实例100》一书提供了丰富的单片机实践教程,旨在通过实际操作加深学习者对单片机的理解和掌握。以下是对该书中部分关键实例的知识点详细解读: ### 函数的使用和熟悉 函数是单片机编程中的基本构建...
【SPSS数据分析实例】这篇教程主要介绍了如何使用SPSS进行数据处理和分析的基本步骤,适合初学者入门。SPSS(Statistical Package for the Social Sciences)是一款广泛应用的统计分析软件,以其友好的用户界面和易...
13. **数据可视化**:LabVIEW强大的图表和图形功能会在实例中得到体现,如实时波形显示、数据统计图、3D图形等。 14. **并行处理与多线程**:LabVIEW支持并行编程,实例可能涵盖如何利用并行结构提高程序性能。 15...
J2EE(Java 2 Platform, Enterprise Edition)是Oracle公司推出的用于构建企业级分布式应用程序的框架,它提供了服务器端的编程模型和运行环境,支持多种服务,如事务处理、安全、集群、数据库连接等。本实例集合将...
《C++编程实例100篇》是一本深入浅出的C++编程教程,它以实践为主导,通过丰富的实例帮助学习者掌握C++语言的基础和核心概念。这本书的每个实例都精心设计,旨在帮助初学者和有经验的开发者巩固和提升C++编程技能。 ...
在IT行业中,处理大量数据是常见的挑战之一,尤其是在数据导出方面。本实例聚焦于“java实现csv导出千万级数据实例”,旨在提供一个高效、稳定的解决方案,避免因数据量过大而导致的性能问题,如Java中的栈溢出...
spring事务管理几种方式代码实例:涉及编程式事务,声明式事务之拦截器代理方式、AOP切面通知方式、AspectJ注解方式,通过不同方式实例代码展现,总结spring事务管理的一般规律,从宏观上加深理解spring事务管理特性...
实例245 取出数据统计结果前3名数据 9.4 周期、日期查询 实例246 查询指定日期的数据 实例247 查询指定时间段的数据 实例248 按月查询数据 9.5 大小比较、逻辑查询、重复记录查询 实例249 查询数据大于指定...
### CANopen实例分析之PDO #### 一、深入解析PDO收发过程 在现代工业自动化领域,CANopen作为一项广泛采用的现场总线协议,其PDO(Process Data Object,过程数据对象)机制扮演着关键角色,负责实时数据的高效...
在本资源中,"ArcGIS+Engine+C#实例开发源代码以及图表统计图源代码" 提供了基于Esri的ArcGIS Engine和C#编程语言的GIS应用开发实例,同时结合了Zegraph这一开源图表库来实现数据的可视化。下面将详细探讨这些知识点...
实例203 如何在数据库更新过程中增加事务操作 实例204 如何在程序中实现查找字段 实例205 如何在程序中实现自定义字段 实例206 如何使用字段拖放功能 实例207 如何为程序添加报表打印功能 实例208 如何使用向导...