- 浏览: 987739 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
JMS(ActiveMQ) PTP和PUB/SUB模式实例:http://donald-draper.iteye.com/blog/2347445
ActiveMQ连接工厂、连接详解:http://donald-draper.iteye.com/blog/2348070
ActiveMQ会话初始化:http://donald-draper.iteye.com/blog/2348341
ActiveMQ生产者:http://donald-draper.iteye.com/blog/2348381
ActiveMQ消费者:http://donald-draper.iteye.com/blog/2348389
ActiveMQ启动过程详解:http://donald-draper.iteye.com/blog/2348399
ActiveMQ Broker发送消息给消费者过程详解:http://donald-draper.iteye.com/blog/2348440
Spring与ActiveMQ的集成:http://donald-draper.iteye.com/blog/2347638
Spring与ActiveMQ的集成详解一:http://donald-draper.iteye.com/blog/2348449
前言:
JmsTemplate的构造,主要是初始化事务资源工厂,消息转换器,传输延时,优先级,消息生存时间再来看发送消息。JmsTemplate发送消息的时候,是将这一过程包装成会话回调接口,
然后执行会话回调接口,会话回调结构中有个一参数就是Session,这个Session的获取就是我们上面在讲的,首先从事务同步管理器获取连接工厂对应的JmsResourceHolder,如果JmsResourceHolder存在,则从JmsResourceHolder获取会话,如果没有则直接从ActiveMQConnectionFactory获取连接及会话,然后由会话创建生产者,有生产者发送消息;
对于有消息转换的,则将消息转化器包装到MessageCreator,发送时由MessageCreator,调用消息转换器的消息转化方法,转换消息,发送消息。消费者手动消费消息分方式同样是包装成会话回调接口,会话获取与生产者发送消息的会话阶段一样,然后从会话创建消费者,消费消息,而转化消息模式,只是将消费者获取的消息通过消息转化器转换一下;
上一篇说了JMSTemplate发送与消费消息,今天这篇看一下监听器:
配置片段
我们回到的DefaultMessageListenerContainer构造
DefaultMessageListenerContainer构造,主要初始化允许并发消费者的数量,每个任务最大消费消息等;由于JmsAccessor实现了InitializingBean,我们从afterPropertiesSet方法看起,在
AbstractJmsListeningContainer中发现了afterPropertiesSet
再看AbstractMessageListenerContainer
再看初始化
//DefaultMessageListenerContainer
创建默认任务执行器
taskExecutor = createDefaultTaskExecutor();
来看AbstractPollingMessageListenerContainer的初始化
//AbstractJmsListeningContainer
看DefaultMessageListenerContainer初始化
再看AsyncMessageListenerInvoker
创建监听器消费者
p
通知消费者从未消费消息通道获取分发消息
message = receiveMessage(consumerToUse);
//ActiveMQMessageConsumer
发送拉消息命令
从未消费消息通道分发消息
调用Invoker,实际为空,待拓展
通知会话,消费消息
总结:
DefaultMessageListenerContainer构造,主要初始化允许并发消费者的数量,每个任务最大消费消息等;然后初始化任务执行器,创建消息监听器Invoker,交由任务线程去执行;消息监听器Invoker主要做的工作,就是获取消息监听器,通知消费者从未消费消息通道获取分发消息,然后遍历消息监听器,有消监听器的onMessage方法消费消息。
附:Spring事务管理,这个以后有时间,研究一下,猜测思想就是事务同步管理器,根据ThreadLocal管理每个线程的事务管理和连接工厂及连接资源Holder映射关系,每一次连接事务的执行,先从事务同步管理器,获取连接资源Holder,如果有,获取事务的状态,没有则看一下需要事务管理,如果需要,则将连接工厂及连接资源Holder映射关系绑定到事务同步管理器;每一次事务提交都是从事务同步管理器获取连接工厂对应的连接资源Holder,从连接资源Holder获取连接,有连接提交事务或回滚。
ActiveMQ连接工厂、连接详解:http://donald-draper.iteye.com/blog/2348070
ActiveMQ会话初始化:http://donald-draper.iteye.com/blog/2348341
ActiveMQ生产者:http://donald-draper.iteye.com/blog/2348381
ActiveMQ消费者:http://donald-draper.iteye.com/blog/2348389
ActiveMQ启动过程详解:http://donald-draper.iteye.com/blog/2348399
ActiveMQ Broker发送消息给消费者过程详解:http://donald-draper.iteye.com/blog/2348440
Spring与ActiveMQ的集成:http://donald-draper.iteye.com/blog/2347638
Spring与ActiveMQ的集成详解一:http://donald-draper.iteye.com/blog/2348449
前言:
JmsTemplate的构造,主要是初始化事务资源工厂,消息转换器,传输延时,优先级,消息生存时间再来看发送消息。JmsTemplate发送消息的时候,是将这一过程包装成会话回调接口,
然后执行会话回调接口,会话回调结构中有个一参数就是Session,这个Session的获取就是我们上面在讲的,首先从事务同步管理器获取连接工厂对应的JmsResourceHolder,如果JmsResourceHolder存在,则从JmsResourceHolder获取会话,如果没有则直接从ActiveMQConnectionFactory获取连接及会话,然后由会话创建生产者,有生产者发送消息;
对于有消息转换的,则将消息转化器包装到MessageCreator,发送时由MessageCreator,调用消息转换器的消息转化方法,转换消息,发送消息。消费者手动消费消息分方式同样是包装成会话回调接口,会话获取与生产者发送消息的会话阶段一样,然后从会话创建消费者,消费消息,而转化消息模式,只是将消费者获取的消息通过消息转化器转换一下;
上一篇说了JMSTemplate发送与消费消息,今天这篇看一下监听器:
配置片段
<!-- 消息监听方式 --> <bean id="topicSubMesListener" class="com.activemq.listener.TopicSubscriberMessageListener"/> <bean id="testMsgTopiclistenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactoryMQ" /> <property name="destination" ref="testTopic" /> <property name="messageListener" ref="topicSubMesListener" /> <property name="pubSubDomain" value="true" /> <property name="receiveTimeout" value="10000" /> </bean>
public class DefaultMessageListenerContainer extends AbstractPollingMessageListenerContainer { public static final String DEFAULT_THREAD_NAME_PREFIX = (new StringBuilder()).append(ClassUtils.getShortName(org/springframework/jms/listener/DefaultMessageListenerContainer)).append("-").toString(); public static final long DEFAULT_RECOVERY_INTERVAL = 5000L; public static final int CACHE_NONE = 0; public static final int CACHE_CONNECTION = 1; public static final int CACHE_SESSION = 2; public static final int CACHE_CONSUMER = 3; public static final int CACHE_AUTO = 4; private static final Constants constants = new Constants(org/springframework/jms/listener/DefaultMessageListenerContainer); private Executor taskExecutor;//任务执行线程 private long recoveryInterval; private int cacheLevel; private int concurrentConsumers;//允许并发消费者的数量 private int maxConcurrentConsumers; private int maxMessagesPerTask;//每个任务最大消费消息 private int idleConsumerLimit; private int idleTaskExecutionLimit; private final Set scheduledInvokers = new HashSet();//HashSet<AsyncMessageListenerInvoker> private int activeInvokerCount; private int registeredWithDestination; private volatile boolean recovering; private Runnable stopCallback; private Object currentRecoveryMarker; private final Object recoveryMonitor = new Object(); }
public abstract class AbstractPollingMessageListenerContainer extends AbstractMessageListenerContainer { public static final long DEFAULT_RECEIVE_TIMEOUT = 1000L; private final MessageListenerContainerResourceFactory transactionalResourceFactory = new MessageListenerContainerResourceFactory(); private boolean sessionTransactedCalled; private boolean pubSubNoLocal; private PlatformTransactionManager transactionManager; private DefaultTransactionDefinition transactionDefinition; private long receiveTimeout;//接受超时时间 private volatile Boolean commitAfterNoMessageReceived; }
public abstract class AbstractMessageListenerContainer extends AbstractJmsListeningContainer { private volatile Object destination;//目的地 private volatile String messageSelector; private volatile Object messageListener;//消息监听器 private boolean subscriptionDurable; private String durableSubscriptionName; private ExceptionListener exceptionListener; private ErrorHandler errorHandler; private boolean exposeListenerSession; private boolean acceptMessagesWhileStopping; }
public abstract class AbstractJmsListeningContainer extends JmsDestinationAccessor implements SmartLifecycle, BeanNameAware, DisposableBean public abstract class JmsDestinationAccessor extends JmsAccessor { private DestinationResolver destinationResolver; private boolean pubSubDomain;//是否是主题订阅模式 }
public abstract class JmsAccessor implements InitializingBean { private static final Constants sessionConstants = new Constants(javax/jms/Session); private ConnectionFactory connectionFactory;//连接工厂 private boolean sessionTransacted;//事务处于事务中 private int sessionAcknowledgeMode;//会话确认模式 }
我们回到的DefaultMessageListenerContainer构造
public DefaultMessageListenerContainer() { recoveryInterval = 5000L; cacheLevel = 4; concurrentConsumers = 1; maxConcurrentConsumers = 1; maxMessagesPerTask = -2147483648; idleConsumerLimit = 1; idleTaskExecutionLimit = 1; activeInvokerCount = 0; registeredWithDestination = 0; recovering = false; currentRecoveryMarker = new Object(); }
DefaultMessageListenerContainer构造,主要初始化允许并发消费者的数量,每个任务最大消费消息等;由于JmsAccessor实现了InitializingBean,我们从afterPropertiesSet方法看起,在
AbstractJmsListeningContainer中发现了afterPropertiesSet
public abstract class AbstractMessageListenerContainer extends AbstractJmsListeningContainer { private volatile Object destination;//目的地 private volatile String messageSelector; private volatile Object messageListener;//消息监听器 private boolean subscriptionDurable; private String durableSubscriptionName; private ExceptionListener exceptionListener; private ErrorHandler errorHandler; private boolean exposeListenerSession; private boolean acceptMessagesWhileStopping; } public void afterPropertiesSet() { //调用父类的afterPropertiesSet,即JmsAccessor super.afterPropertiesSet(); //验证配置 validateConfiguration(); initialize(); } //看JmsAccessor public void afterPropertiesSet() { //获取工厂 if(getConnectionFactory() == null) throw new IllegalArgumentException("Property 'connectionFactory' is required"); } //主要是检查连接工厂配置 public ConnectionFactory getConnectionFactory() { return connectionFactory; }
再看AbstractMessageListenerContainer
protected void validateConfiguration() { nothing }
再看初始化
//DefaultMessageListenerContainer
public void initialize() { //设置缓存级别 if(cacheLevel == 4) cacheLevel = getTransactionManager() == null ? 3 : 0; synchronized(lifecycleMonitor) { if(taskExecutor == null) //创建默认任务执行器 taskExecutor = createDefaultTaskExecutor(); else if((taskExecutor instanceof SchedulingTaskExecutor) && ((SchedulingTaskExecutor)taskExecutor).prefersShortLivedTasks() && maxMessagesPerTask == -2147483648) maxMessagesPerTask = 10; } super.initialize(); } //获取事务管理器 protected final PlatformTransactionManager getTransactionManager() { return transactionManager; }
创建默认任务执行器
taskExecutor = createDefaultTaskExecutor();
protected TaskExecutor createDefaultTaskExecutor() { String beanName = getBeanName(); String threadNamePrefix = beanName == null ? DEFAULT_THREAD_NAME_PREFIX : (new StringBuilder()).append(beanName).append("-").toString(); return new SimpleAsyncTaskExecutor(threadNamePrefix); } public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator implements AsyncListenableTaskExecutor, Serializable { public static final int UNBOUNDED_CONCURRENCY = -1; public static final int NO_CONCURRENCY = 0; private final ConcurrencyThrottleAdapter concurrencyThrottle; private ThreadFactory threadFactory;//线程工厂 }
来看AbstractPollingMessageListenerContainer的初始化
public void initialize() { //初始化事务 if(!sessionTransactedCalled && (transactionManager instanceof ResourceTransactionManager) && !TransactionSynchronizationUtils.sameResourceFactory((ResourceTransactionManager)transactionManager, getConnectionFactory())) super.setSessionTransacted(true); if(transactionDefinition.getName() == null) transactionDefinition.setName(getBeanName()); super.initialize(); }
//AbstractJmsListeningContainer
initialize();
public void initialize() throws JmsException { try { synchronized(lifecycleMonitor) { active = true; //通知监听容器已启动,释放lifecycleMonitor锁 lifecycleMonitor.notifyAll(); } //委托给doInitialize doInitialize(); } catch(JMSException ex) { synchronized(sharedConnectionMonitor) { ConnectionFactoryUtils.releaseConnection(sharedConnection, getConnectionFactory(), autoStartup); sharedConnection = null; } throw convertJmsAccessException(ex); } } //待父类扩展 protected abstract void doInitialize() throws JMSException;
看DefaultMessageListenerContainer初始化
protected void doInitialize() throws JMSException { synchronized(lifecycleMonitor) { for(int i = 0; i < concurrentConsumers; i++) //等待lifecycleMonitor锁,然后创建异步消息监听执行器 scheduleNewInvoker(); } }
//创建异步消息监听执行器 private void scheduleNewInvoker() { //创建消息监听器Invoker,添加到scheduledInvokers Set中 AsyncMessageListenerInvoker invoker = new AsyncMessageListenerInvoker(); if(rescheduleTaskIfNecessary(invoker)) scheduledInvokers.add(invoker); }
再看AsyncMessageListenerInvoker
AsyncMessageListenerInvoker为DefaultMessageListenerContainer的内部类 public class DefaultMessageListenerContainer extends AbstractPollingMessageListenerContainer { private class AsyncMessageListenerInvoker implements SchedulingAwareRunnable { private Session session;//会话 private MessageConsumer consumer;//消费者 private Object lastRecoveryMarker; private boolean lastMessageSucceeded; private int idleTaskExecutionCount; private volatile boolean idle; final DefaultMessageListenerContainer this$0; private AsyncMessageListenerInvoker() { this$0 = DefaultMessageListenerContainer.this; super(); idleTaskExecutionCount = 0; idle = true; } public void run() { boolean messageReceived; synchronized(lifecycleMonitor) { activeInvokerCount++;//激活消息监听执行器 lifecycleMonitor.notifyAll(); } messageReceived = false; if(maxMessagesPerTask < 0) { messageReceived = executeOngoingLoop(); } else { for(int messageCount = 0; isRunning() && messageCount < maxMessagesPerTask; messageCount++) //调用监听器执行器 messageReceived = invokeListener() || messageReceived; } synchronized(lifecycleMonitor) { decreaseActiveInvokerCount(); lifecycleMonitor.notifyAll(); } if(!messageReceived) idleTaskExecutionCount++; else idleTaskExecutionCount = 0; synchronized(lifecycleMonitor) { if(!shouldRescheduleInvoker(idleTaskExecutionCount) || !rescheduleTaskIfNecessary(this)) { scheduledInvokers.remove(this); if(DefaultMessageListenerContainer.this.Object.isDebugEnabled()) DefaultMessageListenerContainer.this.Object.debug((new StringBuilder()).append("Lowered scheduled invoker count: ").append(scheduledInvokers.size()).toString()); lifecycleMonitor.notifyAll(); clearResources(); } else if(isRunning()) { int nonPausedConsumers = getScheduledConsumerCount() - getPausedTaskCount(); if(nonPausedConsumers < 1) DefaultMessageListenerContainer.this.Object.error("All scheduled consumers have been paused, probably due to tasks having been rejected. Check your thread pool configuration! Manual recovery necessary through a start() call."); else if(nonPausedConsumers < getConcurrentConsumers()) DefaultMessageListenerContainer.this.Object.warn("Number of scheduled consumers has dropped below concurrentConsumers limit, probably due to tasks having been rejected. Check your thread pool configuration! Automatic recovery to be triggered by remaining consumers."); } } } //调用监听器执行器 private boolean executeOngoingLoop() throws JMSException { boolean messageReceived = false; boolean active = true; do { if(!active) break; synchronized(lifecycleMonitor) { boolean interrupted = false; boolean wasWaiting = false; while((active = isActive()) && !isRunning()) { if(interrupted) throw new IllegalStateException("Thread was interrupted while waiting for a restart of the listener container, but container is still stopped"); if(!wasWaiting) decreaseActiveInvokerCount(); wasWaiting = true; try { lifecycleMonitor.wait(); } catch(InterruptedException ex) { Thread.currentThread().interrupt(); interrupted = true; } } if(wasWaiting) activeInvokerCount++; if(scheduledInvokers.size() > maxConcurrentConsumers) active = false; } if(active) //调用监听器执行器 messageReceived = invokeListener() || messageReceived; } while(true); return messageReceived; } //减少激活消费者的数量 private void decreaseActiveInvokerCount() { activeInvokerCount--; if(stopCallback != null && activeInvokerCount == 0) { stopCallback.run(); stopCallback = null; } } //初始化会话和消费 private void initResourcesIfNecessary() throws JMSException { if(getCacheLevel() <= 1) { updateRecoveryMarker(); } else { if(session == null && getCacheLevel() >= 2) { updateRecoveryMarker(); //创建会话 session = createSession(getSharedConnection()); } if(consumer == null && getCacheLevel() >= 3) { //创建消费者 consumer = createListenerConsumer(session); synchronized(lifecycleMonitor) { registeredWithDestination++; } } } } private void updateRecoveryMarker() { synchronized(recoveryMonitor) { lastRecoveryMarker = currentRecoveryMarker; } } //清除消费者及会话 private void clearResources() { if(sharedConnectionEnabled()) { synchronized(sharedConnectionMonitor) { JmsUtils.closeMessageConsumer(consumer); JmsUtils.closeSession(session); } } else { JmsUtils.closeMessageConsumer(consumer); JmsUtils.closeSession(session); } if(consumer != null) synchronized(lifecycleMonitor) { registeredWithDestination--; } consumer = null; session = null; } public boolean isLongLived() { return maxMessagesPerTask < 0; } public void setIdle(boolean idle) { this.idle = idle; } public boolean isIdle() { return idle; } } //调用监听器执行器 private boolean invokeListener() throws JMSException { //初始化会话和消费 initResourcesIfNecessary(); boolean messageReceived = receiveAndExecute(this, session, consumer); lastMessageSucceeded = true; return messageReceived; }
protected boolean receiveAndExecute(Object invoker, Session session, MessageConsumer consumer) throws JMSException { if(transactionManager != null) { //如果处于事务中,则管理事务 TransactionStatus status = transactionManager.getTransaction(transactionDefinition); boolean messageReceived; try { messageReceived = doReceiveAndExecute(invoker, session, consumer, status); } transactionManager.commit(status); return messageReceived; } else { //执行消费者监听器Invoker return doReceiveAndExecute(invoker, session, consumer, null); } } protected boolean doReceiveAndExecute(Object invoker, Session session, MessageConsumer consumer, TransactionStatus status) throws JMSException { Connection conToClose; Session sessionToClose; MessageConsumer consumerToClose; conToClose = null; sessionToClose = null; consumerToClose = null; Session sessionToUse; boolean transactional; MessageConsumer consumerToUse; Message message; boolean exposeResource; sessionToUse = session; transactional = false; if(sessionToUse == null) { //获取事务会话 sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession(getConnectionFactory(), transactionalResourceFactory, true); transactional = sessionToUse != null; } //如果事务会话为空,则 if(sessionToUse == null) { Connection conToUse; if(sharedConnectionEnabled()) { //获取共享连接 conToUse = getSharedConnection(); } else { //创建连接 conToUse = createConnection(); conToClose = conToUse; conToUse.start(); } //创建会话 sessionToUse = createSession(conToUse); sessionToClose = sessionToUse; } consumerToUse = consumer; if(consumerToUse == null) { //创建监听器消费者 consumerToUse = createListenerConsumer(sessionToUse); consumerToClose = consumerToUse; } //通知消费者从未消费消息通道获取分发消息 message = receiveMessage(consumerToUse); if(message == null) break MISSING_BLOCK_LABEL_434; // messageReceived(invoker, sessionToUse); //如果事务同步管理有对应的连接工厂资源 exposeResource = !transactional && isExposeListenerSession() && !TransactionSynchronizationManager.hasResource(getConnectionFactory()); if(exposeResource) //则绑定连接工厂,和资源Holder TransactionSynchronizationManager.bindResource(getConnectionFactory(), new LocallyExposedJmsResourceHolder(sessionToUse)); //通知会话,消费消息 doExecuteListener(sessionToUse, message); Throwable ex; ex; if(status != null) { if(logger.isDebugEnabled()) logger.debug((new StringBuilder()).append("Rolling back transaction because of listener exception thrown: ").append(ex).toString()); status.setRollbackOnly(); } }
创建监听器消费者
p
rotected MessageConsumer createListenerConsumer(Session session) throws JMSException { Destination destination = getDestination(); if(destination == null) destination = resolveDestinationName(session, getDestinationName()); return createConsumer(session, destination); }
通知消费者从未消费消息通道获取分发消息
message = receiveMessage(consumerToUse);
protected Message receiveMessage(MessageConsumer consumer) throws JMSException { return receiveTimeout >= 0L ? consumer.receive(receiveTimeout) : consumer.receive(); }
//ActiveMQMessageConsumer
public javax.jms.Message receive(long timeout) throws JMSException { checkClosed(); checkMessageListener(); if(timeout == 0L) return receive(); //发送拉消息命令 sendPullCommand(timeout); if(timeout > 0L) { MessageDispatch md; if(info.getPrefetchSize() == 0) md = dequeue(-1L); else //从未消费消息通道分发消息 md = dequeue(timeout); if(md == null) { return null; } else { beforeMessageIsConsumed(md); afterMessageIsConsumed(md, false); return createActiveMQMessage(md); } } else { return null; } }
发送拉消息命令
protected void sendPullCommand(long timeout) throws JMSException { clearDeliveredList(); if(info.getCurrentPrefetchSize() == 0 && unconsumedMessages.isEmpty()) { MessagePull messagePull = new MessagePull(); messagePull.configure(info); messagePull.setTimeout(timeout); session.asyncSendPacket(messagePull); } }
从未消费消息通道分发消息
private MessageDispatch dequeue(long timeout) throws JMSException { long deadline; deadline = 0L; if(timeout > 0L) deadline = System.currentTimeMillis() + timeout; MessageDispatch md = unconsumedMessages.dequeue(timeout); }
调用Invoker,实际为空,待拓展
messageReceived(invoker, sessionToUse);
通知会话,消费消息
doExecuteListener(sessionToUse, message);
protected void doExecuteListener(Session session, Message message) throws JMSException { try { //通知会话,消费消息 invokeListener(session, message); } }
protected void invokeListener(Session session, Message message) throws JMSException { Object listener = getMessageListener(); if(listener instanceof MessageListener) //通知监听器,消费消息 doInvokeListener((MessageListener)listener, message); } protected void doInvokeListener(MessageListener listener, Message message) throws JMSException { //监听器消费消息 listener.onMessage(message); }
总结:
DefaultMessageListenerContainer构造,主要初始化允许并发消费者的数量,每个任务最大消费消息等;然后初始化任务执行器,创建消息监听器Invoker,交由任务线程去执行;消息监听器Invoker主要做的工作,就是获取消息监听器,通知消费者从未消费消息通道获取分发消息,然后遍历消息监听器,有消监听器的onMessage方法消费消息。
附:Spring事务管理,这个以后有时间,研究一下,猜测思想就是事务同步管理器,根据ThreadLocal管理每个线程的事务管理和连接工厂及连接资源Holder映射关系,每一次连接事务的执行,先从事务同步管理器,获取连接资源Holder,如果有,获取事务的状态,没有则看一下需要事务管理,如果需要,则将连接工厂及连接资源Holder映射关系绑定到事务同步管理器;每一次事务提交都是从事务同步管理器获取连接工厂对应的连接资源Holder,从连接资源Holder获取连接,有连接提交事务或回滚。
public interface TransactionStatus extends SavepointManager { public abstract boolean isNewTransaction(); public abstract boolean hasSavepoint(); public abstract void setRollbackOnly(); public abstract boolean isRollbackOnly(); public abstract boolean isCompleted(); }
public interface PlatformTransactionManager { public abstract TransactionStatus getTransaction(TransactionDefinition transactiondefinition) throws TransactionException; public abstract void commit(TransactionStatus transactionstatus) throws TransactionException; public abstract void rollback(TransactionStatus transactionstatus) throws TransactionException; }
public class JmsTransactionManager extends AbstractPlatformTransactionManager implements ResourceTransactionManager, InitializingBean { public JmsTransactionManager(ConnectionFactory connectionFactory) { this(); setConnectionFactory(connectionFactory); afterPropertiesSet(); } public void afterPropertiesSet() { if(getConnectionFactory() == null) throw new IllegalArgumentException("Property 'connectionFactory' is required"); else return; } protected Object doGetTransaction() { JmsTransactionObject txObject = new JmsTransactionObject(); txObject.setResourceHolder((JmsResourceHolder)TransactionSynchronizationManager.getResource(getConnectionFactory())); return txObject; } protected boolean isExistingTransaction(Object transaction) { JmsTransactionObject txObject = (JmsTransactionObject)transaction; return txObject.getResourceHolder() != null; } protected void doBegin(Object transaction, TransactionDefinition definition) { if(definition.getIsolationLevel() != -1) throw new InvalidIsolationLevelException("JMS does not support an isolation level concept"); JmsTransactionObject txObject = (JmsTransactionObject)transaction; Connection con = null; Session session = null; try { con = createConnection(); session = createSession(con); if(logger.isDebugEnabled()) logger.debug((new StringBuilder()).append("Created JMS transaction on Session [").append(session).append("] from Connection [").append(con).append("]").toString()); txObject.setResourceHolder(new JmsResourceHolder(getConnectionFactory(), con, session)); txObject.getResourceHolder().setSynchronizedWithTransaction(true); int timeout = determineTimeout(definition); if(timeout != -1) txObject.getResourceHolder().setTimeoutInSeconds(timeout); TransactionSynchronizationManager.bindResource(getConnectionFactory(), txObject.getResourceHolder()); } catch(Throwable ex) { if(con != null) try { con.close(); } catch(Throwable ex2) { } throw new CannotCreateTransactionException("Could not create JMS transaction", ex); } } protected Object doSuspend(Object transaction) { JmsTransactionObject txObject = (JmsTransactionObject)transaction; txObject.setResourceHolder(null); return TransactionSynchronizationManager.unbindResource(getConnectionFactory()); } protected void doResume(Object transaction, Object suspendedResources) { JmsResourceHolder conHolder = (JmsResourceHolder)suspendedResources; TransactionSynchronizationManager.bindResource(getConnectionFactory(), conHolder); } protected void doCommit(DefaultTransactionStatus status) { JmsTransactionObject txObject = (JmsTransactionObject)status.getTransaction(); Session session = txObject.getResourceHolder().getSession(); try { if(status.isDebug()) logger.debug((new StringBuilder()).append("Committing JMS transaction on Session [").append(session).append("]").toString()); session.commit(); } catch(TransactionRolledBackException ex) { throw new UnexpectedRollbackException("JMS transaction rolled back", ex); } catch(JMSException ex) { throw new TransactionSystemException("Could not commit JMS transaction", ex); } } protected void doRollback(DefaultTransactionStatus status) { JmsTransactionObject txObject = (JmsTransactionObject)status.getTransaction(); Session session = txObject.getResourceHolder().getSession(); try { if(status.isDebug()) logger.debug((new StringBuilder()).append("Rolling back JMS transaction on Session [").append(session).append("]").toString()); session.rollback(); } catch(JMSException ex) { throw new TransactionSystemException("Could not roll back JMS transaction", ex); } } }
发表评论
-
Spring与ActiveMQ的集成详解一
2017-01-02 17:19 4602JMS(ActiveMQ) PTP和PUB/SUB模式实例:h ... -
ActiveMQ Broker发送消息给消费者过程详解
2017-01-02 15:30 6292JMS(ActiveMQ) PTP和PUB/SUB模 ... -
ActiveMQ Server启动过程详解
2017-01-02 12:43 6519JMS(ActiveMQ) PTP和PUB/SUB模式实例:h ... -
ActiveMQ消费者详解
2017-01-01 14:38 8673JMS(ActiveMQ) PTP和PUB/SUB模式实例:h ... -
ActiveMQ生产者详解
2017-01-01 12:29 6874JMS(ActiveMQ) PTP和PUB/SUB模式实例:h ... -
ActiveMQ会话初始化详解
2016-12-31 20:26 4796JMS(ActiveMQ) PTP和PUB/SUB模 ... -
ActiveMQ连接工厂、连接详解
2016-12-29 16:09 12063JMS(ActiveMQ) PTP和PUB/SUB模式实例:h ... -
基于LevelDB的高可用ActiveMQ集群
2016-12-28 18:34 4287ActiveMQ实现负载均衡+高可用部署方案:http://w ... -
ActiveMQ 目录配置文件
2016-12-28 12:47 7800下载apache-activemq-5.12.1.tar.gz ... -
Spring与ActiveMQ的集成
2016-12-27 18:09 1732JMS与MQ详解:http://www.fx114.net/q ... -
ActiveMQ PTP模式实例二
2016-12-27 14:45 700这篇主要是测试PTP模式下的回复消息,具体测试代码如下: 队列 ... -
JMS(ActiveMQ) PTP和PUB/SUB模式实例
2016-12-27 09:02 3130深入浅出JMS(一)——JMS简介 :http://blog. ...
相关推荐
2. **配置Spring与ActiveMQ的整合**: - 在Spring的XML配置文件(如`activemq-consumer.xml`和`activemq-producer.xml`)中,我们可以定义JMS的ConnectionFactory和Destination(队列或主题)。 - `activemq-...
Spring集成ActiveMQ配置详解 Spring框架与ActiveMQ的集成,为开发者提供了一种高效、可靠的JMS消息处理机制。在企业级应用中,这种集成能够极大地提升系统的响应速度和容错能力,特别是在需要异步通信和分布式事务...
本案例将详细讲解如何将Spring与ActiveMQ整合,以提升系统的可扩展性和解耦性。 1. **Spring框架**:Spring是一个全方位的开发框架,提供了依赖注入(Dependency Injection, DI)和面向切面编程(Aspect-Oriented ...
在本文中,我们将深入探讨如何在Spring Boot应用中集成ActiveMQ,这是一个强大的Java消息服务(JMS)实现。首先,我们需要理解JMS的基本概念。Java消息服务(JMS)是Java平台上的一个标准API,它定义了应用程序如何...
**Spring与ActiveMQ整合详解** 在Java开发中,Spring框架是极为重要的应用基础,而ActiveMQ作为Apache出品的一款开源消息中间件,常被用于实现应用间的异步通信和解耦。本实例代码工程"Spring+ActiveMQ整合实例代码...
以上就是ActiveMQ与Spring的集成配置方案,涵盖了连接工厂、JMS模板、监听容器以及消息监听器等关键元素。通过这样的配置,我们可以方便地在Spring应用中发送和接收消息,从而实现服务间的异步通信。在实际应用中,...
**二、Spring与ActiveMQ的集成** 1. **创建Maven项目** 在IDE中新建一个Maven项目,添加Spring和ActiveMQ相关的依赖。在`pom.xml`文件中,你需要包含以下依赖: ```xml <groupId>org.springframework ...
**Spring与ActiveMQ整合详解** 在Java世界中,Spring框架是企业级应用开发的重要基石,而ActiveMQ则是Apache出品的一款开源消息中间件,它在分布式系统中承担着数据通信的关键角色。Spring与ActiveMQ的结合使用,能...
前几章我们分别利用spring rmi、httpinvoker、httpclient、...2) Spring jms和activemq相关依赖引入 3) Spring整合activemq配置 4) 定义消息发布者(生产者) 5) 定义消息订阅者(消费者) 6) Spring mvc配置 7) 实例测试
【Spring集成ActiveMQ】知识点详解 在Java企业级开发中,Spring框架的广泛使用与Apache ActiveMQ的消息中间件相结合,可以构建出高效、可靠的消息传递系统。ActiveMQ作为开源的JMS(Java Message Service)提供商,...
**ActiveMQ与Spring集成实例详解** ActiveMQ是Apache软件基金会下的一个开源项目,它是一个功能丰富的Java消息服务(JMS)提供商,支持多种协议,并且能够处理大量的并发消息传输。而Spring框架则是一个广泛使用的...
**二、Spring与ActiveMQ集成** 1. **引入依赖** 在Spring项目中,我们需要添加ActiveMQ的相关依赖,如`spring-jms`和`activemq-client`。在`pom.xml`或`build.gradle`文件中添加对应的Maven或Gradle依赖。 2. **...
**ActiveMQ5.1+Spring2.5 Demo详解** ActiveMQ是Apache软件基金会下的一个开源项目,它是一款功能强大的消息中间件,支持多种消息协议,如AMQP、STOMP、OpenWire等。在版本5.1中,ActiveMQ提供了一流的消息传输性能...
在Java开发中,Spring Integration提供了与ActiveMQ集成的API和配置,使得开发者能够轻松地将消息队列功能引入到Spring应用中。下面将详细介绍Spring Integration与ActiveMQ结合使用的相关知识点: 1. **Spring ...
3. **Spring与ActiveMQ的集成** - 引入Spring的JMS模块依赖,如`spring-jms`。 - 创建`ConnectionFactory`,配置连接ActiveMQ的URL、用户名和密码。 - 使用`<jee:jndi-lookup>`标签或`@Resource`注解注入`...
《Spring与ActiveMQ整合实战详解》 在Java开发领域,消息队列(Message Queue)作为解耦、异步处理和提高系统吞吐量的重要工具,被广泛应用。Spring框架以其强大的集成能力,使得与各种消息中间件如ActiveMQ的整合...
《ActiveMQ与Spring整合详解及1.2版本的使用》 在Java消息服务(Java Message Service,JMS)领域,Apache ActiveMQ是一个广泛使用的开源消息代理,它提供了一个高效的、可扩展的消息传递平台。而Spring框架是Java...
5. **良好的 Spring 集成**:与 Spring 框架有很好的集成,可以轻松地将 ActiveMQ 集成到基于 Spring 的应用中。 6. **高性能**:相较于 JBossMQ 等其他消息中间件,ActiveMQ 的性能更优,通常能达到 JBossMQ 性能...
二、Spring框架与JMS集成 Spring框架提供了对JMS的全面支持,包括配置JMS模板、消息监听器容器、事务管理等。通过Spring,我们可以轻松地将消息生产和消费集成到业务逻辑中,实现灵活的消息处理。 三、整合步骤 1. ...