近期项目需要集群,缓存集群是自己实现的,需要在缓存发生变动后,需要发生消息给各个节点更新缓存。所以就做了个远程监听功能。远程监听用rmi协议,事件发布前都动态查询出活动的节点,事件发布后会被活动节点上的listener监听到。上代码
1.定义event和listener
public class BaseEvent extends EventObject {
private static final long serialVersionUID = 1L;
/** System time when the event happened */
private final long timestamp;
public BaseEvent(Object source) {
super(source);
this.timestamp = System.currentTimeMillis();
}
/**
* Return the system time in milliseconds when the event happened.
*/
public final long getTimestamp() {
return this.timestamp;
}
}
public interface EventLisenter<T extends BaseEvent>{
/**
* 事件处理
* @param baseEvent
*/
void onEvent(T t);
}
2、定义远程监听配置
public class RemoteLisenter{
private Class eventClass;
private Class serviceInterface;
private String serviceName;
private String registryPort;
public Class getServiceInterface() {
return serviceInterface;
}
public void setServiceInterface(Class serviceInterface) {
this.serviceInterface = serviceInterface;
}
public String getServiceName() {
return serviceName;
}
public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}
监听管理类,用于事件注册,更新远程监听,发布事件
public class RemoteLisenterConfig {
private List<RemoteLisenter> remoteLisenters =new ArrayList<RemoteLisenter>();
public List<RemoteLisenter> getRemoteLisenters() {
return remoteLisenters;
}
public void setRemoteLisenters(List<RemoteLisenter> remoteLisenters) {
this.remoteLisenters = remoteLisenters;
}
}
@Service
public class ListennerManagement {
protected Logger logger = Logger.getLogger(getClass());
@Autowired
private HeartbeatService heartbeatService;
@Autowired
RemoteLisenterConfig remoteLisenterConfig;
/**
* 本地监听
*/
private Map<String, List<EventLisenter>> localListeners = new LinkedHashMap<String, List<EventLisenter>>();
/**
* 远程监听
*/
private Map<String, List<EventLisenter>> remoteListeners = new LinkedHashMap<String, List<EventLisenter>>();
/**
* 扫瞄所有bean,进行队本地事件进行事件监听
*
* @throws Exception
*/
@SuppressWarnings("rawtypes")
public void registryListener(ApplicationContext ctx) throws Exception {
// 取得容器中所有监听
Map<String, EventLisenter> beans = ctx
.getBeansOfType(EventLisenter.class);
if (beans == null || beans.size() == 0) {
return;
}
Collection<EventLisenter> list = beans.values();
for (EventLisenter listener : list) {
Class listenercls = AopTargetUtils.getTarget(listener).getClass();
Class eventCls = GenericTypeResolver.resolveTypeArgument(
listenercls, EventLisenter.class);
try {
if (localListeners.containsKey(eventCls.getName())) {
localListeners.get(eventCls.getName()).add(listener);
} else {
List<EventLisenter> l = new ArrayList<EventLisenter>();
l.add(listener);
localListeners.put(eventCls.getName(), l);
}
} catch (Exception e) {
throw new Exception("初始化事件监听器时出错:", e);
}
}
}
private void refreshRemoteListeners(){
//查询出集群服务器的IP(此处从数据库配置中查询)
List<String> ipList=heartbeatService.getAliveHostsExcludeSelf();
List<RemoteLisenter> RemoteLisenterList=remoteLisenterConfig.getRemoteLisenters();
remoteListeners = new LinkedHashMap<String, List<EventLisenter>>();
if(RemoteLisenterList!=null){
for (RemoteLisenter remoteLisenter : RemoteLisenterList) {
String eventClsName=remoteLisenter.getEventClass().getName();
Class listenerCls=remoteLisenter.getServiceInterface();
String port=remoteLisenter.getRegistryPort();
String serviceName=remoteLisenter.getServiceName();
if(ipList!=null){
for (String ip : ipList) {
EhCacheService ehCacheService=null;
EventLisenter listener = buildRemotListener(listenerCls, port,serviceName, ip);
if(listener!=null){
if (remoteListeners.containsKey(eventClsName)) {
remoteListeners.get(eventClsName).add(listener);
} else {
List<EventLisenter> l = new ArrayList<EventLisenter>();
l.add(listener);
remoteListeners.put(eventClsName, l);
}
}
}
}
}
}
}
private EventLisenter buildRemotListener(Class listenerCls, String port,
String serviceName, String ip) {
try {
RmiProxyFactoryBean rmiProxyFactoryBean = new RmiProxyFactoryBean();
rmiProxyFactoryBean.setServiceInterface(listenerCls);
rmiProxyFactoryBean.setServiceUrl("rmi://"+ip+":"+port+"/"+serviceName);
rmiProxyFactoryBean.afterPropertiesSet();
if (rmiProxyFactoryBean.getObject() instanceof EventLisenter) {
EventLisenter listener=(EventLisenter)rmiProxyFactoryBean.getObject();
return listener;
}else{
return null;
}
} catch (Exception e) {
logger.error("获取远程监听bean错误[listenerClass="+listenerCls+";port="+port+";ip="+ip+";serviceName="+serviceName+"]", e);
return null;
}
}
/**
* 发布事件
*
* @throws Exception
*/
@SuppressWarnings("rawtypes")
public void publishEvent(BaseEvent event) {
//本地监控
List<EventLisenter> localList = localListeners.get(event.getClass().getName());
if (localList != null) {
for (EventLisenter listener : localList) {
try {
listener.onEvent(event);
} catch (Exception e) {
logger.error(e.getMessage());
}
}
}
//远程监控
Class eventClass=event.getClass();
if(needRemoteListenre(eventClass)){
//刷新远程监听者
refreshRemoteListeners();
List<EventLisenter> remoteList = remoteListeners.get(event.getClass().getName());
if (remoteList != null) {
for (EventLisenter listener : remoteList) {
try {
listener.onEvent(event);
} catch (Exception e) {
logger.error(e.getMessage());
}
}
}
}
}
/**
* 判断本事件是否需要远程监听
* @param eventClass
* @return
*/
private boolean needRemoteListenre(Class eventClass) {
List<RemoteLisenter> RemoteLisenterList=remoteLisenterConfig.getRemoteLisenters();
if(RemoteLisenterList!=null){
for (RemoteLisenter remoteLisenter : RemoteLisenterList) {
Class eventCls=remoteLisenter.getEventClass();
if(eventCls.equals(eventCls))
return true;
}
}
return false;
}
public Map<String, List<EventLisenter>> getLocalListeners() {
return localListeners;
}
public Map<String, List<EventLisenter>> getRemoteListeners() {
return remoteListeners;
}
配置文件
<!-- 配置远程监听配置对象 -->
<bean id="remoteLisenterConfig" class="com.ejintai.cbs_policy_registry.base.event.RemoteLisenterConfig">
<property name="remoteLisenters">
<list>
<bean id="remoteLisenter1" class="com.ejintai.cbs_policy_registry.base.event.RemoteLisenter">
<property name="eventClass" value="com.ejintai.cbs_policy_registry.base.event.biz.EhCacheUpdateEvent" />
<property name="serviceName" value="ehCacheUpdateEventListener" />
<property name="serviceInterface" value="com.ejintai.cbs_policy_registry.base.event.EventLisenter"/>
<property name="registryPort" value="${rmi.port}"/>
</bean>
</list>
</property>
</bean>
<bean id="remoteEhCacheUpdateEventListener" class="org.springframework.remoting.rmi.RmiServiceExporter" >
<property name="serviceName" value="ehCacheUpdateEventListener" />
<property name="service" ref="ehCacheUpdateEventListener"/>
<property name="serviceInterface" value="com.ejintai.cbs_policy_registry.base.event.EventLisenter"/>
<property name="registryPort" value="${rmi.port}"/>
</bean>
分享到:
相关推荐
本篇将深入探讨如何利用`properJavaRDP`这个开源项目来在Java中实现调用远程桌面。 `properJavaRDP`是一个用Java编写的远程桌面协议(RDP)客户端,它提供了对Windows远程桌面服务的访问支持。此项目旨在提供一个...
在本项目中,我们主要探讨如何使用Spring Boot和MyBatis框架来调用网络摄像头进行录像,以华为摄像头为例,并且实现将录制的视频通过SFTP协议上传到远程服务器进行存储。以下是对该项目涉及的技术点的详细说明: 1....
12.2.2. 在Spring的application context中创建 SessionFactory 12.2.3. HibernateTemplate 12.2.4. 不使用回调的基于Spring的DAO实现 12.2.5. 基于Hibernate3的原生API实现DAO 12.2.6. 编程式的事务划分 12.2.7. ...
在本教程中,我们将探讨如何利用Direct Web Remoting (DWR) 和Spring框架来实现这样的功能。 DWR是一个开源Java库,它允许Web应用程序在浏览器和服务器之间进行实时的、异步的通信,类似于Ajax的功能,但更加强大。...
资源管理章节涵盖了资源的获取和抽象,包括Resource接口的介绍、内置的Resource实现、资源加载器以及如何在应用上下文中使用资源路径。 验证、数据绑定和类型转换部分讲解了如何使用Spring的验证器接口进行验证,...
在Spring框架中,远程方法调用(Remote Method Invocation, RMI)是一种强大的工具,它允许在不同的Java虚拟机(JVM)之间透明地调用对象的方法。RMI结合了Spring的强大功能,可以构建分布式系统,使得组件之间的...
本文将详细讲解如何在Spring项目中集成RabbitMQ,实现基于RPC(远程过程调用)的通信模式。 首先,我们需要在项目中引入Spring AMQP库,这是Spring对RabbitMQ的官方支持。可以通过在pom.xml文件中添加以下Maven依赖...
在这个“spring+rmi非本地实现”的项目中,我们有两个主要部分:客户端(client)和服务端(rmiserver)。让我们深入探讨一下这两个部分以及它们如何协同工作。 1. **服务端(rmiserver)**: - **创建RMI接口**:...
基于netty的网络传输实现远程本地调用,无感知远程调用。 该版本涉及: netty JDK动态代理。 Spring 的扫描原理(@CompanScan,@MapperScan 同理) Spring的事件监听与处理, 反射,SpringListener 配置,...
本文将深入探讨如何在Spring Boot项目中实现本地监听以及进行远程端口扫描的相关知识点。 首先,我们来理解本地监听。在Java中,监听本地端口通常涉及到网络编程,特别是服务器端的开发。使用Java的Socket或...
本示例主要关注如何利用Spring Cloud Config实现远程SVN配置的自动化刷新,这有助于提升开发效率并确保配置的一致性。 首先,`spring-cloud-config-server`是Spring Cloud Config的核心组件,它作为一个配置中心,...
- **Spring Cloud Config客户端**:说明了如何配置客户端以监听来自Spring Cloud Bus的通知,并自动刷新其配置。 #### Spring Cloud Netflix - **服务发现:Eureka客户端**:介绍了如何集成Netflix Eureka客户端...
开发者可以通过实现`ApplicationListener`接口监听这些事件,执行相应的逻辑。 4. 整合其他技术:Spring框架设计为模块化,可以轻松集成其他技术,如数据访问(JDBC、Hibernate、MyBatis)、事务管理、Web应用(MVC...
6.8.1. 在Spring中使用AspectJ进行domain object的依赖注入 6.8.2. Spring中其他的AspectJ切面 6.8.3. 使用Spring IoC来配置AspectJ的切面 6.8.4. 在Spring应用中使用AspectJ加载时织入(LTW) 6.9. 更多资源 7...
本文将深入探讨如何在Spring环境中使用ActiveMQ来实现消息的发送与接收,以提高系统的可扩展性和解耦性。 首先,我们需要了解Spring对ActiveMQ的支持。Spring提供了`spring-jms`模块,它包含了一组丰富的API和配置...
验证、数据绑定和类型转换章节可能详细说明了如何在Spring框架中实现验证,例如使用Spring的Validator接口。此外,还可能涵盖了Spring类型转换和字段格式化的实现,包括如何配置全局的日期和时间格式。 Spring...
Spring框架和Direct Web Remoting (DWR) 是两个强大的工具,可以协同工作来实现这一目标。本篇将详细介绍如何利用Spring3和DWR3创建聊天功能,以及DWR的Server Push技术在其中的作用。 首先,Spring3是一个全面的...
Spring Cloud Bus监听到配置仓库的变更后,会触发一个事件并通过RabbitMQ广播这个事件到所有订阅的客户端。每个客户端接收到事件后,会从Config Server拉取最新的配置并刷新本地的配置信息,从而实现配置的动态更新...
配置这些功能通常涉及到设置MBean的`NotificationBroadcaster`属性,并在监听器中实现回调方法。 在实际应用中,JMX与Spring的结合可以通过Spring Boot进一步简化,Spring Boot自动配置了JMX支持,只需要简单的配置...
Spring Cloud Eureka 通过事件机制来通知状态的变化,这些事件被 Spring 容器中的监听器捕获,并进行相应的处理。这种机制使得我们可以在服务状态变化时作出反应,执行比如将服务地址更新到缓存中等操作。 ### 事件...