为了提高应用的性能,我们准备实现分布式cache,所以我特别研究了oscache关于分布式实现的部分.
我们知道为了实现分布式环境下消息的通知,目前两种比较流行的做法是使用JavaGroups[http://www.jgroups.org]和JMS。这两种方式都在底层实现了广播发布消息。
由于JGroups可以提供可靠的广播通信.所以我们准备采用JGroups.
我自己写了一个JavaGroupBroadcastingManager.java类实现消息的管理(包括发送和接收),代码参考了oscache的相关代码,在其基础上进行了改进.
代码如下:
1、JavaGroupBroadcastingManager.java
package com.yz;
import com.opensymphony.oscache.base.FinalizationException;
import com.opensymphony.oscache.base.InitializationException;
import com.opensymphony.oscache.plugins.clustersupport.JavaGroupsBroadcastingListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jgroups.Address;
import org.jgroups.Channel;
import org.jgroups.blocks.NotificationBus;
import java.io.Serializable;
import java.util.Properties;
/**
* @author yangzheng
* @version $Revision$
* @since 2005-7-14
*/
public class JavaGroupBroadcastingManager
implements NotificationBus.Consumer {
private static final Log log = LogFactory.getLog(JavaGroupsBroadcastingListener.class);
private static final String BUS_NAME = "OSCacheBus";
private static final String CHANNEL_PROPERTIES = "cache.cluster.properties";
private static final String MULTICAST_IP_PROPERTY = "cache.cluster.multicast.ip";
private NotificationBus bus;
/**
* Initializes the broadcasting listener by starting up a JavaGroups notification
* bus instance to handle incoming and outgoing messages.
*
*/
public synchronized void initialize(Properties config) throws InitializationException {
String properties = config.getProperty(CHANNEL_PROPERTIES);
String multicastIP = config.getProperty(MULTICAST_IP_PROPERTY);
if (log.isInfoEnabled()) {
log.info("Starting a new JavaGroups broadcasting listener with properties="
+ properties);
}
try {
bus = new NotificationBus(BUS_NAME, properties);
bus.start();
bus.getChannel().setOpt(Channel.LOCAL, new Boolean(false));
bus.setConsumer(this);
log.info("JavaGroups clustering support started successfully");
} catch (Exception e) {
throw new InitializationException("Initialization failed: " + e);
}
}
/**
* Shuts down the JavaGroups being managed
*/
public synchronized void finialize() throws FinalizationException {
if (log.isInfoEnabled()) {
log.info("JavaGroups shutting down...");
}
bus.stop();
bus = null;
if (log.isInfoEnabled()) {
log.info("JavaGroups shutdown complete.");
}
}
/**
* Uses JavaGroups to broadcast the supplied notification message across the cluster.
*
*/
protected void sendNotification(Serializable message) {
bus.sendNotification(message);
}
/**
* Handles incoming notification messages from JavaGroups. This method should
* never be called directly.
*
*/
public void handleNotification(Serializable serializable) {
log.info("An cluster notification message received message " + serializable.toString()
+ "). Notification ignored.");
}
/**
* We are not using the caching, so we just return something that identifies
* us. This method should never be called directly.
*/
public Serializable getCache() {
return "JavaGroupsBroadcastingListener: " + bus.getLocalAddress();
}
/**
* A callback that is fired when a new member joins the cluster. This
* method should never be called directly.
*
* @param address The address of the member who just joined.
*/
public void memberJoined(Address address) {
if (log.isInfoEnabled()) {
log.info("A new member at address '" + address + "' has joined the cluster");
}
}
/**
* A callback that is fired when an existing member leaves the cluster.
* This method should never be called directly.
*
* @param address The address of the member who left.
*/
public void memberLeft(Address address) {
if (log.isInfoEnabled()) {
log.info("Member at address '" + address + "' left the cluster");
}
}
}
2、发送消息的程序:
package com.yz;
import java.io.FileInputStream;
import java.util.Properties;
/**
* @author yangzheng
* @version $Revision$
* @since 2005-7-14
*/
public class TestJavaGroupBroadcastSend {
public static void main(String[] args) throws Exception {
JavaGroupBroadcastingManager javaGroupBroadcastingManager = new JavaGroupBroadcastingManager();
Properties properties = new Properties();
properties.load(new FileInputStream("javagroup.properties"));
javaGroupBroadcastingManager.initialize(properties);
String message = "hello world!";
while (true) {
Thread.sleep(1000);
javaGroupBroadcastingManager.sendNotification(message);
}
}
}
3、接受消息的程序:
package com.yz;
import java.io.FileInputStream;
import java.util.Properties;
/**
* @author yangzheng
* @version $Revision$
* @since 2005-7-14
*/
public class TestJavaGroupBroadcastReceive {
public static void main(String[] args) throws Exception {
JavaGroupBroadcastingManager javaGroupBroadcastingManager = new JavaGroupBroadcastingManager();
Properties properties = new Properties();
properties.load(new FileInputStream("javagroup.properties"));
javaGroupBroadcastingManager.initialize(properties);
Thread.sleep(100000000);
}
}
4、配置文件:(基本上不用改动)
javagroup.properties
cache.cluster.properties=UDP(mcast_addr=231.12.21.132;mcast_port=45566;ip_ttl=32;\
mcast_send_buf_size=150000;mcast_recv_buf_size=80000):\
PING(timeout=2000;num_initial_members=3):\
MERGE2(min_interval=5000;max_interval=10000):\
FD_SOCK:VERIFY_SUSPECT(timeout=1500):\
pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):\
UNICAST(timeout=300,600,1200,2400):\
pbcast.STABLE(desired_avg_gossip=20000):\
FRAG(frag_size=8096;down_thread=false;up_thread=false):\
pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)
cache.cluster.multicast.ip=231.12.21.132
5、所需要的jar包
commons-logging-1.0.4.jar
jgroups-2.2.8.jar concurrent.jar 属于jgroups的包
6、说明:
1、发送消息和接受消息的程序都需要调用JavaGroupBroadcastingManager.initialize()方法初始化jgroup。
2、运行环境的多台服务器要在同一个局域网内,同时hosts中不要将127.0.0.1写入,以便jgroup获得本机的ip,而不是获得127.0.0.1
7、程序运行的结果:
接受端:
Jul 14, 2005 1:29:09 PM com.yz.JavaGroupBroadcastingManager initialize
INFO: Starting a new JavaGroups broadcasting listener with properties=UDP(mcast_addr=231.12.21.132;mcast_port=45566;ip_ttl=32;mcast_send_buf_size=150000;mcast_recv_buf_size=80000):PING(timeout=2000;num_initial_members=3):MERGE2(min_interval=5000;max_interval=10000):FD_SOCK:VERIFY_SUSPECT(timeout=1500):pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):UNICAST(timeout=300,600,1200,2400):pbcast.STABLE(desired_avg_gossip=20000):FRAG(frag_size=8096;down_thread=false;up_thread=false):pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)
Jul 14, 2005 1:29:12 PM org.jgroups.protocols.UDP createSockets
INFO: sockets will use interface 10.0.99.99
Jul 14, 2005 1:29:12 PM org.jgroups.protocols.UDP createSockets
INFO: socket information:
local_addr=10.0.99.99:33637, mcast_addr=231.12.21.132:45566, bind_addr=/10.0.99.99, ttl=32
sock: bound to 10.0.99.99:33637, receive buffer size=64000, send buffer size=32000
mcast_recv_sock: bound to 10.0.99.99:45566, send buffer size=131071, receive buffer size=80000
mcast_send_sock: bound to 10.0.99.99:33638, send buffer size=131071, receive buffer size=80000
-------------------------------------------------------
GMS: address is 10.0.99.99:33637
-------------------------------------------------------
Jul 14, 2005 1:29:14 PM com.yz.JavaGroupBroadcastingManager initialize
INFO: JavaGroups clustering support started successfully
Jul 14, 2005 1:29:14 PM com.yz.JavaGroupBroadcastingManager memberJoined
INFO: A new member at address '10.0.99.99:33617' has joined the cluster
Jul 14, 2005 1:29:14 PM com.yz.JavaGroupBroadcastingManager memberJoined
INFO: A new member at address '10.0.99.99:33637' has joined the cluster
Jul 14, 2005 1:30:24 PM com.yz.JavaGroupBroadcastingManager memberJoined
INFO: A new member at address '10.0.99.98:33648' has joined the cluster // 监控到发送端服务器加入cluster
Jul 14, 2005 1:30:25 PM com.yz.JavaGroupBroadcastingManager handleNotification //接受到消息
INFO: An cluster notification message received message hello world!). Notification ignored.
发送端
Jul 14, 2005 1:20:15 PM com.yz.JavaGroupBroadcastingManager initialize
INFO: Starting a new JavaGroups broadcasting listener with properties=UDP(mcast_addr=231.12.21.132;mcast_port=45566;ip_ttl=32;mcast_send_buf_size=150000;mcast_recv_buf_size=80000):PING(timeout=2000;num_initial_members=3):MERGE2(min_interval=5000;max_interval=10000):FD_SOCK:VERIFY_SUSPECT(timeout=1500):pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):UNICAST(timeout=300,600,1200,2400):pbcast.STABLE(desired_avg_gossip=20000):FRAG(frag_size=8096;down_thread=false;up_thread=false):pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)
Jul 14, 2005 1:20:16 PM org.jgroups.protocols.UDP createSockets
INFO: sockets will use interface 10.0.99.98
Jul 14, 2005 1:20:16 PM org.jgroups.protocols.UDP createSockets
INFO: socket information:
local_addr=10.0.99.98:33648, mcast_addr=231.12.21.132:45566, bind_addr=/10.0.99.98, ttl=32
sock: bound to 10.0.99.98:33648, receive buffer size=64000, send buffer size=32000
mcast_recv_sock: bound to 10.0.99.98:45566, send buffer size=131071, receive buffer size=80000
mcast_send_sock: bound to 10.0.99.98:33649, send buffer size=131071, receive buffer size=80000
-------------------------------------------------------
GMS: address is 10.0.99.98:33648
-------------------------------------------------------
Jul 14, 2005 1:20:18 PM com.yz.JavaGroupBroadcastingManager initialize
INFO: JavaGroups clustering support started successfully
Jul 14, 2005 1:20:18 PM com.yz.JavaGroupBroadcastingManager memberJoined
INFO: A new member at address '10.0.99.99:33617' has joined the cluster
Jul 14, 2005 1:20:18 PM com.yz.JavaGroupBroadcastingManager memberJoined
INFO: A new member at address '10.0.99.99:33637' has joined the cluster
Jul 14, 2005 1:20:18 PM com.yz.JavaGroupBroadcastingManager memberJoined
INFO: A new member at address '10.0.99.98:33648' has joined the cluster // 监控到接受端服务器加入cluster
Jul 14, 2005 1:20:27 PM com.yz.JavaGroupBroadcastingManager memberLeft
INFO: Member at address '10.0.99.99:33637' left the cluster // 监控到接受端服务器的程序退出
现在程序已经可以正常运行,有了这个基础,分布式cache的实现指日可待.
分享到:
相关推荐
通过消息传递和消息排队模型,消息中间件可以在分布式环境下扩展进程间的通信,支持应用程序或组件之间进行可靠的异步通信,从而降低系统间的耦合度,提高系统的可扩展性和可用性。 在分布式系统中,消息中间件的...
- **队列模式(点对点)**:这种模式类似于Redis中的List结构,通过“rpush”和“lpop”操作来实现消息的发送和接收。但是,它无法满足某些场景下消息需要被多个消费者同时消费的需求。 - **发布/订阅模式(PUB/SUB...
在这个"ActiveMQ实例---分布式发送邮件"的案例中,我们将探讨如何利用ActiveMQ实现分布式环境下的邮件发送功能。 首先,让我们了解一下ActiveMQ的基本概念。ActiveMQ是一个实现了多种消息协议(如OpenWire、STOMP、...
5. 编写消息发送接口:为了将消息发送逻辑封装起来,可以创建一个服务类,提供发送消息的方法,这些方法可以根据需要接受不同的参数,比如消息主题和消息内容。 6. 发送消息业务类:在业务层,调用上述接口发送消息...
在实现分布式爬虫的过程中,需要考虑如何高效地利用多台服务器的资源。这时,可以采用Scrapyd这样的工具来部署和管理分布式爬虫,它允许在多台服务器上并发执行爬虫任务,显著提高了数据抓取的效率。 Python是实现...
发布订阅消息通信技术结合公共信息模型,可以实现分布式监控系统的主动消息交互。文中提出的这种方法通过设计实时交互接口,使用JMS(Java Message Service)消息服务器的主题和发布订阅传输交互模式,有效地提高了...
本篇文章将重点讲解如何使用ZookeeperNet库在C#环境下实现分布式锁。 **Zookeeper基础知识** Zookeeper是由Apache Hadoop项目孵化出来的,它基于观察者模式设计,以树形结构存储数据。其主要特性包括: 1. **强...
XIFF(eXtensible Internet Framework)是Flex中一个用于处理XMPP协议的库,它允许开发者在Flex应用程序中实现XMPP功能,比如创建用户会话、发送和接收消息、管理用户状态等。XIFF为Flex提供了与XMPP服务器交互的...
理解如何在服务器端创建WebSocket端点,以及在客户端建立连接和发送/接收消息是核心技能。此外,考虑到分布式环境,可能还需要了解如何在多个服务器之间共享会话状态,以确保用户连接的平滑迁移。 最后,地图找房...
由此,基于软件化的手段实现分布式环境下多点间的语音通信成为研究和开发的热点。 2. 分布式语音通信系统的架构设计 为了克服上述问题,采用了客户机/服务器(Client/Server,简称C/S)架构来构建语音通信系统。在...
该中间件的核心功能是消息传输,包括消息的发送、接收和处理。它还具备动态负载伸缩能力,允许在系统运行时增加或减少业务计算节点,以应对不同的业务需求。 3. 分布式中间件系统的设计理念 分布式中间件系统的设计...
分布式消息机制是消息通信在分布式环境中的延伸,它在多台机器之间传递消息,以实现任务的分布式处理。这样的机制通常包括以下几个关键组件: 1. **消息队列**:存储待处理消息的地方,起到缓冲作用,防止生产者和...
6. **分布式处理**:在分布式环境下,SQL Server可以通过分布式查询、分布式事务处理等特性,跨越多个服务器或数据库执行操作,实现数据的整合和处理。 分布式环境下的SQL Server编程涉及网络通信、并发控制、数据...
在Linux操作系统中,设计和开发一个声音播放控制原型系统(DSPC)在分布式环境下尤其重要,这主要针对需要精确控制声音播放的科学实验,如医学、声学和心理学实验。Fedora Core 3作为Linux的一个发行版,被选为开发...
综上所述,J2EE在分布式环境下的底层结构是一个复杂但协调的体系,结合了多种技术和服务,以实现高效、可靠的分布式应用开发。通过理解这些核心概念,开发者能够构建出适应大规模、高并发场景的企业级解决方案。
《RabbitMQ高效部署分布式消息队列实战篇》是一份深度解析RabbitMQ技术的教程,结合实际应用案例,旨在帮助读者深入理解并熟练掌握如何在分布式环境中高效部署和运用消息队列。RabbitMQ作为业界广泛采用的消息中间件...
传统的ACID(原子性、一致性、隔离性和持久性)事务在分布式环境中难以实现,因为它们可能导致性能下降或者锁竞争问题。为了解决这一问题,我们可以采用“最终一致性”策略,即允许在一段时间内数据存在短暂不一致,...
1. **脑裂问题**:在分布式环境中,可能会遇到所谓的“脑裂”问题,即系统状态分裂,导致服务不可用。需要加强监控和预警机制。 2. **消费线程数问题**:合理设置消费线程的数量是非常重要的。过多可能导致无法达到...
综上所述,这个项目展示了如何利用JavaScript和消息队列来实现分布式环境下的服务器热切换,提供了一种高可用和可扩展的解决方案。通过学习和实践这个demo,开发者可以更好地理解和掌握这些关键技术和概念。
总的来说,理解和掌握SQL Server在分布式环境下的编程技巧对于构建健壮、高效的数据驱动应用至关重要,特别是在需要处理大量数据、高并发和地理分布式需求的场景下。通过充分利用SQL Server提供的工具和技术,可以...