1.发布订阅的基本概念
1.发布订阅模式可以看成一对多的关系:多个订阅者对象同时监听一个主题对象,这个主题对象在自身状态发生变化时,会通知所有的订阅者对象,使他们能够自动的更新自己的状态。
2.发布订阅模式,可以让发布方和订阅方,独立封装,独立改变,当一个对象的改变,需要同时改变其他的对象,而且它不知道有多少个对象需要改变时,可以使用发布订阅模式
3.发布订阅模式在分布式系统的典型应用有, 配置管理和服务发现。
配置管理:是指如果集群中机器拥有某些相同的配置,并且这些配置信息需要动态的改变,我们可以使用发布订阅模式,对配置文件做统一的管理,让这些机器各 自订阅配置文件的改变,当配置文件发生改变的时候这些机器就会得到通知,把自己的配置文件更新为最新的配置
服务发现:是指对集群中的服务上下线做统一的管理,每个工作服务器都可以作为数据的发布方,向集群注册自己的基本信息,而让模型机器作为订阅方,订阅工 作服务器的基本信息,当工作服务器的基本信息发生改变时如上下线,服务器的角色和服务范围变更,监控服务器就会得到通知,并响应这些变化。
package com.zk.subscribe; import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.List; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer; public class SubscribeZkClient { //需要多少个workserver private static final int CLIENT_QTY = 5; private static final String ZOOKEEPER_SERVER = "192.168.30.164:2181,192.168.30.165:2181,192.168.30.166:2181"; //节点的路径 private static final String CONFIG_PATH = "/config";//配置节点 private static final String COMMAND_PATH = "/command";//命令节点 private static final String SERVERS_PATH = "/servers";//服务器列表节点 public static void main(String[] args) throws Exception { //用来存储所有的clients List<ZkClient> clients = new ArrayList<ZkClient>(); //用来存储所有的workservers List<WorkServer> workServers = new ArrayList<WorkServer>(); ManagerServer manageServer = null; try { ServerConfig initConfig = new ServerConfig(); initConfig.setDbPwd("123456"); initConfig.setDbUrl("jdbc:mysql://localhost:3306/mydb"); initConfig.setDbUser("root"); ZkClient clientManage = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new BytesPushThroughSerializer()); manageServer = new ManagerServer(SERVERS_PATH, COMMAND_PATH,CONFIG_PATH,clientManage,initConfig); manageServer.start(); //根据定义的work服务个数,创建服务器后注册,然后启动 for ( int i = 0; i < CLIENT_QTY; ++i ) { ZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new BytesPushThroughSerializer()); clients.add(client); ServerData serverData = new ServerData(); serverData.setId(i); serverData.setName("WorkServer#"+i); serverData.setAddress("192.168.1."+i); WorkServer workServer = new WorkServer(CONFIG_PATH, SERVERS_PATH, serverData, client, initConfig); workServers.add(workServer); workServer.start(); } System.out.println("敲回车键退出!\n"); new BufferedReader(new InputStreamReader(System.in)).readLine(); }finally{ //将workserver和client给关闭 System.out.println("Shutting down..."); for ( WorkServer workServer : workServers ) { try { workServer.stop(); } catch (Exception e) { e.printStackTrace(); } } for ( ZkClient client : clients ) { try { client.close(); } catch (Exception e) { e.printStackTrace(); } } } } }
package com.zk.subscribe; import java.util.List; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.exception.ZkNoNodeException; import org.I0Itec.zkclient.exception.ZkNodeExistsException; import com.alibaba.fastjson.JSON; public class ManagerServer { private String serversPath; private String commandPath; private String configPath; private ZkClient zkClient; private ServerConfig config; //用于监听zookeeper中servers节点的子节点列表变化 private IZkChildListener childListener; //用于监听zookeeper中command节点的数据变化 private IZkDataListener dataListener; //工作服务器的列表 private List<String> workServerList; /** * * @param serversPath * @param commandPath Zookeeper中存放命令的节点路径 * @param configPath * @param zkClient * @param config */ public ManagerServer(String serversPath, String commandPath,String configPath, ZkClient zkClient, ServerConfig config) { this.serversPath = serversPath; this.commandPath = commandPath; this.zkClient = zkClient; this.config = config; this.configPath = configPath; this.childListener = new IZkChildListener() { //用于监听zookeeper中servers节点的子节点列表变化 public void handleChildChange(String parentPath,List<String> currentChilds) throws Exception { //更新服务器列表 workServerList = currentChilds; System.out.println("work server list changed, new list is "); execList(); } }; //用于监听zookeeper中command节点的数据变化 this.dataListener = new IZkDataListener() { public void handleDataDeleted(String dataPath) throws Exception { } public void handleDataChange(String dataPath, Object data) throws Exception { String cmd = new String((byte[]) data); System.out.println("cmd:"+cmd); exeCmd(cmd); } }; } public void start() { initRunning(); } public void stop() { //取消订阅command节点数据变化和servers节点的列表变化 zkClient.unsubscribeChildChanges(serversPath, childListener); zkClient.unsubscribeDataChanges(commandPath, dataListener); } /** * 初始化 */ private void initRunning() { //执行订阅command节点数据变化和servers节点的列表变化 zkClient.subscribeDataChanges(commandPath, dataListener); zkClient.subscribeChildChanges(serversPath, childListener); } /* * 执行控制命令的函数 * 1: list 2: create 3: modify */ private void exeCmd(String cmdType) { if ("list".equals(cmdType)) { execList(); } else if ("create".equals(cmdType)) { execCreate(); } else if ("modify".equals(cmdType)) { execModify(); } else { System.out.println("error command!" + cmdType); } } private void execList() { System.out.println(workServerList.toString()); } private void execCreate() { if (!zkClient.exists(configPath)) { try { zkClient.createPersistent(configPath, JSON.toJSONString(config).getBytes()); } catch (ZkNodeExistsException e) { //节点已经存在异常,直接写入数据 zkClient.writeData(configPath, JSON.toJSONString(config).getBytes()); } catch (ZkNoNodeException e) { //表示其中的一个节点的父节点还没有被创建 String parentDir = configPath.substring(0,configPath.lastIndexOf('/')); zkClient.createPersistent(parentDir, true); execCreate(); } } } private void execModify() { config.setDbUser(config.getDbUser() + "_modify"); try { //回写到zookeeper中 zkClient.writeData(configPath, JSON.toJSONString(config).getBytes()); } catch (ZkNoNodeException e) { execCreate(); } } }
package com.zk.subscribe; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.exception.ZkNoNodeException; import org.I0Itec.zkclient.exception.ZkNodeExistsException; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; /** * 代表工作服务器 * workServer服务器的信息 * */ public class WorkServer{ private String serversPath; private String configPath; private ZkClient zkClient; private ServerConfig config; private ServerData serverData; private IZkDataListener dataListener;//数据监听器 /** * * @param configPath 代表config节点的路径 * @param serversPath 代表servers节点的路径 * @param serverData 代表当前服务器的基本信息 * @param zkClient 底层与zookeeper集群通信的组件 * @param initconfig 当前服务器的初始配置 */ public WorkServer(String configPath,String serversPath,ServerData serverData,ZkClient zkClient, ServerConfig initconfig){ this.configPath = configPath; this.serversPath = serversPath; this.serverData = serverData; this.zkClient = zkClient; this.config = initconfig; /** * dataListener 用于监听config节点的数据改变 */ this.dataListener = new IZkDataListener() { public void handleDataDeleted(String arg0) throws Exception { } /** * 当数据的值改变时处理的 * Object data,这个data是将ServerConfig对象转成json字符串存入 * 可以通过参数中的Object data 拿到当前数据节点最新的配置信息 * 拿到这个data信息后将它反序列化成ServerConfig对象,然后更新到自己的serverconfig属性中 */ public void handleDataChange(String dataPath, Object data) throws Exception { String retJson = new String((byte[])data); ServerConfig serverConfigLocal = (ServerConfig)JSON.parseObject(retJson,ServerConfig.class); //更新配置 updateConfig(serverConfigLocal); System.out.println("new work server config is:"+serverConfigLocal.toString()); } }; } /** * 服务的启动 */ public void start(){ System.out.println("work server start..."); initRunning(); } /** * 服务的停止 */ public void stop(){ System.out.println("work server stop..."); //取消监听 zkClient.unsubscribeDataChanges(configPath, dataListener); } /** * 服务器的初始化 */ private void initRunning(){ registMeToZookeeper(); //订阅config节点的改变 zkClient.subscribeDataChanges(configPath, dataListener); } /** * 启动时向zookeeper注册自己 */ private void registMeToZookeeper(){ //向zookeeper中注册自己的过程其实就是向servers节点下注册一个临时节点 //构造临时节点 String mePath = serversPath.concat("/").concat(serverData.getAddress()); try{ //存入是将json序列化 zkClient.createEphemeral(mePath, JSON.toJSONString(serverData).getBytes()); } catch (ZkNoNodeException e) { //父节点不存在 zkClient.createPersistent(serversPath, true); registMeToZookeeper(); } } /** * 当监听到zookeeper中config节点的配置信息改变时,要读取配置信息来更新自己的配置信息 */ private void updateConfig(ServerConfig serverConfig){ this.config = serverConfig; } }
package com.zk.subscribe; /** * 用于记录WorkServer(工作服务器)的配置信息 */ public class ServerConfig { private String dbUrl; private String dbPwd; private String dbUser; public String getDbUrl() { return dbUrl; } public void setDbUrl(String dbUrl) { this.dbUrl = dbUrl; } public String getDbPwd() { return dbPwd; } public void setDbPwd(String dbPwd) { this.dbPwd = dbPwd; } public String getDbUser() { return dbUser; } public void setDbUser(String dbUser) { this.dbUser = dbUser; } @Override public String toString() { return "ServerConfig [dbUrl=" + dbUrl + ", dbPwd=" + dbPwd + ", dbUser=" + dbUser + "]"; } }
package com.zk.subscribe; /** * 用于记录WorkServer(工作服务器)的基本信息 */ public class ServerData { private String address; private Integer id; private String name; public String getAddress() { return address; } public void setAddress(String address) { this.address = address; } public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { return "ServerData [address=" + address + ", id=" + id + ", name=" + name + "]"; } }
相关推荐
在这个基于Zookeeper集群环境的数据发布订阅代码中,我们将探讨Zookeeper如何实现发布订阅模式,以及与Linux和VMware的相关性。 首先,Zookeeper的发布订阅(Pub/Sub)模型允许应用程序创建主题(topics),并让...
1. Java API:Apache Zookeeper提供了丰富的Java API,使得开发者可以方便地在Java程序中与Zookeeper交互。例如,`org.apache.zookeeper.ZooKeeper` 类用于创建连接、读写数据以及设置watcher。 2. 客户端库:除了...
它的客户端API是开发者与ZooKeeper交互的主要接口,提供了丰富的功能,包括数据节点的创建、删除、更新、读取,以及监控事件的订阅等。本文将深入探讨ZooKeeper客户端API的使用方法和核心概念。 ### 1. 连接...
这些客户端允许开发者创建、删除和更新ZNode(ZooKeeper中的数据节点),并订阅ZNode变化事件。 **ZooKeeper的常用命令:** - `ls`:列出指定路径下的子节点。 - `get`:获取指定ZNode的数据。 - `set`:更新ZNode...
数据发布与订阅是一种常见的配置管理方式,即将数据发布到ZooKeeper的节点上,让订阅者能够动态地获取这些数据,实现配置信息的集中管理和动态更新。 **使用方法**: - **索引信息和集群状态**:将索引信息或集群中...
这些API使得开发人员能够方便地与Zookeeper交互,实现各种分布式协调功能。 七、Zookeeper应用 Zookeeper在分布式系统中的应用广泛,例如作为Hadoop的NameNode高可用解决方案,提供服务发现和配置管理,实现分布式...
在Zookeeper 3.4.10这个版本中,Java API提供了丰富的类和接口,使得开发者能够方便地实现诸如数据发布/订阅、命名服务、配置管理、分布式锁和分布式队列等功能。 首先,我们来看一下核心的`org.apache.zookeeper`...
6. **分布式同步**:通过Zookeeper的原子操作,可以实现跨机器的同步,例如发布/订阅模型。 7. **数据分片**:在大数据场景下,Zookeeper可以协助对数据进行分片,使得多台服务器能并行处理,提高处理速度。 学习...
数据发布与订阅(配置中心) Zookeeper可以用于发布和订阅配置信息,实现配置信息的集中管理和动态更新。具体应用场景包括: - **全局配置管理**:将全局配置信息存储在Zookeeper中,应用程序启动时主动获取,并...
6. **分布式事件通知**:发布订阅模式,用于广播事件或消息。 **ZooKeeper数据结构** ZooKeeper的数据结构类似于文件系统,由一系列的节点(称为ZNode)构成,形成一个层次化的命名空间。每个ZNode都可以存储数据,...
这个错误通常与动态链接库(DLL)的导入问题有关,尤其是在尝试调用ZooKeeper API函数时。ZooKeeper是一个分布式协调服务,广泛应用于分布式系统中,而`zookeeper_init`是其客户端的主要初始化函数。 要解决`__imp_...
5. **API使用**:Zookeeper提供了丰富的Java和C语言的API,开发者可以利用这些API实现分布式锁、发布订阅、队列等功能。 6. **安全性**:Zookeeper支持简单的认证和授权机制,如SASL(Simple Authentication and ...
5. **数据模型**:Zookeeper的数据模型是树形结构,每个节点(称为znode)可以存储数据,并且可以有子节点。每个znode都有唯一的路径表示,类似于文件系统的路径,方便管理和操作。 6. **复制与容错**:Zookeeper...
在具体使用zookeeper-3.4.5.jar时,开发者需要了解其提供的核心类和接口,如`org.apache.zookeeper.ZooKeeper`,它是与Zookeeper服务器进行交互的主要接口,可以用于读写ZNode、监听事件等。还有`org.apache....
Redis还提供了发布/订阅模式,可以实现简单的消息传递,进一步增强了其在微服务架构中的价值。 ZooKeeper的主要功能包括配置管理、命名服务、分布式同步和组服务。在分布式系统中,ZooKeeper可以用来存储和更新配置...
2. 观察者模式:Zookeeper支持观察者模式,客户端可以订阅感兴趣的节点变更,当节点数据变化时,服务器会主动推送通知。 3. 一致性保证:Zookeeper通过Paxos算法的变种ZAB协议实现分布式一致性,确保所有节点看到的...
Zookeeper允许发布/订阅模式的数据存储,可以用于配置管理。例如,应用的配置信息可以集中存储在Zookeeper上,各个服务节点通过订阅配置节点,实时获取配置更新。 2.2 名称服务 Zookeeper可以作为分布式环境中的...
Zookeeper的基础API包括创建、删除、更新和读取Znode(Zookeeper中的数据节点)。Znodes有临时和永久两种模式,分别对应于会话生命周期内的存在和持久化存在。此外,Zookeeper还支持监视器和通知机制,允许客户端...
- **Zab协议**:一种基于主备模式的分布式原子广播协议,确保了集群中所有服务器的数据一致性。 4. **ZooKeeper的应用场景** - **HBase**:使用ZooKeeper进行元数据管理和协调。 - **Kafka**:ZooKeeper负责管理...
5. 分布式同步:通过Watch机制,Zookeeper可以在节点数据变更时通知订阅者,实现数据的同步。 二、Zookeeper 3.4.14版本特性 1. 性能优化:3.4.14版本对Zookeeper的性能进行了优化,包括读写速度、网络通信效率等,...