`

普通事务ITransactionalSpout实例

 
阅读更多
1、普通事务Spout

/**
 * 普通事务Spout
 */
public class MyTxSpout implements ITransactionalSpout<MyMata>{

	
	private static final long serialVersionUID = 1L;
	
	
	/**
	 * 数据源
	 */
	Map<Long, String> dbMap  = null;
	public MyTxSpout()
	{
		Random random = new Random();
		dbMap = new HashMap<Long, String> ();
		String[] hosts = { "www.taobao.com" };
		String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7",
				"CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" };
		String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53", 
				"2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" };
		
		for (long i = 0; i < 100; i++) {
			dbMap.put(i,hosts[0]+"\t"+session_id[random.nextInt(5)]+"\t"+time[random.nextInt(8)]);
		}
	}
	
	

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("tx","log"));
	}

	public Map<String, Object> getComponentConfiguration() {
		return null;
	}

	public org.apache.storm.transactional.ITransactionalSpout.Coordinator<MyMata> getCoordinator(Map conf,
			TopologyContext context) {
		/**
		 * 发射该metadata(事务tuple)到“batch emit”流
		 */
		return new MyCoordinator();
	}

	public org.apache.storm.transactional.ITransactionalSpout.Emitter<MyMata> getEmitter(Map conf,
			TopologyContext context) {
		/**
		 * 逐个发射实际batch的tuple
		 */
		return new MyEmitter(dbMap);
	}

}


2、事务Spout创建一个新的事务(元数据)metadata
  2、1元数据定义
public class MyMata implements Serializable{

	/**
	 * metadata(元数据)中包含当前事务可以从哪个point进行重放数据,存放在zookeeper中的,spout可以通过Kryo从zookeeper中序列化和反序列化该元数据。 
	 */
	private static final long serialVersionUID = 1L;
	
	private long beginPoint ;//事务开始位置

	private int num ;//batch 的tuple个数
	@Override
	public String toString() {
		return getBeginPoint()+"----"+getNum();
	}
	
	public long getBeginPoint() {
		return beginPoint;
	}

	public void setBeginPoint(long beginPoint) {
		this.beginPoint = beginPoint;
	}

	public int getNum() {
		return num;
	}

	public void setNum(int num) {
		this.num = num;
	}
	
	
}



2、2 获得(元数据)metadata,逐个发射实际batch的tuple

public class MyEmitter implements ITransactionalSpout.Emitter<MyMata> {

	Map<Long, String> dbMap = null;

	public MyEmitter(Map<Long, String> dbMap) {
		this.dbMap = dbMap;
	}

	//逐个发射实际batch的tuple
	public void emitBatch(TransactionAttempt tx, MyMata coordinatorMeta, BatchOutputCollector collector) {

		long beginPoint = coordinatorMeta.getBeginPoint();// 从上一个批次获得开始位置

		int num = coordinatorMeta.getNum();// 从批次中获取批次数量

		for (long i = beginPoint; i < num + beginPoint; i++) {
			if (dbMap.get(i) == null) {
				continue;
			}
			collector.emit(new Values(tx, dbMap.get(i)));
		}

	}

	public void cleanupBefore(BigInteger txid) {

	}

	public void close() {

	}

}


3、事务Bolt,会从Emitter接收数据处理,处理完成,提交给finishBatch方法处理。

/**
 * 事务Bolt
 */
public class MyTransactionBolt extends BaseTransactionalBolt {

	private static final long serialVersionUID = 1L;

	public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) {
		this.collector = collector;
		System.err.println("MyTransactionBolt prepare " + id.getTransactionId() + "  attemptid" + id.getAttemptId());
	}

	Integer count = 0;
	BatchOutputCollector collector;
	TransactionAttempt tx;

	// 会从Emitter接收数据处理,处理完成,提交给finishBatch方法处理。
	public void execute(Tuple tuple) {
		tx = (TransactionAttempt) tuple.getValue(0);
		System.err.println(
				"MyTransactionBolt TransactionAttempt " + tx.getTransactionId() + "  attemptid" + tx.getAttemptId());
		String log = tuple.getString(1);
		if (log != null && log.length() > 0) {
			count++;
		}
	}

	// 批处理提交
	public void finishBatch() {
		System.err.println("finishBatch " + count);
		collector.emit(new Values(tx, count));
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		
		declarer.declare(new Fields("tx", "count"));

	}

}



4、Icommitter,batch之间强制按照顺序进行提交

public class MyCommitter extends BaseTransactionalBolt implements ICommitter {

	/**
	 * 接口Icommitter:标识IBatchBolt 或BaseTransactionalBolt是否是一个committer
	 */
	private static final long serialVersionUID = 1L;
	public static final String GLOBAL_KEY = "GLOBAL_KEY";
	public static Map<String, DbValue> dbMap = new HashMap<String, DbValue>();
	int sum = 0;
	TransactionAttempt id;
	BatchOutputCollector collector;

	public void execute(Tuple tuple) {
		sum += tuple.getInteger(1);
	}

	public void finishBatch() {

		DbValue value = dbMap.get(GLOBAL_KEY);
		DbValue newValue;
		if (value == null || !value.txid.equals(id.getTransactionId())) {
			// 更新数据库
			newValue = new DbValue();
			newValue.txid = id.getTransactionId();
			if (value == null) {
				newValue.count = sum;
			} else {
				newValue.count = value.count + sum;
			}
			dbMap.put(GLOBAL_KEY, newValue);
		} else {
			newValue = value;
		}
		System.err.println("total==========================:" + dbMap.get(GLOBAL_KEY).count);
		// collector.emit(tuple)
	}

	public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) {
		this.id = id;
		this.collector = collector;
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {

	}

	public static class DbValue {
		BigInteger txid;
		int count = 0;
	}

}


5、topo类

public class MyTopo {

	/**
	 * 事务topo
	 */
	public static void main(String[] args) {

		TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("ttbId", "spoutid", new MyTxSpout(), 1);
		builder.setBolt("bolt1", new MyTransactionBolt(), 3).shuffleGrouping("spoutid");
		builder.setBolt("committer", new MyCommitter(), 1).shuffleGrouping("bolt1");

		Config conf = new Config();
		conf.setDebug(true);

		if (args.length > 0) {
			try {
				StormSubmitter.submitTopology(args[0], conf, builder.buildTopology());
			} catch (AlreadyAliveException e) {
				e.printStackTrace();
			} catch (InvalidTopologyException e) {
				e.printStackTrace();
			} catch (AuthorizationException e) {
				e.printStackTrace();
			}
		} else {
			LocalCluster localCluster = new LocalCluster();
			localCluster.submitTopology("mytopology", conf, builder.buildTopology());
		}

	}

}



6、测试结果
引用

启动一个事务:0----10
MyTransactionBolt prepare 1  attemptid3005464965348344518
MyTransactionBolt prepare 1  attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1  attemptid3005464965348344518
MyTransactionBolt prepare 1  attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1  attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1  attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1  attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1  attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1  attemptid3005464965348344518
finishBatch 3
MyTransactionBolt TransactionAttempt 1  attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1  attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1  attemptid3005464965348344518
MyTransactionBolt TransactionAttempt 1  attemptid3005464965348344518
finishBatch 4
finishBatch 3
total==========================:10
启动一个事务:10----10
MyTransactionBolt prepare 2  attemptid4420908201582652570
MyTransactionBolt prepare 2  attemptid4420908201582652570
MyTransactionBolt prepare 2  attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2  attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2  attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2  attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2  attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2  attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2  attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2  attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2  attemptid4420908201582652570
MyTransactionBolt TransactionAttempt 2  attemptid4420908201582652570
finishBatch 3
MyTransactionBolt TransactionAttempt 2  attemptid4420908201582652570
finishBatch 4
finishBatch 3
total==========================:20



















分享到:
评论

相关推荐

    spring_事务管理(实例代码)

    在Spring中,我们可以配置事务的传播行为,比如REQUIRED(默认,如果当前存在事务,则加入当前事务,否则新建一个事务)、PROPAGATION_SUPPORTS(如果当前存在事务,则加入,否则不开启事务)、PROPAGATION_REQUIRES...

    S7300与wincc由普通网卡通讯实例.zip西门子PLC编程实例程序源码下载

    S7300与wincc由普通网卡通讯实例.zip西门子PLC编程实例程序源码下载S7300与wincc由普通网卡通讯实例.zip西门子PLC编程实例程序源码下载S7300与wincc由普通网卡通讯实例.zip西门子PLC编程实例程序源码下载S7300与...

    LCN5.0.2同一个微服务模块多实例,TM会发生事务通知TC错乱的问题

    在5.0.2版本中,LCN遇到了一个特定的问题,即当同一个微服务模块运行多个实例时,事务管理器(TM)在通知事务协调器(TC)时可能出现混乱的情况。 首先,我们要理解LCN的基本工作原理。LCN通过在应用程序中嵌入事务...

    tx-lcn5.0.2.解决微服务模块多实例事务通知TC错乱的问题--源码修改

    在TX-LCN框架下,TM(Transaction Manager)负责发起全局事务,而TC(Transaction Coordinator)则协调各个服务实例的局部事务,确保全局事务的一致性。然而,在5.0.2版本中,当一个微服务模块有多个实例时,可能...

    spring hibernate mysql 事务实例

    本实例将深入探讨如何在这样的环境中实现事务管理。 Spring框架以其强大的依赖注入和AOP(面向切面编程)功能,为事务管理提供了便捷的方式。Spring支持编程式和声明式事务管理。编程式事务管理允许我们在代码中...

    西门子PLC工程实例源码第184期:S7300与wincc由普通网卡通讯实例.rar

    资源名:西门子PLC工程实例源码第184期:S7300与wincc由普通网卡通讯实例.rar 资源类型:西门子PLC工程实例源码 源码说明: 全部项目源码都是经过测试校正后百分百成功运行的,如果您下载后不能运行可联系我进行指导...

    stm32 普通定时器应用实例

    本文将深入探讨如何利用STM32的普通定时器实现每1秒钟使LED闪烁一次的应用实例。 首先,我们要了解STM32中的普通定时器(TIM)。STM32提供了多种类型的定时器,包括高级定时器、通用定时器和基本定时器。普通定时器...

    203-普通定时器时钟(51单片机C语言实例Proteus仿真和代码)

    203-普通定时器时钟(51单片机C语言实例Proteus仿真和代码)203-普通定时器时钟(51单片机C语言实例Proteus仿真和代码)203-普通定时器时钟(51单片机C语言实例Proteus仿真和代码)203-普通定时器时钟(51单片机C语言实例...

    257-普通定时器时钟(51单片机C语言实例Proteus仿真和代码)

    257-普通定时器时钟(51单片机C语言实例Proteus仿真和代码)257-普通定时器时钟(51单片机C语言实例Proteus仿真和代码)257-普通定时器时钟(51单片机C语言实例Proteus仿真和代码)257-普通定时器时钟(51单片机C语言实例...

    西门子PLC例程-S7300与wincc由普通网卡通讯实例.zip

    西门子PLC例程-S7300与wincc由普通网卡通讯实例

    141-定时做普通时钟可调(51单片机C语言实例Proteus仿真和代码)

    141-定时做普通时钟可调(51单片机C语言实例Proteus仿真和代码)141-定时做普通时钟可调(51单片机C语言实例Proteus仿真和代码)141-定时做普通时钟可调(51单片机C语言实例Proteus仿真和代码)141-定时做普通时钟可调(51...

    j2ee实例 j2ee实例j2ee实例

    J2EE(Java 2 Platform, Enterprise Edition)是Oracle公司推出的用于构建企业级分布式应用程序的框架,它提供了服务器端的编程模型和运行环境,支持多种服务,如事务处理、安全、集群、数据库连接等。本实例集合将...

    分布式事务实践 解决数据一致性

    介绍了事务的四大原则,并通过实例介绍数据库实现事务的方法,以及使用JDBC实现事务的方法。 2-1 事务原则与实现:事务 2-2 事务原则与实现:SQL事务 2-3 事务原则与实现:JDBC事务(上) 2-4 事务原则与实现:JDBC...

    Python sqlite3事务处理方法实例分析

    本文实例讲述了Python sqlite3事务处理方法。分享给大家供大家参考,具体如下: sqlite3事务总结: 在connect()中不传入 isolation_level 事务处理: 使用connection.commit() #!/usr/bin/env python # -*- coding:...

    TX_LCN5.0.2解决微服务模块多实例TM会发生事务通知TC错乱的问题

    其次,优化事务的提交和回滚机制,比如引入事务序列号,确保每个TM实例的事务请求按照预设的顺序进行,避免TC接收无序的事务操作。再者,可以对TM实例进行适当的隔离,例如设置事务处理的独占锁或者使用分布式锁来...

    Visual C# .NET精彩编程实例集锦

    实例1 如何使用错误提醒控件 实例2 如何使用信息提示控件 实例3 如何使用菜单控件 实例4 如何使用工具栏控件 实例5 如何使用状态栏控件 实例6 如何使用托盘控件 实例7 如何使用标签页控件 实例8 如何使用进度条控件 ...

    PHP+Ajax网站开发典型实例-源代码

    实例45 客户端解析普通字符串 实例46 客户端解析XML文件响应 实例47 客户端发送POST无参数请求 实例48 客户端发送带有参数请求 实例49 客户端以表格形式显示数据 实例50 服务器端自动生成XML文件 实例51 客户端以...

    flex组件之普通组件实例源码

    flex组件之格式化组件实例源码,包括37个实例,一些常用的普通组件

    ASP网站实例开发源码——新疆赛德律师事务所.zip

    【ASP网站实例开发源码——新疆赛德律师事务所.zip】是一个包含了使用ASP(Active Server Pages)技术开发的网站实例源代码。ASP是微软推出的一种服务器端脚本环境,主要用于创建动态交互式网页。这个实例是为新疆...

Global site tag (gtag.js) - Google Analytics