一、认识storm trident
trident可以理解为storm批处理的高级抽象,提供了分组、分区、聚合、函数等操作,提供一致性和恰好一次处理的语义。
1)元祖被作为batch处理
2)每个batch的元祖都被指定唯一的一个事物id,如果因为处理失败导致batch重发,也和保证和重发前一样的事物id
3)数据更新操作严格有序,比如batch1必须在batch2之前被成功处理,且如果batch1失败了,后面的处理也会失败。
假如: batch1处理1--20
batch2处理21--40
batch1处理失败,那么batch2也会失败
虽然数据更新操作严格有序,但是数据处理阶段也可以并行的,只是最后的持久化操作必须有序。
1.1 trident state
trident的状态具有仅仅处理一次,持续聚合的语义,使用trident来实现恰好一次的语义不需要开发人员去处理事务相关的工作,因为trident state已经帮我们封装好了,只需要编写类似于如下的代码:
topology.newStream("sentencestream", spout) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(new MyHbaseState.HbaseFactory(options), new Count(), new Fields("count")) .parallelismHint(3);
所有处理事务逻辑都在MyHbaseState.HbaseFactory中处理了(这个是我自己定义的,trident支持在内存里面处理,类似于MemachedState.opaque)。
trident提供了一个StateFactory用来创建State对象的实例,行如:
public final class XFactory implements StateFactory{ public State makeState(Map conf,int partitonIndex,int numPartitions){ return new State(); } }
1.2 persistentAggregate
persistentAggregate是trident中用来更新来源的状态,如果前面是一个分好组的流,trident希望你提供的状态实现MapState接口,其中key是分组的字段,
而聚合结果是状态的值。
1.3 实现MapStates
trident中实现MapState非常简单,只需要为这个类提供一个IBackingMap的接口实现接口。
二、实战
首先搭建好zk,storm,hadoop,hbase的分布式环境
master:
slave1:
slave2:
main方法:
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { TridentTopology topology = new TridentTopology(); FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("tanjie is a good man"), new Values( "what is your name"), new Values("how old are you"), new Values("my name is tanjie"), new Values("i am 18")); spout.setCycle(false); tridentStreamToHbase(topology,spout); Config config = new Config(); config.setDebug(false); StormSubmitter.submitTopologyWithProgressBar("word_count_trident_state_HbaseState", config, topology.build()); }
tridentStreamToHbase方法:
private static TridentState tridentStreamToHbase(TridentTopology topology, FixedBatchSpout spout) { MyHbaseState.Options options = new MyHbaseState.Options(); options.setTableName("storm_trident_state"); options.setColumFamily("colum1"); options.setQualifier("q1"); /** * 根据数据源拆分单词后,然后分区操作,在每个分区上又进行分组(hash算法),然后在每个分组上进行聚合 * 所以这里可能有多个分区,每个分区有多个分组,然后在多个分组上进行聚合 * 用来进行group的字段会以key的形式存在于State当中,聚合后的结果会以value的形式存储在State当中 */ return topology.newStream("sentencestream", spout) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(new MyHbaseState.HbaseFactory(options), new Count(), new Fields("count")) .parallelismHint(3); }
MyHbaseState实现:
package com.storm.trident.state.hbase; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; import org.apache.storm.task.IMetricsContext; import org.apache.storm.trident.state.JSONNonTransactionalSerializer; import org.apache.storm.trident.state.JSONOpaqueSerializer; import org.apache.storm.trident.state.JSONTransactionalSerializer; import org.apache.storm.trident.state.Serializer; import org.apache.storm.trident.state.State; import org.apache.storm.trident.state.StateFactory; import org.apache.storm.trident.state.StateType; import org.apache.storm.trident.state.map.IBackingMap; import org.apache.storm.trident.state.map.MapState; import org.apache.storm.trident.state.map.OpaqueMap; import org.apache.storm.trident.state.map.SnapshottableMap; import org.apache.storm.tuple.Values; import com.google.common.collect.Maps; @SuppressWarnings({ "unchecked", "rawtypes" }) public class MyHbaseState<T> implements IBackingMap<T> { private static final Map<StateType, Serializer> DEFAULT_SERIALZERS = Maps .newHashMap(); private int partitionNum; private Options<T> options; private Serializer<T> serializer; private Connection connection; private Table table; static { DEFAULT_SERIALZERS.put(StateType.NON_TRANSACTIONAL, new JSONNonTransactionalSerializer()); DEFAULT_SERIALZERS.put(StateType.TRANSACTIONAL, new JSONTransactionalSerializer()); DEFAULT_SERIALZERS.put(StateType.OPAQUE, new JSONOpaqueSerializer()); } public MyHbaseState(final Options<T> options, Map conf, int partitionNum) { this.options = options; this.serializer = options.serializer; this.partitionNum = partitionNum; try { connection = ConnectionFactory.createConnection(HBaseConfiguration .create()); table = connection.getTable(TableName.valueOf(options.tableName)); } catch (IOException e) { e.printStackTrace(); } } public static class Options<T> implements Serializable { /** * */ private static final long serialVersionUID = 1L; public Serializer<T> serializer = null; public String globalkey = "$HBASE_STATE_GLOBAL$"; /** * 表名 */ public String tableName; /** * 列族 */ public String columFamily; /** * */ public String qualifier; public String getTableName() { return tableName; } public void setTableName(String tableName) { this.tableName = tableName; } public String getColumFamily() { return columFamily; } public void setColumFamily(String columFamily) { this.columFamily = columFamily; } public String getQualifier() { return qualifier; } public void setQualifier(String qualifier) { this.qualifier = qualifier; } } protected static class HbaseFactory<T> implements StateFactory { private static final long serialVersionUID = 1L; private Options<T> options; public HbaseFactory(Options<T> options) { this.options = options; if (this.options.serializer == null) { this.options.serializer = DEFAULT_SERIALZERS .get(StateType.OPAQUE); } } @Override public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { System.out.println("partitionIndex:" + partitionIndex + ",numPartitions:" + numPartitions); IBackingMap state = new MyHbaseState(options, conf, partitionIndex); MapState mapState = OpaqueMap.build(state); return new SnapshottableMap(mapState, new Values(options.globalkey)); } } @Override public void multiPut(List<List<Object>> keys, List<T> values) { List<Put> puts = new ArrayList<Put>(keys.size()); for (int i = 0; i < keys.size(); i++) { Put put = new Put(toRowKey(keys.get(i))); T val = values.get(i); System.out.println("partitionIndex: " + this.partitionNum + ",key.get(i):" + keys.get(i) + "value值:" + val); put.addColumn(this.options.columFamily.getBytes(), this.options.qualifier.getBytes(), this.options.serializer.serialize(val)); puts.add(put); } try { this.table.put(puts); } catch (IOException e) { e.printStackTrace(); } } @Override public List<T> multiGet(List<List<Object>> keys) { List<Get> gets = new ArrayList<Get>(); for (final List<Object> key : keys) { // LOG.info("Partition: {}, GET: {}", this.partitionNum, key); Get get = new Get(toRowKey(key)); get.addColumn(this.options.columFamily.getBytes(), this.options.qualifier.getBytes()); gets.add(get); } List<T> retval = new ArrayList<T>(); try { // 批量获取所有rowKey的数据 Result[] results = this.table.get(gets); for (final Result result : results) { byte[] value = result.getValue( this.options.columFamily.getBytes(), this.options.qualifier.getBytes()); if (value != null) { retval.add(this.serializer.deserialize(value)); } else { retval.add(null); } } } catch (IOException e) { e.printStackTrace(); } return retval; } private byte[] toRowKey(List<Object> keys) { ByteArrayOutputStream bos = new ByteArrayOutputStream(); try { for (Object key : keys) { bos.write(String.valueOf(key).getBytes()); } bos.close(); } catch (IOException e) { throw new RuntimeException("IOException creating HBase row key.", e); } return bos.toByteArray(); } }
运行结果:
查看supervisor日志:
2016-12-23 11:34:25.576 STDIO [INFO] partitionIndex: 0,key.get(i):[good]value值:org.apache.storm.trident.state.OpaqueValue@6498fd6a[currTxid=1,prev=<null>,curr=1] 2016-12-23 11:34:25.582 STDIO [INFO] partitionIndex: 1,key.get(i):[name]value值:org.apache.storm.trident.state.OpaqueValue@81e227f[currTxid=1,prev=<null>,curr=1] 2016-12-23 11:34:25.582 STDIO [INFO] partitionIndex: 1,key.get(i):[are]value值:org.apache.storm.trident.state.OpaqueValue@726ac402[currTxid=1,prev=<null>,curr=1] 2016-12-23 11:34:25.585 STDIO [INFO] partitionIndex: 2,key.get(i):[what]value值:org.apache.storm.trident.state.OpaqueValue@2667735e[currTxid=1,prev=<null>,curr=1] 2016-12-23 11:34:25.585 STDIO [INFO] partitionIndex: 2,key.get(i):[your]value值:org.apache.storm.trident.state.OpaqueValue@51c73404[currTxid=1,prev=<null>,curr=1] 2016-12-23 11:34:25.585 STDIO [INFO] partitionIndex: 2,key.get(i):[tanjie]value值:org.apache.storm.trident.state.OpaqueValue@6d281c8d[currTxid=1,prev=<null>,curr=1] 2016-12-23 11:34:25.585 STDIO [INFO] partitionIndex: 2,key.get(i):[old]value值:org.apache.storm.trident.state.OpaqueValue@646aa4f7[currTxid=1,prev=<null>,curr=1] 2016-12-23 11:34:25.586 STDIO [INFO] partitionIndex: 2,key.get(i):[is]value值:org.apache.storm.trident.state.OpaqueValue@157487a2[currTxid=1,prev=<null>,curr=2] 2016-12-23 11:34:25.586 STDIO [INFO] partitionIndex: 2,key.get(i):[a]value值:org.apache.storm.trident.state.OpaqueValue@1574a7af[currTxid=1,prev=<null>,curr=1] 2016-12-23 11:34:25.586 STDIO [INFO] partitionIndex: 2,key.get(i):[how]value值:org.apache.storm.trident.state.OpaqueValue@1dacdd2a[currTxid=1,prev=<null>,curr=1] 2016-12-23 11:34:25.587 STDIO [INFO] partitionIndex: 2,key.get(i):[you]value值:org.apache.storm.trident.state.OpaqueValue@3febff9e[currTxid=1,prev=<null>,curr=1] 2016-12-23 11:34:25.587 STDIO [INFO] partitionIndex: 2,key.get(i):[man]value值:org.apache.storm.trident.state.OpaqueValue@1edafedb[currTxid=1,prev=<null>,curr=1] 2016-12-23 11:34:25.812 STDIO [INFO] partitionIndex: 2,key.get(i):[tanjie]value值:org.apache.storm.trident.state.OpaqueValue@38a106df[currTxid=2,prev=1,curr=2] 2016-12-23 11:34:25.812 STDIO [INFO] partitionIndex: 2,key.get(i):[is]value值:org.apache.storm.trident.state.OpaqueValue@53ca3784[currTxid=2,prev=2,curr=3] 2016-12-23 11:34:25.815 STDIO [INFO] partitionIndex: 0,key.get(i):[am]value值:org.apache.storm.trident.state.OpaqueValue@5261a4c8[currTxid=2,prev=<null>,curr=1] 2016-12-23 11:34:25.815 STDIO [INFO] partitionIndex: 0,key.get(i):[my]value值:org.apache.storm.trident.state.OpaqueValue@88970b9[currTxid=2,prev=<null>,curr=1] 2016-12-23 11:34:25.826 STDIO [INFO] partitionIndex: 1,key.get(i):[i]value值:org.apache.storm.trident.state.OpaqueValue@78b27ff6[currTxid=2,prev=<null>,curr=1] 2016-12-23 11:34:25.827 STDIO [INFO] partitionIndex: 1,key.get(i):[name]value值:org.apache.storm.trident.state.OpaqueValue@eef2d62[currTxid=2,prev=1,curr=2] 2016-12-23 11:34:25.828 STDIO [INFO] partitionIndex: 1,key.get(i):[18]value值:org.apache.storm.trident.state.OpaqueValue@788c8496[currTxid=2,prev=<null>,curr=1]
查看hbase表
相关推荐
setting.xml文件,修改Maven仓库指向至阿里仓
基于java的玉安农副产品销售系统的开题报告
dev-c++ 6.3版本
基于java的项目监管系统开题报告
基于springboot多彩吉安红色旅游网站源码数据库文档.zip
毕业设计&课设_基于 AFLFast 改进能量分配策略的毕业设计项目,含 Mix Schedule策略设计及测试结果分析.zip
基于springboot办公用品管理系统源码数据库文档.zip
C++调用qml对象Demo
非常漂亮的类Web界面的Delphi设计54ed7-main.zip
VB SQL车辆管理系统是一款基于Visual Basic(VB)编程语言和SQL数据库开发的综合车辆管理工具。该系统集成了车辆信息管理、驾驶员信息管理、车辆调度、维修记录、数据存储与检索、报告生成以及安全权限管理等多个核心功能模块。 源代码部分提供了详细的开发流程和实现方法,涵盖了从数据库设计、界面设计到事件驱动编程、数据访问技术和错误处理等关键技术点。通过该系统,用户可以方便地录入、查询、修改和删除车辆及驾驶员信息,实现车辆信息的实时更新和跟踪。同时,系统还支持生成各类车辆管理相关的报告,帮助用户更好地掌握车辆运营情况。 系统部分则采用了直观易用的用户界面设计,使得用户能够轻松上手并快速完成车辆管理工作。系统还具备强大的数据处理能力和安全性,通过数据备份和系统升级优化等功能,确保数据的完整性和系统的稳定运行。 总体而言,VB SQL车辆管理系统是一款功能全面、易于操作且安全可靠的车辆管理工具,适用于企业和个人进行日常车辆运营和管理。无论是车辆信息的录入、查询还是报告生成,该系统都能够提供高效、便捷的服务,是车辆管理工作的理想选择。
AutoSAR基础学习资源
基于springboot英语学习平台源码数据库文档.zip
数据集,深度学习,密封数据集,马体态数据集
基于java的数字家庭网站开题报告
podman使用国内源镜像加速器
基于springboot+web的留守儿童网站源码数据库文档.zip
基于springboot的智能宾馆预定系统源码数据库文档.zip
GetQzonehistory-main.zip
环境说明:开发语言:Java 框架:springboot JDK版本:JDK1.8 服务器:tomcat7 数据库:mysql 5.7 数据库工具:Navicat 开发软件:eclipse/myeclipse/idea Maven包:Maven 浏览器:谷歌浏览器。 项目经过测试均可完美运行
内容概要:本文档详细介绍了QST公司生产的QMI8A01型号的6轴惯性测量单元的数据表及性能参数。主要内容包括设备特性、操作模式、接口标准(SPI、I2C与I3C),以及各种运动检测原理和技术规格。文中还提到了设备的工作温度范围宽广,内置的大容量FIFO可用于缓冲传感器数据,减少系统功耗。此外,对于器件的安装焊接指导亦有详细介绍。 适合人群:电子工程技术人员、嵌入式开发人员、硬件设计师等。 使用场景及目标:适用于需要精准测量物体空间位置变化的应用场合,如消费电子产品、智能穿戴设备、工业自动化等领域。帮助工程师快速掌握该款IMU的技术要点和应用场景。 其他说明:文档提供了详细的电气连接图表、封装尺寸图解等资料,方便用户进行电路板的设计制作。同时针对特定应用提出了一些优化建议。