`

storm高并发PV统计,利用zookeeper锁输出汇总值

 
阅读更多
汇总型方案:
1、shuffleGrouping下,pv(单线程结果) * Executer并发数
一个Executer默认一个task,如果设置Task数大于1,公式应该是:
pv(单线程结果) * Task 数 ,
同一个Executer下task的线程ID相同,taskId不同

优点:简单、计算量小
缺点:稍有误差,但绝大多数场景能接受

优化:
案例PVBolt中每个Task都会输出一个汇总值,实际只需要一个Task输出汇总值,
利用Zookeeper锁来做到只一个Task输出汇总值,而且每5S输出一次


1、pom.xml增加zk
引用
pom.xml中增加ZK:
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
</dependency>

引用

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.test</groupId>
  <artifactId>StormMavenProject</artifactId>
  <packaging>jar</packaging>
  <version>0.0.1-SNAPSHOT</version>
  <name>StormMavenProject</name>
  <url>http://maven.apache.org</url>
  <dependencies>
   
   <dependency>
    <groupId>org.ow2.asm</groupId>
    <artifactId>asm</artifactId>
    <version>5.0.3</version>
   </dependency>
<dependency>
    <groupId>org.clojure</groupId>
    <artifactId>clojure</artifactId>
    <version>1.7.0</version>
</dependency>
<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.3.2</version>
</dependency>
<dependency>
    <groupId>com.esotericsoftware</groupId>
    <artifactId>kryo</artifactId>
    <version>3.0.3</version>
</dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-api</artifactId>
    <version>2.8</version>
</dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-core</artifactId>
    <version>2.8</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>log4j-over-slf4j</artifactId>
    <version>1.6.6</version>
</dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-slf4j-impl</artifactId>
    <version>2.8</version>
</dependency>
<dependency>
    <groupId>com.esotericsoftware</groupId>
    <artifactId>minlog</artifactId>
    <version>1.3.0</version>
</dependency>
<dependency>
    <groupId>org.objenesis</groupId>
    <artifactId>objenesis</artifactId>
    <version>2.1</version>
</dependency>
<dependency>
    <groupId>com.esotericsoftware</groupId>
    <artifactId>reflectasm</artifactId>
    <version>1.10.1</version>
</dependency>

<dependency>
    <groupId>javax.servlet</groupId>
    <artifactId>servlet-api</artifactId>
    <version>2.5</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.21</version>
</dependency>
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-rename-hack</artifactId>
    <version>1.1.0</version>
</dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>

<dependency>
    <groupId>ring-cors</groupId>
    <artifactId>ring-cors</artifactId>
    <version>0.1.5</version>
</dependency>


<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.10</version>
</dependency>



  </dependencies>
  <build>
    <finalName>StormMavenProject</finalName>
  </build>
</project>


2、创建队列数据源

public class SourceSpout implements IRichSpout{

	/**
	 * 数据源Spout
	 */
	private static final long serialVersionUID = 1L;
	
	Queue<String> queue = new ConcurrentLinkedQueue<String>();
	
	SpoutOutputCollector collector = null;
	
	String str = null;

	public void nextTuple() {
		if (queue.size() >= 0) {
			collector.emit(new Values(queue.poll()));
		}
		try {
			Thread.sleep(500) ;
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	
	}
	
	public void open(Map conf, TopologyContext context,
			SpoutOutputCollector collector) {
		try {
			this.collector = collector;
			
			Random random = new Random();
			String[] hosts = { "www.taobao.com" };
			String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7",
					"CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" };
			String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53", 
					"2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" };
			
			for (int i = 0; i < 20; i++) {
				queue.add(hosts[0]+"\t"+session_id[random.nextInt(5)]+"\t"+time[random.nextInt(8)]);
			}
			
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	public void close() {
		// TODO Auto-generated method stub
	}
	
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// TODO Auto-generated method stub
		declarer.declare(new Fields("log"));
	}

	
	public Map<String, Object> getComponentConfiguration() {
		// TODO Auto-generated method stub
		return null;
	}
	
	public void ack(Object msgId) {
		// TODO Auto-generated method stub
		System.out.println("spout ack:"+msgId.toString());
	}

	
	public void activate() {
		// TODO Auto-generated method stub
		
	}



	
	public void deactivate() {
		// TODO Auto-generated method stub
		
	}

	
	public void fail(Object msgId) {
		// TODO Auto-generated method stub
		System.out.println("spout fail:"+msgId.toString());
	}

}


3、编写bolt

public class PVBolt implements IRichBolt{

	/**
	 * zookeeper写入某个线程的id到zookeeper目录,bolt在输出pv时,通过对比线程id确定是否输出。
	 */
	private static final long serialVersionUID = 1L;

	public static final String zk_path = "/lock/storm/pv";
	
	
	public void cleanup() {
		try {
			zKeeper.close();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	String logString = null;
	String lockData = null;
	String session_id = null;
	ZooKeeper zKeeper = null;
	
	long Pv = 0;
	long beginTime = System.currentTimeMillis() ;
	long endTime = 0;
	
	
	public void execute(Tuple input) {
		try {
			endTime = System.currentTimeMillis() ;
			logString = input.getString(0);
			if (logString != null) {
				session_id = logString.split("\t")[1];
				if (session_id != null) {
					Pv ++ ;
				}
			}
			//5秒钟输出一次
			if (endTime - beginTime >= 5 * 1000) {
				System.err.println(lockData+" ======================== ");
				if (lockData.equals(new String(zKeeper.getData(zk_path, false, null)))) {
					System.err.println("pv ======================== "+ Pv * 4);
				}
				beginTime = System.currentTimeMillis() ;
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
		
	}

	
	public void prepare(Map stormConf, TopologyContext context,
			OutputCollector collector) {
		try {
			//创建zookeeper对象,三个参数:zookeper节点,超时,监听
			zKeeper = new ZooKeeper("192.168.1.201:2181,192.168.1.202:2181",3000,new Watcher(){
				
				public void process(WatchedEvent event) {
					System.out.println("event:"+event.getType());
				}
			});
			
			//判断zookeeper是否连上,如果没连上,进入sleep状态
			while (zKeeper.getState() != ZooKeeper.States.CONNECTED) {
				Thread.sleep(1000);
			}
			
			InetAddress address = InetAddress.getLocalHost();
			lockData = address.getHostAddress() + ":" +context.getThisTaskId() ;
			System.err.println(lockData+"++++++++++++++++++++++++++++");
			//将线程ID写入zookeeper临时目录。
			if(zKeeper.exists(zk_path, false) == null)
			{
				zKeeper.create(zk_path, lockData.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
			}
			
		} catch (Exception e) {
			try {
				zKeeper.close();
			} catch (InterruptedException e1) {
				e1.printStackTrace();
			}
		}
		
	}

	
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// TODO Auto-generated method stub
		
	}

	
	public Map<String, Object> getComponentConfiguration() {
		// TODO Auto-generated method stub
		return null;
	}

}



4、编写topoloy类


public class PvTopo {

	public static void main(String[] args) {

		TopologyBuilder builder = new TopologyBuilder();

		builder.setSpout("spout", new SourceSpout(), 1);
		
		builder.setBolt("bolt", new PVBolt(),4).shuffleGrouping("spout");
		
		Map conf = new HashMap();
		conf.put(Config.TOPOLOGY_WORKERS, 4);

		if (args.length > 0) {
			try {
					StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
			} catch (AlreadyAliveException e) {
				e.printStackTrace();
			} catch (InvalidTopologyException e) {
				e.printStackTrace();
			}catch (AuthorizationException e) {
				e.printStackTrace();
			}
		}else {
			LocalCluster localCluster = new LocalCluster();
			localCluster.submitTopology("mytopology", conf, builder.createTopology());
		}
		
	}

}


5、启动zookeeper、storm,master(superviser)主机上创建zookeeper临时路径
引用

进入zk目录
zkCli.sh -server localhost:2181
创建文件夹
create /lock "“
create /lock/storm “”
ls  /lock
get /lock/storm/pv



6、运行PvTopo结果





  • 大小: 44.3 KB
  • 大小: 194 KB
分享到:
评论

相关推荐

    Storm项目实战 之案例优化引入Zookeeper锁控制线程操作

    Storm项目实战 之案例优化引入Zookeeper锁控制线程操作,出自Storm流计算从入门到精通之技术篇,Storm入门视频教程用到技术:Storm集群、Zookeeper集群等,涉及项目:网站PV、UV案例实战、其他案例 学习此课程需要...

    netty-redis-zookeeper高并发实战学习-netty-redis-zookeeper.zip

    在学习"Netty-Redis-Zookeeper高并发实战"的过程中,你可能会涉及到以下知识点: 1. Netty的基本概念和原理,包括其事件循环模型、ByteBuf的使用以及自定义编解码器的实现。 2. Redis的数据结构及其应用场景,如...

    Storm1.2.2+Zookeeper3.4.14.zip

    标题“Storm1.2.2+Zookeeper3.4.14.zip”指的是一个包含Apache Storm 1.2.2版本和Zookeeper 3.4.14版本的压缩文件,这两个组件是大数据处理领域的重要工具。Apache Storm是一个实时计算系统,而Zookeeper则是一个...

    【书籍学习】Netty、Redis、Zookeeper高并发实战-netty-redis-zookeeper.zip

    【书籍学习】Netty、Redis、Zookeeper高并发实战-netty-redis-zookeeper # netty-redis-zookeeper 【书籍学习】Netty、Redis、Zookeeper高并发实战

    zookeeper做分布式锁

    本文将深入探讨如何利用ZooKeeper来构建分布式锁,并讨论其背后的关键概念和技术细节。 **1. ZooKeeper概述** ZooKeeper是一个高可用、高性能的分布式协调服务,它提供了诸如命名服务、配置管理、分布式同步、组...

    使用ZooKeeper实现分布式锁

    这里,我们将深入探讨如何利用ZooKeeper这一强大的分布式协调服务来实现分布式锁,以解决订单编号的唯一性问题。 ZooKeeper是由Apache Hadoop项目孵化的开源项目,它提供了一个高可用、高性能的分布式协调服务。其...

    基于SpringBoot+Zookeeper+Dubbo打造分布式高并发商品秒杀系统.zip

    基于SpringBoot+Zookeeper+Dubbo打造分布式高并发商品秒杀系统 基于SpringBoot+Zookeeper+Dubbo打造分布式高并发商品秒杀系统 基于SpringBoot+Zookeeper+Dubbo打造分布式高并发商品秒杀系统 基于SpringBoot+...

    SpringBoot+Zookeeper+Dubbo打造分布式高并发商品秒杀系统.zip

    SpringBoot+Zookeeper+Dubbo打造分布式高并发商品秒杀系统.zipSpringBoot+Zookeeper+Dubbo打造分布式高并发商品秒杀系统.zipSpringBoot+Zookeeper+Dubbo打造分布式高并发商品秒杀系统.zipSpringBoot+Zookeeper+Dubbo...

    秒杀系统企业级实战应用(真实工业界案例)87 秒杀系统高并发之zookeeper分布式锁

    秒杀系统企业级实战应用(真实工业界案例)87 秒杀系统高并发之zookeeper分布式锁

    利用zookeeper统计管理配置文件

    Zookeeper具有高可用性、高性能等特点,它能够帮助开发者管理分布式环境下的共享资源。本文介绍了如何利用Zookeeper来统计和管理配置文件。 首先,配置文件的管理是现代应用中非常重要的一个方面。应用需要根据不同...

    基于zookeeper的分布式锁简单实现

    这时,Zookeeper,一个高可用的分布式协调服务,常被用来实现分布式锁。 Zookeeper由Apache基金会开发,它提供了一种可靠的分布式一致性服务,包括命名服务、配置管理、集群同步、领导者选举等功能。Zookeeper基于...

    zookeeper+storm安装包

    在实际使用中,Storm通常与Zookeeper配合,利用Zookeeper进行任务调度和故障恢复。 在安装这两个组件时,首先确保你有一个兼容的JDK环境,这里指定的是1.8.0_171版本。JDK是Java Development Kit的缩写,它是开发和...

    storm 流式处理 安装软件(包括zookeeper,jzmq,zeroMQ,storm)

    Storm作为Apache软件基金会的顶级项目,是一个分布式、容错的实时计算系统,能够处理无界数据流,确保每个消息至少被处理一次,从而提供高可靠性的保障。 **Zookeeper在Storm中的角色** Zookeeper是一个分布式的,...

    基于zookeeper的分布式锁实现demo

    Zookeeper是Apache的一个开源项目,它为分布式应用提供了高可用性、顺序保证以及集群间同步等特性,它能够很好地帮助实现分布式锁。 **Zookeeper分布式锁的关键特性包括:** 1. **顺序一致性:** Zookeeper中的节点...

    zookeeper分布式锁实例源码

    在这个场景下,我们将关注ZooKeeper如何实现分布式锁,特别是不可重入锁、可重入锁以及可重入读写锁的概念与实践。 首先,我们要理解什么是分布式锁。在多节点并发访问共享资源时,分布式锁能确保同一时刻只有一个...

    storm,kafka,zookeeper jar包

    - 如何利用 Storm 和 Kafka 的 API 实现数据的实时摄入和处理,以及 ZooKeeper 在这个过程中的协调作用。 理解并掌握这些技术,对于构建高效、可靠的实时大数据处理系统至关重要。这些 jar 包的提供,为开发人员...

    zookeeper的分布式全局锁纯代码解决方案

    总的来说,这个"zookeeper的分布式全局锁纯代码解决方案"是一个很好的学习和实践分布式锁的起点,它帮助我们理解Zookeeper在分布式系统中的作用,以及如何利用它来解决并发访问的问题。在深入研究和理解这些代码后,...

    基于zookeeper实现的分布式读写锁

    本文将深入探讨基于Zookeeper实现的分布式读写锁,并利用Zkclient客户端进行操作。Zookeeper是一个分布式服务协调框架,它提供了一种简单且高效的方式来实现分布式锁。 **一、Zookeeper简介** Zookeeper是由Apache...

Global site tag (gtag.js) - Google Analytics