- 浏览: 987834 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
JMS(ActiveMQ) PTP和PUB/SUB模式实例:http://donald-draper.iteye.com/blog/2347445
ActiveMQ连接工厂、连接详解:http://donald-draper.iteye.com/blog/2348070
ActiveMQ会话初始化:http://donald-draper.iteye.com/blog/2348341
ActiveMQ生产者:http://donald-draper.iteye.com/blog/2348381
ActiveMQ消费者:http://donald-draper.iteye.com/blog/2348389
ActiveMQ启动过程详解:http://donald-draper.iteye.com/blog/2348399
ActiveMQ Broker发送消息给消费者过程详解:http://donald-draper.iteye.com/blog/2348440
Spring与ActiveMQ的集成:http://donald-draper.iteye.com/blog/2347638
Spring与ActiveMQ的集成详解一:http://donald-draper.iteye.com/blog/2348449
Spring与ActiveMQ的集成详解二:http://donald-draper.iteye.com/blog/2348461
在上面一篇我们说过,ActiveMQ的ActiveMQConnectionFactory,ActiveMQConnection,TcpTransport,
ActiveMQConnectionFactory的创建过程,主要为初始化为broker url,用户密码,是否压缩、异步发送消息、支持消息优先级、非阻塞传输,最大线程数,生产窗口大小等属性;从ActiveMQConnectionFactory创建连接,首先通过TcpTransportFacotory创建TcpTransport,然后保证成待锁机制的MutexTransport,最后包装成ResponseCorrelator;根据TcpTransport和ActiveMQConnection状态管理器JMSStatsImpl创建ActiveMQConnection,创建ActiveMQConnection过程中,主要是是否异步分发消息,线程执行器,连接状态管理器,调度器等;然后
设置连接用户密码通过ConnectionInfo,配置是否支持消息优先级、非阻塞传输,最大线程数,生产窗口大小,Transport监听器transportListener;最后启动TcpTransport和Connection,启动TcpTransport主要是初始化socket,ip,端口,输入输出缓存区,输入输出流DataI/OnputStream,启动连接主要启动会话ActiveMQSession。
实例主要生产者代码片段:
今天来看一下,ActiveMQSession会话,消息队列和订阅主题,生产者及发送消息
从下面这句开始:
//ActiveMQConnection
从上面可以看出会话的创建主要最的工作是,初始化消费者,生产者id产生器,会话消费者,生产者队列,消息确认模式,是否异步分发,设置连接事务上下文,异步发送会话信息,新建消息会话执行器,会话添加到ActiveMQConnection的会话队列CopyOnWriteArrayList。
结合上一篇和这一篇说以一下ActiveMQConnection与ActiveMQSession,ActiveMQMessageConsumer,
ActiveMQMessageProducer的关系,连接管理会话(1-n),会话管理消息者与生产者(1-n)。
下面再看一下会话的启动
启动
会话启动分两部分,启动消费者,让消费者消费消息,启动会话执行器
先来看启动消费者
来看ActiveMQMessageConsumer的启动
在ActiveMQMessageConsumer的构造中有对消息通道unconsumedMessages的初始化
如果支持消息优先级,则为SimplePriorityMessageDispatchChannel,否则则为
FifoMessageDispatchChannel
我们来看一下FifoMessageDispatchChannel
再看SimplePriorityMessageDispatchChannel
SimplePriorityMessageDispatchChannel的启动与FifoMessageDispatchChannel相同;
回到会话启动中的执行器启动
//ActiveMQSession
//ActiveMQSessionExecutor
从这一句可以看出通过ActiveMQConnection来创建的,为TaskRunnerFactory
再来看为TaskRunnerFactory如何创建任务运行器
回到createTaskRunner
我们来看PooledTaskRunner
//PooledTaskRunner
在PooledTaskRunner的构造中可以看到
//运行任务
//ActiveMQSessionExecutor implements Task
再看ActiveMQMessageConsumer的dispatch(message)
总结:
会话的创建主要最的工作是,初始化消费者,生产者id产生器,会话消费者,生产者队列,消息确认模式,是否异步分发,设置连接事务上下文,异步发送会话信息,新建消息会话执行器,会话添加到ActiveMQConnection的会话队列CopyOnWriteArrayList;然后启动消费则,主要是启动消息分发通道,唤醒会话执行器ActiveMQSessionExecutor;最后启动会话执行器ActiveMQSessionExecutor,在启动会话执行器时,如果消息分发通道处于未启动状态,则启动消息分发通道,如果有未消费的消息,唤醒消息执行器,唤醒主要做的做工作是
ActiveMQConnection创建任务执行TaskRunnerFactory,有任务执行工厂TaskRunnerFactory,有任务执行工厂创建执行任务PooledTaskRunner,PooledTaskRunner是ActiveMQSessionExecutor的包装,PooledTaskRunner执行就是执行ActiveMQSessionExecutor
iterate的函数,这个过程主要是ActiveMQSessionExecutor从ActiveMQSession获取会话消费者consumer,然后遍历消费者,消费者通过MessageListener消费消息。
ActiveMQConnection与ActiveMQSession,ActiveMQMessageConsumer,
ActiveMQMessageProducer的关系,连接管理会话(1-n),会话管理消息者与生产者(1-n)。ActiveMQSession关联一个ActiveMQSessionExecutor,由会话执行器,消费者消费消息。
//MessageListener
//MessageDispatch
//Session
ActiveMQ连接工厂、连接详解:http://donald-draper.iteye.com/blog/2348070
ActiveMQ会话初始化:http://donald-draper.iteye.com/blog/2348341
ActiveMQ生产者:http://donald-draper.iteye.com/blog/2348381
ActiveMQ消费者:http://donald-draper.iteye.com/blog/2348389
ActiveMQ启动过程详解:http://donald-draper.iteye.com/blog/2348399
ActiveMQ Broker发送消息给消费者过程详解:http://donald-draper.iteye.com/blog/2348440
Spring与ActiveMQ的集成:http://donald-draper.iteye.com/blog/2347638
Spring与ActiveMQ的集成详解一:http://donald-draper.iteye.com/blog/2348449
Spring与ActiveMQ的集成详解二:http://donald-draper.iteye.com/blog/2348461
在上面一篇我们说过,ActiveMQ的ActiveMQConnectionFactory,ActiveMQConnection,TcpTransport,
ActiveMQConnectionFactory的创建过程,主要为初始化为broker url,用户密码,是否压缩、异步发送消息、支持消息优先级、非阻塞传输,最大线程数,生产窗口大小等属性;从ActiveMQConnectionFactory创建连接,首先通过TcpTransportFacotory创建TcpTransport,然后保证成待锁机制的MutexTransport,最后包装成ResponseCorrelator;根据TcpTransport和ActiveMQConnection状态管理器JMSStatsImpl创建ActiveMQConnection,创建ActiveMQConnection过程中,主要是是否异步分发消息,线程执行器,连接状态管理器,调度器等;然后
设置连接用户密码通过ConnectionInfo,配置是否支持消息优先级、非阻塞传输,最大线程数,生产窗口大小,Transport监听器transportListener;最后启动TcpTransport和Connection,启动TcpTransport主要是初始化socket,ip,端口,输入输出缓存区,输入输出流DataI/OnputStream,启动连接主要启动会话ActiveMQSession。
实例主要生产者代码片段:
ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url); Connection :JMS 客户端到JMS Provider 的连接 Connection connection = connectionFactory.createConnection(); Connection 启动 connection.start(); System.out.println("Connection is start..."); //创建一个session //第一个参数:是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED //第二个参数为false时,paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。 //Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功。 //Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会当作发送成功,并删除消息。 //DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。 Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); Queue :消息的目的地;消息发送给谁. Queue destination = session.createQueue(qname); MessageProducer:消息发送者 MessageProducer producer = session.createProducer(destination); //设置生产者的模式,有两种可选 //DeliveryMode.PERSISTENT 当activemq关闭的时候,队列数据将会被保存 //DeliveryMode.NON_PERSISTENT 当activemq关闭的时候,队列里面的数据将会被清空 producer.setDeliveryMode(DeliveryMode.PERSISTENT); 构造消息,此处写死,项目就是参数,或者方法获取 sendMessage(session, producer); session.commit(); connection.close();
今天来看一下,ActiveMQSession会话,消息队列和订阅主题,生产者及发送消息
从下面这句开始:
Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
//ActiveMQConnection
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { //检查连接有没关闭 checkClosedOrFailed(); ensureConnectionInfoSent(); if(!transacted) { //如果transacted为非事务,而acknowledgeMode为事务SESSION_TRANSACTED,抛出异常 if(acknowledgeMode == 0) throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session"); //acknowledgeMode不在0-3范围之内,这 if(acknowledgeMode < 0 || acknowledgeMode > 4) throw new JMSException((new StringBuilder()).append("invalid acknowledgeMode: ").append(acknowledgeMode).append(". Valid values are Session.AUTO_ACKNOWLEDGE (1), ").append("Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)").toString()); } return new ActiveMQSession(this, getNextSessionId(), transacted ? 0 : acknowledgeMode != 0 ? acknowledgeMode : 1, isDispatchAsync(), isAlwaysSessionAsync()); } //来看ActiveMQSession的构造 public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher { public static final int INDIVIDUAL_ACKNOWLEDGE = 4; public static final int MAX_ACK_CONSTANT = 4; private static final Logger LOG = LoggerFactory.getLogger(org/apache/activemq/ActiveMQSession); private final ThreadPoolExecutor connectionExecutor;//连接线程执行器 protected int acknowledgementMode; //通知模式 protected final ActiveMQConnection connection;//MQ连接 protected final SessionInfo info;//会话信息 protected final LongSequenceGenerator consumerIdGenerator;//消费者id产生器 protected final LongSequenceGenerator producerIdGenerator;//生产者id产生器 protected final LongSequenceGenerator deliveryIdGenerator; protected final ActiveMQSessionExecutor executor; protected final AtomicBoolean started; //是否启动 protected final CopyOnWriteArrayList consumers;//消费者 protected final CopyOnWriteArrayList producers;//生产者 protected boolean closed; private volatile boolean synchronizationRegistered; protected boolean asyncDispatch; protected boolean sessionAsyncDispatch; protected final boolean debug; protected final Object sendMutex;//发送互斥锁 protected final Object redeliveryGuard; private final AtomicBoolean clearInProgress; private MessageListener messageListener;//消息监听器 private final JMSSessionStatsImpl stats; private TransactionContext transactionContext; private DeliveryListener deliveryListener; private MessageTransformer transformer; private BlobTransferPolicy blobTransferPolicy; private long lastDeliveredSequenceId; final AtomicInteger clearRequestsCounter; protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException { consumerIdGenerator = new LongSequenceGenerator(); producerIdGenerator = new LongSequenceGenerator(); deliveryIdGenerator = new LongSequenceGenerator(); started = new AtomicBoolean(false);//启动状态 consumers = new CopyOnWriteArrayList();//消费者 producers = new CopyOnWriteArrayList();//生产者 sendMutex = new Object();//发送互斥锁 redeliveryGuard = new Object(); clearInProgress = new AtomicBoolean(); lastDeliveredSequenceId = -2L; clearRequestsCounter = new AtomicInteger(0); debug = LOG.isDebugEnabled(); this.connection = connection; acknowledgementMode = acknowledgeMode;//消息确认模式 this.asyncDispatch = asyncDispatch; this.sessionAsyncDispatch = sessionAsyncDispatch; info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue()); setTransactionContext(new TransactionContext(connection));//设置连接事务上下文 stats = new JMSSessionStatsImpl(producers, consumers); this.connection.asyncSendPacket(info);//异步发送会话信息 setTransformer(connection.getTransformer()); setBlobTransferPolicy(connection.getBlobTransferPolicy()); connectionExecutor = connection.getExecutor();//获取连接执行器 executor = new ActiveMQSessionExecutor(this);//新建消息会话执行器 connection.addSession(this);//将会话添加到ActiveMQ连接的的会话队列CopyOnWriteArrayList if(connection.isStarted()) //启动 start(); } }
从上面可以看出会话的创建主要最的工作是,初始化消费者,生产者id产生器,会话消费者,生产者队列,消息确认模式,是否异步分发,设置连接事务上下文,异步发送会话信息,新建消息会话执行器,会话添加到ActiveMQConnection的会话队列CopyOnWriteArrayList。
结合上一篇和这一篇说以一下ActiveMQConnection与ActiveMQSession,ActiveMQMessageConsumer,
ActiveMQMessageProducer的关系,连接管理会话(1-n),会话管理消息者与生产者(1-n)。
下面再看一下会话的启动
启动
protected void start() throws JMSException { started.set(true); ActiveMQMessageConsumer c; //启动消费者 for(Iterator iter = consumers.iterator(); iter.hasNext(); c.start()) c = (ActiveMQMessageConsumer)iter.next(); //启动消息会话执行器 executor.start(); }
会话启动分两部分,启动消费者,让消费者消费消息,启动会话执行器
先来看启动消费者
来看ActiveMQMessageConsumer的启动
public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher { protected final ActiveMQSession session; protected final ConsumerInfo info;//消费者信息 protected final MessageDispatchChannel unconsumedMessages;//消息分发通道(未消费消费的消息) protected final LinkedList deliveredMessages = new LinkedList();//需要传输消息 private PreviouslyDeliveredMap previouslyDeliveredMessages; private int deliveredCounter;//传输消息计数器 private int additionalWindowSize; private long redeliveryDelay;//消息传输延时 private int ackCounter;//消息回复计时器 private int dispatchedCount;//消息分发计时器 private final AtomicReference messageListener = new AtomicReference();//消息监听器引用 private final JMSConsumerStatsImpl stats;//消费者状态信息管理器 private final String selector;选择器 private boolean synchronizationRegistered; private final AtomicBoolean started = new AtomicBoolean(false);//启动状态 private MessageAvailableListener availableListener; private RedeliveryPolicy redeliveryPolicy;//传输策略 private boolean optimizeAcknowledge; private final AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean(); private ExecutorService executorService;//执行器 private MessageTransformer transformer; private boolean clearDeliveredList; AtomicInteger inProgressClearRequiredFlag; private MessageAck pendingAck;//消息回复 private long lastDeliveredSequenceId; private IOException failureError; private long optimizeAckTimestamp; private long optimizeAcknowledgeTimeOut; private long optimizedAckScheduledAckInterval; private Runnable optimizedAckTask;//消息回复优化任务 private long failoverRedeliveryWaitPeriod;//当Master宕机时,消息传输等待时间 private boolean transactedIndividualAck; private boolean nonBlockingRedelivery;//是否非阻塞传输 private boolean consumerExpiryCheckEnabled; public void start() throws JMSException { if(unconsumedMessages.isClosed()) { return; } else { started.set(true); //启动消息分发通道 unconsumedMessages.start(); //唤醒会话执行器 session.executor.wakeup(); return; } }
在ActiveMQMessageConsumer的构造中有对消息通道unconsumedMessages的初始化
如果支持消息优先级,则为SimplePriorityMessageDispatchChannel,否则则为
FifoMessageDispatchChannel
if(session.connection.isMessagePrioritySupported()) unconsumedMessages = new SimplePriorityMessageDispatchChannel(); else unconsumedMessages = new FifoMessageDispatchChannel();
我们来看一下FifoMessageDispatchChannel
public class FifoMessageDispatchChannel implements MessageDispatchChannel { private final Object mutex = new Object();//消息分发通道互斥量 private final LinkedList list = new LinkedList(); private boolean closed; private boolean running;//运行状态 public void start() { synchronized(mutex) { running = true; //唤醒所有等待消息分发锁的线程 mutex.notifyAll(); } } }
再看SimplePriorityMessageDispatchChannel
public class SimplePriorityMessageDispatchChannel implements MessageDispatchChannel { private static final Integer MAX_PRIORITY = Integer.valueOf(10); private final Object mutex = new Object(); private final LinkedList lists[]; private boolean closed; private boolean running; private int size; public void start() { synchronized(mutex) { running = true; mutex.notifyAll(); } } }
SimplePriorityMessageDispatchChannel的启动与FifoMessageDispatchChannel相同;
回到会话启动中的执行器启动
//ActiveMQSession
protected void start() throws JMSException { started.set(true); ActiveMQMessageConsumer c; //启动消费者 for(Iterator iter = consumers.iterator(); iter.hasNext(); c.start()) c = (ActiveMQMessageConsumer)iter.next(); //启动消息会话执行器 executor.start(); }
//ActiveMQSessionExecutor
public class ActiveMQSessionExecutor implements Task { private final ActiveMQSession session;//MQ会话 private final MessageDispatchChannel messageQueue;//消息分发通道 private boolean dispatchedBySessionPool;//是否依靠会话池分发消息 private volatile TaskRunner taskRunner;//任务执行器 private boolean startedOrWarnedThatNotStarted; ActiveMQSessionExecutor(ActiveMQSession session) { this.session = session; if(this.session.connection != null && this.session.connection.isMessagePrioritySupported()) messageQueue = new SimplePriorityMessageDispatchChannel(); else messageQueue = new FifoMessageDispatchChannel(); } } //ActiveMQSessionExecutor synchronized void start() { //如果消息分发通道处于未启动状态,则启动消息分化通道 if(!messageQueue.isRunning()) { //这个在上面已经看到 messageQueue.start(); //如果有为消费的消息,则唤醒 if(hasUncomsumedMessages()) wakeup(); } } //判断是否有未消费的消息 public boolean hasUncomsumedMessages() { return !messageQueue.isClosed() && messageQueue.isRunning() && !messageQueue.isEmpty(); } //唤醒 public void wakeup() { label0: { if(dispatchedBySessionPool) break MISSING_BLOCK_LABEL_134; if(!session.isSessionAsyncDispatch()) break label0; TaskRunner taskRunner; try { label1: { taskRunner = this.taskRunner; if(taskRunner != null) break MISSING_BLOCK_LABEL_105; synchronized(this) { if(this.taskRunner != null) break MISSING_BLOCK_LABEL_90; if(isRunning()) break label1; } return; } } catch(InterruptedException e) { Thread.currentThread().interrupt(); break MISSING_BLOCK_LABEL_134; } } //创建会话任务执运行器 this.taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this, (new StringBuilder()).append("ActiveMQ Session: ").append(session.getSessionId()).toString()); taskRunner = this.taskRunner; activemqsessionexecutor; JVM INSTR monitorexit ; break MISSING_BLOCK_LABEL_105; exception; throw exception; taskRunner.wakeup(); break MISSING_BLOCK_LABEL_134; }
从这一句可以看出通过ActiveMQConnection来创建的,为TaskRunnerFactory
session.connection.getSessionTaskRunner().createTaskRunner
public TaskRunnerFactory getSessionTaskRunner() { synchronized(this) { if(sessionTaskRunner == null) { sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", 7, false, 1000, isUseDedicatedTaskRunner(), maxThreadPoolSize); sessionTaskRunner.setRejectedTaskHandler(rejectedTaskHandler); } } return sessionTaskRunner; }
再来看为TaskRunnerFactory如何创建任务运行器
public class TaskRunnerFactory implements Executor { private ExecutorService executor;//执行器 private int maxIterationsPerRun; private String name; private int priority;//优先级 private boolean daemon; private final AtomicLong id; private boolean dedicatedTaskRunner; private long shutdownAwaitTermination; private final AtomicBoolean initDone;//是否初始化 private int maxThreadPoolSize;//最大线程池大小 private RejectedExecutionHandler rejectedTaskHandler;//当线程达到,线程池大小的拒绝策略 private ClassLoader threadClassLoader; public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner, int maxThreadPoolSize) { id = new AtomicLong(0L); shutdownAwaitTermination = 30000L; initDone = new AtomicBoolean(false); this.maxThreadPoolSize = 2147483647; rejectedTaskHandler = null; this.name = name; this.priority = priority; this.daemon = daemon; this.maxIterationsPerRun = maxIterationsPerRun; this.dedicatedTaskRunner = dedicatedTaskRunner; this.maxThreadPoolSize = maxThreadPoolSize; } } TaskRunnerFactory public TaskRunner createTaskRunner(Task task, String name) { //初始化 init(); //如果线程池为空,则创建线程执行器 if(executor != null) return new PooledTaskRunner(executor, task, maxIterationsPerRun); else return new DedicatedTaskRunner(task, name, priority, daemon); } public void init() { //如果未初始化,则设置状态为已初始化 if(initDone.compareAndSet(false, true)) { //判断是否用专业执行器,是则创建DedicatedTaskRunner if(dedicatedTaskRunner || "true".equalsIgnoreCase(System.getProperty("org.apache.activemq.UseDedicatedTaskRunner"))) executor = null; else if(executor == null) //创建默认执行器 executor = createDefaultExecutor(); LOG.debug("Initialized TaskRunnerFactory[{}] using ExecutorService: {}", name, executor); } } //创建默认执行器 protected ExecutorService createDefaultExecutor() { ThreadPoolExecutor rc = new ThreadPoolExecutor(0, getMaxThreadPoolSize(), getDefaultKeepAliveTime(), TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() { public Thread newThread(Runnable runnable) { String threadName = (new StringBuilder()).append(name).append("-").append(id.incrementAndGet()).toString(); Thread thread = new Thread(runnable, threadName); thread.setDaemon(daemon); thread.setPriority(priority); if(threadClassLoader != null) thread.setContextClassLoader(threadClassLoader); thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { public void uncaughtException(Thread t, Throwable e) { TaskRunnerFactory.LOG.error("Error in thread '{}'", t.getName(), e); } final _cls1 this$1; { this$1 = _cls1.this; super(); } }); TaskRunnerFactory.LOG.trace("Created thread[{}]: {}", threadName, thread); return thread; } final TaskRunnerFactory this$0; { this$0 = TaskRunnerFactory.this; super(); } }); if(rejectedTaskHandler != null) rc.setRejectedExecutionHandler(rejectedTaskHandler); return rc; }
回到createTaskRunner
public TaskRunner createTaskRunner(Task task, String name) { //初始化 init(); //如果线程池为空,则创建线程执行器 if(executor != null) return new PooledTaskRunner(executor, task, maxIterationsPerRun); else return new DedicatedTaskRunner(task, name, priority, daemon); }
我们来看PooledTaskRunner
class PooledTaskRunner implements TaskRunner { private final int maxIterationsPerRun;//每次执行允许最大线程数 private final Executor executor;//执行器 private final Task task; private final Runnable runable; private boolean queued;//是否为队列 private boolean shutdown; private boolean iterating; private volatile Thread runningThread; public PooledTaskRunner(Executor executor, final Task task, int maxIterationsPerRun) { this.executor = executor; this.maxIterationsPerRun = maxIterationsPerRun; this.task = task; runable = new Runnable() { public void run() { runningThread = Thread.currentThread(); runTask(); PooledTaskRunner.LOG.trace("Run task done: {}", task); runningThread = null; break MISSING_BLOCK_LABEL_70; Exception exception; exception; PooledTaskRunner.LOG.trace("Run task done: {}", task); runningThread = null; throw exception; } final Task val$task; final PooledTaskRunner this$0; { this$0 = PooledTaskRunner.this; task = task1; super(); } }; } }
//PooledTaskRunner
public void wakeup() throws InterruptedException { label0: { synchronized(runable) { if(!queued && !shutdown) break label0; } return; } queued = true; if(!iterating) //执行runable executor.execute(runable); runnable; JVM INSTR monitorexit ; goto _L1 exception; throw exception; _L1: }
在PooledTaskRunner的构造中可以看到
runable = new Runnable() { public void run() { runningThread = Thread.currentThread(); //运行任务 runTask(); PooledTaskRunner.LOG.trace("Run task done: {}", task); runningThread = null; } final Task val$task; final PooledTaskRunner this$0; { this$0 = PooledTaskRunner.this; task = task1; super(); } };
//运行任务
final void runTask() { boolean done = false; int i = 0; do { if(i >= maxIterationsPerRun) break; LOG.trace("Running task iteration {} - {}", Integer.valueOf(i), task); //运行任务的iterate,而任务实际为ActiveMQSessionExecutor if(!task.iterate()) { done = true; break; } i++; } while(true);
//ActiveMQSessionExecutor implements Task
public boolean iterate() { for(Iterator i$ = session.consumers.iterator(); i$.hasNext();) { ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)i$.next(); if(consumer.iterate()) return true; } //从消息队列中获取消息,消息出消息队列 MessageDispatch message = messageQueue.dequeueNoWait(); if(message == null) { return false; } else { //分发消息 dispatch(message); return !messageQueue.isEmpty(); } } //ActiveMQSessionExecutor void dispatch(MessageDispatch message) { Iterator i$ = session.consumers.iterator(); do { if(!i$.hasNext()) break; ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)i$.next(); ConsumerId consumerId = message.getConsumerId(); if(!consumerId.equals(consumer.getConsumerId())) continue; //遍历会话的消费者,分发消息,实际调用的是ActiveMQMessageConsumer的dispatch(message) consumer.dispatch(message); break; } while(true); } //ActiveMQMessageConsumer public boolean iterate() { //private final AtomicReference messageListener = new AtomicReference(); MessageListener listener = (MessageListener)messageListener.get(); if(listener != null) { //如果消息消费者注册的消息监听器,则未消费消息通道则从消息队列取出消息,分发消息 MessageDispatch md = unconsumedMessages.dequeueNoWait(); if(md != null) { //分发消息 dispatch(md); return true; } } return false; }
再看ActiveMQMessageConsumer的dispatch(message)
public void dispatch(MessageDispatch md) { //包装分发消息 ActiveMQMessage message = createActiveMQMessage(md); beforeMessageIsConsumed(md); try { boolean expired = isConsumerExpiryCheckEnabled() && message.isExpired(); if(!expired) //如果消息没有过期,则消息消费者通过消息监听器MessageListener消费消息 listener.onMessage(message); afterMessageIsConsumed(md, expired); } if(!unconsumedMessages.isRunning()) session.connection.rollbackDuplicate(this, md.getMessage()); //将消息添加到未分发消息通道 unconsumedMessages.enqueue(md); if(redeliveryExpectedInCurrentTransaction(md, true)) { LOG.debug("{} tracking transacted redelivery {}", getConsumerId(), md.getMessage()); if(transactedIndividualAck) immediateIndividualTransactedAck(md); else //发送回复消息 session.sendAck(new MessageAck(md, (byte)0, 1)); } }
总结:
会话的创建主要最的工作是,初始化消费者,生产者id产生器,会话消费者,生产者队列,消息确认模式,是否异步分发,设置连接事务上下文,异步发送会话信息,新建消息会话执行器,会话添加到ActiveMQConnection的会话队列CopyOnWriteArrayList;然后启动消费则,主要是启动消息分发通道,唤醒会话执行器ActiveMQSessionExecutor;最后启动会话执行器ActiveMQSessionExecutor,在启动会话执行器时,如果消息分发通道处于未启动状态,则启动消息分发通道,如果有未消费的消息,唤醒消息执行器,唤醒主要做的做工作是
ActiveMQConnection创建任务执行TaskRunnerFactory,有任务执行工厂TaskRunnerFactory,有任务执行工厂创建执行任务PooledTaskRunner,PooledTaskRunner是ActiveMQSessionExecutor的包装,PooledTaskRunner执行就是执行ActiveMQSessionExecutor
iterate的函数,这个过程主要是ActiveMQSessionExecutor从ActiveMQSession获取会话消费者consumer,然后遍历消费者,消费者通过MessageListener消费消息。
ActiveMQConnection与ActiveMQSession,ActiveMQMessageConsumer,
ActiveMQMessageProducer的关系,连接管理会话(1-n),会话管理消息者与生产者(1-n)。ActiveMQSession关联一个ActiveMQSessionExecutor,由会话执行器,消费者消费消息。
//MessageListener
public interface MessageListener { public abstract void onMessage(Message message); }
//MessageDispatch
public class MessageDispatch extends BaseCommand { protected ConsumerId consumerId;//消费id protected ActiveMQDestination destination;//目的地 protected Message message;//消息 protected int redeliveryCounter;//消息传输计数器 protected transient long deliverySequenceId; protected transient Object consumer;//消费者 protected transient TransmitCallback transmitCallback; protected transient Throwable rollbackCause; }
//Session
public interface Session extends Runnable { public static final int AUTO_ACKNOWLEDGE = 1; public static final int CLIENT_ACKNOWLEDGE = 2; public static final int DUPS_OK_ACKNOWLEDGE = 3; public static final int SESSION_TRANSACTED = 0; public abstract ObjectMessage createObjectMessage() throws JMSException; public abstract ObjectMessage createObjectMessage(Serializable serializable) throws JMSException; public abstract StreamMessage createStreamMessage() throws JMSException; public abstract TextMessage createTextMessage() throws JMSException; public abstract TextMessage createTextMessage(String s) throws JMSException; public abstract boolean getTransacted() throws JMSException; public abstract int getAcknowledgeMode() throws JMSException; public abstract void commit() throws JMSException; public abstract void rollback() throws JMSException; public abstract void close() throws JMSException; public abstract void recover() throws JMSException; public abstract MessageListener getMessageListener() throws JMSException; public abstract void setMessageListener(MessageListener messagelistener) throws JMSException; public abstract void run(); public abstract MessageProducer createProducer(Destination destination) throws JMSException; public abstract MessageConsumer createConsumer(Destination destination) throws JMSException; public abstract MessageConsumer createConsumer(Destination destination, String s) throws JMSException; public abstract MessageConsumer createConsumer(Destination destination, String s, boolean flag) throws JMSException; public abstract Queue createQueue(String s) throws JMSException; public abstract Topic createTopic(String s) throws JMSException; public abstract TopicSubscriber createDurableSubscriber(Topic topic, String s) throws JMSException; public abstract TopicSubscriber createDurableSubscriber(Topic topic, String s, String s1, boolean flag) throws JMSException; public abstract QueueBrowser createBrowser(Queue queue) throws JMSException; public abstract QueueBrowser createBrowser(Queue queue, String s) throws JMSException; public abstract TemporaryQueue createTemporaryQueue() throws JMSException; public abstract TemporaryTopic createTemporaryTopic() throws JMSException; public abstract void unsubscribe(String s) throws JMSException; }
发表评论
-
Spring与ActiveMQ的集成详解二
2017-01-03 10:07 2092JMS(ActiveMQ) PTP和PUB/SUB模 ... -
Spring与ActiveMQ的集成详解一
2017-01-02 17:19 4604JMS(ActiveMQ) PTP和PUB/SUB模式实例:h ... -
ActiveMQ Broker发送消息给消费者过程详解
2017-01-02 15:30 6292JMS(ActiveMQ) PTP和PUB/SUB模 ... -
ActiveMQ Server启动过程详解
2017-01-02 12:43 6519JMS(ActiveMQ) PTP和PUB/SUB模式实例:h ... -
ActiveMQ消费者详解
2017-01-01 14:38 8673JMS(ActiveMQ) PTP和PUB/SUB模式实例:h ... -
ActiveMQ生产者详解
2017-01-01 12:29 6874JMS(ActiveMQ) PTP和PUB/SUB模式实例:h ... -
ActiveMQ连接工厂、连接详解
2016-12-29 16:09 12063JMS(ActiveMQ) PTP和PUB/SUB模式实例:h ... -
基于LevelDB的高可用ActiveMQ集群
2016-12-28 18:34 4288ActiveMQ实现负载均衡+高可用部署方案:http://w ... -
ActiveMQ 目录配置文件
2016-12-28 12:47 7801下载apache-activemq-5.12.1.tar.gz ... -
Spring与ActiveMQ的集成
2016-12-27 18:09 1732JMS与MQ详解:http://www.fx114.net/q ... -
ActiveMQ PTP模式实例二
2016-12-27 14:45 700这篇主要是测试PTP模式下的回复消息,具体测试代码如下: 队列 ... -
JMS(ActiveMQ) PTP和PUB/SUB模式实例
2016-12-27 09:02 3130深入浅出JMS(一)——JMS简介 :http://blog. ...
相关推荐
在本文中,我们将深入探讨ActiveMQ的使用、安装及其初始化页面的设置。 首先,ActiveMQ是一个高性能、可靠的分布式消息传递系统,允许应用程序之间通过消息进行异步通信。这种通信模式能够提高系统的可伸缩性和解耦...
在"spring配置activemq详解"这个主题中,我们将探讨如何在Spring项目中配置和使用ActiveMQ。以下是对这个主题的详细说明: 1. **配置ActiveMQ**: - 首先,我们需要在项目中引入ActiveMQ的相关依赖,这通常通过在`...
标题中的“ActiveMQ MySQL 持久化”指的是在使用ActiveMQ消息中间件时,将消息数据存储到MySQL数据库中以实现数据的持久化。ActiveMQ是Apache软件基金会的一个开源项目,它是一个功能丰富的消息代理,支持多种消息...
**ActiveMQ配置文件详解** Apache ActiveMQ 是一个开源的消息中间件,它实现了多种消息协议,如JMS(Java Message Service)和AMQP(Advanced Message Queuing Protocol),并且广泛应用于分布式系统中,提供可靠的...
Linux 环境下 ActiveMQ 持久化、集群环境搭建详解 在 Linux 环境下搭建 ActiveMQ 持久化和集群环境是一种复杂的任务,需要对 Linux 操作系统、Java 环境、ActiveMQ 等方面有深入的了解。以下是搭建 ActiveMQ 持久化...
- **持久化**: 可以配置消息持久化,即使服务器重启,消息也不会丢失。 - **事务**: 支持JMS事务,确保消息的可靠投递。 - **优先级**: 消息可以设置优先级,高优先级的消息优先被消费。 - **消息选择器**: 消费...
### ActiveMQ 5.3.1 整合应用服务器详解 #### 一、概述 在当前的软件开发环境中,消息中间件的应用越来越广泛。ActiveMQ作为一款开源的消息中间件,因其稳定性和灵活性而受到广大开发者的青睐。本文将详细介绍如何...
### Apache ActiveMQ Queue & Topic 详解 #### 一、特性及优势 Apache ActiveMQ 是一款高性能、功能丰富的消息中间件,具有以下显著特点: 1. **实现 JMS 1.1 规范**:支持 J2EE 1.4 及以上版本,这意味着它可以...
ActiveMQ是中国最流行的开源消息中间件之一,它基于...在提供的`TestForActiveMQ`源码中,可能包含了更复杂的示例,比如消息的持久化、事务处理或者网络集群配置,通过学习这些源码,能更深入地掌握ActiveMQ的用法。
Apache ActiveMQ是业界广泛使用的开源消息中间件,它支持多种协议,如AMQP、STOMP、MQTT等,且提供了消息持久化功能,确保在系统故障后仍能恢复消息,保持数据完整性。本主题主要围绕“activemq消息持久化所需Jar包...
### ActiveMQ-JMS好用实例详解 #### 一、ActiveMQ简介及特点 **ActiveMQ** 是一个非常流行的开源消息中间件,它基于 **Java消息服务(JMS)** 规范,能够提供高度可靠的消息传递机制。ActiveMQ 以其丰富的功能集、...
在IT行业中,消息队列(Message Queue)是分布式系统中常用的一种组件,它能有效地解耦各个服务,提高系统的响应速度和并发...在实际应用中,可以根据需求调整配置,如设置消息持久化、事务管理、错误处理等高级特性。
spring +activemq topic消息持久化订阅实例,整个项目中有activemq和spring的整合的所有实例,topic的持久化配置是在ApplicationContext3C、ApplicationContext3C2以及ApplicationContext3P三个中,消息生产者:...
**ActiveMQ订阅模式持久化实现** ActiveMQ是Apache软件基金会开发的一个开源消息中间件,它遵循JMS(Java Message Service)规范,提供了多种消息传递模式,包括发布/订阅(Publish/Subscribe)模式。在发布/订阅...
本主题主要探讨如何将ActiveMQ配置为使用MySQL 8作为其持久化存储方式,以及在这个过程中所需的Jar包。 1. **ActiveMQ与持久化**: - ActiveMQ允许用户选择不同的持久化机制,包括文件系统(KahaDB)和关系数据库...
**ActiveMQ的activemq.xml配置详解** ActiveMQ是Apache软件基金会开发的一个开源消息代理,它遵循Java消息服务(JMS)规范,提供可靠的消息传递功能。`activemq.xml`是ActiveMQ的核心配置文件,它定义了服务器的...
ActiveMQ不仅支持基本的JMS功能,还包含许多额外的特性,如支持多种协议(如AMQP、STOMP等)、事务处理、消息优先级和持久化等。 在Spring Boot中集成ActiveMQ非常简便,因为Spring Boot提供了`spring-boot-starter...
标题中的"activemq持久化jdbc所需jar包.zip"指的是Apache ActiveMQ消息中间件在使用JDBC(Java Database Connectivity)进行消息持久化时所需的库文件集合。ActiveMQ是一款开源、高性能、跨语言的企业级消息代理,它...
ActiveMQ支持消息持久化,即使服务器重启,也不会丢失未处理的消息,增强了系统的可靠性。 八、KahaDB 原理 KahaDB是ActiveMQ的默认存储引擎,用于持久化消息,提供快速的读写性能和数据一致性保障。 九、关键的6...
5. **连接工厂与会话**:了解JMS中的连接工厂和会话对象,它们是与消息服务器建立连接并创建消息通道的基础。 6. **消息生产者与消费者**:掌握如何使用ActiveMQ收发工具创建消息生产者发送消息,以及创建消息消费...