`

分区事务IPartitionedTransactionalSpout实例

 
阅读更多
1.分区事务spout

public class MyPtTxSpout implements IPartitionedTransactionalSpout<MyMata>{

	/**
	 * 分区事务spout
	 */
	private static final long serialVersionUID = 1L;
	public static int BATCH_NUM = 10 ;
	public Map<Integer, Map<Long,String>> PT_DATA_MP = new HashMap<Integer, Map<Long,String>>();
	public MyPtTxSpout()
	{
		Random random = new Random();
		String[] hosts = { "www.taobao.com" };
		String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7",
				"CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" };
		String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53", 
				"2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" };
		//建立五个分区数据
		for (int j = 0; j < 5; j++) {
			HashMap<Long, String> dbMap = new HashMap<Long, String> ();
			for (long i = 0; i < 100; i++) {
				dbMap.put(i,hosts[0]+"\t"+session_id[random.nextInt(5)]+"\t"+time[random.nextInt(8)]);
			}
			PT_DATA_MP.put(j, dbMap);
		}
		
	}
	
	
	public org.apache.storm.transactional.partitioned.IPartitionedTransactionalSpout.Coordinator getCoordinator(
			Map conf, TopologyContext context) {
		return new MyCoordinator();
	}

	
	public org.apache.storm.transactional.partitioned.IPartitionedTransactionalSpout.Emitter<MyMata> getEmitter(
			Map conf, TopologyContext context) {
		return new MyEmitter();
	}
	
	
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		//发射数据格式
		declarer.declare(new Fields("tx","log"));
	}

	
	public Map<String, Object> getComponentConfiguration() {
		return null;
	}

	public class MyCoordinator implements IPartitionedTransactionalSpout.Coordinator
	{
		
		public void close() {
			// TODO Auto-generated method stub
			
		}
		
		public boolean isReady() {
			// TODO Auto-generated method stub
			Utils.sleep(1000);
			return true;
		}
		
		//声明5个分区
		public int numPartitions() {
			// TODO Auto-generated method stub
			return 5;
		}
		
	}
	public class MyEmitter implements IPartitionedTransactionalSpout.Emitter<MyMata>
	{

		
		public void close() {

		}
		
		
		public void emitPartitionBatch(TransactionAttempt tx,
				BatchOutputCollector collector, int partition,
				MyMata partitionMeta) {
			// 与普通事务不同的是,将MyCoordinator方法的发射数据方在emitPartitionBatch方法中执行
			System.err.println("emitPartitionBatch partition:"+partition);
			long beginPoint = partitionMeta.getBeginPoint() ;
			int num = partitionMeta.getNum() ;
			
			Map<Long, String> batchMap = PT_DATA_MP.get(partition);
			for (long i = beginPoint; i < num+beginPoint; i++) {
				if (batchMap.get(i)==null) {
					break;
				}
				collector.emit(new Values(tx,batchMap.get(i)));
			}
		}

		
		public MyMata emitPartitionBatchNew(TransactionAttempt tx,
				BatchOutputCollector collector, int partition,
				MyMata lastPartitionMeta) {
			// 获取批次的开始节点
			long beginPoint = 0;
			if (lastPartitionMeta == null) {
				beginPoint = 0 ;
			}else {
				beginPoint = lastPartitionMeta.getBeginPoint() + lastPartitionMeta.getNum() ;
			}
			
			MyMata mata = new MyMata() ;
			mata.setBeginPoint(beginPoint);
			mata.setNum(BATCH_NUM);
			
			//调用上面emitPartitionBatch方法接着发射数据处理
			emitPartitionBatch(tx,collector,partition,mata);
			
			System.err.println("启动一个事务:"+mata.toString());
			return mata;
		}
		
	}
	
}


----------------------------分区事务与普通事务bolt处理类相同,不作过多改变--------------------------

2、统计当天数据

/**
 * 分区事务bolt与普通事务bolt相同,不作更多改变
 */
public class MyDailyBatchBolt implements IBatchBolt<TransactionAttempt> {

	
	private static final long serialVersionUID = 1L;
	
	//统计当天数据
	Map<String, Integer> countMap = new HashMap<String, Integer>();
	BatchOutputCollector collector ;
	Integer count = null;
	String today = null;
	TransactionAttempt tx = null;
	
	
	public void execute(Tuple tuple) {
		String log = tuple.getString(1);
		tx = (TransactionAttempt)tuple.getValue(0);
		if (log != null && log.split("\\t").length >=3 ) {
			today = DateFmt.getCountDate(log.split("\\t")[2], DateFmt.date_short) ;
			count = countMap.get(today);
			if(count == null)
			{
				count = 0;
			}
			count ++ ;
			
			countMap.put(today, count);
		}
	}

	
	public void finishBatch() {
		System.err.println(tx+"--"+today+"--"+count);
		collector.emit(new Values(tx,today,count));
	}

	
	public void prepare(Map conf, TopologyContext context,
			BatchOutputCollector collector, TransactionAttempt id) {
		this.collector = collector;
		
	}

	
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("tx","date","count"));
	}

	
	public Map<String, Object> getComponentConfiguration() {
		return null;
	}

}


3、汇总批数据,更新数据库
/**
 * 分区事务CommitterBolt与普通事务CommitterBolt相同,不作更多改变
 */
public class MyDailyCommitterBolt extends BaseTransactionalBolt implements ICommitter{

	
	private static final long serialVersionUID = 1L;
	
	//汇总批数据,更新数据库
	public static final String GLOBAL_KEY = "GLOBAL_KEY";
	public static Map<String, DbValue> dbMap = new HashMap<String, DbValue>() ;
	
	Map<String, Integer> countMap = new HashMap<String, Integer>();
	TransactionAttempt id ;
	BatchOutputCollector collector;
	String today = null;
	
	
	public void execute(Tuple tuple) {
		today = tuple.getString(1) ;
		Integer count = tuple.getInteger(2);
		id = (TransactionAttempt)tuple.getValue(0);
		
		if (today !=null && count != null) {
			Integer batchCount = countMap.get(today) ;
			if (batchCount == null) {
				batchCount = 0;
			}
			batchCount += count ;
			countMap.put(today, batchCount);
		}
	}

	
	public void finishBatch() {
		if (countMap.size() > 0) {
			DbValue value = dbMap.get(GLOBAL_KEY);
			DbValue newValue ;
			if (value == null || !value.txid.equals(id.getTransactionId())) {
				//更新数据库
				newValue = new DbValue();
				newValue.txid = id.getTransactionId() ;
				newValue.dateStr = today;
				if (value == null) {
					newValue.count = countMap.get(today) ;
				}else {
					newValue.count = value.count + countMap.get("2014-01-07") ;
				}
				dbMap.put(GLOBAL_KEY, newValue);
			}else
			{
				newValue = value;
			}
		}
		
		System.out.println("total==========================:"+dbMap.get(GLOBAL_KEY).count);
//		collector.emit(tuple)
	}

	
	public void prepare(Map conf, TopologyContext context,
			BatchOutputCollector collector, TransactionAttempt id) {
		// TODO Auto-generated method stub
		this.id = id ;
		this.collector = collector;
	}

	
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// TODO Auto-generated method stub

	}
	public static class DbValue
	{
		BigInteger txid;
		int count = 0;
		String dateStr;
	}

}



4、topo类

public class MyDailyTopo {

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		// TODO Auto-generated method stub

		TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("ttbId","spoutid",new MyPtTxSpout(),1);
		builder.setBolt("bolt1", new MyDailyBatchBolt(),3).shuffleGrouping("spoutid");
		builder.setBolt("committer", new MyDailyCommitterBolt(),1).shuffleGrouping("bolt1") ;
		
		Config conf = new Config() ;
		conf.setDebug(false);

		if (args.length > 0) {
			try {
				StormSubmitter.submitTopology(args[0], conf, builder.buildTopology());
			} catch (AlreadyAliveException e) {
				e.printStackTrace();
			} catch (InvalidTopologyException e) {
				e.printStackTrace();
			} catch (AuthorizationException e) {
				e.printStackTrace();
			}
		}else {
			LocalCluster localCluster = new LocalCluster();
			localCluster.submitTopology("mytopology", conf, builder.buildTopology());
		}
		
		
		
		
	}

}




6、测试结果
通过测试结果,得出结论:系统启动五个线程分别读取五个分区的前10条数据,然后用三个线程汇总每一批次数据,最后用一个线程全部汇总数据。
引用

emitPartitionBatch partition:0
启动一个事务:0----10
emitPartitionBatch partition:1
启动一个事务:0----10
emitPartitionBatch partition:2
启动一个事务:0----10
emitPartitionBatch partition:3
启动一个事务:0----10
emitPartitionBatch partition:4
启动一个事务:0----10
1:716574575327675658--2014-01-07--13
1:716574575327675658--2014-01-07--19
1:716574575327675658--2014-01-07--18
total==========================:50
emitPartitionBatch partition:0
启动一个事务:10----10
emitPartitionBatch partition:1
启动一个事务:10----10
emitPartitionBatch partition:2
启动一个事务:10----10
emitPartitionBatch partition:3
启动一个事务:10----10
emitPartitionBatch partition:4
启动一个事务:10----10
2:2288688263701331016--2014-01-07--18
2:2288688263701331016--2014-01-07--13
2:2288688263701331016--2014-01-07--19
total==========================:100

















分享到:
评论

相关推荐

    sqlserver分区表制作实例.doc

    "SQL Server 分区表制作实例" SQL Server 分区表是一种存储大量数据的技术,它可以将大型表拆分成多个小的、管理起来更加方便的分区,每个分区可以独立存储和维护。今天,我们将通过一个实例来演示如何创建分区表...

    sql server分区表实例

    总的来说,这些SQL脚本提供了关于如何设计、创建和优化SQL Server分区表的实例,包括基于范围的分区、文件组的使用、分区联接优化以及滑动窗口策略等,这些都是处理大数据和提高查询性能的关键技术。通过学习和实践...

    在MSCS环境下实现DB2分区服务器集群实例

    ### 在MSCS环境下实现DB2分区服务器集群实例 #### 一、引言 本文将详细介绍如何在Microsoft Cluster Service (MSCS)环境下实现IBM DB2分区服务器集群实例的配置过程。此配置适用于Windows 2000平台,并采用了DB2 ...

    oracle表分区实例

    以下是Oracle表分区的一些关键概念和实例: 1. **范围分区**(Range Partitioning): 范围分区是根据列值的范围进行划分,例如日期范围。在示例中,销售记录按照季度进行分区,每个季度的数据存储在不同的分区中...

    oracle数据库表分区实例

    ### Oracle数据库表分区实例 #### 一、Oracle表分区简介 在Oracle数据库中,表分区是一种将大型表物理地划分为多个较小部分的技术。通过合理地利用分区技术,可以显著提高查询性能,简化数据管理任务,并加快数据...

    PostgreSQL分区表(partitioning)应用实例详解

    项目中有需求要垂直分表,即按照时间区间将数据拆分到n个表中,PostgreSQL提供了分区表的功能。分区表实际上是把逻辑上的一个大表分割成物理上的几小块,提供了很多好处,比如: 1、查询性能大幅提升 2、删除历史...

    sql2005分区表实例

    SQL中,分区表的实例,通过此实例理解分区表,掌握分区表如何创建,如何存储数据等

    unix AIX 环境下 exp 备份 Oracle 分区表实例

    在Unix AIX环境下进行Oracle分区表的备份操作是IT领域中一项重要的技能,尤其是在处理大量数据和维护系统稳定性时。本文将深入解析如何在Unix AIX环境下使用exp工具备份Oracle分区表,包括环境配置、备份策略及恢复...

    MS SQL Server分区表、分区索引详解

    ### MS SQL Server 分区表、分区索引详解 #### 一、分区表简介 使用分区表的主要目的是为了改善大型表及具有多种访问模式的表的可伸缩性和可管理性。这里的“大型表”指的是数据量巨大的表,“访问模式”是指因不同...

    oracle分区表之hash分区表的使用及扩展

    Oracle分区表中的Hash分区是一种基于哈希算法的分区策略,适用于处理无法清晰定义分区范围的大型数据表。这种分区方式通过计算分区键的哈希值来决定数据存储在哪个分区,以此达到数据分散和负载均衡的目的。Hash分区...

    MySQL交换分区的实例详解

    MySQL交换分区的实例详解 前言 在介绍交换分区之前,我们先了解一下 mysql 分区。 数据库的分区有两种:水平分区和垂直分区。而MySQL暂时不支持垂直分区,因此接下来说的都是水平分区。水平分区即:以行为单位对表...

    Oracle分区表培训

    建立分区范例.sql"文件提供了创建分区表的实例。在Oracle中,分区可以基于多种方式,如范围、列表、哈希或复合分区。范围分区通常用于根据连续的数值(如日期)将数据分段;列表分区则适用于预定义的一组值;哈希...

    利用kettle自动创建oracle表分区

    在创建Oracle表分区之前,需要先创建一个分区表实例。创建分区表实例的SQL语句如下: ``` create table DE_TEST( name_tag varchar2(10), day_tag DATE) PARTITION BY RANGE (day_tag) ( PARTITION DE_TEST_...

    ADB读取分区 备份分区 写入分区的小工具

    开机usb调试开启状态下读取分区和对应的分区号 根据安卓版本的不同。个别机型写入分区需要root权限 然后可以备份分区 写入分区和备份全分区等等操作 会一些玩机基础常识的友友下载使用 小白谨慎 资源有复制性。下载...

    sql2005实例创建分区!初学者一看就会

    ### SQL Server 2005 分区管理及应用详解 #### 一、分区的重要性与应用场景 在处理大量数据时,数据库性能往往会受到严重的影响。为了提高查询效率并优化存储空间,SQL Server 2005 引入了分区功能。分区是一种将...

    Oracle 分区索引介绍和实例演示

    全局分区索引适用于OLTP(在线事务处理)系统。 3. 前缀索引与无前缀索引: - 有前缀索引:包含分区键的索引,无论本地还是全局,都能支持分区消除,提高查询效率。 - 无前缀索引:不包含分区键的前导列,仅适用...

Global site tag (gtag.js) - Google Analytics