- 浏览: 57360 次
- 性别:
- 来自: 北京
文章分类
最新评论
汇总型方案:
1、shuffleGrouping下,pv(单线程结果) * Executer并发数
一个Executer默认一个task,如果设置Task数大于1,公式应该是:
pv(单线程结果) * Task 数 ,
同一个Executer下task的线程ID相同,taskId不同
优点:简单、计算量小
缺点:稍有误差,但绝大多数场景能接受
优化:
案例PVBolt中每个Task都会输出一个汇总值,实际只需要一个Task输出汇总值,
利用Zookeeper锁来做到只一个Task输出汇总值,而且每5S输出一次
1、pom.xml增加zk
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test</groupId>
<artifactId>StormMavenProject</artifactId>
<packaging>jar</packaging>
<version>0.0.1-SNAPSHOT</version>
<name>StormMavenProject</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
<version>5.0.3</version>
</dependency>
<dependency>
<groupId>org.clojure</groupId>
<artifactId>clojure</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.6.6</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>minlog</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>reflectasm</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-rename-hack</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ring-cors</groupId>
<artifactId>ring-cors</artifactId>
<version>0.1.5</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
</dependency>
</dependencies>
<build>
<finalName>StormMavenProject</finalName>
</build>
</project>
2、创建队列数据源
3、编写bolt
4、编写topoloy类
5、启动zookeeper、storm,master(superviser)主机上创建zookeeper临时路径
进入zk目录
zkCli.sh -server localhost:2181
创建文件夹
create /lock "“
create /lock/storm “”
ls /lock
get /lock/storm/pv
6、运行PvTopo结果
1、shuffleGrouping下,pv(单线程结果) * Executer并发数
一个Executer默认一个task,如果设置Task数大于1,公式应该是:
pv(单线程结果) * Task 数 ,
同一个Executer下task的线程ID相同,taskId不同
优点:简单、计算量小
缺点:稍有误差,但绝大多数场景能接受
优化:
案例PVBolt中每个Task都会输出一个汇总值,实际只需要一个Task输出汇总值,
利用Zookeeper锁来做到只一个Task输出汇总值,而且每5S输出一次
1、pom.xml增加zk
引用
pom.xml中增加ZK:
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
</dependency>
引用
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test</groupId>
<artifactId>StormMavenProject</artifactId>
<packaging>jar</packaging>
<version>0.0.1-SNAPSHOT</version>
<name>StormMavenProject</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
<version>5.0.3</version>
</dependency>
<dependency>
<groupId>org.clojure</groupId>
<artifactId>clojure</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.6.6</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>minlog</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>reflectasm</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-rename-hack</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ring-cors</groupId>
<artifactId>ring-cors</artifactId>
<version>0.1.5</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
</dependency>
</dependencies>
<build>
<finalName>StormMavenProject</finalName>
</build>
</project>
2、创建队列数据源
public class SourceSpout implements IRichSpout{ /** * 数据源Spout */ private static final long serialVersionUID = 1L; Queue<String> queue = new ConcurrentLinkedQueue<String>(); SpoutOutputCollector collector = null; String str = null; public void nextTuple() { if (queue.size() >= 0) { collector.emit(new Values(queue.poll())); } try { Thread.sleep(500) ; } catch (InterruptedException e) { e.printStackTrace(); } } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { try { this.collector = collector; Random random = new Random(); String[] hosts = { "www.taobao.com" }; String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7", "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" }; String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53", "2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" }; for (int i = 0; i < 20; i++) { queue.add(hosts[0]+"\t"+session_id[random.nextInt(5)]+"\t"+time[random.nextInt(8)]); } } catch (Exception e) { e.printStackTrace(); } } public void close() { // TODO Auto-generated method stub } public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("log")); } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } public void ack(Object msgId) { // TODO Auto-generated method stub System.out.println("spout ack:"+msgId.toString()); } public void activate() { // TODO Auto-generated method stub } public void deactivate() { // TODO Auto-generated method stub } public void fail(Object msgId) { // TODO Auto-generated method stub System.out.println("spout fail:"+msgId.toString()); } }
3、编写bolt
public class PVBolt implements IRichBolt{ /** * zookeeper写入某个线程的id到zookeeper目录,bolt在输出pv时,通过对比线程id确定是否输出。 */ private static final long serialVersionUID = 1L; public static final String zk_path = "/lock/storm/pv"; public void cleanup() { try { zKeeper.close(); } catch (InterruptedException e) { e.printStackTrace(); } } String logString = null; String lockData = null; String session_id = null; ZooKeeper zKeeper = null; long Pv = 0; long beginTime = System.currentTimeMillis() ; long endTime = 0; public void execute(Tuple input) { try { endTime = System.currentTimeMillis() ; logString = input.getString(0); if (logString != null) { session_id = logString.split("\t")[1]; if (session_id != null) { Pv ++ ; } } //5秒钟输出一次 if (endTime - beginTime >= 5 * 1000) { System.err.println(lockData+" ======================== "); if (lockData.equals(new String(zKeeper.getData(zk_path, false, null)))) { System.err.println("pv ======================== "+ Pv * 4); } beginTime = System.currentTimeMillis() ; } } catch (Exception e) { e.printStackTrace(); } } public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { try { //创建zookeeper对象,三个参数:zookeper节点,超时,监听 zKeeper = new ZooKeeper("192.168.1.201:2181,192.168.1.202:2181",3000,new Watcher(){ public void process(WatchedEvent event) { System.out.println("event:"+event.getType()); } }); //判断zookeeper是否连上,如果没连上,进入sleep状态 while (zKeeper.getState() != ZooKeeper.States.CONNECTED) { Thread.sleep(1000); } InetAddress address = InetAddress.getLocalHost(); lockData = address.getHostAddress() + ":" +context.getThisTaskId() ; System.err.println(lockData+"++++++++++++++++++++++++++++"); //将线程ID写入zookeeper临时目录。 if(zKeeper.exists(zk_path, false) == null) { zKeeper.create(zk_path, lockData.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } } catch (Exception e) { try { zKeeper.close(); } catch (InterruptedException e1) { e1.printStackTrace(); } } } public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } }
4、编写topoloy类
public class PvTopo { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new SourceSpout(), 1); builder.setBolt("bolt", new PVBolt(),4).shuffleGrouping("spout"); Map conf = new HashMap(); conf.put(Config.TOPOLOGY_WORKERS, 4); if (args.length > 0) { try { StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); }catch (AuthorizationException e) { e.printStackTrace(); } }else { LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("mytopology", conf, builder.createTopology()); } } }
5、启动zookeeper、storm,master(superviser)主机上创建zookeeper临时路径
引用
进入zk目录
zkCli.sh -server localhost:2181
创建文件夹
create /lock "“
create /lock/storm “”
ls /lock
get /lock/storm/pv
6、运行PvTopo结果
发表评论
-
ITridentSpout、FirstN(取Top N)实现、 流合并和join
2017-05-25 10:01 1040一、ITridentSpout 基于事务 static int ... -
Trident实战之计算网站PV
2017-05-24 13:24 6561、Trident实战之计算网站PV /** * ... -
Trident API和概念
2017-05-23 10:57 759一、Trident API——Spout ITride ... -
Trident入门
2017-05-22 13:44 524英文原址:https://github.com/nathanm ... -
分布式远程调用drpc实例
2017-05-22 10:53 424一、DRPC定义 分布式dRPC(distributed RP ... -
不透明分区事务IOpaquePartitionedTransactional实例
2017-05-22 10:54 6821、spout public class MyOpaq ... -
分区事务IPartitionedTransactionalSpout实例
2017-05-21 11:02 5871.分区事务spout public class My ... -
普通事务ITransactionalSpout实例之按天统计数据
2017-05-20 16:56 4901、普通事务Spout /** * 普通事务Spou ... -
普通事务ITransactionalSpout实例
2017-05-20 15:45 8251、普通事务Spout /** * 普通事务Spou ... -
Storm事务API
2017-05-19 16:00 616Spout ITransactionalSpout<T& ... -
Storm批处理事务原理详解
2017-05-19 15:54 2117事务:Storm容错机制通 ... -
集群统一启动和停止shell脚本开发
2017-05-17 09:56 4581、cd 2、ls -al 显示隐藏目录 3、rm -rf ... -
storm高并发UV统计
2017-05-14 22:05 1143统计高并发UV可行的方案(类似WordCount的计算去重wo ... -
storm高并发PV统计
2017-04-16 17:54 698一、PV统计思考 方案需要考虑分析多线程下,注意线程安全问题。 ... -
Storm高并发运用WordSum
2017-04-16 14:21 10761、创建发射所有字符串统计总个数及去重个数处理类 pub ... -
storm分组策略介绍
2017-04-16 11:46 707一、storm数据来源 Spout的数据源: MQ:直接流数 ... -
Storm高并发介绍
2017-04-16 10:18 601并发度: worker:指的是component (spo ... -
Storm 字符统计Demo
2017-04-14 13:57 5391、数据源读取,字符发射spout类 /** * 字符 ... -
Storm 本地模式
2017-04-09 22:25 400本地模式,是在eclipse等编译器编写strom运行文件 ... -
Storm启动配置
2017-03-29 17:40 675一、安装Storm wget ...
相关推荐
Storm项目实战 之案例优化引入Zookeeper锁控制线程操作,出自Storm流计算从入门到精通之技术篇,Storm入门视频教程用到技术:Storm集群、Zookeeper集群等,涉及项目:网站PV、UV案例实战、其他案例 学习此课程需要...
标题“Storm1.2.2+Zookeeper3.4.14.zip”指的是一个包含Apache Storm 1.2.2版本和Zookeeper 3.4.14版本的压缩文件,这两个组件是大数据处理领域的重要工具。Apache Storm是一个实时计算系统,而Zookeeper则是一个...
在学习"Netty-Redis-Zookeeper高并发实战"的过程中,你可能会涉及到以下知识点: 1. Netty的基本概念和原理,包括其事件循环模型、ByteBuf的使用以及自定义编解码器的实现。 2. Redis的数据结构及其应用场景,如...
【书籍学习】Netty、Redis、Zookeeper高并发实战-netty-redis-zookeeper # netty-redis-zookeeper 【书籍学习】Netty、Redis、Zookeeper高并发实战
本文将深入探讨如何利用ZooKeeper来构建分布式锁,并讨论其背后的关键概念和技术细节。 **1. ZooKeeper概述** ZooKeeper是一个高可用、高性能的分布式协调服务,它提供了诸如命名服务、配置管理、分布式同步、组...
这里,我们将深入探讨如何利用ZooKeeper这一强大的分布式协调服务来实现分布式锁,以解决订单编号的唯一性问题。 ZooKeeper是由Apache Hadoop项目孵化的开源项目,它提供了一个高可用、高性能的分布式协调服务。其...
基于SpringBoot+Zookeeper+Dubbo打造分布式高并发商品秒杀系统 基于SpringBoot+Zookeeper+Dubbo打造分布式高并发商品秒杀系统 基于SpringBoot+Zookeeper+Dubbo打造分布式高并发商品秒杀系统 基于SpringBoot+...
SpringBoot+Zookeeper+Dubbo打造分布式高并发商品秒杀系统.zipSpringBoot+Zookeeper+Dubbo打造分布式高并发商品秒杀系统.zipSpringBoot+Zookeeper+Dubbo打造分布式高并发商品秒杀系统.zipSpringBoot+Zookeeper+Dubbo...
秒杀系统企业级实战应用(真实工业界案例)87 秒杀系统高并发之zookeeper分布式锁
Zookeeper具有高可用性、高性能等特点,它能够帮助开发者管理分布式环境下的共享资源。本文介绍了如何利用Zookeeper来统计和管理配置文件。 首先,配置文件的管理是现代应用中非常重要的一个方面。应用需要根据不同...
这时,Zookeeper,一个高可用的分布式协调服务,常被用来实现分布式锁。 Zookeeper由Apache基金会开发,它提供了一种可靠的分布式一致性服务,包括命名服务、配置管理、集群同步、领导者选举等功能。Zookeeper基于...
在实际使用中,Storm通常与Zookeeper配合,利用Zookeeper进行任务调度和故障恢复。 在安装这两个组件时,首先确保你有一个兼容的JDK环境,这里指定的是1.8.0_171版本。JDK是Java Development Kit的缩写,它是开发和...
Storm作为Apache软件基金会的顶级项目,是一个分布式、容错的实时计算系统,能够处理无界数据流,确保每个消息至少被处理一次,从而提供高可靠性的保障。 **Zookeeper在Storm中的角色** Zookeeper是一个分布式的,...
Zookeeper是Apache的一个开源项目,它为分布式应用提供了高可用性、顺序保证以及集群间同步等特性,它能够很好地帮助实现分布式锁。 **Zookeeper分布式锁的关键特性包括:** 1. **顺序一致性:** Zookeeper中的节点...
在这个场景下,我们将关注ZooKeeper如何实现分布式锁,特别是不可重入锁、可重入锁以及可重入读写锁的概念与实践。 首先,我们要理解什么是分布式锁。在多节点并发访问共享资源时,分布式锁能确保同一时刻只有一个...
- 如何利用 Storm 和 Kafka 的 API 实现数据的实时摄入和处理,以及 ZooKeeper 在这个过程中的协调作用。 理解并掌握这些技术,对于构建高效、可靠的实时大数据处理系统至关重要。这些 jar 包的提供,为开发人员...
总的来说,这个"zookeeper的分布式全局锁纯代码解决方案"是一个很好的学习和实践分布式锁的起点,它帮助我们理解Zookeeper在分布式系统中的作用,以及如何利用它来解决并发访问的问题。在深入研究和理解这些代码后,...
本文将深入探讨基于Zookeeper实现的分布式读写锁,并利用Zkclient客户端进行操作。Zookeeper是一个分布式服务协调框架,它提供了一种简单且高效的方式来实现分布式锁。 **一、Zookeeper简介** Zookeeper是由Apache...