- 浏览: 56649 次
- 性别:
- 来自: 北京
文章分类
最新评论
1、普通事务Spout
2、事务Spout创建一个新的事务(元数据)metadata
2、1元数据定义
2、2 获得(元数据)metadata,逐个发射实际batch的tuple
3、事务Bolt,会从Emitter接收数据处理,处理完成,提交给finishBatch方法处理。
4、Icommitter,batch之间强制按照顺序进行提交
5、topo类
6、测试结果
启动一个事务:0----10
MyTransactionBolt prepare 1 attemptid3005464965348344518
MyTransactionBolt prepare 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt prepare 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
finishBatch 3
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
finishBatch 4
finishBatch 3
total==========================:10
启动一个事务:10----10
MyTransactionBolt prepare 2 attemptid4420908201582652570
MyTransactionBolt prepare 2 attemptid4420908201582652570
MyTransactionBolt prepare 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
finishBatch 3
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
finishBatch 4
finishBatch 3
total==========================:20
/** * 普通事务Spout */ public class MyTxSpout implements ITransactionalSpout<MyMata>{ private static final long serialVersionUID = 1L; /** * 数据源 */ Map<Long, String> dbMap = null; public MyTxSpout() { Random random = new Random(); dbMap = new HashMap<Long, String> (); String[] hosts = { "www.taobao.com" }; String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7", "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" }; String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53", "2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" }; for (long i = 0; i < 100; i++) { dbMap.put(i,hosts[0]+"\t"+session_id[random.nextInt(5)]+"\t"+time[random.nextInt(8)]); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("tx","log")); } public Map<String, Object> getComponentConfiguration() { return null; } public org.apache.storm.transactional.ITransactionalSpout.Coordinator<MyMata> getCoordinator(Map conf, TopologyContext context) { /** * 发射该metadata(事务tuple)到“batch emit”流 */ return new MyCoordinator(); } public org.apache.storm.transactional.ITransactionalSpout.Emitter<MyMata> getEmitter(Map conf, TopologyContext context) { /** * 逐个发射实际batch的tuple */ return new MyEmitter(dbMap); } }
2、事务Spout创建一个新的事务(元数据)metadata
2、1元数据定义
public class MyMata implements Serializable{ /** * metadata(元数据)中包含当前事务可以从哪个point进行重放数据,存放在zookeeper中的,spout可以通过Kryo从zookeeper中序列化和反序列化该元数据。 */ private static final long serialVersionUID = 1L; private long beginPoint ;//事务开始位置 private int num ;//batch 的tuple个数 @Override public String toString() { return getBeginPoint()+"----"+getNum(); } public long getBeginPoint() { return beginPoint; } public void setBeginPoint(long beginPoint) { this.beginPoint = beginPoint; } public int getNum() { return num; } public void setNum(int num) { this.num = num; } }
2、2 获得(元数据)metadata,逐个发射实际batch的tuple
public class MyEmitter implements ITransactionalSpout.Emitter<MyMata> { Map<Long, String> dbMap = null; public MyEmitter(Map<Long, String> dbMap) { this.dbMap = dbMap; } //逐个发射实际batch的tuple public void emitBatch(TransactionAttempt tx, MyMata coordinatorMeta, BatchOutputCollector collector) { long beginPoint = coordinatorMeta.getBeginPoint();// 从上一个批次获得开始位置 int num = coordinatorMeta.getNum();// 从批次中获取批次数量 for (long i = beginPoint; i < num + beginPoint; i++) { if (dbMap.get(i) == null) { continue; } collector.emit(new Values(tx, dbMap.get(i))); } } public void cleanupBefore(BigInteger txid) { } public void close() { } }
3、事务Bolt,会从Emitter接收数据处理,处理完成,提交给finishBatch方法处理。
/** * 事务Bolt */ public class MyTransactionBolt extends BaseTransactionalBolt { private static final long serialVersionUID = 1L; public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) { this.collector = collector; System.err.println("MyTransactionBolt prepare " + id.getTransactionId() + " attemptid" + id.getAttemptId()); } Integer count = 0; BatchOutputCollector collector; TransactionAttempt tx; // 会从Emitter接收数据处理,处理完成,提交给finishBatch方法处理。 public void execute(Tuple tuple) { tx = (TransactionAttempt) tuple.getValue(0); System.err.println( "MyTransactionBolt TransactionAttempt " + tx.getTransactionId() + " attemptid" + tx.getAttemptId()); String log = tuple.getString(1); if (log != null && log.length() > 0) { count++; } } // 批处理提交 public void finishBatch() { System.err.println("finishBatch " + count); collector.emit(new Values(tx, count)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("tx", "count")); } }
4、Icommitter,batch之间强制按照顺序进行提交
public class MyCommitter extends BaseTransactionalBolt implements ICommitter { /** * 接口Icommitter:标识IBatchBolt 或BaseTransactionalBolt是否是一个committer */ private static final long serialVersionUID = 1L; public static final String GLOBAL_KEY = "GLOBAL_KEY"; public static Map<String, DbValue> dbMap = new HashMap<String, DbValue>(); int sum = 0; TransactionAttempt id; BatchOutputCollector collector; public void execute(Tuple tuple) { sum += tuple.getInteger(1); } public void finishBatch() { DbValue value = dbMap.get(GLOBAL_KEY); DbValue newValue; if (value == null || !value.txid.equals(id.getTransactionId())) { // 更新数据库 newValue = new DbValue(); newValue.txid = id.getTransactionId(); if (value == null) { newValue.count = sum; } else { newValue.count = value.count + sum; } dbMap.put(GLOBAL_KEY, newValue); } else { newValue = value; } System.err.println("total==========================:" + dbMap.get(GLOBAL_KEY).count); // collector.emit(tuple) } public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) { this.id = id; this.collector = collector; } public void declareOutputFields(OutputFieldsDeclarer declarer) { } public static class DbValue { BigInteger txid; int count = 0; } }
5、topo类
public class MyTopo { /** * 事务topo */ public static void main(String[] args) { TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("ttbId", "spoutid", new MyTxSpout(), 1); builder.setBolt("bolt1", new MyTransactionBolt(), 3).shuffleGrouping("spoutid"); builder.setBolt("committer", new MyCommitter(), 1).shuffleGrouping("bolt1"); Config conf = new Config(); conf.setDebug(true); if (args.length > 0) { try { StormSubmitter.submitTopology(args[0], conf, builder.buildTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } catch (AuthorizationException e) { e.printStackTrace(); } } else { LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("mytopology", conf, builder.buildTopology()); } } }
6、测试结果
引用
启动一个事务:0----10
MyTransactionBolt prepare 1 attemptid3005464965348344518
MyTransactionBolt prepare 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt prepare 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
finishBatch 3
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1 attemptid3005464965348344518
finishBatch 4
finishBatch 3
total==========================:10
启动一个事务:10----10
MyTransactionBolt prepare 2 attemptid4420908201582652570
MyTransactionBolt prepare 2 attemptid4420908201582652570
MyTransactionBolt prepare 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
finishBatch 3
MyTransactionBolt TransactionAttempt 2 attemptid4420908201582652570
finishBatch 4
finishBatch 3
total==========================:20
发表评论
-
ITridentSpout、FirstN(取Top N)实现、 流合并和join
2017-05-25 10:01 1032一、ITridentSpout 基于事务 static int ... -
Trident实战之计算网站PV
2017-05-24 13:24 6481、Trident实战之计算网站PV /** * ... -
Trident API和概念
2017-05-23 10:57 750一、Trident API——Spout ITride ... -
Trident入门
2017-05-22 13:44 516英文原址:https://github.com/nathanm ... -
分布式远程调用drpc实例
2017-05-22 10:53 416一、DRPC定义 分布式dRPC(distributed RP ... -
不透明分区事务IOpaquePartitionedTransactional实例
2017-05-22 10:54 6791、spout public class MyOpaq ... -
分区事务IPartitionedTransactionalSpout实例
2017-05-21 11:02 5821.分区事务spout public class My ... -
普通事务ITransactionalSpout实例之按天统计数据
2017-05-20 16:56 4861、普通事务Spout /** * 普通事务Spou ... -
Storm事务API
2017-05-19 16:00 613Spout ITransactionalSpout<T& ... -
Storm批处理事务原理详解
2017-05-19 15:54 2102事务:Storm容错机制通 ... -
集群统一启动和停止shell脚本开发
2017-05-17 09:56 4481、cd 2、ls -al 显示隐藏目录 3、rm -rf ... -
storm高并发UV统计
2017-05-14 22:05 1130统计高并发UV可行的方案(类似WordCount的计算去重wo ... -
storm高并发PV统计,利用zookeeper锁输出汇总值
2017-05-14 14:42 895汇总型方案: 1、shuffleGrouping下,pv(单线 ... -
storm高并发PV统计
2017-04-16 17:54 686一、PV统计思考 方案需要考虑分析多线程下,注意线程安全问题。 ... -
Storm高并发运用WordSum
2017-04-16 14:21 10681、创建发射所有字符串统计总个数及去重个数处理类 pub ... -
storm分组策略介绍
2017-04-16 11:46 702一、storm数据来源 Spout的数据源: MQ:直接流数 ... -
Storm高并发介绍
2017-04-16 10:18 590并发度: worker:指的是component (spo ... -
Storm 字符统计Demo
2017-04-14 13:57 5331、数据源读取,字符发射spout类 /** * 字符 ... -
Storm 本地模式
2017-04-09 22:25 395本地模式,是在eclipse等编译器编写strom运行文件 ... -
Storm启动配置
2017-03-29 17:40 672一、安装Storm wget ...
相关推荐
在Spring中,我们可以配置事务的传播行为,比如REQUIRED(默认,如果当前存在事务,则加入当前事务,否则新建一个事务)、PROPAGATION_SUPPORTS(如果当前存在事务,则加入,否则不开启事务)、PROPAGATION_REQUIRES...
S7300与wincc由普通网卡通讯实例.zip西门子PLC编程实例程序源码下载S7300与wincc由普通网卡通讯实例.zip西门子PLC编程实例程序源码下载S7300与wincc由普通网卡通讯实例.zip西门子PLC编程实例程序源码下载S7300与...
其次,优化事务的提交和回滚机制,比如引入事务序列号,确保每个TM实例的事务请求按照预设的顺序进行,避免TC接收无序的事务操作。再者,可以对TM实例进行适当的隔离,例如设置事务处理的独占锁或者使用分布式锁来...
本实例将深入探讨如何在Spring中使用AOP来实现声明式事务配置。 一、Spring AOP基础 AOP允许我们在程序运行时动态地将代码插入到其他对象中,它主要用来处理那些具有横切关注点的问题,如日志、异常处理和事务管理...
通过上述实例代码,我们不仅理解了C#中如何使用SQL事务处理来确保数据操作的原子性和一致性,还学习了如何优雅地处理异常情况,包括事务的回滚和资源的释放。这对于开发高质量、健壮的应用程序至关重要。在实际项目...
在5.0.2版本中,LCN遇到了一个特定的问题,即当同一个微服务模块运行多个实例时,事务管理器(TM)在通知事务协调器(TC)时可能出现混乱的情况。 首先,我们要理解LCN的基本工作原理。LCN通过在应用程序中嵌入事务...
本实例将深入探讨如何在这样的环境中实现事务管理。 Spring框架以其强大的依赖注入和AOP(面向切面编程)功能,为事务管理提供了便捷的方式。Spring支持编程式和声明式事务管理。编程式事务管理允许我们在代码中...
在本实例中,我们将重点探讨WCF服务的基础知识以及如何实现WCF事务编程。** ### 1. WCF基础知识 - **服务导向架构**: WCF基于服务导向架构(SOA),允许服务和客户端之间通过标准协议进行通信。 - **绑定**: 绑定定义...
本实例主要探讨了如何在.NET环境中,结合SQL语句实现事务的应用。以下将详细介绍相关知识点。 首先,`.NET事务应用`指的是使用.NET Framework的类库进行事务管理。在.NET中,我们可以使用System.Transactions命名...
资源名:西门子PLC工程实例源码第184期:S7300与wincc由普通网卡通讯实例.rar 资源类型:西门子PLC工程实例源码 源码说明: 全部项目源码都是经过测试校正后百分百成功运行的,如果您下载后不能运行可联系我进行指导...
本文将深入解析`@Transactional`的事务回滚机制,并通过实例来详细讲解其工作原理,帮助读者理解和应用这一核心功能。 一、`@Transactional`注解介绍 `@Transactional`是Spring提供的一个注解,用于在方法级别或类...
本文将深入探讨如何利用STM32的普通定时器实现每1秒钟使LED闪烁一次的应用实例。 首先,我们要了解STM32中的普通定时器(TIM)。STM32提供了多种类型的定时器,包括高级定时器、通用定时器和基本定时器。普通定时器...
257-普通定时器时钟(51单片机C语言实例Proteus仿真和代码)257-普通定时器时钟(51单片机C语言实例Proteus仿真和代码)257-普通定时器时钟(51单片机C语言实例Proteus仿真和代码)257-普通定时器时钟(51单片机C语言实例...
spring事务管理几种方式代码实例:涉及编程式事务,声明式事务之拦截器代理方式、AOP切面通知方式、AspectJ注解方式,通过不同方式实例代码展现,总结spring事务管理的一般规律,从宏观上加深理解spring事务管理特性...
西门子PLC例程-S7300与wincc由普通网卡通讯实例
141-定时做普通时钟可调(51单片机C语言实例Proteus仿真和代码)141-定时做普通时钟可调(51单片机C语言实例Proteus仿真和代码)141-定时做普通时钟可调(51单片机C语言实例Proteus仿真和代码)141-定时做普通时钟可调(51...
J2EE(Java 2 Platform, Enterprise Edition)是Oracle公司推出的用于构建企业级分布式应用程序的框架,它提供了服务器端的编程模型和运行环境,支持多种服务,如事务处理、安全、集群、数据库连接等。本实例集合将...
介绍了事务的四大原则,并通过实例介绍数据库实现事务的方法,以及使用JDBC实现事务的方法。 2-1 事务原则与实现:事务 2-2 事务原则与实现:SQL事务 2-3 事务原则与实现:JDBC事务(上) 2-4 事务原则与实现:JDBC...
此外,Java实现工作流实例还可能涉及到异常处理、事务管理、定制化行为扩展等功能。例如,如果在流程中遇到问题,可以设置错误处理策略,如回退到上一步或终止流程。开发者还可以通过扩展引擎提供的钩子函数,实现...
本文实例讲述了Python sqlite3事务处理方法。分享给大家供大家参考,具体如下: sqlite3事务总结: 在connect()中不传入 isolation_level 事务处理: 使用connection.commit() #!/usr/bin/env python # -*- coding:...