- 浏览: 56958 次
- 性别:
- 来自: 北京
文章分类
最新评论
1、Trident实战之计算网站PV
2、自定义分割数据
3、日期处理类
4、pom文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test</groupId>
<artifactId>StormMavenProject</artifactId>
<packaging>jar</packaging>
<version>0.0.1-SNAPSHOT</version>
<name>StormMavenProject</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
<version>5.0.3</version>
</dependency>
<dependency>
<groupId>org.clojure</groupId>
<artifactId>clojure</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.6.6</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>minlog</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>reflectasm</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-rename-hack</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ring-cors</groupId>
<artifactId>ring-cors</artifactId>
<version>0.1.5</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
</dependency>
</dependencies>
<build>
<finalName>StormMavenProject</finalName>
</build>
</project>
5、测试结果
DRPC RESULT: [["2014-01-07 2014-01-08","2014-01-07",3]]
DRPC RESULT: [["2014-01-07 2014-01-08","2014-01-07",3]]
DRPC RESULT: [["2014-01-07 2014-01-08","2014-01-07",3]]
/** * Trident实战之计算网站PV */ public class TridentPVTopo { public static StormTopology buildTopology(LocalDRPC drpc) { Random random = new Random(); String[] hosts = { "www.taobao.com" }; String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7", "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" }; String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53", "2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" }; FixedBatchSpout spout = new FixedBatchSpout(new Fields("eachLog"), 3, // 第一个参数表示输出类型,与topo的输入类型对应,第二个参数表示以三行作为一个批次 new Values(hosts[0] + "\t" + session_id[random.nextInt(5)] + "\t" + time[random.nextInt(8)]), new Values(hosts[0] + "\t" + session_id[random.nextInt(5)] + "\t" + time[random.nextInt(8)]), new Values(hosts[0] + "\t" + session_id[random.nextInt(5)] + "\t" + time[random.nextInt(8)])); spout.setCycle(false); /** * topo处理数据,存入中间存储 */ TridentTopology topology = new TridentTopology(); TridentState wordCounts = topology.newStream("spout1", spout)// 获取数据源 .each(new Fields("eachLog"), new Mysplit("\t"), new Fields("date", "session_id"))// 第一参数是输入数据类型,第二参数是实现“分割”功能,第三个参数是输出数据类型 .groupBy(new Fields("date"))// 按日期分组 // 持久化到内存,传入session_id分组,输出pv数据类型 .persistentAggregate(new MemoryMapState.Factory(), new Fields("session_id"), new Count(), new Fields("pv")); // .parallelismHint(16); /** * 读取中间存储数据 */ topology.newDRPCStream("GetPV", drpc)// 输入函数名称 .each(new Fields("args"), new Split(" "), new Fields("date"))// 对传入参数进行“分割”处理, .groupBy(new Fields("date"))// 用日期进行查询 .stateQuery(wordCounts, new Fields("date"), new MapGet(), new Fields("PV"))// 第一个参数是中间存储。第二个参数是查询参数,可以不输入默认是传入流的值。 .each(new Fields("PV"), new FilterNull());// 查询结果过滤 return topology.build(); } public static void main(String[] args) throws Exception { // 客户端调用topo Config conf = new Config(); conf.setMaxSpoutPending(20); if (args.length == 0) { LocalDRPC drpc = new LocalDRPC(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("wordCounter", conf, buildTopology(drpc)); for (int i = 0; i < 100; i++) { System.err.println("DRPC RESULT: " + drpc.execute("GetPV", "2014-01-07 2014-01-08")); Thread.sleep(1000); } } else { conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null)); } } }
2、自定义分割数据
public class Mysplit extends BaseFunction { /** * 自定义分割数据 */ private static final long serialVersionUID = 1L; String patton = null; public Mysplit(String patton) { this.patton = patton; } public void execute(TridentTuple tuple, TridentCollector collector) { String log = tuple.getString(0); String logArr[] = log.split(patton); if (logArr.length == 3) { collector.emit(new Values(DateFmt.getCountDate(logArr[2], DateFmt.date_short), logArr[1])); } } }
public class Split extends BaseFunction { /** * 分割数据 */ private static final long serialVersionUID = 1L; String patton = null; public Split(String patton) { this.patton = patton; } public void execute(TridentTuple tuple, TridentCollector collector) { String sentence = tuple.getString(0); for (String word : sentence.split(patton)) { collector.emit(new Values(word)); } } }
3、日期处理类
public class DateFmt { /* * 日期处理类 */ public static final String date_long = "yyyy-MM-dd HH:mm:ss" ; public static final String date_short = "yyyy-MM-dd" ; public static SimpleDateFormat sdf = new SimpleDateFormat(date_short); public static String getCountDate(String date,String patton) { SimpleDateFormat sdf = new SimpleDateFormat(patton); Calendar cal = Calendar.getInstance(); if (date != null) { try { cal.setTime(sdf.parse(date)) ; } catch (ParseException e) { e.printStackTrace(); } } return sdf.format(cal.getTime()); } public static Date parseDate(String dateStr) throws Exception { return sdf.parse(dateStr); } public static void main(String[] args) throws Exception{ // System.out.println(DateFmt.getCountDate("2014-03-01 12:13:14", DateFmt.date_short)); System.out.println(parseDate("2014-05-02").after(parseDate("2014-05-01"))); } }
4、pom文件
引用
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test</groupId>
<artifactId>StormMavenProject</artifactId>
<packaging>jar</packaging>
<version>0.0.1-SNAPSHOT</version>
<name>StormMavenProject</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
<version>5.0.3</version>
</dependency>
<dependency>
<groupId>org.clojure</groupId>
<artifactId>clojure</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.6.6</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>minlog</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>reflectasm</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-rename-hack</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ring-cors</groupId>
<artifactId>ring-cors</artifactId>
<version>0.1.5</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
</dependency>
</dependencies>
<build>
<finalName>StormMavenProject</finalName>
</build>
</project>
5、测试结果
引用
DRPC RESULT: [["2014-01-07 2014-01-08","2014-01-07",3]]
DRPC RESULT: [["2014-01-07 2014-01-08","2014-01-07",3]]
DRPC RESULT: [["2014-01-07 2014-01-08","2014-01-07",3]]
发表评论
-
ITridentSpout、FirstN(取Top N)实现、 流合并和join
2017-05-25 10:01 1036一、ITridentSpout 基于事务 static int ... -
Trident API和概念
2017-05-23 10:57 752一、Trident API——Spout ITride ... -
Trident入门
2017-05-22 13:44 517英文原址:https://github.com/nathanm ... -
分布式远程调用drpc实例
2017-05-22 10:53 420一、DRPC定义 分布式dRPC(distributed RP ... -
不透明分区事务IOpaquePartitionedTransactional实例
2017-05-22 10:54 6821、spout public class MyOpaq ... -
分区事务IPartitionedTransactionalSpout实例
2017-05-21 11:02 5841.分区事务spout public class My ... -
普通事务ITransactionalSpout实例之按天统计数据
2017-05-20 16:56 4901、普通事务Spout /** * 普通事务Spou ... -
普通事务ITransactionalSpout实例
2017-05-20 15:45 8211、普通事务Spout /** * 普通事务Spou ... -
Storm事务API
2017-05-19 16:00 614Spout ITransactionalSpout<T& ... -
Storm批处理事务原理详解
2017-05-19 15:54 2109事务:Storm容错机制通 ... -
集群统一启动和停止shell脚本开发
2017-05-17 09:56 4531、cd 2、ls -al 显示隐藏目录 3、rm -rf ... -
storm高并发UV统计
2017-05-14 22:05 1138统计高并发UV可行的方案(类似WordCount的计算去重wo ... -
storm高并发PV统计,利用zookeeper锁输出汇总值
2017-05-14 14:42 899汇总型方案: 1、shuffleGrouping下,pv(单线 ... -
storm高并发PV统计
2017-04-16 17:54 689一、PV统计思考 方案需要考虑分析多线程下,注意线程安全问题。 ... -
Storm高并发运用WordSum
2017-04-16 14:21 10721、创建发射所有字符串统计总个数及去重个数处理类 pub ... -
storm分组策略介绍
2017-04-16 11:46 705一、storm数据来源 Spout的数据源: MQ:直接流数 ... -
Storm高并发介绍
2017-04-16 10:18 594并发度: worker:指的是component (spo ... -
Storm 字符统计Demo
2017-04-14 13:57 5351、数据源读取,字符发射spout类 /** * 字符 ... -
Storm 本地模式
2017-04-09 22:25 398本地模式,是在eclipse等编译器编写strom运行文件 ... -
Storm启动配置
2017-03-29 17:40 673一、安装Storm wget ...
相关推荐
总的来说,Storm Trident实战之计算网站PV是一个典型的实时数据处理应用场景,它展示了如何利用Trident的灵活性和强大功能,实时处理大量数据,实时计算出网站的页面浏览量,为运营决策提供实时数据支持。...
《Storm实战:构建大数据实时计算 》一共分为10章:第1章全面介绍了Storm的特性、能解决什么问题,以及和其他流计算系统的对比;第2章通过实际运行一个简单的例子,以及介绍本地环境和集群环境的搭建,让读者对...
### 基于Storm流计算天猫双十一作战室项目实战 #### 一、课程亮点与核心知识点 本课程针对Apache Storm这一强大的分布式实时计算系统进行了全方位的解析与实战演练。通过本课程的学习,不仅可以掌握Storm的基本...
Trident是一种构建在Twitter的开源分布式实时数据处理框架Storm之上的抽象层,它提供了高级的数据处理功能,特别适合大规模实时流数据的处理。在大数据领域,传统的Hadoop框架擅长批量数据处理,但不适用于实时需求...
Storm入门教程 之Storm原理和概念详解,出自Storm流计算从入门到精通之技术篇,Storm入门视频教程用到技术:Storm集群、Zookeeper集群等,涉及项目:网站PV、UV案例实战、其他案例; Storm视频教程亮点: 1、Storm...
**Storm Trident:分布式流处理框架详解** Storm Trident是Twitter开源的、基于Apache Storm的一个高级抽象,它提供了一种更强大且高效的方式来处理实时数据流。Trident的核心理念是将数据流划分为一系列的小批量...
### 大数据开发高级就业指导课程——Storm及Trident理论与实战 #### 一、Storm并发机制 在Storm中,为了提高数据处理的性能和效率,设计了一套完整的并发机制。这一机制涉及到Topology的组件配置、并发度设置等多...
"Substance"和"Trident"是两个专门用于美化Java Swing应用程序的库,它们为开发者提供了丰富的主题和动画效果,使得Java GUI应用程序能够拥有更加现代、吸引人的外观。 Substance库: 1. Substance是一个开源项目,...
《Storm实战:构建大数据实时计算》是一本深入探讨Apache Storm技术的专业书籍,旨在帮助读者理解和掌握如何使用Storm进行大数据实时处理。Apache Storm是一个开源的分布式实时计算系统,它能够处理无限的数据流,...
为了改善这一状况,开发者可以利用第三方库,如Substance和Trident,这两个库主要用于实现Java界面的皮肤更换功能,让GUI看起来更加现代和吸引人。在本文中,我们将深入探讨Substance和Trident这两个库,并学习如何...
Trident数据手册pdf,主要介绍Trident特点及技术规格,Trident 择是一款 3U 机架抽取式 KVM 切换器,配有一体化抽取式键盘和 3X17 英寸 LCD 显示屏,使用了高对比度的显示器(50:1),可以折叠放入 3U 机架内。
《Storm实战构建大数据实时计算》是一本专注于大数据领域实时处理技术的专著,主要围绕Apache Storm这一开源流处理系统展开。Storm被广泛应用于实时数据分析、在线机器学习、持续集成、实时网站仪表板等多个场景,其...
在"项目1-地区销售额-Trident代码开发一"中,初步建立了Trident拓扑结构,而在"项目1-地区销售额-Trident代码开发二"中,进一步完善了Trident的实现,包括计算订单数的TopN,这可以通过`stream.applyAssembly(new ...
建议选择4G或8G的小U盘,U盘的格式为FAT32,在U盘中新建一个T16的文件夹,将附件Trident8493_NVR.tar直接拷贝到T16文件夹中升级,插入U盘后点击系统维护升级。附件不要解压作直接拷贝。不要使用制作过U盘启动的U盘,...
java swing用户交互界面的美观开发工具包,便于界面开发。
《Storm实战构建大数据实时计算》是一本专注于大数据处理领域的专著,主要围绕开源分布式实时计算系统Apache Storm展开。Apache Storm是一个强大的工具,用于处理大规模的数据流处理,它以高吞吐量、容错性以及实时...
Trident API 提供了丰富的操作类型,以支持高效且可靠的实时计算。 一、Trident API 的核心概念 1. Stream:Stream 是 Trident 的基本数据结构,代表了一连串不断流入系统的事件。它们可以源自外部数据源,也可以...
《基于Storm流计算天猫双十一作战室项目实战》的课程涵盖了多个关键技术和实战环节,旨在让学员深入了解并熟练应用Storm流计算框架,同时强化其在大数据处理领域的综合能力。以下是课程内容的详细阐述: 1. **Storm...
【课程大纲】01.Storm项目实战课程大纲02.CDH5搭建之CM5安装部署03.CDH5搭建和CM界面化集群管理04.Hadoop、HBase、Zookeeper集群管理和角色分配05.Kafka基础知识和集群搭建06.Kafka基本操作和最优设置07.Kafka Java ...
三叉戟 Trident项目是新一代多线程,高性能和无尘Minecraft服务器的实现。最新发布的获得JAR方法一:自己构建如果您确定我们的分发形式有问题,或者您想要在获取JAR文件之前进行一些修改,则希望直接从源代码进行...