一般来说,对于那些处理时间比较长,需要马上响应并且做成异步后不会影响其它流程的做成异步。比如用户注册成功后发邮件给用户,这个可以在用户注册成功后马上返回,而不需要等到邮件发送成功才返回。异步有二种方式:内存异步和JMS异步。
内存异步即把要处理的请求放到内存队列中,然后由多个线程去消费。这种方式性能比较高,但是会存在请求丢失和内存溢出的风险。比如服务器突然down机,那么队列中未处理完的请求就会丢失;如果请求处理时间太长,并且请求一直在增加,即生产者速度要大于消费者速度时,就存在内存溢出的风险。这种适合对数据丢失不敏感,并且生产者速度要小于消费者速度的场合。
JMS异步即把要处理的请求先持久化到数据库,然后多个线程去消费。这种方式性能相对来说要慢一些,但是不会出现请求丢失和内存溢出的情况。这种适合不允许数据丢失,请求处理时间比较长的场合。
异步框架,使用元数据和AOP的方式,将内存异步和JMS异步统一起来。
package com.konceptusa.infinet.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* 方法异步
* @author Jwin
*
*/
@Retention(RetentionPolicy.RUNTIME)
@Target( {ElementType.METHOD })
@Documented
public @interface Async
{
//异步队列名称,默认为类名.方法名
String queueName() default "";
//消费者线程数
int threadCount() default 10;
//队列报警值,仅对内存异步有效
int warningQueueSize() default 100;
//是否使用jms异步,默认为内存异步
boolean jmsAsync() default false;
}
package com.konceptusa.infinet.imsupport.aop;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.jms.ConnectionFactory;
import javax.jms.Queue;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.aspectj.lang.ProceedingJoinPoint;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import com.konceptusa.framework.asyn.AsynCallInfo;
import com.konceptusa.framework.asyn.jms.AsynExecuteMessageListener;
import com.konceptusa.framework.core.jmsservice.ObjectMessageSenderImpl;
import com.konceptusa.framework.core.jmsservice.SendMessageFailedException;
import com.konceptusa.infinet.annotation.Async;
/**
* 异步AOP
* @author Jwin
*
*/
public class AsyncInterceptor implements DisposableBean
{
private final static Log LOG = LogFactory.getLog(AsyncInterceptor.class);
private Map<String, ThreadPoolExecutor> executorMap = new HashMap<String, ThreadPoolExecutor>();
private Map<String, ObjectMessageSenderImpl> messageSenderMap = new HashMap<String, ObjectMessageSenderImpl>();
private Map<String, DefaultMessageListenerContainer> listerContainerMap = new HashMap<String, DefaultMessageListenerContainer>();
private ConnectionFactory connectionFactory;
public Object async(final ProceedingJoinPoint pjp, Async async) throws Throwable
{
if(!async.jmsAsync())
{
memAsync(pjp, async);
}
else
{
jmsAsync(pjp,async);
}
return null;
}
private void jmsAsync(ProceedingJoinPoint pjp, Async async)
{
ObjectMessageSenderImpl objectMessageSender = null;
String queueName = getQueueName(pjp, async);
synchronized (messageSenderMap)
{
objectMessageSender = messageSenderMap.get(queueName);
if(objectMessageSender == null)
{
LOG.info("init jms queueName " + queueName + " thread count " + async.threadCount() );
objectMessageSender = new ObjectMessageSenderImpl();
Queue queue = new ActiveMQQueue(queueName);
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(connectionFactory);
jmsTemplate.setDefaultDestination(queue);
objectMessageSender.setJmsTemplate(jmsTemplate);
messageSenderMap.put(queueName, objectMessageSender);
DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
defaultMessageListenerContainer.setConcurrentConsumers(async.threadCount());
defaultMessageListenerContainer.setConnectionFactory(connectionFactory);
defaultMessageListenerContainer.setDestination(queue);
defaultMessageListenerContainer.setCacheLevelName("CACHE_CONSUMER");
defaultMessageListenerContainer.setMaxMessagesPerTask(20);
AsynExecuteMessageListener messageListener = new AsynExecuteMessageListener();
messageListener.setTarget(pjp.getTarget());
defaultMessageListenerContainer.setMessageListener(messageListener);
defaultMessageListenerContainer.afterPropertiesSet();
listerContainerMap.put(queueName, defaultMessageListenerContainer);
}
}
AsynCallInfo asynCallInfo = new AsynCallInfo(pjp.getSignature().getName(),pjp.getArgs());
objectMessageSender.asynSendMessage(asynCallInfo);
}
private String getQueueName(ProceedingJoinPoint pjp, Async async)
{
String queueName = async.queueName();
//如果没有指定队列名,则以 类名.方法名 为队列名
if(StringUtils.isBlank(queueName))
{
queueName = pjp.getTarget().getClass().getSimpleName() + "." + pjp.getSignature().getName();
}
return queueName;
}
private void memAsync(final ProceedingJoinPoint pjp, Async async)
{
ThreadPoolExecutor executor = null;
final String queueName = getQueueName(pjp, async);
synchronized (executorMap)
{
executor = executorMap.get(queueName);
if(executor == null)
{
LOG.info("init mem queueName " + queueName + " thread count " + async.threadCount() + " warning queue size " + async.warningQueueSize());
executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(async.threadCount());
executorMap.put(queueName, executor);
}
}
Thread thread = new Thread(){
@Override
public void run()
{
try
{
pjp.proceed();
} catch (Throwable e)
{
LOG.error("async call fail", e);
}
}
};
executor.execute(thread);
int size = executor.getQueue().size();
if( size >= async.warningQueueSize())
{
LOG.warn("queueName " + queueName + " exceeds warning queue size " + async.warningQueueSize() + " current size " + size);
}
}
public void destroy() throws Exception
{
for(String key : executorMap.keySet())
{
LOG.info("destroy mem queue " + key);
ThreadPoolExecutor executor = executorMap.get(key);
executor.shutdown();
executor.awaitTermination(2, TimeUnit.SECONDS);
}
for(String key : listerContainerMap.keySet())
{
LOG.info("destroy jms queue " + key);
DefaultMessageListenerContainer container = listerContainerMap.get(key);
container.destroy();
}
executorMap.clear();
listerContainerMap.clear();
}
public void setConnectionFactory(ConnectionFactory jmsConnectionFactory)
{
this.connectionFactory = jmsConnectionFactory;
}
}
对需要异步的方法加入@Aysnc 即可实现异步
内存异步
@Async
public String memAsyncFind(Long userid)
{
System.out.println("mem asyncFind userid " + userid);
return userid.toString();
}
JMS异步
@Async(jmsAsync=true)
public String jmsAsyncFind(Long userid)
{
System.out.println("jms asyncFind userid " + userid);
return userid.toString();
}
AOP配置
<?xml version="1.0" encoding="utf-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:jee="http://www.springframework.org/schema/jee"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-2.0.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-2.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.0.xsd
"
default-autowire="byName">
<bean id="asyncInterceptor"
class="com.konceptusa.infinet.imsupport.aop.AsyncInterceptor">
</bean>
<aop:config>
<aop:pointcut id="asyncPointcut"
expression="execution(* com.konceptusa.infinet.test.service..*.*(..)) and @annotation(async)" />
<aop:aspect id="asyncAspect" ref="asyncInterceptor">
<aop:around method="async" pointcut-ref="asyncPointcut" />
</aop:aspect>
</aop:config>
</beans>
Activemq配置
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.org/config/1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.org/config/1.0 http://activemq.apache.org/schema/activemq-core.xsd
http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd">
<broker xmlns="http://activemq.org/config/1.0" brokerName="test"
dataDirectory="${activemq.data}/data/test">
<persistenceAdapter>
<journaledJDBC journalLogFiles="10"
dataDirectory="${activemq.data}/persistence"
dataSource="#activemq-ds" createTablesOnStartup="true"
useDatabaseLock="false" />
</persistenceAdapter>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="100 mb"
percentUsageMinDelta="20" />
</memoryUsage>
<storeUsage>
<storeUsage limit="1 gb" />
</storeUsage>
</systemUsage>
</systemUsage>
</broker>
<bean id="activemq-ds"
class="com.mchange.v2.c3p0.ComboPooledDataSource"
destroy-method="close">
<property name="driverClass" value="${database.driverName.mq}" />
<property name="jdbcUrl" value="${database.url.mq}" />
<property name="user" value="${database.user.mq}" />
<property name="password" value="${database.password.mq}" />
<property name="initialPoolSize" value="${database.initialSize}" />
<property name="minPoolSize" value="${database.initialSize}" />
<property name="maxPoolSize" value="${database.maxActive}" />
<property name="maxIdleTime" value="${database.maxIdleTime}" />
<property name="acquireIncrement" value="${database.acquireIncrement}" />
<property name="numHelperThreads" value="${database.numHelperThreads}" />
<property name="automaticTestTable"
value="${database.automaticTestTable}" />
<property name="maxStatements" value="${database.maxStatements}" />
<property name="maxStatementsPerConnection"
value="${database.maxStatementsPerConnection}" />
<property name="idleConnectionTestPeriod"
value="${database.idleConnectionTestPeriod}" />
</bean>
<connectionFactory xmlns="http://activemq.org/config/1.0"
id="jmsConnectionFactory" brokerURL="vm://test" />
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory"
ref="jmsConnectionFactory" />
</bean>
</beans>
分享到:
相关推荐
在Spring中,实现异步消息监听器有三种方式:实现 javax.jms.MessageListener 接口、实现 Spring 的 SessionAwareMessageListener 和捆绑一个标准 POJO 到 Spring 的 MessageListenerAdapter 类上。这三种方法在消息...
Java 消息服务 (JMS) API 是实现异步消息传递的标准,但其原始使用方式涉及复杂的资源管理和异常处理。 传统的 JMS 实现涉及多个步骤,包括 JNDI(Java Naming and Directory Interface)查询来查找队列连接工厂和...
Spring 框架提供了对 Java 消息服务(Java Message Service, JMS)的支持,使得开发者能够轻松地在应用程序中实现异步消息传递。 JMS 是一个标准接口,用于在分布式环境中交换消息。通过 Spring JMS,我们可以利用...
"JMS之Spring + ActiveMQ实现消息队列"涉及到的关键知识点包括:Spring框架的JMS支持、ActiveMQ的使用、ConnectionFactory的配置、JmsTemplate和MessageListener的实现,以及消息队列在解决系统解耦和异步处理中的...
【Spring JMS】是Spring框架中的一个模块,用于简化Java消息服务(JMS)的使用,使得异步消息处理变得更加简单和灵活。Spring JMS通过提供一个模板类,抽象了JMS API的底层细节,让开发者能够更加专注于消息的处理...
本教程将带你逐步了解如何利用Tomcat、Spring和JMS(Java Message Service)构建一个简单的异步消息传递入门实例。 首先,让我们来理解一下核心组件: 1. **Tomcat**:这是一个流行的开源Java Servlet容器,用于...
在这个"spring jms tomcat 异步消息传递入门实例"中,我们将探讨如何在Spring框架下利用JMS和Tomcat实现异步消息传递,以提高系统性能和可扩展性。 首先,理解异步消息传递的概念至关重要。在同步通信中,发送方...
Spring框架则简化了使用JEE组件(包括JMS)的任务。它提供的模板机制隐藏了典型的JMS实现的细节,这样开发人员可以集中精力放在处理消息的实际工作中,而不用担心如何去创建,访问或清除JMS资源。本文将对SpringJMSAP
在IT行业中,Java消息服务(Java Message Service,简称JMS)是一种标准接口,用于应用程序之间的异步通信。它允许数据以消息的形式在不同的应用程序之间传递,从而实现解耦和可扩展性。OpenJMS是开源的JMS提供者之...
总结起来,JavaEE异步消息处理通过JMS和消息队列实现了客户端和服务端之间的解耦和高效通信,利用线程池和异步处理提升了系统的并发性和响应能力。开发者可以利用各种框架和工具,如EJB的MDB或Spring的JMS支持,轻松...
Java消息服务(Java Message Service,简称JMS)是Java平台中用于企业级应用间异步通信的一种标准API。它提供了一种面向消息中间件(Message-Oriented Middleware, MOM)的标准接口,使得应用程序可以发送、接收和...
【标题】:“基于Websphere MQ持久化消息实现异步转同步—方案一” 在分布式系统中,异步处理和消息队列(Message Queuing,MQ)是提高系统可扩展性和解耦组件的重要手段。Websphere MQ是IBM提供的一款强大、可靠的...
Spring框架和ActiveMQ的结合使用,为开发者提供了强大的异步消息传递能力。本文将深入探讨如何结合Spring 2.0与ActiveMQ来实现异步消息调用,并分享相关知识点。 首先,Spring 2.0是一个广泛使用的Java应用框架,它...
JMS是一种标准接口,用于在分布式环境中交换消息,它允许不同的应用程序之间进行异步通信。 在使用`jms-deploy`之前,开发者需要对JMS有一定的理解。JMS的核心概念包括消息生产者(Message Producers)、消息消费者...
使用JMS(Java Message Service)或者RabbitMQ等消息中间件实现异步通信;使用多线程或异步回调机制来处理并发任务;使用设计模式如工厂模式、观察者模式等增强框架的灵活性。 7. **测试与调试**:C2架构的可测试性...
在WebLogic中整合JMS(Java Message Service)和Spring框架,可以实现异步消息处理,提高系统的响应速度和并发处理能力。JMS 是一个为Java平台设计的消息中间件接口,它允许应用程序通过消息传递进行通信,而Spring...
JMS为Java开发者提供了一种强大而灵活的消息处理框架,通过支持不同的消息模型、提供丰富的API以及实现异步通信等功能,使得基于Java的应用程序能够高效地处理大量消息。理解JMS的基本概念和原理对于构建可靠的...
标题中的“基于Websphere MQ持久化消息实现异步转同步—方案二”是指在分布式系统中,通过使用Websphere MQ(WebSphere Message Broker,一种消息中间件)来处理异步通信,并通过消息的持久化特性,确保消息在异常...