- 浏览: 56438 次
- 性别:
- 来自: 北京
文章分类
最新评论
1、spout
2、处理类不作改变
3、汇总及批量提交
4、topo类
6、测试结果
emitPartitionBatch partition:0
启动一个事务:0----10
emitPartitionBatch partition:1
启动一个事务:0----10
emitPartitionBatch partition:2
启动一个事务:0----10
emitPartitionBatch partition:3
启动一个事务:0----10
emitPartitionBatch partition:4
启动一个事务:0----10
1:-1542691397832167833--2014-01-07--16
1:-1542691397832167833--2014-01-07--22
1:-1542691397832167833--2014-01-07--12
total==========================:50
null--null--null
null--null--null
null--null--null
emitPartitionBatch partition:0
启动一个事务:10----10
emitPartitionBatch partition:1
启动一个事务:10----10
emitPartitionBatch partition:2
启动一个事务:10----10
emitPartitionBatch partition:3
启动一个事务:10----10
emitPartitionBatch partition:4
启动一个事务:10----10
2:9191261929288144194--2014-01-07--22
2:9191261929288144194--2014-01-07--16
2:9191261929288144194--2014-01-07--12
total==========================:100
public class MyOpaquePtTxSpout implements IOpaquePartitionedTransactionalSpout<MyMata> { /** * 将Coordinator发射数据动作下放到emitPartitionBatch类执行。 */ private static final long serialVersionUID = 1L; public static int BATCH_NUM = 10; public Map<Integer, Map<Long, String>> PT_DATA_MP = new HashMap<Integer, Map<Long, String>>(); public MyOpaquePtTxSpout() { Random random = new Random(); String[] hosts = { "www.taobao.com" }; String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7", "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" }; String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53", "2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" }; for (int j = 0; j < 5; j++) { HashMap<Long, String> dbMap = new HashMap<Long, String>(); for (long i = 0; i < 100; i++) { dbMap.put(i, hosts[0] + "\t" + session_id[random.nextInt(5)] + "\t" + time[random.nextInt(8)]); } PT_DATA_MP.put(j, dbMap); } } public org.apache.storm.transactional.partitioned.IOpaquePartitionedTransactionalSpout.Coordinator getCoordinator( Map conf, TopologyContext context) { return new MyCoordinator(); } public org.apache.storm.transactional.partitioned.IOpaquePartitionedTransactionalSpout.Emitter<MyMata> getEmitter( Map conf, TopologyContext context) { return new MyEmitter(); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("tx", "log")); } public Map<String, Object> getComponentConfiguration() { return null; } public class MyCoordinator implements IOpaquePartitionedTransactionalSpout.Coordinator { public void close() { } public boolean isReady() { Utils.sleep(1000); return true; } } public class MyEmitter implements IOpaquePartitionedTransactionalSpout.Emitter<MyMata> { public void close() { } public MyMata emitPartitionBatch(TransactionAttempt tx, BatchOutputCollector collector, int partition, MyMata lastPartitionMeta) { System.err.println("emitPartitionBatch partition:" + partition); long beginPoint = 0; if (lastPartitionMeta == null) { beginPoint = 0; } else { beginPoint = lastPartitionMeta.getBeginPoint() + lastPartitionMeta.getNum(); } MyMata mata = new MyMata(); mata.setBeginPoint(beginPoint); mata.setNum(BATCH_NUM); System.err.println("启动一个事务:" + mata.toString()); //获取某一个分区数据 Map<Long, String> batchMap = PT_DATA_MP.get(partition); for (Long i = mata.getBeginPoint(); i < mata.getBeginPoint() + mata.getNum(); i++) { if (batchMap.size() <= i) { break; } collector.emit(new Values(tx, batchMap.get(i))); } //返回一个mata给当前方法,当作下次循环的参数 return mata; } public int numPartitions() { return 5; } } }
2、处理类不作改变
public class MyDailyBatchBolt implements IBatchBolt<TransactionAttempt> { /** * 不作改变 */ private static final long serialVersionUID = 1L; Map<String, Integer> countMap = new HashMap<String, Integer>(); BatchOutputCollector collector ; Integer count = null; String today = null; TransactionAttempt tx = null; public void execute(Tuple tuple) { // TODO Auto-generated method stub String log = tuple.getString(1); tx = (TransactionAttempt)tuple.getValue(0); if (log != null && log.split("\\t").length >=3 ) { today = DateFmt.getCountDate(log.split("\\t")[2], DateFmt.date_short) ; count = countMap.get(today); if(count == null) { count = 0; } count ++ ; countMap.put(today, count); } } public void finishBatch() { System.err.println(tx+"--"+today+"--"+count); collector.emit(new Values(tx,today,count)); } public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) { this.collector = collector; } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("tx","date","count")); } public Map<String, Object> getComponentConfiguration() { return null; } }
3、汇总及批量提交
public class MyDailyCommitterBolt extends BaseTransactionalBolt implements ICommitter { /** * finishBatch增加上个事务批次处理后的结果 */ private static final long serialVersionUID = 1L; public static Map<String, DbValue> dbMap = new HashMap<String, DbValue>(); Map<String, Integer> countMap = new HashMap<String, Integer>(); TransactionAttempt id; BatchOutputCollector collector; String today = null; public void execute(Tuple tuple) { today = tuple.getString(1); Integer count = tuple.getInteger(2); id = (TransactionAttempt) tuple.getValue(0); if (today != null && count != null) { Integer batchCount = countMap.get(today); if (batchCount == null) { batchCount = 0; } batchCount += count; countMap.put(today, batchCount); } } public void finishBatch() { // TODO Auto-generated method stub if (countMap.size() > 0) { DbValue value = dbMap.get(today); DbValue newValue; if (value == null || !value.txid.equals(id.getTransactionId())) { // 更新数据库 newValue = new DbValue(); newValue.txid = id.getTransactionId(); newValue.dateStr = today; if (value == null) { newValue.count = countMap.get(today); newValue.pre_count = 0; } else { newValue.pre_count = value.count;// 上个事务批次处理后的结果 newValue.count = value.count + countMap.get("2014-01-07"); } dbMap.put(today, newValue); } else { newValue = value; } System.out.println("total==========================:" + dbMap.get(today).count); } } public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) { this.id = id; this.collector = collector; } public void declareOutputFields(OutputFieldsDeclarer declarer) { } // 模拟一个数据表 public static class DbValue { String dateStr; // 按日期汇总 int count = 0; // 汇总数 BigInteger txid; // 事务ID int pre_count; // 上个事务批次处理后的结果 } }
4、topo类
public class MyDailyTopo { /** * @param args */ public static void main(String[] args) { // TODO Auto-generated method stub TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("ttbId","spoutid",new MyOpaquePtTxSpout(),1); builder.setBolt("bolt1", new MyDailyBatchBolt(),3).shuffleGrouping("spoutid"); builder.setBolt("committer", new MyDailyCommitterBolt(),1).shuffleGrouping("bolt1") ; Config conf = new Config() ; conf.setDebug(false); if (args.length > 0) { try { StormSubmitter.submitTopology(args[0], conf, builder.buildTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } catch (AuthorizationException e) { e.printStackTrace(); } }else { LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("mytopology", conf, builder.buildTopology()); } } }
6、测试结果
引用
emitPartitionBatch partition:0
启动一个事务:0----10
emitPartitionBatch partition:1
启动一个事务:0----10
emitPartitionBatch partition:2
启动一个事务:0----10
emitPartitionBatch partition:3
启动一个事务:0----10
emitPartitionBatch partition:4
启动一个事务:0----10
1:-1542691397832167833--2014-01-07--16
1:-1542691397832167833--2014-01-07--22
1:-1542691397832167833--2014-01-07--12
total==========================:50
null--null--null
null--null--null
null--null--null
emitPartitionBatch partition:0
启动一个事务:10----10
emitPartitionBatch partition:1
启动一个事务:10----10
emitPartitionBatch partition:2
启动一个事务:10----10
emitPartitionBatch partition:3
启动一个事务:10----10
emitPartitionBatch partition:4
启动一个事务:10----10
2:9191261929288144194--2014-01-07--22
2:9191261929288144194--2014-01-07--16
2:9191261929288144194--2014-01-07--12
total==========================:100
发表评论
-
ITridentSpout、FirstN(取Top N)实现、 流合并和join
2017-05-25 10:01 1029一、ITridentSpout 基于事务 static int ... -
Trident实战之计算网站PV
2017-05-24 13:24 6461、Trident实战之计算网站PV /** * ... -
Trident API和概念
2017-05-23 10:57 747一、Trident API——Spout ITride ... -
Trident入门
2017-05-22 13:44 512英文原址:https://github.com/nathanm ... -
分布式远程调用drpc实例
2017-05-22 10:53 415一、DRPC定义 分布式dRPC(distributed RP ... -
分区事务IPartitionedTransactionalSpout实例
2017-05-21 11:02 5791.分区事务spout public class My ... -
普通事务ITransactionalSpout实例之按天统计数据
2017-05-20 16:56 4841、普通事务Spout /** * 普通事务Spou ... -
普通事务ITransactionalSpout实例
2017-05-20 15:45 8191、普通事务Spout /** * 普通事务Spou ... -
Storm事务API
2017-05-19 16:00 611Spout ITransactionalSpout<T& ... -
Storm批处理事务原理详解
2017-05-19 15:54 2095事务:Storm容错机制通 ... -
集群统一启动和停止shell脚本开发
2017-05-17 09:56 4461、cd 2、ls -al 显示隐藏目录 3、rm -rf ... -
storm高并发UV统计
2017-05-14 22:05 1127统计高并发UV可行的方案(类似WordCount的计算去重wo ... -
storm高并发PV统计,利用zookeeper锁输出汇总值
2017-05-14 14:42 892汇总型方案: 1、shuffleGrouping下,pv(单线 ... -
storm高并发PV统计
2017-04-16 17:54 682一、PV统计思考 方案需要考虑分析多线程下,注意线程安全问题。 ... -
Storm高并发运用WordSum
2017-04-16 14:21 10671、创建发射所有字符串统计总个数及去重个数处理类 pub ... -
storm分组策略介绍
2017-04-16 11:46 700一、storm数据来源 Spout的数据源: MQ:直接流数 ... -
Storm高并发介绍
2017-04-16 10:18 586并发度: worker:指的是component (spo ... -
Storm 字符统计Demo
2017-04-14 13:57 5321、数据源读取,字符发射spout类 /** * 字符 ... -
Storm 本地模式
2017-04-09 22:25 392本地模式,是在eclipse等编译器编写strom运行文件 ... -
Storm启动配置
2017-03-29 17:40 667一、安装Storm wget ...
相关推荐
"SQL Server 分区表制作实例" SQL Server 分区表是一种存储大量数据的技术,它可以将大型表拆分成多个小的、管理起来更加方便的分区,每个分区可以独立存储和维护。今天,我们将通过一个实例来演示如何创建分区表...
总的来说,这些SQL脚本提供了关于如何设计、创建和优化SQL Server分区表的实例,包括基于范围的分区、文件组的使用、分区联接优化以及滑动窗口策略等,这些都是处理大数据和提高查询性能的关键技术。通过学习和实践...
### 在MSCS环境下实现DB2分区服务器集群实例 #### 一、引言 本文将详细介绍如何在Microsoft Cluster Service (MSCS)环境下实现IBM DB2分区服务器集群实例的配置过程。此配置适用于Windows 2000平台,并采用了DB2 ...
以下是Oracle表分区的一些关键概念和实例: 1. **范围分区**(Range Partitioning): 范围分区是根据列值的范围进行划分,例如日期范围。在示例中,销售记录按照季度进行分区,每个季度的数据存储在不同的分区中...
### Oracle数据库表分区实例 #### 一、Oracle表分区简介 在Oracle数据库中,表分区是一种将大型表物理地划分为多个较小部分的技术。通过合理地利用分区技术,可以显著提高查询性能,简化数据管理任务,并加快数据...
项目中有需求要垂直分表,即按照时间区间将数据拆分到n个表中,PostgreSQL提供了分区表的功能。分区表实际上是把逻辑上的一个大表分割成物理上的几小块,提供了很多好处,比如: 1、查询性能大幅提升 2、删除历史...
SQL中,分区表的实例,通过此实例理解分区表,掌握分区表如何创建,如何存储数据等
在Unix AIX环境下进行Oracle分区表的备份操作是IT领域中一项重要的技能,尤其是在处理大量数据和维护系统稳定性时。本文将深入解析如何在Unix AIX环境下使用exp工具备份Oracle分区表,包括环境配置、备份策略及恢复...
### MS SQL Server 分区表、分区索引详解 #### 一、分区表简介 使用分区表的主要目的是为了改善大型表及具有多种访问模式的表的可伸缩性和可管理性。这里的“大型表”指的是数据量巨大的表,“访问模式”是指因不同...
"transactional_topo_opaque_partition"这个主题聚焦于Storm中的事务处理和不透明分区特性,这在构建高可用、准确无误的数据处理系统时非常重要。本文将深入探讨这两个概念及其在Storm 0.9.0.1版本中的实现,同时也...
在创建Oracle表分区之前,需要先创建一个分区表实例。创建分区表实例的SQL语句如下: ``` create table DE_TEST( name_tag varchar2(10), day_tag DATE) PARTITION BY RANGE (day_tag) ( PARTITION DE_TEST_...
MySQL交换分区的实例详解 前言 在介绍交换分区之前,我们先了解一下 mysql 分区。 数据库的分区有两种:水平分区和垂直分区。而MySQL暂时不支持垂直分区,因此接下来说的都是水平分区。水平分区即:以行为单位对表...
开机usb调试开启状态下读取分区和对应的分区号 根据安卓版本的不同。个别机型写入分区需要root权限 然后可以备份分区 写入分区和备份全分区等等操作 会一些玩机基础常识的友友下载使用 小白谨慎 资源有复制性。下载...
建立分区范例.sql"文件提供了创建分区表的实例。在Oracle中,分区可以基于多种方式,如范围、列表、哈希或复合分区。范围分区通常用于根据连续的数值(如日期)将数据分段;列表分区则适用于预定义的一组值;哈希...
8. 实例配置文件:DB2实例配置文件(如db2nodes.cfg)定义了数据库实例及其分区信息,这些配置被所有数据库共享。配置文件位于DB2实例目录,通过NFS网络文件系统被其他节点共享。 9. 实例、分区和数据库的关系:在...
### SQL Server 2005 分区管理及应用详解 #### 一、分区的重要性与应用场景 在处理大量数据时,数据库性能往往会受到严重的影响。为了提高查询效率并优化存储空间,SQL Server 2005 引入了分区功能。分区是一种将...
在Linux系统中,磁盘分区是一项基础且重要的任务,它涉及到如何有效地管理和组织存储空间。本文将详细解析Linux中的主分区、扩展分区和逻辑分区之间的联系和差异。 首先,主分区是硬盘上直接分配的独立区域,每个...