`
jwin
  • 浏览: 26978 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

内存异步及JMS异步框架实现

    博客分类:
  • 2008
阅读更多

      一般来说,对于那些处理时间比较长,需要马上响应并且做成异步后不会影响其它流程的做成异步。比如用户注册成功后发邮件给用户,这个可以在用户注册成功后马上返回,而不需要等到邮件发送成功才返回。异步有二种方式:内存异步和JMS异步。

     内存异步即把要处理的请求放到内存队列中,然后由多个线程去消费。这种方式性能比较高,但是会存在请求丢失和内存溢出的风险。比如服务器突然down机,那么队列中未处理完的请求就会丢失;如果请求处理时间太长,并且请求一直在增加,即生产者速度要大于消费者速度时,就存在内存溢出的风险。这种适合对数据丢失不敏感,并且生产者速度要小于消费者速度的场合。

    JMS异步即把要处理的请求先持久化到数据库,然后多个线程去消费。这种方式性能相对来说要慢一些,但是不会出现请求丢失和内存溢出的情况。这种适合不允许数据丢失,请求处理时间比较长的场合。

 

异步框架,使用元数据和AOP的方式,将内存异步和JMS异步统一起来。

package com.konceptusa.infinet.annotation;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * 方法异步
 * @author Jwin
 *
 */
@Retention(RetentionPolicy.RUNTIME)
@Target( {ElementType.METHOD })
@Documented
public @interface Async
{
	//异步队列名称,默认为类名.方法名
	String queueName() default "";
	//消费者线程数
	int threadCount() default 10;
	//队列报警值,仅对内存异步有效
	int warningQueueSize() default 100;
	//是否使用jms异步,默认为内存异步
	boolean jmsAsync() default false;
}

 

 

 

package com.konceptusa.infinet.imsupport.aop;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import javax.jms.ConnectionFactory;
import javax.jms.Queue;

import org.apache.activemq.command.ActiveMQQueue;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.aspectj.lang.ProceedingJoinPoint;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;

import com.konceptusa.framework.asyn.AsynCallInfo;
import com.konceptusa.framework.asyn.jms.AsynExecuteMessageListener;
import com.konceptusa.framework.core.jmsservice.ObjectMessageSenderImpl;
import com.konceptusa.framework.core.jmsservice.SendMessageFailedException;
import com.konceptusa.infinet.annotation.Async;

/**
 * 异步AOP
 * @author Jwin
 *
 */
public class AsyncInterceptor implements DisposableBean
{
	private final static Log LOG = LogFactory.getLog(AsyncInterceptor.class);
	private Map<String, ThreadPoolExecutor> executorMap = new HashMap<String, ThreadPoolExecutor>();
	private Map<String, ObjectMessageSenderImpl> messageSenderMap = new HashMap<String, ObjectMessageSenderImpl>();
	private Map<String, DefaultMessageListenerContainer> listerContainerMap = new HashMap<String, DefaultMessageListenerContainer>();
	private ConnectionFactory connectionFactory;
	public Object async(final ProceedingJoinPoint pjp, Async async) throws Throwable
	{
		if(!async.jmsAsync())
		{
			memAsync(pjp, async);			
		}
		else
		{
			jmsAsync(pjp,async);
		}
		return null;
	}
	
	private void jmsAsync(ProceedingJoinPoint pjp, Async async)
	{
		ObjectMessageSenderImpl objectMessageSender = null;
		String queueName = getQueueName(pjp, async);
		synchronized (messageSenderMap)
		{
			objectMessageSender = messageSenderMap.get(queueName);
			if(objectMessageSender == null)
			{
				LOG.info("init jms queueName " + queueName + " thread count " + async.threadCount() );
				objectMessageSender = new ObjectMessageSenderImpl();
				Queue queue = new ActiveMQQueue(queueName);
				JmsTemplate jmsTemplate = new JmsTemplate();
				jmsTemplate.setConnectionFactory(connectionFactory);
				jmsTemplate.setDefaultDestination(queue);
				objectMessageSender.setJmsTemplate(jmsTemplate);
				messageSenderMap.put(queueName, objectMessageSender);
				DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
				defaultMessageListenerContainer.setConcurrentConsumers(async.threadCount());
				defaultMessageListenerContainer.setConnectionFactory(connectionFactory);
				defaultMessageListenerContainer.setDestination(queue);
				defaultMessageListenerContainer.setCacheLevelName("CACHE_CONSUMER");
				defaultMessageListenerContainer.setMaxMessagesPerTask(20);
				AsynExecuteMessageListener messageListener = new AsynExecuteMessageListener();
				messageListener.setTarget(pjp.getTarget());
				defaultMessageListenerContainer.setMessageListener(messageListener);
				defaultMessageListenerContainer.afterPropertiesSet();
				listerContainerMap.put(queueName, defaultMessageListenerContainer);
			}			
		}
		AsynCallInfo asynCallInfo = new AsynCallInfo(pjp.getSignature().getName(),pjp.getArgs());
		objectMessageSender.asynSendMessage(asynCallInfo);
	}

	private String getQueueName(ProceedingJoinPoint pjp, Async async)
	{
		String queueName = async.queueName();
		//如果没有指定队列名,则以 类名.方法名 为队列名
		if(StringUtils.isBlank(queueName))
		{
			queueName = pjp.getTarget().getClass().getSimpleName() + "." + pjp.getSignature().getName();
		}
		return queueName;
	}
	private void memAsync(final ProceedingJoinPoint pjp, Async async)
	{
		ThreadPoolExecutor executor = null;
		final String queueName = getQueueName(pjp, async);
		synchronized (executorMap)
		{
			executor = executorMap.get(queueName);
			if(executor == null)
			{
				LOG.info("init mem queueName " + queueName + " thread count " + async.threadCount() + " warning queue size " + async.warningQueueSize());
				executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(async.threadCount());
				executorMap.put(queueName, executor);
			}
		}
		Thread thread = new Thread(){
			@Override
			public void run()
			{
				try
				{
					pjp.proceed();
				} catch (Throwable e)
				{
					LOG.error("async call fail", e);
				}
			}
		};
		executor.execute(thread);
		int size = executor.getQueue().size();
		if( size >= async.warningQueueSize())
		{
			LOG.warn("queueName " + queueName + " exceeds warning queue size " + async.warningQueueSize() + " current size " + size);
		}
	}
	public void destroy() throws Exception
	{
		for(String key : executorMap.keySet())
		{			
			LOG.info("destroy mem queue " + key);
			ThreadPoolExecutor executor = executorMap.get(key);
			executor.shutdown();
			executor.awaitTermination(2, TimeUnit.SECONDS);
		}
		for(String key : listerContainerMap.keySet())
		{			
			LOG.info("destroy jms queue " + key);
			DefaultMessageListenerContainer container = listerContainerMap.get(key);
			container.destroy();
		}
		executorMap.clear();
		listerContainerMap.clear();
	}
	public void setConnectionFactory(ConnectionFactory jmsConnectionFactory)
	{
		this.connectionFactory = jmsConnectionFactory;
	}
}

 

 

对需要异步的方法加入@Aysnc 即可实现异步

 

内存异步

	@Async
	public String memAsyncFind(Long userid)
	{
		System.out.println("mem asyncFind userid " + userid);
		return userid.toString();
	} 

 

JMS异步

	@Async(jmsAsync=true)
	public String jmsAsyncFind(Long userid)
	{
		System.out.println("jms asyncFind userid " + userid);
		return userid.toString();
	} 

 

AOP配置

<?xml version="1.0" encoding="utf-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:util="http://www.springframework.org/schema/util"
	xmlns:jee="http://www.springframework.org/schema/jee"
	xmlns:aop="http://www.springframework.org/schema/aop"
	xmlns:tx="http://www.springframework.org/schema/tx"
	xsi:schemaLocation="
   http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
   http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-2.0.xsd
   http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-2.0.xsd
   http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.0.xsd
   http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.0.xsd
   "
	default-autowire="byName">
 	<bean id="asyncInterceptor"
		class="com.konceptusa.infinet.imsupport.aop.AsyncInterceptor">
	</bean>
	
	<aop:config>
		<aop:pointcut id="asyncPointcut"
				expression="execution(* com.konceptusa.infinet.test.service..*.*(..)) and @annotation(async)" />
		<aop:aspect id="asyncAspect" ref="asyncInterceptor">
			<aop:around method="async" pointcut-ref="asyncPointcut" />
		</aop:aspect>
	</aop:config>	
	
</beans>

Activemq配置 

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:amq="http://activemq.org/config/1.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.org/config/1.0 http://activemq.apache.org/schema/activemq-core.xsd
  http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd">

	<broker xmlns="http://activemq.org/config/1.0" brokerName="test"
		dataDirectory="${activemq.data}/data/test">
		<persistenceAdapter>
			<journaledJDBC journalLogFiles="10"
				dataDirectory="${activemq.data}/persistence"
				dataSource="#activemq-ds" createTablesOnStartup="true"
				useDatabaseLock="false" />
		</persistenceAdapter>

		<systemUsage>
			<systemUsage>
				<memoryUsage>
					<memoryUsage limit="100 mb"
						percentUsageMinDelta="20" />
				</memoryUsage>
				<storeUsage>
					<storeUsage limit="1 gb" />
				</storeUsage>
			</systemUsage>
		</systemUsage>
	</broker>

	<bean id="activemq-ds"
		class="com.mchange.v2.c3p0.ComboPooledDataSource"
		destroy-method="close">
		<property name="driverClass" value="${database.driverName.mq}" />
		<property name="jdbcUrl" value="${database.url.mq}" />
		<property name="user" value="${database.user.mq}" />
		<property name="password" value="${database.password.mq}" />
		<property name="initialPoolSize" value="${database.initialSize}" />
		<property name="minPoolSize" value="${database.initialSize}" />
		<property name="maxPoolSize" value="${database.maxActive}" />
		<property name="maxIdleTime" value="${database.maxIdleTime}" />
		<property name="acquireIncrement" value="${database.acquireIncrement}" />
		<property name="numHelperThreads" value="${database.numHelperThreads}" />
		<property name="automaticTestTable"
			value="${database.automaticTestTable}" />
		<property name="maxStatements" value="${database.maxStatements}" />
		<property name="maxStatementsPerConnection"
			value="${database.maxStatementsPerConnection}" />
		<property name="idleConnectionTestPeriod"
			value="${database.idleConnectionTestPeriod}" />
	</bean>

	<connectionFactory xmlns="http://activemq.org/config/1.0"
		id="jmsConnectionFactory" brokerURL="vm://test" />

	<bean id="connectionFactory"
		class="org.springframework.jms.connection.SingleConnectionFactory">
		<property name="targetConnectionFactory"
			ref="jmsConnectionFactory" />
	</bean>
</beans>
 

 

7
1
分享到:
评论
2 楼 WLLT 2011-11-10  
就博文就是那项目里用的!
1 楼 WLLT 2011-11-10  
    你好写的不要错转了,兄弟我目前在修复的一个项目是你搞的 啊,我看项目署名就是你,有几个问题想问你。看到加我QQ254885640 希望成为朋友!

相关推荐

    JMS与Spring之二(用message listener container异步收发消息)

    在Spring中,实现异步消息监听器有三种方式:实现 javax.jms.MessageListener 接口、实现 Spring 的 SessionAwareMessageListener 和捆绑一个标准 POJO 到 Spring 的 MessageListenerAdapter 类上。这三种方法在消息...

    使用Spring JMS轻松实现异步消息传递.pdf

    Java 消息服务 (JMS) API 是实现异步消息传递的标准,但其原始使用方式涉及复杂的资源管理和异常处理。 传统的 JMS 实现涉及多个步骤,包括 JNDI(Java Naming and Directory Interface)查询来查找队列连接工厂和...

    使用Spring JMS轻松实现异步消息传递.docx

    Spring 框架提供了对 Java 消息服务(Java Message Service, JMS)的支持,使得开发者能够轻松地在应用程序中实现异步消息传递。 JMS 是一个标准接口,用于在分布式环境中交换消息。通过 Spring JMS,我们可以利用...

    JMS之Spring +activeMQ实现消息队列

    "JMS之Spring + ActiveMQ实现消息队列"涉及到的关键知识点包括:Spring框架的JMS支持、ActiveMQ的使用、ConnectionFactory的配置、JmsTemplate和MessageListener的实现,以及消息队列在解决系统解耦和异步处理中的...

    Spring JMS使异步消息变得简单.doc

    【Spring JMS】是Spring框架中的一个模块,用于简化Java消息服务(JMS)的使用,使得异步消息处理变得更加简单和灵活。Spring JMS通过提供一个模板类,抽象了JMS API的底层细节,让开发者能够更加专注于消息的处理...

    tomcat spring jms 异步消息传递入门实例

    本教程将带你逐步了解如何利用Tomcat、Spring和JMS(Java Message Service)构建一个简单的异步消息传递入门实例。 首先,让我们来理解一下核心组件: 1. **Tomcat**:这是一个流行的开源Java Servlet容器,用于...

    spring jms tomcat 异步消息传递入门实例

    在这个"spring jms tomcat 异步消息传递入门实例"中,我们将探讨如何在Spring框架下利用JMS和Tomcat实现异步消息传递,以提高系统性能和可扩展性。 首先,理解异步消息传递的概念至关重要。在同步通信中,发送方...

    使用SpringJMS轻松实现异步消息传递

    Spring框架则简化了使用JEE组件(包括JMS)的任务。它提供的模板机制隐藏了典型的JMS实现的细节,这样开发人员可以集中精力放在处理消息的实际工作中,而不用担心如何去创建,访问或清除JMS资源。本文将对SpringJMSAP

    应用openJMS实现JMS消息发布于订阅

    在IT行业中,Java消息服务(Java Message Service,简称JMS)是一种标准接口,用于应用程序之间的异步通信。它允许数据以消息的形式在不同的应用程序之间传递,从而实现解耦和可扩展性。OpenJMS是开源的JMS提供者之...

    javaEE 异步消息处理

    总结起来,JavaEE异步消息处理通过JMS和消息队列实现了客户端和服务端之间的解耦和高效通信,利用线程池和异步处理提升了系统的并发性和响应能力。开发者可以利用各种框架和工具,如EJB的MDB或Spring的JMS支持,轻松...

    JMS

    Java消息服务(Java Message Service,简称JMS)是Java平台中用于企业级应用间异步通信的一种标准API。它提供了一种面向消息中间件(Message-Oriented Middleware, MOM)的标准接口,使得应用程序可以发送、接收和...

    [重要]基于Websphere MQ持久化消息实现异步转同步—方案一

    【标题】:“基于Websphere MQ持久化消息实现异步转同步—方案一” 在分布式系统中,异步处理和消息队列(Message Queuing,MQ)是提高系统可扩展性和解耦组件的重要手段。Websphere MQ是IBM提供的一款强大、可靠的...

    结合Spring2.0和ActiveMQ进行异步消息调用

    Spring框架和ActiveMQ的结合使用,为开发者提供了强大的异步消息传递能力。本文将深入探讨如何结合Spring 2.0与ActiveMQ来实现异步消息调用,并分享相关知识点。 首先,Spring 2.0是一个广泛使用的Java应用框架,它...

    前端开源库-jms-deploy

    JMS是一种标准接口,用于在分布式环境中交换消息,它允许不同的应用程序之间进行异步通信。 在使用`jms-deploy`之前,开发者需要对JMS有一定的理解。JMS的核心概念包括消息生产者(Message Producers)、消息消费者...

    C2风格的框架程序

    使用JMS(Java Message Service)或者RabbitMQ等消息中间件实现异步通信;使用多线程或异步回调机制来处理并发任务;使用设计模式如工厂模式、观察者模式等增强框架的灵活性。 7. **测试与调试**:C2架构的可测试性...

    weblogic与jms+spring

    在WebLogic中整合JMS(Java Message Service)和Spring框架,可以实现异步消息处理,提高系统的响应速度和并发处理能力。JMS 是一个为Java平台设计的消息中间件接口,它允许应用程序通过消息传递进行通信,而Spring...

    JMS概念及原理,简明扼要

    JMS为Java开发者提供了一种强大而灵活的消息处理框架,通过支持不同的消息模型、提供丰富的API以及实现异步通信等功能,使得基于Java的应用程序能够高效地处理大量消息。理解JMS的基本概念和原理对于构建可靠的...

    [重要]基于Websphere MQ持久化消息实现异步转同步—方案二

    标题中的“基于Websphere MQ持久化消息实现异步转同步—方案二”是指在分布式系统中,通过使用Websphere MQ(WebSphere Message Broker,一种消息中间件)来处理异步通信,并通过消息的持久化特性,确保消息在异常...

Global site tag (gtag.js) - Google Analytics