- 浏览: 979813 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
JMS(ActiveMQ) PTP和PUB/SUB模式实例:http://donald-draper.iteye.com/blog/2347445
ActiveMQ连接工厂、连接详解:http://donald-draper.iteye.com/blog/2348070
ActiveMQ会话初始化:http://donald-draper.iteye.com/blog/2348341
ActiveMQ生产者:http://donald-draper.iteye.com/blog/2348381
ActiveMQ消费者:http://donald-draper.iteye.com/blog/2348389
ActiveMQ启动过程详解:http://donald-draper.iteye.com/blog/2348399
ActiveMQ Broker发送消息给消费者过程详解:http://donald-draper.iteye.com/blog/2348440
Spring与ActiveMQ的集成:http://donald-draper.iteye.com/blog/2347638
Spring与ActiveMQ的集成详解一:http://donald-draper.iteye.com/blog/2348449
Spring与ActiveMQ的集成详解二:http://donald-draper.iteye.com/blog/2348461
在Spring与ActiveMQ继承这篇文章中,消息的生产和手动消费都是依赖于JmsTemplate,今天我们就来看一
JmsTemplate,
贴上JmsTemplate配置,以便理解
再来看JmsTemplate的构造
JmsTemplateResourceFactory为JmsTemplate的内部类
从JmsTemplate的构造可以看出,主要是初始化事务资源工厂,消息转换器,传输延时,优先级,消息生存时间
再来看发送消息
再来看执行回调接口
先看当会话为null时的情况:
创建连接
//JmsAccessor
创建会话
当会话不为空的情况:
//ConnectionFactoryUtils
来看从事务同步管理器获取JmsResourceHolder
//事务同步器注册到事务同步管理器
//JmsResourceSynchronization
//ResourceHolderSynchronization
绑定连接工厂与资源holder的关系
我们先理一下,TransactionSynchronizationManager,JmsResourceHolder,JmsResourceSynchronization的关系;JmsResourceHolder管理连接,会话,连接工厂,及连接与会话的映射关系,JmsResourceSynchronization为连接工厂和JmsResourceHolder及事务的包装;TransactionSynchronizationManager管理线程的事务JmsResourceSynchronization,
连接工厂和JmsResourceHolder的映射管理。
再来理一下上面这一段说了什么,JmsTemplate发送消息的时候,是将这一过程包装成会话回调接口,然后执行会话回调接口,会话回调结构中有个一参数就是Session,这个Session的获取就是我们上面在讲的,首先从事务同步管理器获取连接工厂对应的JmsResourceHolder,如果JmsResourceHolder存在,则从JmsResourceHolder
获取会话,如果没有则直接从ActiveMQConnectionFactory获取连接及会话
回到执行会话回调接口
来看执行会话回调接口doInJms方法
生产者发送消息
再来看发送转换消息
小节一下:
JmsTemplate发送消息的时候,是将这一过程包装成会话回调接口,然后执行会话回调接口,会话回调结构中有个一参数就是Session,这个Session的获取就是我们上面在讲的,首先从事务同步管理器获取连接工厂对应的JmsResourceHolder,如果JmsResourceHolder存在,则从JmsResourceHolder获取会话,如果没有则直接从ActiveMQConnectionFactory获取连接及会话,然后由会话创建生产者,有生产者发送消息;对于有消息转换的,则将消息转化器包装到MessageCreator,发送时由MessageCreator,调用消息转换器的消息转化方法,
转换消息,发送消息。
再来看一JmsTemplate的消费消息
这段发送送消息相似,直接忽略掉,直接看接受消息
消费者手动消费消息分方式同样是包装成会话回调接口,会话获取与生产者发送消息的会话阶段一样,然后从会话创建消费者,消费消息,而转化消息模式,只是将消费者获取的消息通过消息转化器转换一下;
总结:
JmsTemplate的构造,主要是初始化事务资源工厂,消息转换器,传输延时,优先级,消息生存时间再来看发送消息。JmsTemplate发送消息的时候,是将这一过程包装成会话回调接口,然后执行会话回调接口,会话回调结构中有个一参数就是Session,这个Session的获取就是我们上面在讲的,首先从事务同步管理器获取连接工厂对应的JmsResourceHolder,如果JmsResourceHolder存在,则从JmsResourceHolder
获取会话,如果没有则直接从ActiveMQConnectionFactory获取连接及会话,然后由会话创建生产者,有生产者发送消息;对于有消息转换的,则将消息转化器包装到MessageCreator,发送时由MessageCreator,调用消息转换器的消息转化方法,转换消息,发送消息。消费者手动消费消息分方式同样是包装成会话回调接口,会话获取与生产者发送消息的会话阶段一样,然后从会话创建消费者,消费消息,而转化消息模式,只是将消费者获取的消息通过消息转化器转换一下;
ActiveMQ连接工厂、连接详解:http://donald-draper.iteye.com/blog/2348070
ActiveMQ会话初始化:http://donald-draper.iteye.com/blog/2348341
ActiveMQ生产者:http://donald-draper.iteye.com/blog/2348381
ActiveMQ消费者:http://donald-draper.iteye.com/blog/2348389
ActiveMQ启动过程详解:http://donald-draper.iteye.com/blog/2348399
ActiveMQ Broker发送消息给消费者过程详解:http://donald-draper.iteye.com/blog/2348440
Spring与ActiveMQ的集成:http://donald-draper.iteye.com/blog/2347638
Spring与ActiveMQ的集成详解一:http://donald-draper.iteye.com/blog/2348449
Spring与ActiveMQ的集成详解二:http://donald-draper.iteye.com/blog/2348461
在Spring与ActiveMQ继承这篇文章中,消息的生产和手动消费都是依赖于JmsTemplate,今天我们就来看一
JmsTemplate,
贴上JmsTemplate配置,以便理解
<!-- 配置Jms模板 --> <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactoryMQ" /> <property name="defaultDestination" ref="testQueue" /> <!-- 接收消息时的超时时间 --> <!--<property name="receiveTimeout" value="10000" /> --> <!-- 消息类型转换 --> <property name="messageConverter" ref="msgConverter"></property> </bean>
public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations { public static final long RECEIVE_TIMEOUT_NO_WAIT = -1L; public static final long RECEIVE_TIMEOUT_INDEFINITE_WAIT = 0L; private static final Method setDeliveryDelayMethod; private final JmsTemplateResourceFactory transactionalResourceFactory; private Object defaultDestination;//消息目的地 private MessageConverter messageConverter;//消息转换器 private boolean messageIdEnabled; private boolean messageTimestampEnabled; private boolean pubSubNoLocal; private long receiveTimeout; private long deliveryDelay;//传输延时 private boolean explicitQosEnabled; private int deliveryMode;//消息持久化模式 private int priority;//优先级 private long timeToLive;//消息生存时间 static { //获取MessageProducer的设置传输延时方法 setDeliveryDelayMethod = ClassUtils.getMethodIfAvailable(javax/jms/MessageProducer, "setDeliveryDelay", new Class[] { Long.TYPE }); } }
public abstract class JmsDestinationAccessor extends JmsAccessor { private DestinationResolver destinationResolver;//消息目的地解决器 private boolean pubSubDomain;//是否为订阅主题模式 }
public abstract class JmsAccessor implements InitializingBean { private static final Constants sessionConstants = new Constants(javax/jms/Session); private ConnectionFactory connectionFactory;//连接工厂 private boolean sessionTransacted;//是不是事务会话 private int sessionAcknowledgeMode;//会话确认模式 }
再来看JmsTemplate的构造
public JmsTemplate() { transactionalResourceFactory = new JmsTemplateResourceFactory(); messageIdEnabled = true; messageTimestampEnabled = true; pubSubNoLocal = false; receiveTimeout = 0L; deliveryDelay = 0L; explicitQosEnabled = false; deliveryMode = 2; priority = 4; timeToLive = 0L; //初始化消息转换器 initDefaultStrategies(); } protected void initDefaultStrategies() { //初始化消息转换器 setMessageConverter(new SimpleMessageConverter()); }
JmsTemplateResourceFactory为JmsTemplate的内部类
public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations { private class JmsTemplateResourceFactory implements org.springframework.jms.connection.ConnectionFactoryUtils.ResourceFactory { public Connection getConnection(JmsResourceHolder holder) { return JmsTemplate.this.getConnection(holder); } public Session getSession(JmsResourceHolder holder) { return JmsTemplate.this.getSession(holder); } public Connection createConnection() throws JMSException { return JmsTemplate.this.createConnection(); } public Session createSession(Connection con) throws JMSException { return JmsTemplate.this.createSession(con); } public boolean isSynchedLocalTransactionAllowed() { return isSessionTransacted(); } final JmsTemplate this$0; private JmsTemplateResourceFactory() { this$0 = JmsTemplate.this; super(); } } }
从JmsTemplate的构造可以看出,主要是初始化事务资源工厂,消息转换器,传输延时,优先级,消息生存时间
再来看发送消息
public void send(final Destination destination, final MessageCreator messageCreator) throws JmsException { //创建会话回调接口,发送消息 execute(new SessionCallback() { public Object doInJms(Session session) throws JMSException { //会话发送消息 doSend(session, destination, messageCreator); return null; } final Destination val$destination; final MessageCreator val$messageCreator; final JmsTemplate this$0; { this.this$0 = JmsTemplate.this; destination = destination1; messageCreator = messagecreator; super(); } }, false); }
再来看执行回调接口
public Object execute(SessionCallback action, boolean startConnection) throws JmsException { Connection conToClose; Session sessionToClose; Assert.notNull(action, "Callback object must not be null"); conToClose = null; sessionToClose = null; Object obj; try { //从ConnectionFactoryUtils获取事务会话 Session sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession(getConnectionFactory(), transactionalResourceFactory, startConnection); if(sessionToUse == null) { //创建连接 conToClose = createConnection(); //创建会话 sessionToClose = createSession(conToClose); if(startConnection) conToClose.start(); sessionToUse = sessionToClose; } if(logger.isDebugEnabled()) logger.debug((new StringBuilder()).append("Executing callback on JMS Session: ").append(sessionToUse).toString()); //执行会话回调接口doInJms方法 obj = action.doInJms(sessionToUse); } //关闭会话 JmsUtils.closeSession(sessionToClose); //释放连接 ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), startConnection); return obj; }
先看当会话为null时的情况:
创建连接
conToClose = createConnection();
//JmsAccessor
protected Connection createConnection() throws JMSException { //从ActiveMQConnectionFactory获取连接ActiveMQConnection return getConnectionFactory().createConnection(); }
创建会话
sessionToClose = createSession(conToClose);
protected Session createSession(Connection con) throws JMSException { //从ActiveMQConnection获取连接会话ActiveMQSession return con.createSession(isSessionTransacted(), getSessionAcknowledgeMode()); }
当会话不为空的情况:
//ConnectionFactoryUtils
public static Session doGetTransactionalSession(ConnectionFactory connectionFactory, ResourceFactory resourceFactory, boolean startConnection) throws JMSException { Assert.notNull(connectionFactory, "ConnectionFactory must not be null"); Assert.notNull(resourceFactory, "ResourceFactory must not be null"); //事务同步管理器获取JmsResourceHolder JmsResourceHolder resourceHolder = (JmsResourceHolder)TransactionSynchronizationManager.getResource(connectionFactory); Connection con; if(resourceHolder != null) { //如果JmsResourceHolder存在,则从resourceFactory获取会话 Session session = resourceFactory.getSession(resourceHolder); if(session != null) { if(startConnection) { //获取启动,则从resourceFactory获取resourceHolder对应的连接 con = resourceFactory.getConnection(resourceHolder); if(con != null) //启动连接 con.start(); } return session; } if(resourceHolder.isFrozen()) return null; } if(!TransactionSynchronizationManager.isSynchronizationActive()) return null; JmsResourceHolder resourceHolderToUse = resourceHolder; if(resourceHolderToUse == null) resourceHolderToUse = new JmsResourceHolder(connectionFactory); //从resourceFactory获取resourceHolder对应的连接 con = resourceFactory.getConnection(resourceHolderToUse); Session session = null; try { boolean isExistingCon = con != null; if(!isExistingCon) { con = resourceFactory.createConnection(); resourceHolderToUse.addConnection(con); } //resourceFactory根据连接创建会话 session = resourceFactory.createSession(con); //将连接与会话关系添加到resourceHolderToUse resourceHolderToUse.addSession(session, con); if(startConnection) con.start(); } if(resourceHolderToUse != resourceHolder) { //注册同步器 TransactionSynchronizationManager.registerSynchronization(new JmsResourceSynchronization(resourceHolderToUse, connectionFactory, resourceFactory.isSynchedLocalTransactionAllowed())); //设置事务 resourceHolderToUse.setSynchronizedWithTransaction(true); //绑定连接工厂与资源holder的关系 TransactionSynchronizationManager.bindResource(connectionFactory, resourceHolderToUse); } return session; }
来看从事务同步管理器获取JmsResourceHolder
JmsResourceHolder resourceHolder = (JmsResourceHolder)TransactionSynchronizationManager.getResource(connectionFactory); ublic abstract class TransactionSynchronizationManager { private static final ThreadLocal resources = new ThreadLocal();//资源 private static final ThreadLocal synchronizations = new ThreadLocal();//同步器 private static final Comparator synchronizationComparator = new OrderComparator(); private static final ThreadLocal currentTransactionName = new ThreadLocal();//当前事务名 private static final ThreadLocal currentTransactionReadOnly = new ThreadLocal();//事务读写 private static final ThreadLocal currentTransactionIsolationLevel = new ThreadLocal();事务级别 private static final ThreadLocal actualTransactionActive = new ThreadLocal(); public static Object getResource(Object key) { Assert.notNull(key, "Key must not be null"); Map map = (Map)resources.get(); if(map == null) return null; Object value = map.get(key); if(value != null && logger.isDebugEnabled()) logger.debug("Retrieved value [" + value + "] for key [" + key + "] bound to thread [" + Thread.currentThread().getName() + "]"); return value; } }
//事务同步器注册到事务同步管理器
public static void registerSynchronization(TransactionSynchronization synchronization) throws IllegalStateException { Assert.notNull(synchronization, "TransactionSynchronization must not be null"); if(!isSynchronizationActive()) { throw new IllegalStateException("Transaction synchronization is not active"); } else { //添加到事务同步管理,同步集 List synchs = (List)synchronizations.get(); synchs.add(synchronization); return; } }
//JmsResourceSynchronization
public abstract class ConnectionFactoryUtils { private static class JmsResourceSynchronization extends ResourceHolderSynchronization { private final boolean transacted; public JmsResourceSynchronization(JmsResourceHolder resourceHolder, Object resourceKey, boolean transacted) { super(resourceHolder, resourceKey); this.transacted = transacted; } } }
//ResourceHolderSynchronization
public abstract class ResourceHolderSynchronization implements TransactionSynchronization { private final ResourceHolder resourceHolder; private final Object resourceKey; private volatile boolean holderActive; }
绑定连接工厂与资源holder的关系
TransactionSynchronizationManager.bindResource(connectionFactory, resourceHolderToUse); //TransactionSynchronizationManager public static void bindResource(Object key, Object value) throws IllegalStateException { //从当前线程事务同步化管理获取工厂与资源holder的映射,并将映射关系添加到映射Map中 Map map = (Map)resources.get(); if(map == null) { map = new HashMap(); resources.set(map); } if(map.containsKey(key)) throw new IllegalStateException("Already value [" + map.get(key) + "] for key [" + key + "] bound to thread [" + Thread.currentThread().getName() + "]"); map.put(key, value); }
public class JmsResourceHolder extends ResourceHolderSupport { private ConnectionFactory connectionFactory;//连接工厂 private boolean frozen; private final List connections;//连接 private final List sessions;//会话 private final Map sessionsPerConnection;//连接会话映射关系 public final void addSession(Session session, Connection connection) { if(!this.sessions.contains(session)) { //添加会话 this.sessions.add(session); if(connection != null) { List sessions = (List)sessionsPerConnection.get(connection); if(sessions == null) { sessions = new LinkedList(); //添加连接会话映射关系 sessionsPerConnection.put(connection, sessions); } sessions.add(session); } } } }
我们先理一下,TransactionSynchronizationManager,JmsResourceHolder,JmsResourceSynchronization的关系;JmsResourceHolder管理连接,会话,连接工厂,及连接与会话的映射关系,JmsResourceSynchronization为连接工厂和JmsResourceHolder及事务的包装;TransactionSynchronizationManager管理线程的事务JmsResourceSynchronization,
连接工厂和JmsResourceHolder的映射管理。
再来理一下上面这一段说了什么,JmsTemplate发送消息的时候,是将这一过程包装成会话回调接口,然后执行会话回调接口,会话回调结构中有个一参数就是Session,这个Session的获取就是我们上面在讲的,首先从事务同步管理器获取连接工厂对应的JmsResourceHolder,如果JmsResourceHolder存在,则从JmsResourceHolder
获取会话,如果没有则直接从ActiveMQConnectionFactory获取连接及会话
回到执行会话回调接口
来看执行会话回调接口doInJms方法
obj = action.doInJms(sessionToUse); public Object doInJms(Session session) throws JMSException { //会话发送消息 doSend(session, destination, messageCreator); return null; } protected void doSend(Session session, Destination destination, MessageCreator messageCreator) throws JMSException { MessageProducer producer; //创建生产者 producer = createProducer(session, destination); Message message = messageCreator.createMessage(session); //生产者发送消息 doSend(producer, message); }
生产者发送消息
doSend(producer, message); protected void doSend(MessageProducer producer, Message message) throws JMSException { if(deliveryDelay > 0L) { if(setDeliveryDelayMethod == null) throw new IllegalStateException("setDeliveryDelay requires JMS 2.0"); //如果有延时,则延时发送 ReflectionUtils.invokeMethod(setDeliveryDelayMethod, producer, new Object[] { Long.valueOf(deliveryDelay) }); } if(isExplicitQosEnabled()) producer.send(message, getDeliveryMode(), getPriority(), getTimeToLive()); else //无延时直接发送 producer.send(message); }
再来看发送转换消息
public void convertAndSend(Object message) throws JmsException { //获取默认目的地,即我们配置的JmsTemplate的defaultDestination Destination defaultDestination = getDefaultDestination(); if(defaultDestination != null) convertAndSend(defaultDestination, message); else convertAndSend(getRequiredDefaultDestinationName(), message); } public void convertAndSend(Destination destination, final Object message) throws JmsException { send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { //调用配置JmsTemplate的messageConverter的toMessage方法 return getRequiredMessageConverter().toMessage(message, session); } final Object val$message; final JmsTemplate this$0; { this.this$0 = JmsTemplate.this; message = obj; super(); } }); } public void send(final Destination destination, final MessageCreator messageCreator) throws JmsException { execute(new SessionCallback() { public Object doInJms(Session session) throws JMSException { //委托给doSend(Session session, Destination destination, MessageCreator messageCreator) doSend(session, destination, messageCreator); return null; } final Destination val$destination; final MessageCreator val$messageCreator; final JmsTemplate this$0; { this.this$0 = JmsTemplate.this; destination = destination1; messageCreator = messagecreator; super(); } }, false); } protected void doSend(Session session, Destination destination, MessageCreator messageCreator) throws JMSException { MessageProducer producer; producer = createProducer(session, destination); //调用配置JmsTemplate的messageConverter的toMessage方法 Message message = messageCreator.createMessage(session); if(logger.isDebugEnabled()) logger.debug((new StringBuilder()).append("Sending created message: ").append(message).toString()); doSend(producer, message); }
小节一下:
JmsTemplate发送消息的时候,是将这一过程包装成会话回调接口,然后执行会话回调接口,会话回调结构中有个一参数就是Session,这个Session的获取就是我们上面在讲的,首先从事务同步管理器获取连接工厂对应的JmsResourceHolder,如果JmsResourceHolder存在,则从JmsResourceHolder获取会话,如果没有则直接从ActiveMQConnectionFactory获取连接及会话,然后由会话创建生产者,有生产者发送消息;对于有消息转换的,则将消息转化器包装到MessageCreator,发送时由MessageCreator,调用消息转换器的消息转化方法,
转换消息,发送消息。
再来看一JmsTemplate的消费消息
public Message receive() throws JmsException { Destination defaultDestination = getDefaultDestination(); if(defaultDestination != null) return receive(defaultDestination); } public Message receive(Destination destination) throws JmsException { return receiveSelected(destination, null); } public Message receiveSelected(final Destination destination, final String messageSelector) throws JmsException { return (Message)execute(new SessionCallback() { public Message doInJms(Session session) throws JMSException { return doReceive(session, destination, messageSelector); } public volatile Object doInJms(Session session) throws JMSException { return doInJms(session); } final Destination val$destination; final String val$messageSelector; final JmsTemplate this$0; { this.this$0 = JmsTemplate.this; destination = destination1; messageSelector = s; super(); } }, true); }
这段发送送消息相似,直接忽略掉,直接看接受消息
doReceive(session, destination, messageSelector); protected Message doReceive(Session session, Destination destination, String messageSelector) throws JMSException { return doReceive(session, createConsumer(session, destination, messageSelector)); } protected Message doReceive(Session session, MessageConsumer consumer) throws JMSException { Message message1; long timeout = getReceiveTimeout(); JmsResourceHolder resourceHolder = (JmsResourceHolder)TransactionSynchronizationManager.getResource(getConnectionFactory()); if(resourceHolder != null && resourceHolder.hasTimeout()) timeout = Math.min(timeout, resourceHolder.getTimeToLiveInMillis()); //消费消息 Message message = doReceive(consumer, timeout); if(session.getTransacted()) { if(isSessionLocallyTransacted(session)) JmsUtils.commitIfNecessary(session); } else //如果需要客户端确认,则调用acknowledge if(isClientAcknowledge(session) && message != null) message.acknowledge(); message1 = message; JmsUtils.closeMessageConsumer(consumer); return message1; } private Message doReceive(MessageConsumer consumer, long timeout) throws JMSException { if(timeout == -1L) return consumer.receiveNoWait(); if(timeout > 0L) return consumer.receive(timeout); else //消费者消费消息 return consumer.receive(); } public Object receiveAndConvert() throws JmsException { return doConvertFromMessage(receive()); } protected Object doConvertFromMessage(Message message) { if(message == null) break MISSING_BLOCK_LABEL_22; //直接有消息转发器,转换消息 return getRequiredMessageConverter().fromMessage(message); JMSException ex; ex; throw convertJmsAccessException(ex); return null; }
消费者手动消费消息分方式同样是包装成会话回调接口,会话获取与生产者发送消息的会话阶段一样,然后从会话创建消费者,消费消息,而转化消息模式,只是将消费者获取的消息通过消息转化器转换一下;
总结:
JmsTemplate的构造,主要是初始化事务资源工厂,消息转换器,传输延时,优先级,消息生存时间再来看发送消息。JmsTemplate发送消息的时候,是将这一过程包装成会话回调接口,然后执行会话回调接口,会话回调结构中有个一参数就是Session,这个Session的获取就是我们上面在讲的,首先从事务同步管理器获取连接工厂对应的JmsResourceHolder,如果JmsResourceHolder存在,则从JmsResourceHolder
获取会话,如果没有则直接从ActiveMQConnectionFactory获取连接及会话,然后由会话创建生产者,有生产者发送消息;对于有消息转换的,则将消息转化器包装到MessageCreator,发送时由MessageCreator,调用消息转换器的消息转化方法,转换消息,发送消息。消费者手动消费消息分方式同样是包装成会话回调接口,会话获取与生产者发送消息的会话阶段一样,然后从会话创建消费者,消费消息,而转化消息模式,只是将消费者获取的消息通过消息转化器转换一下;
发表评论
-
Spring与ActiveMQ的集成详解二
2017-01-03 10:07 2072JMS(ActiveMQ) PTP和PUB/SUB模 ... -
ActiveMQ Broker发送消息给消费者过程详解
2017-01-02 15:30 6263JMS(ActiveMQ) PTP和PUB/SUB模 ... -
ActiveMQ Server启动过程详解
2017-01-02 12:43 6504JMS(ActiveMQ) PTP和PUB/SUB模式实例:h ... -
ActiveMQ消费者详解
2017-01-01 14:38 8652JMS(ActiveMQ) PTP和PUB/SUB模式实例:h ... -
ActiveMQ生产者详解
2017-01-01 12:29 6852JMS(ActiveMQ) PTP和PUB/SUB模式实例:h ... -
ActiveMQ会话初始化详解
2016-12-31 20:26 4766JMS(ActiveMQ) PTP和PUB/SUB模 ... -
ActiveMQ连接工厂、连接详解
2016-12-29 16:09 12003JMS(ActiveMQ) PTP和PUB/SUB模式实例:h ... -
基于LevelDB的高可用ActiveMQ集群
2016-12-28 18:34 4272ActiveMQ实现负载均衡+高可用部署方案:http://w ... -
ActiveMQ 目录配置文件
2016-12-28 12:47 7776下载apache-activemq-5.12.1.tar.gz ... -
Spring与ActiveMQ的集成
2016-12-27 18:09 1714JMS与MQ详解:http://www.fx114.net/q ... -
ActiveMQ PTP模式实例二
2016-12-27 14:45 688这篇主要是测试PTP模式下的回复消息,具体测试代码如下: 队列 ... -
JMS(ActiveMQ) PTP和PUB/SUB模式实例
2016-12-27 09:02 3112深入浅出JMS(一)——JMS简介 :http://blog. ...
相关推荐
总之,"spring配置activemq详解"是一个涵盖Spring与ActiveMQ整合的深度话题,涉及了从项目配置到消息生产和消费的全过程。通过理解和实践这些知识点,开发者能够构建出稳定、高效的分布式消息处理系统。
Spring集成ActiveMQ配置详解 Spring框架与ActiveMQ的集成,为开发者提供了一种高效、可靠的JMS消息处理机制。在企业级应用中,这种集成能够极大地提升系统的响应速度和容错能力,特别是在需要异步通信和分布式事务...
1. **Spring框架**:Spring是一个全方位的开发框架,提供了依赖注入(Dependency Injection, DI)和面向切面编程(Aspect-Oriented Programming, AOP)等核心特性。通过DI,Spring可以帮助开发者管理对象间的依赖...
在本文中,我们将深入探讨如何在Spring Boot应用中集成ActiveMQ,这是一个强大的Java消息服务(JMS)实现。首先,我们需要理解JMS的基本概念。Java消息服务(JMS)是Java平台上的一个标准API,它定义了应用程序如何...
以上就是ActiveMQ与Spring的集成配置方案,涵盖了连接工厂、JMS模板、监听容器以及消息监听器等关键元素。通过这样的配置,我们可以方便地在Spring应用中发送和接收消息,从而实现服务间的异步通信。在实际应用中,...
**Spring与ActiveMQ整合详解** 在Java开发中,Spring框架是极为重要的应用基础,而ActiveMQ作为Apache出品的一款开源消息中间件,常被用于实现应用间的异步通信和解耦。本实例代码工程"Spring+ActiveMQ整合实例代码...
本示例将详细讲解如何在Windows环境下,利用Maven构建工具,将Spring与ActiveMQ进行集成,创建一个简单的生产者-消费者模型。 **一、环境准备** 首先,确保你已经安装了Java运行环境(JDK)、Maven和Eclipse或...
**Spring与ActiveMQ整合详解** 在Java世界中,Spring框架是企业级应用开发的重要基石,而ActiveMQ则是Apache出品的一款开源消息中间件,它在分布式系统中承担着数据通信的关键角色。Spring与ActiveMQ的结合使用,能...
本章我将通过spring jms和activemq实现单Web项目服务器间异步访问和多Web项目服务器间异步访问。 一. 简介 1. 什么是Apache ActiveMq Apache ActiveMq是最流行和最强大的开源消息和集成服务器。同时Apache ActiveMq...
**ActiveMQ与Spring集成实例详解** ActiveMQ是Apache软件基金会下的一个开源项目,它是一个功能丰富的Java消息服务(JMS)提供商,支持多种协议,并且能够处理大量的并发消息传输。而Spring框架则是一个广泛使用的...
【Spring集成ActiveMQ】知识点详解 在Java企业级开发中,Spring框架的广泛使用与Apache ActiveMQ的消息中间件相结合,可以构建出高效、可靠的消息传递系统。ActiveMQ作为开源的JMS(Java Message Service)提供商,...
**二、Spring与ActiveMQ集成** 1. **引入依赖** 在Spring项目中,我们需要添加ActiveMQ的相关依赖,如`spring-jms`和`activemq-client`。在`pom.xml`或`build.gradle`文件中添加对应的Maven或Gradle依赖。 2. **...
通过分析和运行这个Demo,你可以更深入地理解ActiveMQ与Spring的集成方式,为你的项目提供一个可靠的参考模板。在实践中,你可以根据自己的需求调整配置,例如改变ActiveMQ服务器的地址、设置消息队列的大小、添加...
在Java开发中,Spring Integration提供了与ActiveMQ集成的API和配置,使得开发者能够轻松地将消息队列功能引入到Spring应用中。下面将详细介绍Spring Integration与ActiveMQ结合使用的相关知识点: 1. **Spring ...
3. **Spring与ActiveMQ的集成** - 引入Spring的JMS模块依赖,如`spring-jms`。 - 创建`ConnectionFactory`,配置连接ActiveMQ的URL、用户名和密码。 - 使用`<jee:jndi-lookup>`标签或`@Resource`注解注入`...
《Spring与ActiveMQ整合实战详解》 在Java开发领域,消息队列(Message Queue)作为解耦、异步处理和提高系统吞吐量的重要工具,被广泛应用。Spring框架以其强大的集成能力,使得与各种消息中间件如ActiveMQ的整合...
在1.2版本中,虽然相比后来的版本可能缺乏一些特性,但对于早期的Spring和ActiveMQ集成来说,它仍然是一个可靠的选择。开发者可以利用这个版本来构建基于JMS的消息系统,实现如工作队列、发布/订阅模式等消息传递...
5. **良好的 Spring 集成**:与 Spring 框架有很好的集成,可以轻松地将 ActiveMQ 集成到基于 Spring 的应用中。 6. **高性能**:相较于 JBossMQ 等其他消息中间件,ActiveMQ 的性能更优,通常能达到 JBossMQ 性能...
本教程将重点讲解如何将ActiveMQ与Spring进行整合,以创建一个高效、可靠的分布式消息系统。 一、ActiveMQ简介 ActiveMQ是一款功能强大的开源消息代理,支持多种协议,如OpenWire、STOMP、AMQP、MQTT等,适用于多种...