`

普通事务ITransactionalSpout实例之按天统计数据

 
阅读更多
1、普通事务Spout

/**
 * 普通事务Spout
 */
public class MyTxSpout implements ITransactionalSpout<MyMata>{

	
	private static final long serialVersionUID = 1L;
	
	
	/**
	 * 数据源
	 */
	Map<Long, String> dbMap  = null;
	public MyTxSpout()
	{
		Random random = new Random();
		dbMap = new HashMap<Long, String> ();
		String[] hosts = { "www.taobao.com" };
		String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7",
				"CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" };
		String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53", 
				"2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" };
		
		for (long i = 0; i < 100; i++) {
			dbMap.put(i,hosts[0]+"\t"+session_id[random.nextInt(5)]+"\t"+time[random.nextInt(8)]);
		}
	}
	
	

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("tx","log"));
	}

	public Map<String, Object> getComponentConfiguration() {
		return null;
	}

	public org.apache.storm.transactional.ITransactionalSpout.Coordinator<MyMata> getCoordinator(Map conf,
			TopologyContext context) {
		/**
		 * 发射该metadata(事务tuple)到“batch emit”流
		 */
		return new MyCoordinator();
	}

	public org.apache.storm.transactional.ITransactionalSpout.Emitter<MyMata> getEmitter(Map conf,
			TopologyContext context) {
		/**
		 * 逐个发射实际batch的tuple
		 */
		return new MyEmitter(dbMap);
	}

}


2、事务Spout创建一个新的事务(元数据)metadata
  2、1元数据定义
public class MyMata implements Serializable{

	/**
	 * metadata(元数据)中包含当前事务可以从哪个point进行重放数据,存放在zookeeper中的,spout可以通过Kryo从zookeeper中序列化和反序列化该元数据。 
	 */
	private static final long serialVersionUID = 1L;
	
	private long beginPoint ;//事务开始位置

	private int num ;//batch 的tuple个数
	@Override
	public String toString() {
		return getBeginPoint()+"----"+getNum();
	}
	
	public long getBeginPoint() {
		return beginPoint;
	}

	public void setBeginPoint(long beginPoint) {
		this.beginPoint = beginPoint;
	}

	public int getNum() {
		return num;
	}

	public void setNum(int num) {
		this.num = num;
	}
	
	
}



2、2 获得(元数据)metadata,逐个发射实际batch的tuple

public class MyEmitter implements ITransactionalSpout.Emitter<MyMata> {

	Map<Long, String> dbMap = null;

	public MyEmitter(Map<Long, String> dbMap) {
		this.dbMap = dbMap;
	}

	//逐个发射实际batch的tuple
	public void emitBatch(TransactionAttempt tx, MyMata coordinatorMeta, BatchOutputCollector collector) {

		long beginPoint = coordinatorMeta.getBeginPoint();// 从上一个批次获得开始位置

		int num = coordinatorMeta.getNum();// 从批次中获取批次数量

		for (long i = beginPoint; i < num + beginPoint; i++) {
			if (dbMap.get(i) == null) {
				continue;
			}
			collector.emit(new Values(tx, dbMap.get(i)));
		}

	}

	public void cleanupBefore(BigInteger txid) {

	}

	public void close() {

	}

}


------------------------spout与上篇相同-----------------------------------------

3、按天统计数据事务Bolt,会从Emitter接收数据处理,处理完成,提交给finishBatch方法处理。

public class MyDailyBatchBolt implements IBatchBolt<TransactionAttempt> {

	/**
	 * 按天统计数据
	 */
	private static final long serialVersionUID = 1L;
	Map<String, Integer> countMap = new HashMap<String, Integer>();
	BatchOutputCollector collector ;
	Integer count = null;
	String today = null;
	TransactionAttempt tx = null;
	@Override
	public void execute(Tuple tuple) {
		// TODO Auto-generated method stub
		String log = tuple.getString(1);
		tx = (TransactionAttempt)tuple.getValue(0);
		if (log != null && log.split("\\t").length >=3 ) {
			today = DateFmt.getCountDate(log.split("\\t")[2], DateFmt.date_short) ;
			count = countMap.get(today);
			if(count == null)
			{
				count = 0;
			}
			count ++ ;
			
			countMap.put(today, count);
		}
	}

	@Override
	public void finishBatch() {
		collector.emit(new Values(tx,today,count));
	}

	@Override
	public void prepare(Map conf, TopologyContext context,
			BatchOutputCollector collector, TransactionAttempt id) {
		// TODO Auto-generated method stub
		this.collector = collector;
		
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// TODO Auto-generated method stub
		declarer.declare(new Fields("tx","date","count"));
	}

	@Override
	public Map<String, Object> getComponentConfiguration() {
		// TODO Auto-generated method stub
		return null;
	}

}



4、接收统计数据,累加汇总MyDailyCommitterBolt,batch之间强制按照顺序进行提交

public class MyDailyCommitterBolt extends BaseTransactionalBolt implements ICommitter{

	/**
	 * 接收统计数据,累加汇总
	 */
	private static final long serialVersionUID = 1L;
	public static final String GLOBAL_KEY = "GLOBAL_KEY";
	public static Map<String, DbValue> dbMap = new HashMap<String, DbValue>() ;
	
	Map<String, Integer> countMap = new HashMap<String, Integer>();
	TransactionAttempt id ;
	BatchOutputCollector collector;
	String today = null;
	@Override
	public void execute(Tuple tuple) {
		today = tuple.getString(1) ;
		Integer count = tuple.getInteger(2);
		id = (TransactionAttempt)tuple.getValue(0);
		
		if (today !=null && count != null) {
			Integer batchCount = countMap.get(today) ;
			if (batchCount == null) {
				batchCount = 0;
			}
			batchCount += count ;
			countMap.put(today, batchCount);
		}
	}

	@Override
	public void finishBatch() {
		// TODO Auto-generated method stub
		if (countMap.size() > 0) {
			DbValue value = dbMap.get(GLOBAL_KEY);
			DbValue newValue ;
			if (value == null || !value.txid.equals(id.getTransactionId())) {
				//更新数据库
				newValue = new DbValue();
				newValue.txid = id.getTransactionId() ;
				newValue.dateStr = today;
				if (value == null) {
					newValue.count = countMap.get(today) ;
				}else {
					newValue.count = value.count + countMap.get("2014-01-07") ;
				}
				dbMap.put(GLOBAL_KEY, newValue);
			}else
			{
				newValue = value;
			}
		}
		
		System.out.println("total==========================:"+dbMap.get(GLOBAL_KEY).count);
//		collector.emit(tuple)
	}

	@Override
	public void prepare(Map conf, TopologyContext context,
			BatchOutputCollector collector, TransactionAttempt id) {
		// TODO Auto-generated method stub
		this.id = id ;
		this.collector = collector;
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// TODO Auto-generated method stub

	}
	public static class DbValue
	{
		BigInteger txid;
		int count = 0;
		String dateStr;
	}

}



5、topo类

public class MyDailyTopo {

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		// TODO Auto-generated method stub

		TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("ttbId","spoutid",new MyTxSpout(),1);
		builder.setBolt("bolt1", new MyDailyBatchBolt(),3).shuffleGrouping("spoutid");
		builder.setBolt("committer", new MyDailyCommitterBolt(),1).shuffleGrouping("bolt1") ;
		
		Config conf = new Config() ;
		conf.setDebug(true);

		if (args.length > 0) {
			try {
				StormSubmitter.submitTopology(args[0], conf, builder.buildTopology());
			} catch (AlreadyAliveException e) {
				e.printStackTrace();
			} catch (InvalidTopologyException e) {
				e.printStackTrace();
			}
		}else {
			LocalCluster localCluster = new LocalCluster();
			localCluster.submitTopology("mytopology", conf, builder.buildTopology());
		}
		
		
		
		
	}

}


6、测试结果
引用
启动一个事务:0----10
total==========================:10
启动一个事务:10----10
total==========================:20
启动一个事务:20----10
total==========================:30



















分享到:
评论

相关推荐

    SAS系统和数据分析(含实例数据)

    《SAS系统与数据分析:深度解析与实例应用》 SAS(Statistical Analysis System)是一种强大的统计分析软件,广泛应用于商业智能、数据挖掘、预测分析等领域。本资料集旨在为初学者提供一个全面且深入的SAS学习平台...

    Web Echarts+layui 统计图表demo实例 (统计深圳市旅游景点信息)

    在本项目中,"Web Echarts+layui 统计图表demo实例 (统计深圳市旅游景点信息)" 是一个基于Echarts和layui框架开发的数据可视化应用。Echarts是中国百度公司开源的一个JavaScript图表库,它提供了丰富的图表类型,如...

    jqGrid实例下载(数据交互,统计,时间插件都已实现)

    在本实例中,"jqGrid实例下载(数据交互,统计,时间插件都已实现)"意味着你将获得一个已经包含了数据交互、统计功能以及时间插件的完整jqGrid示例。 1. 数据交互:jqGrid能够轻松地与服务器进行数据交换。你可以...

    yolov8-seg模型源码,实例分割,带数据集,测试可执行demo

    这份资源是一份针对深度学习计算机视觉领域的实例分割源码,使用 Ultralytics YOLOv8-seg 模型和 COCO128-seg 数据集进行目标检测和实例分割任务。提供了一个亲身测试且直接可运行的实例分割解决方案。 数据集我已经...

    数据仓库Foodmart2005 实例数据库

    与事务处理系统不同,数据仓库关注的是历史数据的聚合和分析,而不是实时的事务处理。 Foodmart2005数据集包含零售行业的销售数据,如商品、商店、部门、日期等信息。这些数据经过预处理,以适应多维分析,例如按...

    PHP+Ajax网站开发典型实例-源代码

    实例31 使用文件统计在线人数 实例32 设置和获取Cookie参数 实例33 删除会话中已注册变量 实例34 数据编码 实例35 简单购物车 第5章 PEAR和正则表达式实例 实例36 PEAR管理器安装及PEAR包常见操作 实例37 ...

    十天学会单片机实例100

    《十天学会单片机实例100》一书提供了丰富的单片机实践教程,旨在通过实际操作加深学习者对单片机的理解和掌握。以下是对该书中部分关键实例的知识点详细解读: ### 函数的使用和熟悉 函数是单片机编程中的基本构建...

    王斌会的《多元统计分析及R语言建模》一书数据

    这些数据文件很可能包含了各种实例数据集,用于配合书中的案例分析,使读者能够动手实践并理解各种统计方法。通过这些数据,你可以练习读取数据、探索数据特性、建立模型、评估模型性能以及生成可视化结果等技能。...

    spss数据分析实例

    【SPSS数据分析实例】这篇教程主要介绍了如何使用SPSS进行数据处理和分析的基本步骤,适合初学者入门。SPSS(Statistical Package for the Social Sciences)是一款广泛应用的统计分析软件,以其友好的用户界面和易...

    MFC串口RS232编程简单实例(数据发送与接收)

    本实例主要探讨的是如何使用MFC进行串口RS232编程,包括串口的初始化、参数配置、数据的发送与接收等关键环节。 串口通信,即RS-232通信,是一种广泛应用于设备间短距离通信的技术。它定义了接口信号的电气特性、...

    C++Builder精彩编程实例集锦的源代码前3部分.rar

    实例203 如何在数据库更新过程中增加事务操作 实例204 如何在程序中实现查找字段 实例205 如何在程序中实现自定义字段 实例206 如何使用字段拖放功能 实例207 如何为程序添加报表打印功能 实例208 如何使用向导...

    labview55个经典实例

    13. **数据可视化**:LabVIEW强大的图表和图形功能会在实例中得到体现,如实时波形显示、数据统计图、3D图形等。 14. **并行处理与多线程**:LabVIEW支持并行编程,实例可能涵盖如何利用并行结构提高程序性能。 15...

    asp.net 图表 c#图表 chart 统计图(c#版含实例)

    "asp.net 图表 c#图表 chart 统计图(c#版含实例)"这个主题,主要涵盖了如何在ASP.NET应用中使用C#语言创建和操作图表,特别是统计图。 统计图是一种图形表示,它能够清晰地展示数据的分布、趋势和关系。在ASP.NET...

    PHP开发实战1200例(第1卷).(清华出版.潘凯华.刘中华).part2

    实例068 SESSION购物车中数据的读取 100 实例069 员工信息的管理 102 实例070 网页版九九乘法表 104 实例071 读取数组购物车中的数据 105 实例072 图像验证码的生成 106 2.6 跳转语句 107 实例073 控制页面中表情图...

    C++编程实例100篇

    《C++编程实例100篇》是一本深入浅出的C++编程教程,它以实践为主导,通过丰富的实例帮助学习者掌握C++语言的基础和核心概念。这本书的每个实例都精心设计,旨在帮助初学者和有经验的开发者巩固和提升C++编程技能。 ...

    概率论与数理统计 Matlab数理统计实验 Matlab各类方差分析实例 含数据和源代码.rar

    概率论与数理统计 Matlab数理统计实验 Matlab各类方差分析实例 含数据和源代码.rar

    java实现csv导出千万级数据实例

    在IT行业中,处理大量数据是常见的挑战之一,尤其是在数据导出方面。本实例聚焦于“java实现csv导出千万级数据实例”,旨在提供一个高效、稳定的解决方案,避免因数据量过大而导致的性能问题,如Java中的栈溢出...

    《R语言经典实例》

    综上所述,《R语言经典实例》旨在通过实例教学的方式,帮助读者掌握R语言的基础操作、数据处理和可视化技巧、统计分析方法以及机器学习的初级应用,从而使得读者能够有效地利用R语言进行数据分析。这本书是R语言学习...

Global site tag (gtag.js) - Google Analytics