事务:运用IPartitionedTridentSpout
DRPC:Web端通过DRPC获得结果数据
topN:按销售额排行
基于Hbase存储的Trident state:支持Topo重启时不丢数据,也可支撑Web端读数据
数据获得实现
通过开源Storm-kafka项目提供
采用Storm-kafka 项目中提供的TransactionalTridentKafkaSpout
开发思路
1、业务逻辑处理,bolt中实现;
2、数据落地格式可更加前台HighCharts的需要而定,有时需要特别为HighCharts的数据格式来存储落地数据,如项目三中就会这样。
采用内存+磁盘方式避免断电、重启等会造成数据丢失的问题。
3、必须前后台分离,有利于稳定性;
4、Web端开发思路相对简单,就是长链接读数据,推数据到HighCharts
5、HighCharts开发,很多实例及代码,可以现学现用。
优点:1、前台、后台分离,重启维护互不影响;
2、Storm重启不会影响结果数据,不影响HighCharts图表展示;
3、Tomcat重启不会影响数据处理;
4、前台后台数据传输通过Hbase 或 DRPC,注意DRPC是通过服务访问,稳定性不如DB。
package cn.wh.trident;
import storm.kafka.BrokerHosts;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import storm.kafka.trident.TransactionalTridentKafkaSpout;
import storm.kafka.trident.TridentKafkaConfig;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.operation.builtin.Count;
importstorm.trident.operation.builtin.FirstN;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Sum;
import storm.trident.testing.MemoryMapState;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
import cn.wh.storm.KafkaProperties;
publicclass TridentTopo {
/**
* @param args
*/
publicstaticvoid main(String[] args) {
BrokerHosts zkHosts = new ZkHosts(KafkaProperties.zkConnect);
String topic = "track";
TridentKafkaConfig config = new TridentKafkaConfig(zkHosts, topic);
config.forceFromStart = false; //测试时用true,上线时必须改为false
//输入格式
config.scheme = new SchemeAsMultiScheme(new StringScheme());
config.fetchSizeBytes = 100 ; //batch size
LocalDRPC drpc = new LocalDRPC();
TransactionalTridentKafkaSpout spout = newTransactionalTridentKafkaSpout(config) ;
TridentTopology topology = new TridentTopology() ;
//销售额
TridentState amtState = topology.newStream("spout", spout)
.parallelismHint(3)
.each(new Fields(StringScheme.STRING_SCHEME_KEY),new OrderSplit("\\t"), new Fields("order_id","order_amt","create_date","province_id"))
.shuffle()
.groupBy(new Fields("create_date","province_id"))
.persistentAggregate(new MemoryMapState.Factory(), new Fields("order_amt"), new Sum(), new Fields("sum_amt"));
topology.newDRPCStream("getOrderAmt", drpc)
.each(new Fields("args"), new Split(" "), new Fields("arg"))
.each(new Fields("arg"), new SplitBy("\\:"), new Fields("create_date","province_id"))
.groupBy(new Fields("create_date","province_id"))
.stateQuery(amtState, new Fields("create_date","province_id"), new MapGet(), new Fields("sum_amt"))
// .applyAssembly(new FirstN(5, "sum_amt", true))
;
//订单数
TridentState orderState = topology.newStream("orderSpout", spout)
.parallelismHint(3)
.each(new Fields(StringScheme.STRING_SCHEME_KEY),new OrderSplit("\\t"), new Fields("order_id","order_amt","create_date","province_id"))
.shuffle()
.groupBy(new Fields("create_date","province_id"))
.persistentAggregate(new MemoryMapState.Factory(), new Fields("order_id"), new Count(), new Fields("order_num"));
topology.newDRPCStream("getOrderNum", drpc)
.each(new Fields("args"), new Split(" "), new Fields("arg"))
.each(new Fields("arg"), new SplitBy("\\:"), new Fields("create_date","province_id"))
.groupBy(new Fields("create_date","province_id"))
.stateQuery(orderState, new Fields("create_date","province_id"), new MapGet(), new Fields("order_num"))
// .applyAssembly(new FirstN(5, "order_num", true))
;
Config conf = new Config() ;
conf.setDebug(false);
LocalCluster cluster = new LocalCluster() ;
cluster.submitTopology("myTopo", conf, topology.build());
while(true){
// System.err.println("销售额:"+drpc.execute("getOrderAmt", "2014-08-19:1 2014-08-19:2 2014-08-19:3 2014-08-19:4 2014-08-19:5")) ;
System.err.println("订单数:"+drpc.execute("getOrderNum", "2014-08-19:1 2014-08-19:2 2014-08-19:3 2014-08-19:4 2014-08-19:5")) ;
Utils.sleep(5000);
}
}
}
DRPC方式基于hbase state方式
package cn.wh.stormtest;
import hbase.state.HBaseAggregateState;
import hbase.state.TridentConfig;
import kafka.productor.KafkaProperties;
import storm.kafka.BrokerHosts;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import storm.kafka.trident.TransactionalTridentKafkaSpout;
import storm.kafka.trident.TridentKafkaConfig;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.FirstN;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Sum;
import storm.trident.state.StateFactory;
import storm.trident.testing.MemoryMapState;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
public class TridentTopo {
public static StormTopology builder(LocalDRPC drpc)
{
TridentConfig tridentConfig = new TridentConfig("state");
StateFactory state = HBaseAggregateState.transactional(tridentConfig);
BrokerHosts zkHosts = new ZkHosts(KafkaProperties.zkConnect);
String topic = "track";
TridentKafkaConfig config = new TridentKafkaConfig(zkHosts, topic);
config.forceFromStart = false; //测试时用true,上线时必须改为false
config.scheme = new SchemeAsMultiScheme(new StringScheme());
config.fetchSizeBytes = 100 ;//batch size
TransactionalTridentKafkaSpout spout = new TransactionalTridentKafkaSpout(config) ;
TridentTopology topology = new TridentTopology() ;
//销售额
TridentState amtState = topology.newStream("spout", spout)
.parallelismHint(3)
.each(new Fields(StringScheme.STRING_SCHEME_KEY),new OrderAmtSplit("\\t"), new Fields("order_id","order_amt","create_date","province_id","cf"))
.shuffle()
.groupBy(new Fields("create_date","cf","province_id"))
.persistentAggregate(state, new Fields("order_amt"), new Sum(), new Fields("sum_amt"));
// .persistentAggregate(new MemoryMapState.Factory(), new Fields("order_amt"), new Sum(), new Fields("sum_amt"));
topology.newDRPCStream("getOrderAmt", drpc)
.each(new Fields("args"), new Split(" "), new Fields("arg"))
.each(new Fields("arg"), new SplitBy("\\:"), new Fields("create_date","cf","province_id"))
.groupBy(new Fields("create_date","cf","province_id"))
.stateQuery(amtState, new Fields("create_date","cf","province_id"), new MapGet(), new Fields("sum_amt"))
.each(new Fields("sum_amt"),new FilterNull())
.applyAssembly(new FirstN(5, "sum_amt", true))
;
//订单数
TridentState orderState = topology.newStream("orderSpout", spout)
.parallelismHint(3)
.each(new Fields(StringScheme.STRING_SCHEME_KEY),new OrderNumSplit("\\t"), new Fields("order_id","order_amt","create_date","province_id","cf"))
.shuffle()
.groupBy(new Fields("create_date","cf","province_id"))
.persistentAggregate(state, new Fields("order_id"), new Count(), new Fields("order_num"));
// .persistentAggregate(new MemoryMapState.Factory(), new Fields("order_id"), new Count(), new Fields("order_num"));
topology.newDRPCStream("getOrderNum", drpc)
.each(new Fields("args"), new Split(" "), new Fields("arg"))
.each(new Fields("arg"), new SplitBy("\\:"), new Fields("create_date","cf","province_id"))
.groupBy(new Fields("create_date","cf","province_id"))
.stateQuery(orderState, new Fields("create_date","cf","province_id"), new MapGet(), new Fields("order_num"))
.each(new Fields("order_num"),new FilterNull())
// .applyAssembly(new FirstN(5, "order_num", true))
;
return topology.build() ;
}
/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
LocalDRPC drpc = new LocalDRPC();
Config conf = new Config() ;
conf.setNumWorkers(5);
conf.setDebug(false);
if (args.length > 0) {
try {
StormSubmitter.submitTopology(args[0], conf, builder(null));
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
}
}else{
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("mytopology", conf, builder(drpc));
}
// Utils.sleep(60000);
// while (true) {
// System.err.println("销售额:"+drpc.execute("getOrderAmt", "2014-09-13:cf:amt_3 2014-09-13:cf:amt_2 2014-09-13:cf:amt_1 2014-09-13:cf:amt_7 2014-09-13:cf:amt_6 2014-09-13:cf:amt_5 2014-09-13:cf:amt_4 2014-09-13:cf:amt_8")) ;
// Utils.sleep(5000);
// }
/**
* [["2014-08-19:1 2014-08-19:2 2014-08-19:3 2014-08-19:4 2014-08-19:5","2014-08-19:1","2014-08-19","1",821.9000000000001],
* ["2014-08-19:1 2014-08-19:2 2014-08-19:3 2014-08-19:4 2014-08-19:5","2014-08-19:2","2014-08-19","2",631.3000000000001],
* ["2014-08-19:1 2014-08-19:2 2014-08-19:3 2014-08-19:4 2014-08-19:5","2014-08-19:3","2014-08-19","3",240.7],
* ["2014-08-19:1 2014-08-19:2 2014-08-19:3 2014-08-19:4 2014-08-19:5","2014-08-19:4","2014-08-19","4",340.4],
* ["2014-08-19:1 2014-08-19:2 2014-08-19:3 2014-08-19:4 2014-08-19:5","2014-08-19:5","2014-08-19","5",460.8]]
*/
}
}
----------------------------DRPC Client-------------
import backtype.storm.LocalDRPC;
import backtype.storm.utils.DRPCClient;
import backtype.storm.utils.Utils;
public class TridentDRPCclient {
public static void main(String[] args) {
DRPCClient client = new DRPCClient("192.168.1.107", 3772);
// LocalDRPC client = new LocalDRPC();
try {
while (true)
{
System.err.println("销售额:"+client.execute("getOrderAmt", "2014-09-13:cf:amt_5 2014-09-13:cf:amt_8")) ;
System.err.println("订单数:"+client.execute("getOrderNum", "2014-09-13:cf:amt_1 2014-09-13:cf:amt_2")) ;
Utils.sleep(5000);
}
} catch (Exception e) {
e.printStackTrace() ;
}
}
}
分享到:
相关推荐
- **Storm Trident项目**:特别值得一提的是,其中一个项目是完全使用Storm Trident开发完成的。Storm Trident作为Storm的一个高级API,提供了更高级别的抽象和事务支持,使得复杂的数据处理变得更加简单和可靠。 *...
Storm视频教程通过含3个Storm完整项目,均为企业实际项目,其中一个是完全由Storm Trident开发。本课程每个技术均采用最新稳定版本,学完后可以从Kafka到Storm项目开发及HighCharts图表开发一个人搞定!涨工资?身价...
Storm入门教程 之Storm原理和概念详解,出自Storm流计算从入门到精通之技术篇,Storm入门视频教程用到技术:Storm集群、Zookeeper集群等,涉及项目:网站PV、UV案例实战、其他案例; Storm视频教程亮点: 1、Storm...
3. **Storm Trident项目开发**:Trident是Storm的一个高级API,提供了一种更强大的状态管理和容错机制。学员将学习如何使用Trident进行复杂的数据处理,如窗口聚合、状态管理等。 4. **Storm与Kafka集成**:Kafka...
Storm流计算项目(文档中含有视频下载地址和解压密码),内容包含 storm、trident、kafka、hbase、cdh、hightcharts 等内容
【课程大纲】01.Storm项目实战课程大纲02.CDH5搭建之CM5安装部署03.CDH5搭建和CM界面化集群管理04.Hadoop、HBase、Zookeeper集群管理和角色分配05.Kafka基础知识和集群搭建06.Kafka基本操作和最优设置07.Kafka Java ...
storm_trident_state storm trident自定义state的实现,实现了3种插入数据库的持久化方法,事务,不透明事务,不透明分区事务 下载下面,运行:mvn eclipse:eclipse即可在eclipse使用此项目。
"trident-elasticsearch"项目是将这两者结合的产物,它提供了一个Storm Trident的集成层,使得在Storm中处理的数据能够无缝地流入和流出Elasticsearch。 首先,让我们深入了解一下Elasticsearch。Elasticsearch基于...
总的来说,这个项目深入展示了Storm在实时流计算中的应用,包括数据摄入、处理、存储和可视化,以及如何通过Trident和HBase优化流处理性能。通过对多个项目的实践,学习者可以掌握实时数据分析系统的设计和开发技能...
Trident-GCD是一个开源项目,它为Apache Storm的Trident API提供了一种集成Google Cloud Datastore的状态管理实现。Trident是Storm的一个高级接口,用于构建复杂的数据处理管道,而Google Cloud Datastore则是一个...
【课程大纲】01.Storm项目实战课程大纲02.CDH5搭建之CM5安装部署03.CDH5搭建和CM界面化集群管理04.Hadoop、HBase、Zookeeper集群管理和角色分配05.Kafka基础知识和集群搭建06.Kafka基本操作和最优设置07.Kafka Java ...
【课程大纲】01.Storm项目实战课程大纲02.CDH5搭建之CM5安装部署03.CDH5搭建和CM界面化集群管理04.Hadoop、HBase、Zookeeper集群管理和角色分配05.Kafka基础知识和集群搭建06.Kafka基本操作和最优设置07.Kafka Java ...
【课程大纲】01.Storm项目实战课程大纲02.CDH5搭建之CM5安装部署03.CDH5搭建和CM界面化集群管理04.Hadoop、HBase、Zookeeper集群管理和角色分配05.Kafka基础知识和集群搭建06.Kafka基本操作和最优设置07.Kafka Java ...
【课程大纲】01.Storm项目实战课程大纲02.CDH5搭建之CM5安装部署03.CDH5搭建和CM界面化集群管理04.Hadoop、HBase、Zookeeper集群管理和角色分配05.Kafka基础知识和集群搭建06.Kafka基本操作和最优设置07.Kafka Java ...
【课程大纲】01.Storm项目实战课程大纲02.CDH5搭建之CM5安装部署03.CDH5搭建和CM界面化集群管理04.Hadoop、HBase、Zookeeper集群管理和角色分配05.Kafka基础知识和集群搭建06.Kafka基本操作和最优设置07.Kafka Java ...
【课程大纲】01.Storm项目实战课程大纲02.CDH5搭建之CM5安装部署03.CDH5搭建和CM界面化集群管理04.Hadoop、HBase、Zookeeper集群管理和角色分配05.Kafka基础知识和集群搭建06.Kafka基本操作和最优设置07.Kafka Java ...
【课程大纲】01.Storm项目实战课程大纲02.CDH5搭建之CM5安装部署03.CDH5搭建和CM界面化集群管理04.Hadoop、HBase、Zookeeper集群管理和角色分配05.Kafka基础知识和集群搭建06.Kafka基本操作和最优设置07.Kafka Java ...
【课程大纲】01.Storm项目实战课程大纲02.CDH5搭建之CM5安装部署03.CDH5搭建和CM界面化集群管理04.Hadoop、HBase、Zookeeper集群管理和角色分配05.Kafka基础知识和集群搭建06.Kafka基本操作和最优设置07.Kafka Java ...
【课程大纲】01.Storm项目实战课程大纲02.CDH5搭建之CM5安装部署03.CDH5搭建和CM界面化集群管理04.Hadoop、HBase、Zookeeper集群管理和角色分配05.Kafka基础知识和集群搭建06.Kafka基本操作和最优设置07.Kafka Java ...