一、认识storm trident
trident可以理解为storm批处理的高级抽象,提供了分组、分区、聚合、函数等操作,提供一致性和恰好一次处理的语义。
1)元祖被作为batch处理
2)每个batch的元祖都被指定唯一的一个事物id,如果因为处理失败导致batch重发,也和保证和重发前一样的事物id
3)数据更新操作严格有序,比如batch1必须在batch2之前被成功处理,且如果batch1失败了,后面的处理也会失败。
假如: batch1处理1--20
batch2处理21--40
batch1处理失败,那么batch2也会失败
虽然数据更新操作严格有序,但是数据处理阶段也可以并行的,只是最后的持久化操作必须有序。
1.1 trident state
trident的状态具有仅仅处理一次,持续聚合的语义,使用trident来实现恰好一次的语义不需要开发人员去处理事务相关的工作,因为trident state已经帮我们封装好了,只需要编写类似于如下的代码:
topology.newStream("sentencestream", spout) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(new MyHbaseState.HbaseFactory(options), new Count(), new Fields("count")) .parallelismHint(3);
所有处理事务逻辑都在MyHbaseState.HbaseFactory中处理了(这个是我自己定义的,trident支持在内存里面处理,类似于MemachedState.opaque)。
trident提供了一个StateFactory用来创建State对象的实例,行如:
public final class XFactory implements StateFactory{ public State makeState(Map conf,int partitonIndex,int numPartitions){ return new State(); } }
1.2 persistentAggregate
persistentAggregate是trident中用来更新来源的状态,如果前面是一个分好组的流,trident希望你提供的状态实现MapState接口,其中key是分组的字段,
而聚合结果是状态的值。
1.3 实现MapStates
trident中实现MapState非常简单,只需要为这个类提供一个IBackingMap的接口实现接口。
二、实战
首先搭建好zk,storm,hadoop,hbase的分布式环境
master:
slave1:
slave2:
main方法:
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { TridentTopology topology = new TridentTopology(); FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("tanjie is a good man"), new Values( "what is your name"), new Values("how old are you"), new Values("my name is tanjie"), new Values("i am 18")); spout.setCycle(false); tridentStreamToHbase(topology,spout); Config config = new Config(); config.setDebug(false); StormSubmitter.submitTopologyWithProgressBar("word_count_trident_state_HbaseState", config, topology.build()); }
tridentStreamToHbase方法:
private static TridentState tridentStreamToHbase(TridentTopology topology, FixedBatchSpout spout) { MyHbaseState.Options options = new MyHbaseState.Options(); options.setTableName("storm_trident_state"); options.setColumFamily("colum1"); options.setQualifier("q1"); /** * 根据数据源拆分单词后,然后分区操作,在每个分区上又进行分组(hash算法),然后在每个分组上进行聚合 * 所以这里可能有多个分区,每个分区有多个分组,然后在多个分组上进行聚合 * 用来进行group的字段会以key的形式存在于State当中,聚合后的结果会以value的形式存储在State当中 */ return topology.newStream("sentencestream", spout) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(new MyHbaseState.HbaseFactory(options), new Count(), new Fields("count")) .parallelismHint(3); }
MyHbaseState实现:
package com.storm.trident.state.hbase; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; import org.apache.storm.task.IMetricsContext; import org.apache.storm.trident.state.JSONNonTransactionalSerializer; import org.apache.storm.trident.state.JSONOpaqueSerializer; import org.apache.storm.trident.state.JSONTransactionalSerializer; import org.apache.storm.trident.state.Serializer; import org.apache.storm.trident.state.State; import org.apache.storm.trident.state.StateFactory; import org.apache.storm.trident.state.StateType; import org.apache.storm.trident.state.map.IBackingMap; import org.apache.storm.trident.state.map.MapState; import org.apache.storm.trident.state.map.OpaqueMap; import org.apache.storm.trident.state.map.SnapshottableMap; import org.apache.storm.tuple.Values; import com.google.common.collect.Maps; @SuppressWarnings({ "unchecked", "rawtypes" }) public class MyHbaseState<T> implements IBackingMap<T> { private static final Map<StateType, Serializer> DEFAULT_SERIALZERS = Maps .newHashMap(); private int partitionNum; private Options<T> options; private Serializer<T> serializer; private Connection connection; private Table table; static { DEFAULT_SERIALZERS.put(StateType.NON_TRANSACTIONAL, new JSONNonTransactionalSerializer()); DEFAULT_SERIALZERS.put(StateType.TRANSACTIONAL, new JSONTransactionalSerializer()); DEFAULT_SERIALZERS.put(StateType.OPAQUE, new JSONOpaqueSerializer()); } public MyHbaseState(final Options<T> options, Map conf, int partitionNum) { this.options = options; this.serializer = options.serializer; this.partitionNum = partitionNum; try { connection = ConnectionFactory.createConnection(HBaseConfiguration .create()); table = connection.getTable(TableName.valueOf(options.tableName)); } catch (IOException e) { e.printStackTrace(); } } public static class Options<T> implements Serializable { /** * */ private static final long serialVersionUID = 1L; public Serializer<T> serializer = null; public String globalkey = "$HBASE_STATE_GLOBAL$"; /** * 表名 */ public String tableName; /** * 列族 */ public String columFamily; /** * */ public String qualifier; public String getTableName() { return tableName; } public void setTableName(String tableName) { this.tableName = tableName; } public String getColumFamily() { return columFamily; } public void setColumFamily(String columFamily) { this.columFamily = columFamily; } public String getQualifier() { return qualifier; } public void setQualifier(String qualifier) { this.qualifier = qualifier; } } protected static class HbaseFactory<T> implements StateFactory { private static final long serialVersionUID = 1L; private Options<T> options; public HbaseFactory(Options<T> options) { this.options = options; if (this.options.serializer == null) { this.options.serializer = DEFAULT_SERIALZERS .get(StateType.OPAQUE); } } @Override public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { System.out.println("partitionIndex:" + partitionIndex + ",numPartitions:" + numPartitions); IBackingMap state = new MyHbaseState(options, conf, partitionIndex); MapState mapState = OpaqueMap.build(state); return new SnapshottableMap(mapState, new Values(options.globalkey)); } } @Override public void multiPut(List<List<Object>> keys, List<T> values) { List<Put> puts = new ArrayList<Put>(keys.size()); for (int i = 0; i < keys.size(); i++) { Put put = new Put(toRowKey(keys.get(i))); T val = values.get(i); System.out.println("partitionIndex: " + this.partitionNum + ",key.get(i):" + keys.get(i) + "value值:" + val); put.addColumn(this.options.columFamily.getBytes(), this.options.qualifier.getBytes(), this.options.serializer.serialize(val)); puts.add(put); } try { this.table.put(puts); } catch (IOException e) { e.printStackTrace(); } } @Override public List<T> multiGet(List<List<Object>> keys) { List<Get> gets = new ArrayList<Get>(); for (final List<Object> key : keys) { // LOG.info("Partition: {}, GET: {}", this.partitionNum, key); Get get = new Get(toRowKey(key)); get.addColumn(this.options.columFamily.getBytes(), this.options.qualifier.getBytes()); gets.add(get); } List<T> retval = new ArrayList<T>(); try { // 批量获取所有rowKey的数据 Result[] results = this.table.get(gets); for (final Result result : results) { byte[] value = result.getValue( this.options.columFamily.getBytes(), this.options.qualifier.getBytes()); if (value != null) { retval.add(this.serializer.deserialize(value)); } else { retval.add(null); } } } catch (IOException e) { e.printStackTrace(); } return retval; } private byte[] toRowKey(List<Object> keys) { ByteArrayOutputStream bos = new ByteArrayOutputStream(); try { for (Object key : keys) { bos.write(String.valueOf(key).getBytes()); } bos.close(); } catch (IOException e) { throw new RuntimeException("IOException creating HBase row key.", e); } return bos.toByteArray(); } }
运行结果:
查看supervisor日志:
2016-12-23 11:34:25.576 STDIO [INFO] partitionIndex: 0,key.get(i):[good]value值:org.apache.storm.trident.state.OpaqueValue@6498fd6a[currTxid=1,prev=<null>,curr=1] 2016-12-23 11:34:25.582 STDIO [INFO] partitionIndex: 1,key.get(i):[name]value值:org.apache.storm.trident.state.OpaqueValue@81e227f[currTxid=1,prev=<null>,curr=1] 2016-12-23 11:34:25.582 STDIO [INFO] partitionIndex: 1,key.get(i):[are]value值:org.apache.storm.trident.state.OpaqueValue@726ac402[currTxid=1,prev=<null>,curr=1] 2016-12-23 11:34:25.585 STDIO [INFO] partitionIndex: 2,key.get(i):[what]value值:org.apache.storm.trident.state.OpaqueValue@2667735e[currTxid=1,prev=<null>,curr=1] 2016-12-23 11:34:25.585 STDIO [INFO] partitionIndex: 2,key.get(i):[your]value值:org.apache.storm.trident.state.OpaqueValue@51c73404[currTxid=1,prev=<null>,curr=1] 2016-12-23 11:34:25.585 STDIO [INFO] partitionIndex: 2,key.get(i):[tanjie]value值:org.apache.storm.trident.state.OpaqueValue@6d281c8d[currTxid=1,prev=<null>,curr=1] 2016-12-23 11:34:25.585 STDIO [INFO] partitionIndex: 2,key.get(i):[old]value值:org.apache.storm.trident.state.OpaqueValue@646aa4f7[currTxid=1,prev=<null>,curr=1] 2016-12-23 11:34:25.586 STDIO [INFO] partitionIndex: 2,key.get(i):[is]value值:org.apache.storm.trident.state.OpaqueValue@157487a2[currTxid=1,prev=<null>,curr=2] 2016-12-23 11:34:25.586 STDIO [INFO] partitionIndex: 2,key.get(i):[a]value值:org.apache.storm.trident.state.OpaqueValue@1574a7af[currTxid=1,prev=<null>,curr=1] 2016-12-23 11:34:25.586 STDIO [INFO] partitionIndex: 2,key.get(i):[how]value值:org.apache.storm.trident.state.OpaqueValue@1dacdd2a[currTxid=1,prev=<null>,curr=1] 2016-12-23 11:34:25.587 STDIO [INFO] partitionIndex: 2,key.get(i):[you]value值:org.apache.storm.trident.state.OpaqueValue@3febff9e[currTxid=1,prev=<null>,curr=1] 2016-12-23 11:34:25.587 STDIO [INFO] partitionIndex: 2,key.get(i):[man]value值:org.apache.storm.trident.state.OpaqueValue@1edafedb[currTxid=1,prev=<null>,curr=1] 2016-12-23 11:34:25.812 STDIO [INFO] partitionIndex: 2,key.get(i):[tanjie]value值:org.apache.storm.trident.state.OpaqueValue@38a106df[currTxid=2,prev=1,curr=2] 2016-12-23 11:34:25.812 STDIO [INFO] partitionIndex: 2,key.get(i):[is]value值:org.apache.storm.trident.state.OpaqueValue@53ca3784[currTxid=2,prev=2,curr=3] 2016-12-23 11:34:25.815 STDIO [INFO] partitionIndex: 0,key.get(i):[am]value值:org.apache.storm.trident.state.OpaqueValue@5261a4c8[currTxid=2,prev=<null>,curr=1] 2016-12-23 11:34:25.815 STDIO [INFO] partitionIndex: 0,key.get(i):[my]value值:org.apache.storm.trident.state.OpaqueValue@88970b9[currTxid=2,prev=<null>,curr=1] 2016-12-23 11:34:25.826 STDIO [INFO] partitionIndex: 1,key.get(i):[i]value值:org.apache.storm.trident.state.OpaqueValue@78b27ff6[currTxid=2,prev=<null>,curr=1] 2016-12-23 11:34:25.827 STDIO [INFO] partitionIndex: 1,key.get(i):[name]value值:org.apache.storm.trident.state.OpaqueValue@eef2d62[currTxid=2,prev=1,curr=2] 2016-12-23 11:34:25.828 STDIO [INFO] partitionIndex: 1,key.get(i):[18]value值:org.apache.storm.trident.state.OpaqueValue@788c8496[currTxid=2,prev=<null>,curr=1]
查看hbase表
相关推荐
storm_trident_state storm trident自定义state的实现,实现了3种插入数据库的持久化方法,事务,不透明事务,不透明分区事务 下载下面,运行:mvn eclipse:eclipse即可在eclipse使用此项目。
**Storm Trident:分布式流处理框架详解** Storm Trident是Twitter开源的、基于Apache Storm的一个高级抽象,它提供了一种更强大且高效的方式来处理实时数据流。Trident的核心理念是将数据流划分为一系列的小批量...
【课程大纲】01.Storm项目实战课程大纲02.CDH5搭建之CM5安装部署03.CDH5搭建和CM界面化集群管理04.Hadoop、HBase、Zookeeper集群管理和角色分配05.Kafka基础知识和集群搭建06.Kafka基本操作和最优设置07.Kafka Java ...
Trident-GCD是一个开源项目,它为Apache Storm的Trident API提供了一种集成Google Cloud Datastore的状态管理实现。Trident是Storm的一个高级接口,用于构建复杂的数据处理管道,而Google Cloud Datastore则是一个...
该库提供了核心storm bolt,并在Elasticsearch 之上实现了Trident 状态。 它支持非事务性、事务性和不透明状态类型。 Maven 依赖 < groupId>com.github.fhuss</ groupId> < artifactId>storm-elasticsearch ...
【课程大纲】01.Storm项目实战课程大纲02.CDH5搭建之CM5安装部署03.CDH5搭建和CM界面化集群管理04.Hadoop、HBase、Zookeeper集群管理和角色分配05.Kafka基础知识和集群搭建06.Kafka基本操作和最优设置07.Kafka Java ...
"项目1-地区销售额-基于HBase存储的State运用"详细介绍了如何将Trident与HBase集成,利用HBase的分布式存储特性来优化状态管理。 此外,项目还涵盖了其他几个关键部分,如使用HighCharts创建动态图表,实现HTTP长...
###必读把大数进行分片,根据数据中某个字段分组Origin...//Test只是一个本地聚合,减少数据emit发射数量//更新操作在LocationUpdater类中的updateState方法并回调DB类的setBulk 来添加每批次的数据到自定义数据结构中。
例如在项目1中,通过Trident API实现的Spout可以提高数据处理的效率和一致性,同时利用HBase的State功能存储中间状态,保证数据的准确无误。项目2和项目3进一步展示了如何利用storm-kafka处理更复杂的数据分析任务,...
【课程大纲】01.Storm项目实战课程大纲02.CDH5搭建之CM5安装部署03.CDH5搭建和CM界面化集群管理04.Hadoop、HBase、Zookeeper集群管理和角色分配05.Kafka基础知识和集群搭建06.Kafka基本操作和最优设置07.Kafka Java ...
- **Trident State**:Trident状态管理机制,支持将计算结果持久化到HBase中,即使拓扑重启,也能恢复之前的状态,避免数据丢失。 3. **技术栈**: - **HBase**:作为大数据存储系统,用于存储Trident计算后的...
本文件主要分析了几个主流的大数据流式处理框架,包括Apache Spark Streaming、Apache Flink、Apache Storm、Apache Storm Trident、Apache Gearpump以及Twitter Heron,对它们的功能对比、性能评估进行了深入探讨。...
【课程大纲】01.Storm项目实战课程大纲02.CDH5搭建之CM5安装部署03.CDH5搭建和CM界面化集群管理04.Hadoop、HBase、Zookeeper集群管理和角色分配05.Kafka基础知识和集群搭建06.Kafka基本操作和最优设置07.Kafka Java ...
原生状态支持是指框架自身内置的对处理状态的管理,如Flink的 keyed state 或者 Storm Trident的transactional state。这使得开发者可以直接在框架内部处理复杂的状态逻辑,而无需额外的存储层。 5. **选择合适的...
【课程大纲】01.Storm项目实战课程大纲02.CDH5搭建之CM5安装部署03.CDH5搭建和CM界面化集群管理04.Hadoop、HBase、Zookeeper集群管理和角色分配05.Kafka基础知识和集群搭建06.Kafka基本操作和最优设置07.Kafka Java ...
【课程大纲】01.Storm项目实战课程大纲02.CDH5搭建之CM5安装部署03.CDH5搭建和CM界面化集群管理04.Hadoop、HBase、Zookeeper集群管理和角色分配05.Kafka基础知识和集群搭建06.Kafka基本操作和最优设置07.Kafka Java ...
【课程大纲】01.Storm项目实战课程大纲02.CDH5搭建之CM5安装部署03.CDH5搭建和CM界面化集群管理04.Hadoop、HBase、Zookeeper集群管理和角色分配05.Kafka基础知识和集群搭建06.Kafka基本操作和最优设置07.Kafka Java ...
【课程大纲】01.Storm项目实战课程大纲02.CDH5搭建之CM5安装部署03.CDH5搭建和CM界面化集群管理04.Hadoop、HBase、Zookeeper集群管理和角色分配05.Kafka基础知识和集群搭建06.Kafka基本操作和最优设置07.Kafka Java ...
【课程大纲】01.Storm项目实战课程大纲02.CDH5搭建之CM5安装部署03.CDH5搭建和CM界面化集群管理04.Hadoop、HBase、Zookeeper集群管理和角色分配05.Kafka基础知识和集群搭建06.Kafka基本操作和最优设置07.Kafka Java ...