`

不透明分区事务IOpaquePartitionedTransactional实例

 
阅读更多
1、spout

public class MyOpaquePtTxSpout implements IOpaquePartitionedTransactionalSpout<MyMata> {

	/**
	 * 将Coordinator发射数据动作下放到emitPartitionBatch类执行。
	 */
	private static final long serialVersionUID = 1L;
	public static int BATCH_NUM = 10;
	public Map<Integer, Map<Long, String>> PT_DATA_MP = new HashMap<Integer, Map<Long, String>>();

	public MyOpaquePtTxSpout() {
		Random random = new Random();
		String[] hosts = { "www.taobao.com" };
		String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34",
				"BBYH61456FGHHJ7JL89RG5VV9UYU7", "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" };
		String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53",
				"2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" };

		for (int j = 0; j < 5; j++) {
			HashMap<Long, String> dbMap = new HashMap<Long, String>();
			for (long i = 0; i < 100; i++) {
				dbMap.put(i, hosts[0] + "\t" + session_id[random.nextInt(5)] + "\t" + time[random.nextInt(8)]);
			}
			PT_DATA_MP.put(j, dbMap);
		}
	}

	public org.apache.storm.transactional.partitioned.IOpaquePartitionedTransactionalSpout.Coordinator getCoordinator(
			Map conf, TopologyContext context) {
		return new MyCoordinator();
	}

	public org.apache.storm.transactional.partitioned.IOpaquePartitionedTransactionalSpout.Emitter<MyMata> getEmitter(
			Map conf, TopologyContext context) {
		return new MyEmitter();
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("tx", "log"));
	}

	public Map<String, Object> getComponentConfiguration() {
		return null;
	}

	public class MyCoordinator implements IOpaquePartitionedTransactionalSpout.Coordinator {

		public void close() {

		}

		public boolean isReady() {
			Utils.sleep(1000);
			return true;
		}

	}

	public class MyEmitter implements IOpaquePartitionedTransactionalSpout.Emitter<MyMata> {

		public void close() {

		}

		public MyMata emitPartitionBatch(TransactionAttempt tx, BatchOutputCollector collector, int partition,
				MyMata lastPartitionMeta) {
			System.err.println("emitPartitionBatch partition:" + partition);
			long beginPoint = 0;
			if (lastPartitionMeta == null) {
				beginPoint = 0;
			} else {
				beginPoint = lastPartitionMeta.getBeginPoint() + lastPartitionMeta.getNum();
			}

			MyMata mata = new MyMata();
			mata.setBeginPoint(beginPoint);
			mata.setNum(BATCH_NUM);
			System.err.println("启动一个事务:" + mata.toString());
			//获取某一个分区数据
			Map<Long, String> batchMap = PT_DATA_MP.get(partition);
			for (Long i = mata.getBeginPoint(); i < mata.getBeginPoint() + mata.getNum(); i++) {
				if (batchMap.size() <= i) {
					break;
				}
				collector.emit(new Values(tx, batchMap.get(i)));
			}
			//返回一个mata给当前方法,当作下次循环的参数
			return mata;
		}

		public int numPartitions() {
			return 5;
		}

	}
}



2、处理类不作改变

public class MyDailyBatchBolt implements IBatchBolt<TransactionAttempt> {

	/**
	 * 不作改变
	 */
	private static final long serialVersionUID = 1L;
	Map<String, Integer> countMap = new HashMap<String, Integer>();
	BatchOutputCollector collector ;
	Integer count = null;
	String today = null;
	TransactionAttempt tx = null;
	
	public void execute(Tuple tuple) {
		// TODO Auto-generated method stub
		String log = tuple.getString(1);
		tx = (TransactionAttempt)tuple.getValue(0);
		if (log != null && log.split("\\t").length >=3 ) {
			today = DateFmt.getCountDate(log.split("\\t")[2], DateFmt.date_short) ;
			count = countMap.get(today);
			if(count == null)
			{
				count = 0;
			}
			count ++ ;
			
			countMap.put(today, count);
		}
	}

	
	public void finishBatch() {
		System.err.println(tx+"--"+today+"--"+count);
		collector.emit(new Values(tx,today,count));
	}

	
	public void prepare(Map conf, TopologyContext context,
			BatchOutputCollector collector, TransactionAttempt id) {
		this.collector = collector;
		
	}

	
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("tx","date","count"));
	}

	
	public Map<String, Object> getComponentConfiguration() {
		return null;
	}

}




3、汇总及批量提交
public class MyDailyCommitterBolt extends BaseTransactionalBolt implements ICommitter {

	/**
	 * finishBatch增加上个事务批次处理后的结果
	 */
	private static final long serialVersionUID = 1L;
	public static Map<String, DbValue> dbMap = new HashMap<String, DbValue>();

	Map<String, Integer> countMap = new HashMap<String, Integer>();
	TransactionAttempt id;
	BatchOutputCollector collector;
	String today = null;

	public void execute(Tuple tuple) {
		today = tuple.getString(1);
		Integer count = tuple.getInteger(2);
		id = (TransactionAttempt) tuple.getValue(0);

		if (today != null && count != null) {
			Integer batchCount = countMap.get(today);
			if (batchCount == null) {
				batchCount = 0;
			}
			batchCount += count;
			countMap.put(today, batchCount);
		}
	}

	public void finishBatch() {
		// TODO Auto-generated method stub
		if (countMap.size() > 0) {
			DbValue value = dbMap.get(today);
			DbValue newValue;
			if (value == null || !value.txid.equals(id.getTransactionId())) {
				// 更新数据库
				newValue = new DbValue();
				newValue.txid = id.getTransactionId();
				newValue.dateStr = today;
				if (value == null) {
					newValue.count = countMap.get(today);
					newValue.pre_count = 0;
				} else {
					newValue.pre_count = value.count;// 上个事务批次处理后的结果
					newValue.count = value.count + countMap.get("2014-01-07");
				}
				dbMap.put(today, newValue);
			} else {
				newValue = value;
			}
			System.out.println("total==========================:" + dbMap.get(today).count);
		}
	}

	public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) {
		this.id = id;
		this.collector = collector;
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {

	}

	// 模拟一个数据表
	public static class  DbValue {
		String dateStr; // 按日期汇总
		int count = 0; // 汇总数
		BigInteger txid; // 事务ID
		int pre_count; // 上个事务批次处理后的结果
	}

}


4、topo类


public class MyDailyTopo {

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		// TODO Auto-generated method stub

		TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("ttbId","spoutid",new MyOpaquePtTxSpout(),1);
		builder.setBolt("bolt1", new MyDailyBatchBolt(),3).shuffleGrouping("spoutid");
		builder.setBolt("committer", new MyDailyCommitterBolt(),1).shuffleGrouping("bolt1") ;
		
		Config conf = new Config() ;
		conf.setDebug(false);

		if (args.length > 0) {
			try {
				StormSubmitter.submitTopology(args[0], conf, builder.buildTopology());
			} catch (AlreadyAliveException e) {
				e.printStackTrace();
			} catch (InvalidTopologyException e) {
				e.printStackTrace();
			} catch (AuthorizationException e) {
				e.printStackTrace();
			}
		}else {
			LocalCluster localCluster = new LocalCluster();
			localCluster.submitTopology("mytopology", conf, builder.buildTopology());
		}
		
		
		
		
	}

}




6、测试结果

引用

emitPartitionBatch partition:0
启动一个事务:0----10
emitPartitionBatch partition:1
启动一个事务:0----10
emitPartitionBatch partition:2
启动一个事务:0----10
emitPartitionBatch partition:3
启动一个事务:0----10
emitPartitionBatch partition:4
启动一个事务:0----10
1:-1542691397832167833--2014-01-07--16
1:-1542691397832167833--2014-01-07--22
1:-1542691397832167833--2014-01-07--12
total==========================:50
null--null--null
null--null--null
null--null--null
emitPartitionBatch partition:0
启动一个事务:10----10
emitPartitionBatch partition:1
启动一个事务:10----10
emitPartitionBatch partition:2
启动一个事务:10----10
emitPartitionBatch partition:3
启动一个事务:10----10
emitPartitionBatch partition:4
启动一个事务:10----10
2:9191261929288144194--2014-01-07--22
2:9191261929288144194--2014-01-07--16
2:9191261929288144194--2014-01-07--12
total==========================:100




















分享到:
评论

相关推荐

    sqlserver分区表制作实例.doc

    "SQL Server 分区表制作实例" SQL Server 分区表是一种存储大量数据的技术,它可以将大型表拆分成多个小的、管理起来更加方便的分区,每个分区可以独立存储和维护。今天,我们将通过一个实例来演示如何创建分区表...

    sql server分区表实例

    总的来说,这些SQL脚本提供了关于如何设计、创建和优化SQL Server分区表的实例,包括基于范围的分区、文件组的使用、分区联接优化以及滑动窗口策略等,这些都是处理大数据和提高查询性能的关键技术。通过学习和实践...

    在MSCS环境下实现DB2分区服务器集群实例

    ### 在MSCS环境下实现DB2分区服务器集群实例 #### 一、引言 本文将详细介绍如何在Microsoft Cluster Service (MSCS)环境下实现IBM DB2分区服务器集群实例的配置过程。此配置适用于Windows 2000平台,并采用了DB2 ...

    oracle表分区实例

    以下是Oracle表分区的一些关键概念和实例: 1. **范围分区**(Range Partitioning): 范围分区是根据列值的范围进行划分,例如日期范围。在示例中,销售记录按照季度进行分区,每个季度的数据存储在不同的分区中...

    oracle数据库表分区实例

    ### Oracle数据库表分区实例 #### 一、Oracle表分区简介 在Oracle数据库中,表分区是一种将大型表物理地划分为多个较小部分的技术。通过合理地利用分区技术,可以显著提高查询性能,简化数据管理任务,并加快数据...

    PostgreSQL分区表(partitioning)应用实例详解

    项目中有需求要垂直分表,即按照时间区间将数据拆分到n个表中,PostgreSQL提供了分区表的功能。分区表实际上是把逻辑上的一个大表分割成物理上的几小块,提供了很多好处,比如: 1、查询性能大幅提升 2、删除历史...

    sql2005分区表实例

    SQL中,分区表的实例,通过此实例理解分区表,掌握分区表如何创建,如何存储数据等

    unix AIX 环境下 exp 备份 Oracle 分区表实例

    在Unix AIX环境下进行Oracle分区表的备份操作是IT领域中一项重要的技能,尤其是在处理大量数据和维护系统稳定性时。本文将深入解析如何在Unix AIX环境下使用exp工具备份Oracle分区表,包括环境配置、备份策略及恢复...

    MS SQL Server分区表、分区索引详解

    ### MS SQL Server 分区表、分区索引详解 #### 一、分区表简介 使用分区表的主要目的是为了改善大型表及具有多种访问模式的表的可伸缩性和可管理性。这里的“大型表”指的是数据量巨大的表,“访问模式”是指因不同...

    transactional_topo_opaque_partition

    "transactional_topo_opaque_partition"这个主题聚焦于Storm中的事务处理和不透明分区特性,这在构建高可用、准确无误的数据处理系统时非常重要。本文将深入探讨这两个概念及其在Storm 0.9.0.1版本中的实现,同时也...

    利用kettle自动创建oracle表分区

    在创建Oracle表分区之前,需要先创建一个分区表实例。创建分区表实例的SQL语句如下: ``` create table DE_TEST( name_tag varchar2(10), day_tag DATE) PARTITION BY RANGE (day_tag) ( PARTITION DE_TEST_...

    MySQL交换分区的实例详解

    MySQL交换分区的实例详解 前言 在介绍交换分区之前,我们先了解一下 mysql 分区。 数据库的分区有两种:水平分区和垂直分区。而MySQL暂时不支持垂直分区,因此接下来说的都是水平分区。水平分区即:以行为单位对表...

    ADB读取分区 备份分区 写入分区的小工具

    开机usb调试开启状态下读取分区和对应的分区号 根据安卓版本的不同。个别机型写入分区需要root权限 然后可以备份分区 写入分区和备份全分区等等操作 会一些玩机基础常识的友友下载使用 小白谨慎 资源有复制性。下载...

    Oracle分区表培训

    建立分区范例.sql"文件提供了创建分区表的实例。在Oracle中,分区可以基于多种方式,如范围、列表、哈希或复合分区。范围分区通常用于根据连续的数值(如日期)将数据分段;列表分区则适用于预定义的一组值;哈希...

    DB2 数据库分区特性(DPF)

    8. 实例配置文件:DB2实例配置文件(如db2nodes.cfg)定义了数据库实例及其分区信息,这些配置被所有数据库共享。配置文件位于DB2实例目录,通过NFS网络文件系统被其他节点共享。 9. 实例、分区和数据库的关系:在...

    sql2005实例创建分区!初学者一看就会

    ### SQL Server 2005 分区管理及应用详解 #### 一、分区的重要性与应用场景 在处理大量数据时,数据库性能往往会受到严重的影响。为了提高查询效率并优化存储空间,SQL Server 2005 引入了分区功能。分区是一种将...

    Linux主分区,扩展分区,逻辑分区的联系和区别

    在Linux系统中,磁盘分区是一项基础且重要的任务,它涉及到如何有效地管理和组织存储空间。本文将详细解析Linux中的主分区、扩展分区和逻辑分区之间的联系和差异。 首先,主分区是硬盘上直接分配的独立区域,每个...

Global site tag (gtag.js) - Google Analytics