- 浏览: 57357 次
- 性别:
- 来自: 北京
文章分类
最新评论
1、普通事务Spout
2、事务Spout创建一个新的事务(元数据)metadata
2、1元数据定义
2、2 获得(元数据)metadata,逐个发射实际batch的tuple
3、事务Bolt,会从Emitter接收数据处理,处理完成,提交给finishBatch方法处理。
4、Icommitter,batch之间强制按照顺序进行提交
5、topo类
6、测试结果
启动一个事务:0----10
MyTransactionBolt prepare 1 attemptid3005464965348344518
MyTransactionBolt prepare 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt prepare 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
finishBatch 3
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
finishBatch 4
finishBatch 3
total==========================:10
启动一个事务:10----10
MyTransactionBolt prepare 2 attemptid4420908201582652570
MyTransactionBolt prepare 2 attemptid4420908201582652570
MyTransactionBolt prepare 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
finishBatch 3
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
finishBatch 4
finishBatch 3
total==========================:20
/** * 普通事务Spout */ public class MyTxSpout implements ITransactionalSpout<MyMata>{ private static final long serialVersionUID = 1L; /** * 数据源 */ Map<Long, String> dbMap = null; public MyTxSpout() { Random random = new Random(); dbMap = new HashMap<Long, String> (); String[] hosts = { "www.taobao.com" }; String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7", "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" }; String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53", "2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" }; for (long i = 0; i < 100; i++) { dbMap.put(i,hosts[0]+"\t"+session_id[random.nextInt(5)]+"\t"+time[random.nextInt(8)]); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("tx","log")); } public Map<String, Object> getComponentConfiguration() { return null; } public org.apache.storm.transactional.ITransactionalSpout.Coordinator<MyMata> getCoordinator(Map conf, TopologyContext context) { /** * 发射该metadata(事务tuple)到“batch emit”流 */ return new MyCoordinator(); } public org.apache.storm.transactional.ITransactionalSpout.Emitter<MyMata> getEmitter(Map conf, TopologyContext context) { /** * 逐个发射实际batch的tuple */ return new MyEmitter(dbMap); } }
2、事务Spout创建一个新的事务(元数据)metadata
2、1元数据定义
public class MyMata implements Serializable{ /** * metadata(元数据)中包含当前事务可以从哪个point进行重放数据,存放在zookeeper中的,spout可以通过Kryo从zookeeper中序列化和反序列化该元数据。 */ private static final long serialVersionUID = 1L; private long beginPoint ;//事务开始位置 private int num ;//batch 的tuple个数 @Override public String toString() { return getBeginPoint()+"----"+getNum(); } public long getBeginPoint() { return beginPoint; } public void setBeginPoint(long beginPoint) { this.beginPoint = beginPoint; } public int getNum() { return num; } public void setNum(int num) { this.num = num; } }
2、2 获得(元数据)metadata,逐个发射实际batch的tuple
public class MyEmitter implements ITransactionalSpout.Emitter<MyMata> { Map<Long, String> dbMap = null; public MyEmitter(Map<Long, String> dbMap) { this.dbMap = dbMap; } //逐个发射实际batch的tuple public void emitBatch(TransactionAttempt tx, MyMata coordinatorMeta, BatchOutputCollector collector) { long beginPoint = coordinatorMeta.getBeginPoint();// 从上一个批次获得开始位置 int num = coordinatorMeta.getNum();// 从批次中获取批次数量 for (long i = beginPoint; i < num + beginPoint; i++) { if (dbMap.get(i) == null) { continue; } collector.emit(new Values(tx, dbMap.get(i))); } } public void cleanupBefore(BigInteger txid) { } public void close() { } }
3、事务Bolt,会从Emitter接收数据处理,处理完成,提交给finishBatch方法处理。
/** * 事务Bolt */ public class MyTransactionBolt extends BaseTransactionalBolt { private static final long serialVersionUID = 1L; public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) { this.collector = collector; System.err.println("MyTransactionBolt prepare " + id.getTransactionId() + " attemptid" + id.getAttemptId()); } Integer count = 0; BatchOutputCollector collector; TransactionAttempt tx; // 会从Emitter接收数据处理,处理完成,提交给finishBatch方法处理。 public void execute(Tuple tuple) { tx = (TransactionAttempt) tuple.getValue(0); System.err.println( "MyTransactionBolt TransactionAttempt " + tx.getTransactionId() + " attemptid" + tx.getAttemptId()); String log = tuple.getString(1); if (log != null && log.length() > 0) { count++; } } // 批处理提交 public void finishBatch() { System.err.println("finishBatch " + count); collector.emit(new Values(tx, count)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("tx", "count")); } }
4、Icommitter,batch之间强制按照顺序进行提交
public class MyCommitter extends BaseTransactionalBolt implements ICommitter { /** * 接口Icommitter:标识IBatchBolt 或BaseTransactionalBolt是否是一个committer */ private static final long serialVersionUID = 1L; public static final String GLOBAL_KEY = "GLOBAL_KEY"; public static Map<String, DbValue> dbMap = new HashMap<String, DbValue>(); int sum = 0; TransactionAttempt id; BatchOutputCollector collector; public void execute(Tuple tuple) { sum += tuple.getInteger(1); } public void finishBatch() { DbValue value = dbMap.get(GLOBAL_KEY); DbValue newValue; if (value == null || !value.txid.equals(id.getTransactionId())) { // 更新数据库 newValue = new DbValue(); newValue.txid = id.getTransactionId(); if (value == null) { newValue.count = sum; } else { newValue.count = value.count + sum; } dbMap.put(GLOBAL_KEY, newValue); } else { newValue = value; } System.err.println("total==========================:" + dbMap.get(GLOBAL_KEY).count); // collector.emit(tuple) } public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) { this.id = id; this.collector = collector; } public void declareOutputFields(OutputFieldsDeclarer declarer) { } public static class DbValue { BigInteger txid; int count = 0; } }
5、topo类
public class MyTopo { /** * 事务topo */ public static void main(String[] args) { TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("ttbId", "spoutid", new MyTxSpout(), 1); builder.setBolt("bolt1", new MyTransactionBolt(), 3).shuffleGrouping("spoutid"); builder.setBolt("committer", new MyCommitter(), 1).shuffleGrouping("bolt1"); Config conf = new Config(); conf.setDebug(true); if (args.length > 0) { try { StormSubmitter.submitTopology(args[0], conf, builder.buildTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } catch (AuthorizationException e) { e.printStackTrace(); } } else { LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("mytopology", conf, builder.buildTopology()); } } }
6、测试结果
引用
启动一个事务:0----10
MyTransactionBolt prepare 1 attemptid3005464965348344518
MyTransactionBolt prepare 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt prepare 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
finishBatch 3
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
finishBatch 4
finishBatch 3
total==========================:10
启动一个事务:10----10
MyTransactionBolt prepare 2 attemptid4420908201582652570
MyTransactionBolt prepare 2 attemptid4420908201582652570
MyTransactionBolt prepare 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
finishBatch 3
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
finishBatch 4
finishBatch 3
total==========================:20
发表评论
-
ITridentSpout、FirstN(取Top N)实现、 流合并和join
2017-05-25 10:01 1040一、ITridentSpout 基于事务 static int ... -
Trident实战之计算网站PV
2017-05-24 13:24 6561、Trident实战之计算网站PV /** * ... -
Trident API和概念
2017-05-23 10:57 759一、Trident API——Spout ITride ... -
Trident入门
2017-05-22 13:44 524英文原址:https://github.com/nathanm ... -
分布式远程调用drpc实例
2017-05-22 10:53 424一、DRPC定义 分布式dRPC(distributed RP ... -
不透明分区事务IOpaquePartitionedTransactional实例
2017-05-22 10:54 6821、spout public class MyOpaq ... -
分区事务IPartitionedTransactionalSpout实例
2017-05-21 11:02 5871.分区事务spout public class My ... -
普通事务ITransactionalSpout实例之按天统计数据
2017-05-20 16:56 4901、普通事务Spout /** * 普通事务Spou ... -
Storm事务API
2017-05-19 16:00 616Spout ITransactionalSpout<T& ... -
Storm批处理事务原理详解
2017-05-19 15:54 2117事务:Storm容错机制通 ... -
集群统一启动和停止shell脚本开发
2017-05-17 09:56 4581、cd 2、ls -al 显示隐藏目录 3、rm -rf ... -
storm高并发UV统计
2017-05-14 22:05 1143统计高并发UV可行的方案(类似WordCount的计算去重wo ... -
storm高并发PV统计,利用zookeeper锁输出汇总值
2017-05-14 14:42 903汇总型方案: 1、shuffleGrouping下,pv(单线 ... -
storm高并发PV统计
2017-04-16 17:54 698一、PV统计思考 方案需要考虑分析多线程下,注意线程安全问题。 ... -
Storm高并发运用WordSum
2017-04-16 14:21 10751、创建发射所有字符串统计总个数及去重个数处理类 pub ... -
storm分组策略介绍
2017-04-16 11:46 707一、storm数据来源 Spout的数据源: MQ:直接流数 ... -
Storm高并发介绍
2017-04-16 10:18 601并发度: worker:指的是component (spo ... -
Storm 字符统计Demo
2017-04-14 13:57 5381、数据源读取,字符发射spout类 /** * 字符 ... -
Storm 本地模式
2017-04-09 22:25 400本地模式,是在eclipse等编译器编写strom运行文件 ... -
Storm启动配置
2017-03-29 17:40 675一、安装Storm wget ...
相关推荐
在这个“spring简单实例 事务回滚”的案例中,我们将深入探讨Spring如何处理事务回滚,以及它是如何在Java源码层面实现这一功能的。 首先,让我们理解什么是事务。在数据库操作中,事务是确保数据一致性的重要机制...
在Spring中,我们可以配置事务的传播行为,比如REQUIRED(默认,如果当前存在事务,则加入当前事务,否则新建一个事务)、PROPAGATION_SUPPORTS(如果当前存在事务,则加入,否则不开启事务)、PROPAGATION_REQUIRES...
S7300与wincc由普通网卡通讯实例.zip西门子PLC编程实例程序源码下载S7300与wincc由普通网卡通讯实例.zip西门子PLC编程实例程序源码下载S7300与wincc由普通网卡通讯实例.zip西门子PLC编程实例程序源码下载S7300与...
在5.0.2版本中,LCN遇到了一个特定的问题,即当同一个微服务模块运行多个实例时,事务管理器(TM)在通知事务协调器(TC)时可能出现混乱的情况。 首先,我们要理解LCN的基本工作原理。LCN通过在应用程序中嵌入事务...
本实例将深入探讨如何在这样的环境中实现事务管理。 Spring框架以其强大的依赖注入和AOP(面向切面编程)功能,为事务管理提供了便捷的方式。Spring支持编程式和声明式事务管理。编程式事务管理允许我们在代码中...
本实例主要探讨了如何在.NET环境中,结合SQL语句实现事务的应用。以下将详细介绍相关知识点。 首先,`.NET事务应用`指的是使用.NET Framework的类库进行事务管理。在.NET中,我们可以使用System.Transactions命名...
资源名:西门子PLC工程实例源码第184期:S7300与wincc由普通网卡通讯实例.rar 资源类型:西门子PLC工程实例源码 源码说明: 全部项目源码都是经过测试校正后百分百成功运行的,如果您下载后不能运行可联系我进行指导...
本文将深入探讨如何利用STM32的普通定时器实现每1秒钟使LED闪烁一次的应用实例。 首先,我们要了解STM32中的普通定时器(TIM)。STM32提供了多种类型的定时器,包括高级定时器、通用定时器和基本定时器。普通定时器...
203-普通定时器时钟(51单片机C语言实例Proteus仿真和代码)203-普通定时器时钟(51单片机C语言实例Proteus仿真和代码)203-普通定时器时钟(51单片机C语言实例Proteus仿真和代码)203-普通定时器时钟(51单片机C语言实例...
257-普通定时器时钟(51单片机C语言实例Proteus仿真和代码)257-普通定时器时钟(51单片机C语言实例Proteus仿真和代码)257-普通定时器时钟(51单片机C语言实例Proteus仿真和代码)257-普通定时器时钟(51单片机C语言实例...
西门子PLC例程-S7300与wincc由普通网卡通讯实例
141-定时做普通时钟可调(51单片机C语言实例Proteus仿真和代码)141-定时做普通时钟可调(51单片机C语言实例Proteus仿真和代码)141-定时做普通时钟可调(51单片机C语言实例Proteus仿真和代码)141-定时做普通时钟可调(51...
J2EE(Java 2 Platform, Enterprise Edition)是Oracle公司推出的用于构建企业级分布式应用程序的框架,它提供了服务器端的编程模型和运行环境,支持多种服务,如事务处理、安全、集群、数据库连接等。本实例集合将...
介绍了事务的四大原则,并通过实例介绍数据库实现事务的方法,以及使用JDBC实现事务的方法。 2-1 事务原则与实现:事务 2-2 事务原则与实现:SQL事务 2-3 事务原则与实现:JDBC事务(上) 2-4 事务原则与实现:JDBC...
本文实例讲述了Python sqlite3事务处理方法。分享给大家供大家参考,具体如下: sqlite3事务总结: 在connect()中不传入 isolation_level 事务处理: 使用connection.commit() #!/usr/bin/env python # -*- coding:...
其次,优化事务的提交和回滚机制,比如引入事务序列号,确保每个TM实例的事务请求按照预设的顺序进行,避免TC接收无序的事务操作。再者,可以对TM实例进行适当的隔离,例如设置事务处理的独占锁或者使用分布式锁来...
实例1 如何使用错误提醒控件 实例2 如何使用信息提示控件 实例3 如何使用菜单控件 实例4 如何使用工具栏控件 实例5 如何使用状态栏控件 实例6 如何使用托盘控件 实例7 如何使用标签页控件 实例8 如何使用进度条控件 ...
实例45 客户端解析普通字符串 实例46 客户端解析XML文件响应 实例47 客户端发送POST无参数请求 实例48 客户端发送带有参数请求 实例49 客户端以表格形式显示数据 实例50 服务器端自动生成XML文件 实例51 客户端以...
flex组件之格式化组件实例源码,包括37个实例,一些常用的普通组件