`

spring+guava事件异步分发处理

阅读更多
Guava是Google开源的一个Java基础类库,它在Google内部被广泛使用。Guava提供了很多功能模块比如:集合、并发库、缓存等,EventBus是Guava框架对观察者模式的一种实现,使用EventBus可以很简洁的实现事件注册监听和消费。Guava框架里面提供了两种相关的实现,一种是单线程同步事件消费,另外一直是多线程异步事件消费。后者在对象设计上是前者的子类,EventBus 异步事件订阅处理通过post发送消息对需要的事件进行注册,在通过事件处理,根据类名,参数去处理注册的事件。

导入Guava 包:

      
 <dependency>
	    <groupId>com.google.guava</groupId>
	    <artifactId>guava</artifactId>
	    <version>19.0</version>
	</dependency>


spring 整合Guagva EventBus事件配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:context="http://www.springframework.org/schema/context"
	xsi:schemaLocation="http://www.springframework.org/schema/context
          http://www.springframework.org/schema/context/spring-context-3.0.xsd
          http://www.springframework.org/schema/beans
          http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
          http://www.springframework.org/schema/tx
          http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">
		<context:component-scan base-package="demo.dcn"><!-- 扫描一下的包,完成注册bean 并过滤掉控制器扫描 -->
		<context:exclude-filter type="annotation"
			expression="org.springframework.stereotype.Controller" />
		</context:component-scan>          
          <!-- 线程池 -->
	<bean id="taskExecutor"
		class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
		<property name="corePoolSize" value="20" />
		<property name="maxPoolSize" value="200" />
		<property name="queueCapacity" value="1000000" />
		<property name="keepAliveSeconds" value="600" />  
		<property name="rejectedExecutionHandler">
			<!-- AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常 -->          
            <!-- CallerRunsPolicy:主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中,可以有效降低向线程池内添加任务的速度 -->          
			<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
		</property>   
	</bean>
	<bean id="userFeedRedisSubscriber" class="demo.dcn.guava.eventBus.subscriber.UserFeedRedisSubscriber" />
         <!-- 事件总线 -->
	<bean id="eventBus" class="com.google.common.eventbus.AsyncEventBus"  >
		<constructor-arg ref="taskExecutor"  />
	</bean>
	<!-- 主册事件 -->
	<bean id="eventBuilder" class="demo.dcn.guava.eventBus.AsynEventBusBuilder">
		<property name="eventBus" ref="eventBus"/>
		<property name="handlers"> 
			<set>
				<!-- <ref bean="msgNoticeSubscriber"/>
				<ref bean="userActivitySubcriber"/> -->
				<ref bean="userFeedRedisSubscriber"/>
			</set>
		</property>
	</bean> 
</beans>



event 事件:
package demo.dcn.guava.eventBus.events;

import java.io.Serializable;

/**
 * 用户喜欢关注,移除关注,触发的交互事件
 * @author kun.zhang@downjoy.com
 *
 *
 */
public class FollowEvent implements Serializable{

	private static final long serialVersionUID = 1L;
	// 用户ID
	private Long lookerId;
	// 类型
	private FriendEventType type;
	
	private Long friendId;
	
	public FollowEvent(Long lookerId, FriendEventType type, Long friendId) {
		super();
		this.lookerId = lookerId;
		this.type = type;
		this.friendId = friendId;
	}

	public FollowEvent() {
		super();
	}

	public enum FriendEventType{
		 ADD_FOLLOW_FRIEND(1L, "增加关注好友"),ADD_FANS_FRIEND(2L, "增加粉丝好友"),DEL_FOLLOW_FRIEND(3L, "删除关注好友"),DEL_FANS_FRIEND(4L, "删除粉丝好友");
		private Long id;
	    private String desc;

		private FriendEventType(Long id, String desc) {
			this.id = id;
			this.desc = desc;
		}

		public Long getId() {
			return id;
		}

		public void setId(Long id) {
			this.id = id;
		}

		public String getDesc() {
			return desc;
		}

		public void setDesc(String desc) {
			this.desc = desc;
		}
	
	}
}


事件注入中心:
package demo.dcn.guava.eventBus;

import java.util.Set;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;

import com.google.common.eventbus.AsyncEventBus;

/**
 * 事件注入中心
 * @author kun.zhang@downjoy.com
 *
 *
 */
public class AsynEventBusBuilder implements InitializingBean,DisposableBean {
	
	private AsyncEventBus eventBus;
	
	private Set<Object> handlers;
	
	
	@Override
	public void destroy() throws Exception {
	}

	@Override
	public void afterPropertiesSet() throws Exception {
		for (Object handler : handlers) {
			eventBus.register(handler);
		}
		
	}

	/**
	 * @return the eventBus
	 */
	public AsyncEventBus getEventBus() {
		return eventBus;
	}

	/**
	 * @param eventBus the eventBus to set
	 */
	public void setEventBus(AsyncEventBus eventBus) {
		this.eventBus = eventBus;
	}

	/**
	 * @return the handlers
	 */
	public Set<Object> getHandlers() {
		return handlers;
	}

	/**
	 * @param handlers the handlers to set
	 */
	public void setHandlers(Set<Object> handlers) {
		this.handlers = handlers;
	}
}

异步处理事件,通过 @Subscribe 注解,
package demo.dcn.guava.eventBus.subscriber;

import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;

import demo.dcn.guava.eventBus.events.FollowEvent;

/**
 * 用户活动订阅事件处理
 * @author kun.zhang@downjoy.com
 *
 *
 */
public class UserFeedRedisSubscriber {

	/**
	 *  处理喜欢关注,移除关注
	 * @param event
	 */
	@Subscribe
        @AllowConcurrentEvents//线程安全
	public void followInteractionHandel(FollowEvent event){
			if(event!=null){
				System.out.println("用户关注了你");//接送消息系统
			}	
	}	
}

异步使用订阅事件:
@Override
	public ResultMapper addUserFollow(Long lookerId, Long hosterId) {
		ResultMapper result = new ResultMapper();
		if(lookerId==null||hosterId==null||hosterId==0l||lookerId==0l){
			result.setCode(ResultMap.FAILURE.getCode());
			result.setMsg(ResultMap.FAILURE.getDesc());
		}if(lookerId != null && hosterId != null&&lookerId.equals(hosterId)){
            result.setCode(ResultMap.REPETOPER.getCode());
            result.setMsg(ResultMap.REPETOPER.getDesc());
            return result;
        }
		try{
			boolean flag = userFollowDAO.addFollow(lookerId, hosterId);
			if(!flag){
				 result.setCode(ResultMap.SUCCESS.getCode());
			     result.setMsg(ResultMap.SUCCESS.getDesc());
			  }
			FollowEvent event = new FollowEvent(lookerId,FriendEventType.ADD_FOLLOW_FRIEND,hosterId);
			eventBus.post(event);//异步处理关联订阅事件
		}catch(Exception e){
			 logger.error("userfeed addFollower error,Long userId={}, Long follerId={}", lookerId, hosterId, e);
			 result.setCode(ResultMap.FAILURE.getCode());
			 result.setMsg(ResultMap.FAILURE.getDesc());
		}
		return result;
	}




分享到:
评论

相关推荐

    spring-5.3.14-schema.zip

    Spring的缓存抽象层提供了多种缓存机制的统一接口,如 EhCache、Guava Cache 或 Redis。通过在方法上添加注解,可以轻松启用和管理缓存,提高应用性能。 6. **lang** 这个模块包含了一些辅助工具类,如运行时异常...

    利用Java开发高性能、高并发Web应用

    2. **非阻塞I/O与异步处理**:Java NIO(非阻塞I/O)和Java 7引入的CompletableFuture类,以及Netty这样的高性能异步事件驱动框架,能够处理大量并发连接,提高系统吞吐量。 3. **线程池管理**:使用...

    SSM实战项目——Java高并发秒杀API,详细流程+学习笔记

    SSM实战项目——Java高并发秒杀API是一个深入学习Java后端开发的重要实践,它涵盖了Spring、SpringMVC和MyBatis三大框架的整合应用,以及如何处理高并发下的秒杀场景。在这个项目中,我们将深入理解如何设计并实现一...

    Java项目开发实用案例_第4章代码_高效海量访问系统

    在Java中,可以利用Spring Cache或者Guava Cache实现本地缓存,也可以使用RedisTemplate进行分布式缓存操作。 4. **数据库优化**:包括索引优化、查询优化、读写分离、分库分表等。例如,使用MyBatis或JPA进行数据...

    Java实现秒杀系统.rar

    - **异步处理**:使用CompletableFuture或者Reactor框架进行异步编程,使得处理请求时可以并行执行,提升效率。 2. **分布式锁**: - **Redis分布式锁**:利用Redis的setnx命令实现乐观锁,保证同一时间只有一个...

    JAVA缓存与大型网站架

    在构建大型网站时,缓存技术是至关重要的性能优化手段,尤其是在使用JAVA作为主要开发语言的环境中。JAVA缓存能够显著提升系统响应...在实践中,结合其他技术如CDN、异步处理等,可以构建出更加高效和稳定的大型网站。

    各种常用JAR包(数据库,框架,ajax......)

    再来谈谈Ajax框架的JAR包,如jQuery库的jquery.js,它是一个广泛使用的JavaScript库,提供了丰富的DOM操作、事件处理和动画效果。而像 Prototype.js 或 Dojo Toolkit 这样的其他框架,也提供了异步通信功能,使得...

    基于SpringBoot高并发商城秒杀系统项目.zip

    - 异步处理:使用消息队列(如RabbitMQ或Kafka)进行请求解耦,减轻系统压力。 - 负载均衡:Nginx等负载均衡器分发请求,避免单点压力过大。 - 限流策略:如Guava的RateLimiter或Hystrix的断路器,控制请求速率。...

    java开发完整包

    在Java开发中,JAR(Java Archive)文件是Java类库的标准打包格式,它将多个类文件、资源文件以及元数据打包成一个单一的文件,便于分发和运行。在这个"常用数据包jar"中,我们可以推测包含了一些核心的Java库和第三...

    基于 Java 13 实现的游戏服务器框架.zip

    - **Netty 框架**:作为Java领域最流行的网络编程库,Netty提供了一套完整的异步事件驱动的网络应用程序框架,可以快速构建高性能的游戏服务器。 3. **并发处理**: - **线程池**:使用ExecutorService创建线程池...

    activemq-rar-5.6.0.rar

    10. spring-context-3.0.6.RELEASE.jar:Spring框架的核心模块,用于处理应用上下文和依赖注入,可能用于管理ActiveMQ的配置和服务。 综上所述,这个rar压缩包提供了ActiveMQ 5.6.0版本的RAR文件,以及其运行所需的...

    传智播客巴巴运动网-项目中使用到的jar文件

    8. **消息队列**:如RabbitMQ或ActiveMQ的客户端,用于实现异步处理和解耦系统组件。 9. **缓存技术**:如Redis或Ehcache,用于提高数据访问速度,减少对数据库的压力。 10. **其他工具库**:如Guava、Apache ...

    基于Java的高性能、支持免费http代理池、支持横向扩展、分布式爬虫项目.zip

    2. 异步IO:利用Java的NIO(非阻塞IO)或AIO(异步IO),可以提高对大量连接的处理能力,减少系统资源的占用。 3. 缓存机制:项目可能采用了内存缓存(如Guava Cache或 caffeine)来存储常用的代理IP,减少数据库或...

    软件开发所需所有jar

    9. **消息队列库**:如RabbitMQ、Kafka客户端,实现异步处理和解耦。 10. **图形处理库**:如ImageIO、Java2D,用于图像处理和绘制。 11. **Web服务器和应用服务器**:如Tomcat、Jetty,用于运行和部署Java Web...

    ava常用Jar2

    Java常用Jar2这个主题涉及到的是Java开发中经常使用的外部库文件,这些库文件通常以.JAR(Java Archive)格式存在,用于封装多个类文件、资源文件以及元数据,便于在不同项目中复用和分发。JAR文件是Java平台的标准...

    building-scalable-distributed-systems-a3

    Java 8引入了`CompletableFuture`,它支持链式调用和组合多个异步操作,以提高系统并行处理能力。通过异步编程,我们可以让服务器在等待I/O操作完成的同时处理其他请求,显著提高响应速度。 三、响应式编程 Java中...

    Microservices

    10. **事件驱动架构**: 使用消息队列(如RabbitMQ或Kafka)实现异步通信和解耦,有助于构建松耦合的微服务系统。 以上是关于Java微服务架构的一些核心知识点,理解并掌握这些技术将有助于构建高效、可扩展和容错的...

Global site tag (gtag.js) - Google Analytics