`
qingshizhi
  • 浏览: 4628 次
社区版块
存档分类
最新评论

spring中实现远程监听

阅读更多
   近期项目需要集群,缓存集群是自己实现的,需要在缓存发生变动后,需要发生消息给各个节点更新缓存。所以就做了个远程监听功能。远程监听用rmi协议,事件发布前都动态查询出活动的节点,事件发布后会被活动节点上的listener监听到。上代码
1.定义event和listener
public class  BaseEvent  extends EventObject {


	private static final long serialVersionUID = 1L;
	
	
	/** System time when the event happened */
	private final long timestamp;
	

	public BaseEvent(Object source) {
		super(source);
		this.timestamp = System.currentTimeMillis();
		
	}
	
	/**
	 * Return the system time in milliseconds when the event happened.
	 */
	public final long getTimestamp() {
		return this.timestamp;
	}
}

public interface EventLisenter<T extends BaseEvent>{

	/**
	 * 事件处理
	 * @param baseEvent
	 */
	void onEvent(T t);
}

2、定义远程监听配置
public  class RemoteLisenter{
	
	private  Class  eventClass;
	
	private Class serviceInterface;
	
	private String serviceName;
	
	private String  registryPort;

	public Class getServiceInterface() {
		return serviceInterface;
	}

	public void setServiceInterface(Class serviceInterface) {
		this.serviceInterface = serviceInterface;
	}

	public String getServiceName() {
		return serviceName;
	}

	public void setServiceName(String serviceName) {
		this.serviceName = serviceName;
	}


监听管理类,用于事件注册,更新远程监听,发布事件
public class RemoteLisenterConfig {

	private List<RemoteLisenter> remoteLisenters =new ArrayList<RemoteLisenter>();
	
	
	public List<RemoteLisenter> getRemoteLisenters() {
		return remoteLisenters;
	}


	public void setRemoteLisenters(List<RemoteLisenter> remoteLisenters) {
		this.remoteLisenters = remoteLisenters;
	}

}


@Service
public class ListennerManagement {

	
	protected Logger logger = Logger.getLogger(getClass());

	
	@Autowired
	private HeartbeatService heartbeatService;
	
	
	@Autowired
	RemoteLisenterConfig remoteLisenterConfig;
	
	
	
	

	/**
	 * 本地监听
	 */
	private  Map<String, List<EventLisenter>> localListeners = new LinkedHashMap<String, List<EventLisenter>>();
	
	/**
	 * 远程监听
	 */
	private  Map<String, List<EventLisenter>> remoteListeners = new LinkedHashMap<String, List<EventLisenter>>();

	

	/**
	 * 扫瞄所有bean,进行队本地事件进行事件监听
	 * 
	 * @throws Exception
	 */
	@SuppressWarnings("rawtypes")
	public  void registryListener(ApplicationContext  ctx) throws Exception {
		// 取得容器中所有监听
		Map<String, EventLisenter> beans = ctx
				.getBeansOfType(EventLisenter.class);
		if (beans == null || beans.size() == 0) {
			return;
		}
		Collection<EventLisenter> list = beans.values();
		for (EventLisenter listener : list) {
			Class listenercls = AopTargetUtils.getTarget(listener).getClass();
			Class eventCls = GenericTypeResolver.resolveTypeArgument(
					listenercls, EventLisenter.class);

			try {
				if (localListeners.containsKey(eventCls.getName())) {
					localListeners.get(eventCls.getName()).add(listener);
				} else {
					List<EventLisenter> l = new ArrayList<EventLisenter>();
					l.add(listener);
					localListeners.put(eventCls.getName(), l);
				}
			} catch (Exception e) {
				throw new Exception("初始化事件监听器时出错:", e);
			}
		}

	}

	
	 private void refreshRemoteListeners(){
		//查询出集群服务器的IP(此处从数据库配置中查询)
		List<String> ipList=heartbeatService.getAliveHostsExcludeSelf();
		List<RemoteLisenter>  RemoteLisenterList=remoteLisenterConfig.getRemoteLisenters();
		remoteListeners = new LinkedHashMap<String, List<EventLisenter>>();
		if(RemoteLisenterList!=null){
			for (RemoteLisenter remoteLisenter : RemoteLisenterList) {
				
				String eventClsName=remoteLisenter.getEventClass().getName();
				Class listenerCls=remoteLisenter.getServiceInterface();
				String port=remoteLisenter.getRegistryPort();
				String serviceName=remoteLisenter.getServiceName();
				if(ipList!=null){
					for (String ip : ipList) {
						EhCacheService ehCacheService=null;
						EventLisenter listener = buildRemotListener(listenerCls, port,serviceName, ip);
						
						if(listener!=null){
							if (remoteListeners.containsKey(eventClsName)) {
								remoteListeners.get(eventClsName).add(listener);
							} else {
								List<EventLisenter> l = new ArrayList<EventLisenter>();
								l.add(listener);
								remoteListeners.put(eventClsName, l);
							}
						}
					}
				}
			
		}
			
			
			
		}
	}


	private EventLisenter buildRemotListener(Class listenerCls, String port,
			String serviceName, String ip) {
		
		try {
			RmiProxyFactoryBean rmiProxyFactoryBean = new RmiProxyFactoryBean();
			rmiProxyFactoryBean.setServiceInterface(listenerCls); 
			rmiProxyFactoryBean.setServiceUrl("rmi://"+ip+":"+port+"/"+serviceName);
			rmiProxyFactoryBean.afterPropertiesSet();
			
			if (rmiProxyFactoryBean.getObject() instanceof EventLisenter) { 
				EventLisenter  listener=(EventLisenter)rmiProxyFactoryBean.getObject();
				return listener;
			}else{
				return null;
			}
		} catch (Exception e) {
			logger.error("获取远程监听bean错误[listenerClass="+listenerCls+";port="+port+";ip="+ip+";serviceName="+serviceName+"]", e);
			return null;
		}
		
		
		

		
	}
	
	
	/**
	 * 发布事件
	 * 
	 * @throws Exception
	 */
	@SuppressWarnings("rawtypes")
	public  void publishEvent(BaseEvent event)  {
		//本地监控
		List<EventLisenter> localList = localListeners.get(event.getClass().getName());
		if (localList != null) {
			for (EventLisenter listener : localList) {
				try {
					listener.onEvent(event);
				} catch (Exception e) {
					logger.error(e.getMessage());
				}
			}
		}
		
		//远程监控
		Class eventClass=event.getClass();
		if(needRemoteListenre(eventClass)){
			//刷新远程监听者
			refreshRemoteListeners();
			
			List<EventLisenter> remoteList = remoteListeners.get(event.getClass().getName());
			if (remoteList != null) {
				for (EventLisenter listener : remoteList) {
					try {
						listener.onEvent(event);
					} catch (Exception e) {
						logger.error(e.getMessage());
					}
				}
			}
		}
	}


    /**
     * 判断本事件是否需要远程监听
     * @param eventClass
     * @return
     */
	private boolean needRemoteListenre(Class eventClass) {
		
		List<RemoteLisenter>  RemoteLisenterList=remoteLisenterConfig.getRemoteLisenters();
		if(RemoteLisenterList!=null){
			for (RemoteLisenter remoteLisenter : RemoteLisenterList) {			
			     Class eventCls=remoteLisenter.getEventClass();
			     if(eventCls.equals(eventCls))
			    	 return true;
			}
		}
		return false;	
			
	}


	public Map<String, List<EventLisenter>> getLocalListeners() {
		return localListeners;
	}


	public Map<String, List<EventLisenter>> getRemoteListeners() {
		return remoteListeners;
	}
	


配置文件
<!-- 配置远程监听配置对象 -->
	<bean id="remoteLisenterConfig"  class="com.ejintai.cbs_policy_registry.base.event.RemoteLisenterConfig">
	  <property name="remoteLisenters">
	     <list>
	     <bean id="remoteLisenter1"  class="com.ejintai.cbs_policy_registry.base.event.RemoteLisenter">
	          <property name="eventClass"      value="com.ejintai.cbs_policy_registry.base.event.biz.EhCacheUpdateEvent" />  
	          <property name="serviceName"         value="ehCacheUpdateEventListener" />  
              <property name="serviceInterface"    value="com.ejintai.cbs_policy_registry.base.event.EventLisenter"/>  
      		  <property name="registryPort"        value="${rmi.port}"/>  
	      </bean>
	     </list>
	  </property>
	</bean>

 <bean id="remoteEhCacheUpdateEventListener" class="org.springframework.remoting.rmi.RmiServiceExporter" >  
        <property name="serviceName"         value="ehCacheUpdateEventListener" />  
        <property name="service"             ref="ehCacheUpdateEventListener"/>  
        <property name="serviceInterface"    value="com.ejintai.cbs_policy_registry.base.event.EventLisenter"/>  
        <property name="registryPort"        value="${rmi.port}"/>  
    </bean>

分享到:
评论

相关推荐

    Java实现调用远程桌面示例

    本篇将深入探讨如何利用`properJavaRDP`这个开源项目来在Java中实现调用远程桌面。 `properJavaRDP`是一个用Java编写的远程桌面协议(RDP)客户端,它提供了对Windows远程桌面服务的访问支持。此项目旨在提供一个...

    spring boot+mybatis 项目 调用网络摄像头,进行录像(以华为摄像头为例子,可以使用多种网络摄像头)

    在本项目中,我们主要探讨如何使用Spring Boot和MyBatis框架来调用网络摄像头进行录像,以华为摄像头为例,并且实现将录制的视频通过SFTP协议上传到远程服务器进行存储。以下是对该项目涉及的技术点的详细说明: 1....

    Spring-Reference_zh_CN(Spring中文参考手册)

    12.2.2. 在Spring的application context中创建 SessionFactory 12.2.3. HibernateTemplate 12.2.4. 不使用回调的基于Spring的DAO实现 12.2.5. 基于Hibernate3的原生API实现DAO 12.2.6. 编程式的事务划分 12.2.7. ...

    使用dwr+spring实现消息推送

    在本教程中,我们将探讨如何利用Direct Web Remoting (DWR) 和Spring框架来实现这样的功能。 DWR是一个开源Java库,它允许Web应用程序在浏览器和服务器之间进行实时的、异步的通信,类似于Ajax的功能,但更加强大。...

    Spring5中文文档

    资源管理章节涵盖了资源的获取和抽象,包括Resource接口的介绍、内置的Resource实现、资源加载器以及如何在应用上下文中使用资源路径。 验证、数据绑定和类型转换部分讲解了如何使用Spring的验证器接口进行验证,...

    如何在Spring框架中使用RMI技术.zip

    在Spring框架中,远程方法调用(Remote Method Invocation, RMI)是一种强大的工具,它允许在不同的Java虚拟机(JVM)之间透明地调用对象的方法。RMI结合了Spring的强大功能,可以构建分布式系统,使得组件之间的...

    spring集成rabbitmq实现rpc

    本文将详细讲解如何在Spring项目中集成RabbitMQ,实现基于RPC(远程过程调用)的通信模式。 首先,我们需要在项目中引入Spring AMQP库,这是Spring对RabbitMQ的官方支持。可以通过在pom.xml文件中添加以下Maven依赖...

    spring+rmi非本地实现

    在这个“spring+rmi非本地实现”的项目中,我们有两个主要部分:客户端(client)和服务端(rmiserver)。让我们深入探讨一下这两个部分以及它们如何协同工作。 1. **服务端(rmiserver)**: - **创建RMI接口**:...

    基于netty + SpringBoot仿照dubbo手动实现RPC远程本地无感知调用项目源码

    ​ 基于netty的网络传输实现远程本地调用,无感知远程调用。 该版本涉及: netty JDK动态代理。 Spring 的扫描原理(@CompanScan,@MapperScan 同理) Spring的事件监听与处理, 反射,SpringListener 配置,...

    JAVA本地监听与远程端口扫描-springboot

    本文将深入探讨如何在Spring Boot项目中实现本地监听以及进行远程端口扫描的相关知识点。 首先,我们来理解本地监听。在Java中,监听本地端口通常涉及到网络编程,特别是服务器端的开发。使用Java的Socket或...

    微服务框架 springcloud config 自动刷新远程svn配置代码示例

    本示例主要关注如何利用Spring Cloud Config实现远程SVN配置的自动化刷新,这有助于提升开发效率并确保配置的一致性。 首先,`spring-cloud-config-server`是Spring Cloud Config的核心组件,它作为一个配置中心,...

    spring cloud 中文文档

    - **Spring Cloud Config客户端**:说明了如何配置客户端以监听来自Spring Cloud Bus的通知,并自动刷新其配置。 #### Spring Cloud Netflix - **服务发现:Eureka客户端**:介绍了如何集成Netflix Eureka客户端...

    简易实现Spring流程代码

    开发者可以通过实现`ApplicationListener`接口监听这些事件,执行相应的逻辑。 4. 整合其他技术:Spring框架设计为模块化,可以轻松集成其他技术,如数据访问(JDBC、Hibernate、MyBatis)、事务管理、Web应用(MVC...

    Spring中文帮助文档

    6.8.1. 在Spring中使用AspectJ进行domain object的依赖注入 6.8.2. Spring中其他的AspectJ切面 6.8.3. 使用Spring IoC来配置AspectJ的切面 6.8.4. 在Spring应用中使用AspectJ加载时织入(LTW) 6.9. 更多资源 7...

    spring使用activeMQ实现消息发送

    本文将深入探讨如何在Spring环境中使用ActiveMQ来实现消息的发送与接收,以提高系统的可扩展性和解耦性。 首先,我们需要了解Spring对ActiveMQ的支持。Spring提供了`spring-jms`模块,它包含了一组丰富的API和配置...

    spring-framework-5.0 中文文档PDF

    验证、数据绑定和类型转换章节可能详细说明了如何在Spring框架中实现验证,例如使用Spring的Validator接口。此外,还可能涵盖了Spring类型转换和字段格式化的实现,包括如何配置全局的日期和时间格式。 Spring...

    spring3+dwr3实现聊天功能

    Spring框架和Direct Web Remoting (DWR) 是两个强大的工具,可以协同工作来实现这一目标。本篇将详细介绍如何利用Spring3和DWR3创建聊天功能,以及DWR的Server Push技术在其中的作用。 首先,Spring3是一个全面的...

    spring-cloud-config + spring-cloud-bus-amqp实现分布式集群配置动态更新

    Spring Cloud Bus监听到配置仓库的变更后,会触发一个事件并通过RabbitMQ广播这个事件到所有订阅的客户端。每个客户端接收到事件后,会从Config Server拉取最新的配置并刷新本地的配置信息,从而实现配置的动态更新...

    JMX与Spring 结合

    配置这些功能通常涉及到设置MBean的`NotificationBroadcaster`属性,并在监听器中实现回调方法。 在实际应用中,JMX与Spring的结合可以通过Spring Boot进一步简化,Spring Boot自动配置了JMX支持,只需要简单的配置...

    详解SpringCloud eureka服务状态监听

    Spring Cloud Eureka 通过事件机制来通知状态的变化,这些事件被 Spring 容器中的监听器捕获,并进行相应的处理。这种机制使得我们可以在服务状态变化时作出反应,执行比如将服务地址更新到缓存中等操作。 ### 事件...

Global site tag (gtag.js) - Google Analytics