public class JavaGroupBroadcastingManager implements NotificationBus.Consumer {
/**
* 共享数据:未被激活的时间片信息集合(离线通话信息)
*/
private static Map<String,TimeSlice> voiceMessageMap = new HashMap<String, TimeSlice>();
/**
* 共享数据:短信信息
*/
private static Map<String,SmsCcrParameter> smsMessageIdMap = new HashMap<String,SmsCcrParameter>();
private static final Log log = LogFactory
.getLog(JavaGroupBroadcastingManager.class);
private static final String BUS_NAME = "bus.name";
private static final String CHANNEL_PROPERTIES = "cache.cluster.properties";
private NotificationBus bus;
private static JavaGroupBroadcastingManager manager = null;
public static JavaGroupBroadcastingManager getInstance(){
if(manager == null){
manager = new JavaGroupBroadcastingManager();
try {
manager.initialize();
} catch (Exception e) {
e.printStackTrace();
}
}
return manager;
}
/**
* 初始化成员
*/
public synchronized void initialize()
throws Exception {
Properties properties = new Properties();
String filePath = System.getProperty("com.sntele.surfing.conf") + File.separator + "ocsgroup.properties";
log.info(filePath);
properties.load(new FileInputStream(filePath));
String channelProperties = properties.getProperty(CHANNEL_PROPERTIES);
String busName = properties.getProperty(BUS_NAME);
try {
bus = new NotificationBus(busName, channelProperties);
bus.start();
bus.getChannel().setOpt(Channel.LOCAL, new Boolean(false));
bus.setConsumer(this);
log.info("JavaGroups clustering support started successfully");
} catch (Exception e) {
throw new Exception("Initialization failed: " + e);
}
}
/**
* 关闭成员
*/
public synchronized void finialize() throws Exception {
bus.stop();
bus = null;
}
/**
* 发送信息
*/
public void sendNotification(Serializable serializable) {
bus.sendNotification(serializable);
if(serializable instanceof SmsCcrParameter){
SmsCcrParameter smsCcrParameter = (SmsCcrParameter)serializable;
if(smsCcrParameter.getMemoryAction() == 0){
smsMessageIdMap.put(smsCcrParameter.getMessageId(), smsCcrParameter);
}else{
smsMessageIdMap.remove(smsCcrParameter.getMessageId());
}
}else if(serializable instanceof TimeSlice){
TimeSlice timeSlice = (TimeSlice)serializable;
if(timeSlice.getMemoryAction() == 0){
voiceMessageMap.put(timeSlice.getInitCcrParameter().getCallerNumber(),timeSlice);
}else{
TimeSliceControlPool.getInstince().getTimeSliceControl(timeSlice.getInitCcrParameter().getProvinceCode()).removeData(timeSlice.getInitCcrParameter().getCallerNumber());
voiceMessageMap.remove(timeSlice.getInitCcrParameter().getCallerNumber());
}
}
}
/**
* 成员发送信息
*/
public void handleNotification(Serializable serializable) {
if(serializable instanceof SmsCcrParameter){
SmsCcrParameter smsCcrParameter = (SmsCcrParameter)serializable;
if(smsCcrParameter.getMemoryAction() == 0){
smsMessageIdMap.put(smsCcrParameter.getMessageId(), smsCcrParameter);
}else{
smsMessageIdMap.remove(smsCcrParameter.getMessageId());
}
}else if(serializable instanceof TimeSlice){
TimeSlice timeSlice = (TimeSlice)serializable;
if(timeSlice.getMemoryAction() == 0){
voiceMessageMap.put(timeSlice.getInitCcrParameter().getCallerNumber(),timeSlice);
}else{
TimeSliceControlPool.getInstince().getTimeSliceControl(timeSlice.getInitCcrParameter().getProvinceCode()).removeData(timeSlice.getInitCcrParameter().getCallerNumber());
voiceMessageMap.remove(timeSlice.getInitCcrParameter().getCallerNumber());
}
}
}
/**
* 成员地址
*/
public Serializable getCache() {
if (log.isInfoEnabled()) {
log.info("成员本地地址: " + bus.getLocalAddress().toString());
}
return bus.getLocalAddress();
}
/**
* 新成员加入
*/
public void memberJoined(Address address) {
if (log.isInfoEnabled()) {
log.info("新成员加入:" + address);
}
Iterator<?> smsIt = smsMessageIdMap.entrySet().iterator();
log.info("发送缓存中已经存在的短信信息给新成员:" + smsMessageIdMap.size() + "......start ");
while (smsIt.hasNext()) {
Map.Entry entry = (Map.Entry) smsIt.next();
SmsCcrParameter value = (SmsCcrParameter)entry.getValue();
sendNotification(value);
}
Iterator<?> voiceIt = voiceMessageMap.entrySet().iterator();
log.info("发送缓存中已经存在的语音信息给新成员:" + voiceMessageMap.size() + "......start ");
while (voiceIt.hasNext()) {
Map.Entry entry = (Map.Entry) voiceIt.next();
TimeSlice value = (TimeSlice)entry.getValue();
sendNotification(value);
}
}
/**
* 成员离开
*/
public void memberLeft(Address address) {
if (log.isInfoEnabled()) {
log.info("成员离开:" + address);
}
//将检测语音的是否到时的信息放到其他服务器上
Iterator<?> voiceIt = voiceMessageMap.entrySet().iterator();
log.info("添加离开成员的语音到时检索信息到通话时间片控制中:" + voiceMessageMap.size() + "......start ");
while (voiceIt.hasNext()) {
Map.Entry entry = (Map.Entry) voiceIt.next();
TimeSlice value = (TimeSlice)entry.getValue();
TimeSliceControlPool.getInstince().getTimeSliceControl(value.getInitCcrParameter().getProvinceCode()).addTimeSlice(value.getInitCcrParameter().getCallerNumber(),value);
}
}
public Map<String, TimeSlice> getVoiceMessageMap() {
return voiceMessageMap;
}
public Map<String, SmsCcrParameter> getSmsMessageIdMap() {
return smsMessageIdMap;
}
}
测试方法:
JavaGroupBroadcastingManager manager = JavaGroupBroadcastingManager.getInstance();
manager.sendNotification(requestSession);
分享到:
相关推荐
**JGroup学习总结** JGroup是一个开源的Java框架,专门用于构建高可用、容错的分布式系统。它提供了一整套服务,包括组成员管理、消息传递、故障检测和恢复等,是许多分布式应用和中间件的基础。这篇博客将深入探讨...
JGroup是Java编程语言中的一款强大且灵活的集群通信库,专为构建高可用性、高性能的分布式系统而设计。它的核心目标是提供可靠的消息传递,确保数据在多个节点之间的一致性和完整性。本实例将深入讲解如何使用JGroup...
Reliable group communication with JGroups 3.x Preface This is the JGroups manual. It provides information about: 1. Installation and configuration 2. Using JGroups (the API) 3. Configuration of the ...
### JGroups:可靠的组通信 #### 概览与核心概念 **JGroups**是一个用于创建分布式应用程序的Java库,它提供了可靠、高效的组...通过深入了解其核心概念和API,开发人员能够充分利用JGroups来解决各种分布式的挑战。
根据提供的文档内容,本文将对“jgroup代码”的安装步骤及如何编写一个简单的应用进行详细的阐述与解析。 ### 一、jgroup代码简介 JGroups是一个高性能、可扩展且易于使用的分布式通信库,用于实现集群中的节点...
JGroup 是一个强大的开源库,专门用于构建集群通信系统。它的主要目标是在集群内部实现可靠的消息传递,确保数据的一致性和高可用性。与 Java Message Service (JMS) 不同,JGroup 更专注于消息传递,而不是队列和...
EHCAHCE基于JGROUP的集群配置方案,内含相关配置文件,及配置说明
### Java SWT 编写 JGroup 局域网聊天程序知识点详解 #### 一、概述 在本篇文章中,我们将深入探讨如何使用Java Swing Toolkits (SWT) 和 JGroups 库来开发一个局域网内的聊天应用程序。文章的标题提到了“Java ...
根据提供的信息,《jgroup in action》是一本关于JGroups工具包的书籍,它详细介绍了如何使用JGroups进行可靠的多播通信。JGroups是强大的UUP(User-level UDP)开源组件,已被JBoss采用,用于底层通信。下面我们将...
EnCache通过将数据分布在整个集群中,实现了负载均衡,使得多个服务器可以并行处理请求,提升了系统的整体处理能力。 JGroups则是一个强大的工具,专为构建高可靠的分布式系统而设计。它提供了群组通信功能,允许...
在IT行业中,JBoss、JDBC、JSON和JGroup是四个关键的概念,它们在不同的领域发挥着重要作用。这里,我们将深入探讨这些技术及其在实际应用中的相关知识点。 首先,JBoss是一个开源的应用服务器,它是Java EE(企业...
《JGroup-3.0.1:构建高效集群通信的核心技术》 JGroup是一个开源的Java框架,专门用于构建高可用、高性能的集群系统。它提供了健壮的组通信服务,包括成员资格管理、消息传递、故障检测以及一致性算法等。在版本...
《JGroup配置详解》 JGroup是一个开源的Java框架,专门用于构建可靠的消息传递系统,尤其是在分布式计算环境中。本文将深入解析JGroup的协议栈配置,重点探讨传输协议和可靠消息传递机制。 **4.1 传输协议** 传输...
同时,配置PING的`gossip_host`和`gossip_port`属性,利用GossipRouter来发现其他成员。不过,依赖GossipRouter可能引入单点故障,并降低系统可扩展性。 **4.1.2 TCP** 当集群成员分布在WAN(广域网)中,由于...
- **消息构造**:首先,你需要创建一个`Message`对象,包含目标地址(如果有的话)和消息负载。 - **路由选择**:JGROUPS使用组协议栈来决定消息的路由。组协议栈是由多个协议层组成的,如UDP、TCP、FRAG等,每...
使用JGroup实现分布式数据结构(堆栈和集合) 介绍 [什么是JGroups?]( ) [JGroup入门]( ) JGroups是完全用Java编写的可靠的组通信工具包。 它基于IP多播(也支持TCP),但是有一些特殊功能,例如可靠性和组...
JGroup功能十分强大,通过配置各种参数就可以充分利用它所提供的各项功能。JGroup最大的特点就是支持协议栈的可配置性,它本是实现了基本的Java的协议栈实现,也就是最基本的消息广播的基础,同时支持附加协议栈的...
其实回顾一下集中式的构架,无非两种情况:一是节点均衡的网状(JBoss Tree Cache),利用JGroup的多播通信机制来同步数据;二是Master-Slaves模式(分布式文件系统),由Master来管理Slave,比如如何选择Slave,...
在压缩包文件“ehcache”中,可能包含了Ehcache的相关文档、示例代码或者配置文件,这些资源可以帮助开发者更深入地理解Ehcache的使用和配置,以及如何利用JGROUPS实现高效的分布式缓存复制。 总的来说,基于...