`
Donald_Draper
  • 浏览: 972801 次
社区版块
存档分类
最新评论

ActiveMQ Broker发送消息给消费者过程详解

阅读更多
JMS(ActiveMQ) PTP和PUB/SUB模式实例:http://donald-draper.iteye.com/blog/2347445
ActiveMQ连接工厂、连接详解:http://donald-draper.iteye.com/blog/2348070
ActiveMQ会话初始化:http://donald-draper.iteye.com/blog/2348341
ActiveMQ生产者:http://donald-draper.iteye.com/blog/2348381
ActiveMQ消费者:http://donald-draper.iteye.com/blog/2348389
ActiveMQ启动过程详解:http://donald-draper.iteye.com/blog/2348399
ActiveMQ Broker发送消息给消费者过程详解:http://donald-draper.iteye.com/blog/2348440
Spring与ActiveMQ的集成:http://donald-draper.iteye.com/blog/2347638
Spring与ActiveMQ的集成详解一:http://donald-draper.iteye.com/blog/2348449
Spring与ActiveMQ的集成详解二:http://donald-draper.iteye.com/blog/2348461
引言:
从activemq脚本,可以看出启动ActiveMQ实际是启动,bin文件夹下的其实activemq.jar
包中有一个类为Main,这就是active的启动入口,Main主要是加载lib目录和ClassPath,初始化
类加载器,委托给ShellCommand,由ShellCommand根据命令描述去执行,如果是Version和HELP,
则打印信息,若是启动命令,则通过XBeanBrokerFactory创建BrokerService,这个过程主要利用的Spring的bean容器机制,然后启动BrokerService,主要启动持久化适配器,JMX连接,上下文关系器,最后启动所有网络连接,及TcpTransport连接TransportConnector,默认使用的是openwire:tcp,所以我们就看一下TcpTransportServer,TcpTransportServer有TcpTransportFactory创建并配置OpenWire协议转换器,启动TcpTransportServer,就是从ServerSocketFactory获取ServerSocket,并绑定ip和port,监听连接
,并设置ServerSocket的监听器org.apache.activemq.transport.nio.SelectorManager.Listener,这个用的是java nio。
前一篇文章中,说过ActiveMQ启动过程,今天看一下TcpTransportServer与ActiveMQConnection如何交互,如何将消息发送给消费者,从TransportConnector启动开始。

//TransportConnector
//启动TCP监听
public void start()
        throws Exception
    {
        broker = brokerService.getBroker();
        brokerInfo.setBrokerName(broker.getBrokerName());
        brokerInfo.setBrokerId(broker.getBrokerId());
        brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos());
        brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration());
        brokerInfo.setBrokerURL(broker.getBrokerService().getDefaultSocketURIString());
	//添加TcpTransportServer的监听器
        getServer().setAcceptListener(new TransportAcceptListener() {
            public void onAccept(final Transport transport)
            {
                try
                {
                    brokerService.getTaskRunnerFactory().execute(new Runnable() {
                        public void run()
                        {
                            try
                            {
                                if(!brokerService.isStopping())
                                {
				   //创建TransportConnector,并启动
                                    Connection connection = createConnection(transport);
                                    connection.start();
				  }
                            }
                        }

                        final Transport val$transport;
                        final _cls1 this$1;
                    {
                        this$1 = _cls1.this;
                        transport = transport1;
                        super();
                    }
                    });
                }
            }
            final TransportConnector this$0;
            {
                this$0 = TransportConnector.this;
                super();
            }
        });
	//启动TcpTransportServer
        getServer().setBrokerInfo(brokerInfo);
        getServer().start();
        DiscoveryAgent da = getDiscoveryAgent();
        if(da != null)
        {
            da.registerService(getPublishableConnectString());
            da.start();
        }
        if(enableStatusMonitor)
        {
            statusDector = new TransportStatusDetector(this);
            statusDector.start();
        }
    }

//创建TransportConnector连接
 protected Connection createConnection(Transport transport)
        throws IOException
    {
        TransportConnection answer = new TransportConnection(this, transport, broker, disableAsyncDispatch ? null : taskRunnerFactory, brokerService.getTaskRunnerFactory());
        boolean statEnabled = getStatistics().isEnabled();
        answer.getStatistics().setEnabled(statEnabled);
        answer.setMessageAuthorizationPolicy(messageAuthorizationPolicy);
        return answer;
    }

我们再来看TcpTransportServer的启动
getServer().start();
这个启动有两层含义一是启动TcpTransportServer线程,而是启动Service,
先来看第一层含义
//TcpTransportServer
public void run()
    {
        final ServerSocketChannel chan = serverSocket.getChannel();
        if(chan != null)
            try
            {
	        //如果socket通道存在,则设置通道选择器
                chan.configureBlocking(false);
                selector = SelectorManager.getInstance().register(chan, new org.apache.activemq.transport.nio.SelectorManager.Listener() {
                    public void onSelect(SelectorSelection sel)
                    {
                        try
                        {
                            SocketChannel sc = chan.accept();
                            if(sc != null)
                                if(isStopped() || getAcceptListener() == null)
                                    sc.close();
                                else
                                if(useQueueForAccept)
                                    socketQueue.put(sc.socket());
                                else
                                    handleSocket(sc.socket());
                        }
                    }
                    final ServerSocketChannel val$chan;
                    final TcpTransportServer this$0;
            {
                this$0 = TcpTransportServer.this;
                chan = serversocketchannel;
                super();
            }
                });
                selector.setInterestOps(16);
                selector.enable();
            }
        else
            do
            {
	       //如果socket通道不存在,则serverSocket接受连接,并处理Socket连接
                if(isStopped())
                    break;
                Socket socket = null;
                try
                {
                    socket = serverSocket.accept();
                    if(socket != null)
                        if(isStopped() || getAcceptListener() == null)
                            socket.close();
                        else
                        if(useQueueForAccept)
                            socketQueue.put(socket);
                        else
                            handleSocket(socket);
                }
            } while(true);
    }

protected final void handleSocket(Socket socket)
    {
        boolean closeSocket = true;
        try
        {
            if(currentTransportCount.get() >= maximumConnections)
                throw new ExceededMaximumConnectionsException("Exceeded the maximum number of allowed client connections. See the 'maximumConnections' property on the TCP transport configuration URI in the ActiveMQ configuration file (e.g., activemq.xml)");
            HashMap options = new HashMap();
            options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
            options.put("maxInactivityDurationInitalDelay", Long.valueOf(maxInactivityDurationInitalDelay));
            options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion));
            options.put("trace", Boolean.valueOf(trace));
            options.put("soTimeout", Integer.valueOf(soTimeout));
            options.put("socketBufferSize", Integer.valueOf(socketBufferSize));
            options.put("connectionTimeout", Integer.valueOf(connectionTimeout));
            options.put("logWriterName", logWriterName);
            options.put("dynamicManagement", Boolean.valueOf(dynamicManagement));
            options.put("startLogging", Boolean.valueOf(startLogging));
            options.putAll(transportOptions);
            WireFormat format = wireFormatFactory.createWireFormat();
	    //创建transport
            Transport transport = createTransport(socket, format);
            closeSocket = false;
	    //将transport添加到ServiceSupport监听器列表中
            if(transport instanceof ServiceSupport)
                ((ServiceSupport)transport).addServiceListener(this);
            Transport configuredTransport = transportFactory.serverConfigure(transport, format, options);
	    //TcpTransportServer监听器接受连接transport,
            getAcceptListener().onAccept(configuredTransport);
            currentTransportCount.incrementAndGet();
        }
    }

    protected Transport createTransport(Socket socket, WireFormat format)
        throws IOException
    {
        return new TcpTransport(format, socket);
    }


TcpTransportServer父类栈

public class TcpTransportServer extends TransportServerThreadSupport
    implements ServiceListener

public abstract class TransportServerThreadSupport extends TransportServerSupport
    implements Runnable

public abstract class TransportServerSupport extends ServiceSupport
    implements TransportServer


再来看Service层的启动
public abstract class ServiceSupport
    implements Service
{
    private AtomicBoolean started;
    private AtomicBoolean stopping;
    private AtomicBoolean stopped;
    private List serviceListeners;//service监听器

    public void start()
        throws Exception
    {
        boolean success;
        if(!started.compareAndSet(false, true))
            break MISSING_BLOCK_LABEL_93;
        success = false;
        stopped.set(false);
        preStart();
	//doStart为抽象函数,待父类扩展
        doStart();
        success = true;
        started.set(success);
        break MISSING_BLOCK_LABEL_54;
        Exception exception;
        exception;
        started.set(success);
        throw exception;
        ServiceListener l;
	//启动所有Service监听,在TcpServe处理Socket的连接中(handleSocket),
	//将transport添加到ServiceSupport监听器列表中
        for(Iterator i$ = serviceListeners.iterator(); i$.hasNext(); l.started(this))
            l = (ServiceListener)i$.next();

    }
}

而TcpTransport也是service,来看他的启动,也有两层含义,第一启动TcpTransport线程,而启动Service,

先看第一层

//初始化Socket,及数据输入输出流,已经过,这里不再将
protected void doStart()
        throws Exception
    {
        //连接
        connect();
        stoppedLatch.set(new CountDownLatch(1));
        super.doStart();
    }

    protected void connect()
        throws Exception
    {
        InetSocketAddress localAddress = null;
        InetSocketAddress remoteAddress = null;
        if(localLocation != null)
            localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
        if(remoteLocation != null)
        {
            String host = resolveHostName(remoteLocation.getHost());
            remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
        }
        trafficClassSet = setTrafficClass(socket);
        if(socket != null)
        {
            if(localAddress != null)
                socket.bind(localAddress);
            if(remoteAddress != null)
                if(connectionTimeout >= 0)
                    socket.connect(remoteAddress, connectionTimeout);
                else
                    socket.connect(remoteAddress);
        } else
        if(localAddress != null)
            socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(), localAddress.getAddress(), localAddress.getPort());
        else
            socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
        initialiseSocket(socket);
        initializeStreams();
    }


再看TcpTransport线程的启动

public class TcpTransport extends TransportThreadSupport
    implements Transport, Service, Runnable
{
 public void run()
    {
        LOG.trace((new StringBuilder()).append("TCP consumer thread for ").append(this).append(" starting").toString());
        runnerThread = Thread.currentThread();
	//如果TcpTransport
        for(; !isStopped(); doRun());
        ((CountDownLatch)stoppedLatch.get()).countDown();
    }
}
 protected void doRun()
        throws IOException
    {
        try
        {
	    //读取命令
            Object command = readCommand();
	    //处理命令
            doConsume(command);
        }
    }

读取命令
Object command = readCommand();
 protected Object readCommand()
        throws IOException
    {
        //通过wireFormat解析字节流
        return wireFormat.unmarshal(dataIn);
    }

//OpenWireFormat
public final class OpenWireFormat
    implements WireFormat
{
    private DataStreamMarshaller dataMarshallers[];
    private int version;
    private boolean stackTraceEnabled;
    private boolean tcpNoDelayEnabled;
    private boolean cacheEnabled;
    private boolean tightEncodingEnabled;
    private boolean sizePrefixDisabled;
    private long maxFrameSize;
    private short nextMarshallCacheIndex;
    private short nextMarshallCacheEvictionIndex;
    private Map marshallCacheMap;
    private DataStructure marshallCache[];//命令字节流发送缓存
    private DataStructure unmarshallCache[];//命令字节流解析缓存
    private DataByteArrayOutputStream bytesOut;//数据输入流
    private DataByteArrayInputStream bytesIn;//数据输出流
    private WireFormatInfo preferedWireFormatInfo;//协议格式信息
public synchronized Object unmarshal(ByteSequence sequence)
        throws IOException
    {
        //从二进制字节流读取数据到缓存,记录读取位置
        bytesIn.restart(sequence);
        if(!sizePrefixDisabled)
        {
            int size = bytesIn.readInt();
            if(sequence.getLength() - 4 == size);
            if((long)size > maxFrameSize)
                throw new IOException((new StringBuilder()).append("Frame size of ").append(size / 1048576).append(" MB larger than max allowed ").append(maxFrameSize / 1048576L).append(" MB").toString());
        }
        //解析输入流,转为为command
	Object command = doUnmarshal(bytesIn);
        return command;
    }
}

//DataByteArrayInputStream
public final class DataByteArrayInputStream extends InputStream
    implements DataInput
{

    private byte buf[];
    private int pos;
    private int offset;
    //从二进制字节流读取数据到缓存,记录读取位置
       public void restart(ByteSequence sequence)
    {
        buf = sequence.getData();
        pos = sequence.getOffset();
    }
}

//OpenWireFormat
解析输入流,转为为command
public Object doUnmarshal(DataInput dis)
        throws IOException
    {
        //获取命令类型
        byte dataType = dis.readByte();
        if(dataType != 0)
        {
	    //创建命令字节流对应大小的字节流处理器,DataStreamMarshaller为WireFormatInfoMarshaller
            DataStreamMarshaller dsm = dataMarshallers[dataType & 255];
            if(dsm == null)
                throw new IOException((new StringBuilder()).append("Unknown data type: ").append(dataType).toString());
	    //创建命令对应的数据结构
	    Object data = dsm.createObject();
            if(tightEncodingEnabled)
            {
                BooleanStream bs = new BooleanStream();
                bs.unmarshal(dis);
                dsm.tightUnmarshal(this, data, dis, bs);
            } else
            {
	        //解析命令字节流
                dsm.looseUnmarshal(this, data, dis);
            }
            return data;
        } else
        {
            return null;
        }
    }

//WireFormatInfoMarshaller
 public class WireFormatInfoMarshaller extends BaseDataStreamMarshaller
{
    //创建命令对应的数据结构
    public DataStructure createObject()
    {
        return new WireFormatInfo();
    }
    //设置WireFormat格式下,命令命令对应的魔数,版本信息 
public void looseUnmarshal(OpenWireFormat wireFormat, Object o, DataInput dataIn)
        throws IOException
    {
        //解析命令字节流,委托给BaseDataStreamMarshaller
        super.looseUnmarshal(wireFormat, o, dataIn);
        WireFormatInfo info = (WireFormatInfo)o;
	//如果命令字节流属性不为null,则初始化命令字节流
        info.beforeUnmarshall(wireFormat);
        info.setMagic(looseUnmarshalConstByteArray(dataIn, 8));
        info.setVersion(dataIn.readInt());
	//设置命令字节流属性
        info.setMarshalledProperties(looseUnmarshalByteSequence(dataIn));
        info.afterUnmarshall(wireFormat);
    }
}

//WireFormatInfo
public class WireFormatInfo
    implements Command, MarshallAware
{
public static final byte DATA_STRUCTURE_TYPE = 1;
    private static final int MAX_PROPERTY_SIZE = 4096;
    private static final byte MAGIC[] = {
        65, 99, 116, 105, 118, 101, 77, 81
    };
    protected byte magic[];
    protected int version;
    protected ByteSequence marshalledProperties;
    protected transient Map properties;
    private transient Endpoint from;
    private transient Endpoint to;
    public void beforeMarshall(WireFormat wireFormat)
        throws IOException
    {
        if(marshalledProperties == null && properties != null)
        {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            DataOutputStream os = new DataOutputStream(baos);
            MarshallingSupport.marshalPrimitiveMap(properties, os);
            os.close();
            marshalledProperties = baos.toByteSequence();
        }
    }
    //设置命令字节流属性
  public void setMarshalledProperties(ByteSequence marshalledProperties)
    {
        this.marshalledProperties = marshalledProperties;
    }
}

//BaseDataStreamMarshaller
public abstract class BaseDataStreamMarshaller
    implements DataStreamMarshaller
{
   public static final Constructor STACK_TRACE_ELEMENT_CONSTRUCTOR;

    static 
    {
        Constructor constructor = null;
        try
        {
            constructor = java/lang/StackTraceElement.getConstructor(new Class[] {
                java/lang/String, java/lang/String, java/lang/String, Integer.TYPE
            });
        }
        catch(Throwable throwable) { }
        STACK_TRACE_ELEMENT_CONSTRUCTOR = constructor;
    }
    //待扩展
     public void looseUnmarshal(OpenWireFormat openwireformat, Object obj, DataInput datainput)
        throws IOException
    {
    }
}

}

回到TcpTransport处理命令
doConsume(command);

处理命令
public void doConsume(Object command)
    {
        if(command != null)
            if(transportListener != null)
	    //如果transport监听器不为空,则处理命令
                transportListener.onCommand(command);
            else
                LOG.error((new StringBuilder()).append("No transportListener available to process inbound command: ").append(command).toString());
    }

public abstract class TransportSupport extends ServiceSupport
    implements Transport
{
     TransportListener transportListener;//transport监听器
     public void doConsume(Object command)
    {
        if(command != null)
            if(transportListener != null)
                transportListener.onCommand(command);
            else
                LOG.error((new StringBuilder()).append("No transportListener available to process inbound command: ").append(command).toString());
    }
}

再回到看ActiveMQConnection实现transportListener
public class ActiveMQConnection
    implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, TransportListener, EnhancedConnection
{
 public void onCommand(Object o)
    {
        final Command command = (Command)o;
        if(!closed.get() && command != null)
            try
            {
                command.visit(new CommandVisitorAdapter() {
                    //分发消息
                    public Response processMessageDispatch(MessageDispatch md)
                        throws Exception
                    {
                        waitForTransportInterruptionProcessingToComplete();
			//根据分发消息id,获取消费者,然后消费者,消费消息
                        ActiveMQDispatcher dispatcher = (ActiveMQDispatcher)dispatchers.get(md.getConsumerId());
                        if(dispatcher != null)
                        {
                            Message msg = md.getMessage();
                            if(msg != null)
                            {
                                msg = msg.copy();
                                msg.setReadOnlyBody(true);
                                msg.setReadOnlyProperties(true);
                                msg.setRedeliveryCounter(md.getRedeliveryCounter());
                                msg.setConnection(ActiveMQConnection.this);
                                msg.setMemoryUsage(null);
                                md.setMessage(msg);
                            }
			    //分发消息
                            dispatcher.dispatch(md);
                    }
                    //处理生产者恢复消息
                    public Response processProducerAck(ProducerAck pa)
                        throws Exception
                    {
                        if(pa != null && pa.getProducerId() != null)
                        {
                            ActiveMQMessageProducer producer = (ActiveMQMessageProducer)producers.get(pa.getProducerId());
                            if(producer != null)
                                producer.onProducerAck(pa);
                        }
                        return null;
                    }
                    //处理broker
                    public Response processBrokerInfo(BrokerInfo info)
                        throws Exception
                    {
                        brokerInfo = info;
                        brokerInfoReceived.countDown();
                        optimizeAcknowledge = brokerInfo.isFaultTolerantConfiguration() ? 0 : 1;
                        getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
                        return null;
                    }
                    //处理连接错误
                    public Response processConnectionError(final ConnectionError error)
                        throws Exception
                    {
                        executor.execute(new Runnable() {

                            public void run()
                            {
                                onAsyncException(error.getException());
                            }

                            final ConnectionError val$error;
                            final _cls3 this$1;

                    
                    {
                        this$1 = _cls3.this;
                        error = connectionerror;
                        super();
                    }
                        });
                        return null;
                    }
                    //处理控制命令
                    public Response processControlCommand(ControlCommand command)
                        throws Exception
                    {
                        onControlCommand(command);
                        return null;
                    }
                    //处理连接命令
                    public Response processConnectionControl(ConnectionControl control)
                        throws Exception
                    {
                        onConnectionControl((ConnectionControl)command);
                        return null;
                    }
                    //处理消费控制命令
                    public Response processConsumerControl(ConsumerControl control)
                        throws Exception
                    {
                        onConsumerControl((ConsumerControl)command);
                        return null;
                    }

                    public Response processWireFormat(WireFormatInfo info)
                        throws Exception
                    {
                        onWireFormatInfo((WireFormatInfo)command);
                        return null;
                    }

                    final Command val$command;
                    final ActiveMQConnection this$0;

            
            {
                this$0 = ActiveMQConnection.this;
                command = command1;
                super();
            }
                });
            }
	//启动所有连接注册的监听器
        TransportListener listener;
        for(Iterator iter = transportListeners.iterator(); iter.hasNext(); listener.onCommand(command))
            listener = (TransportListener)iter.next();

    }

    处理消费控制命令
     protected void onConsumerControl(ConsumerControl command)
    {
        if(command.isClose())
        {
            ActiveMQSession session;
            for(Iterator i$ = sessions.iterator(); i$.hasNext(); session.close(command.getConsumerId()))
                session = (ActiveMQSession)i$.next();

        } else
        {
            Iterator i$;
            ActiveMQSession session;
	    //设置会话消费者抓取数据大小
            for(i$ = sessions.iterator(); i$.hasNext(); session.setPrefetchSize(command.getConsumerId(), command.getPrefetch()))
                session = (ActiveMQSession)i$.next();
            i$ = connectionConsumers.iterator();
            do
            {
                if(!i$.hasNext())
                    break;
                ActiveMQConnectionConsumer connectionConsumer = (ActiveMQConnectionConsumer)i$.next();
                ConsumerInfo consumerInfo = connectionConsumer.getConsumerInfo();
                if(consumerInfo.getConsumerId().equals(command.getConsumerId()))
		    //设置消费抓取数据大小
                    consumerInfo.setPrefetchSize(command.getPrefetch());
            } while(true);
        }
    }


//启动所有连接注册的监听器
TransportListener listener;
for(Iterator iter = transportListeners.iterator(); iter.hasNext(); listener.onCommand(command))
listener = (TransportListener)iter.next();

在ActiveMQConnection构造中有这么一段
 protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats)
        throws Exception
    {
        //transport的监听器为ActiveMQConnection
        this.transport.setTransportListener(this);
        stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
        this.factoryStats.addConnection(this);
        connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
    }


实际上调用的是ActiveMQConnection的onCommand
ResponseCorrelator
public void onCommand(Object o)
    {
        Command command = null;
        if(o instanceof Command)
            command = (Command)o;
        else
            throw new ClassCastException((new StringBuilder()).append("Object cannot be converted to a Command,  Object: ").append(o).toString());
        if(command.isResponse())
        {
            Response response = (Response)command;
            FutureResponse future = null;
            synchronized(requestMap)
            {
                future = (FutureResponse)requestMap.remove(Integer.valueOf(response.getCorrelationId()));
            }
            if(future != null)
                future.set(response);
            else
            if(debug)
                LOG.debug((new StringBuilder()).append("Received unexpected response: {").append(command).append("}for command id: ").append(response.getCorrelationId()).toString());
        } else
        {
            getTransportListener().onCommand(command);
        }
    }



回到ActiveMQConnection的onCommand
 //分发消息
                    public Response processMessageDispatch(MessageDispatch md)
                        throws Exception
                    {
                        waitForTransportInterruptionProcessingToComplete();
			//根据分发消息id,获取消费者,然后消费者,消费消息
                        ActiveMQDispatcher dispatcher = (ActiveMQDispatcher)dispatchers.get(md.getConsumerId());
                        if(dispatcher != null)
                        {
                            Message msg = md.getMessage();
                            if(msg != null)
                            {
                                msg = msg.copy();
                                msg.setReadOnlyBody(true);
                                msg.setReadOnlyProperties(true);
                                msg.setRedeliveryCounter(md.getRedeliveryCounter());
				//设置分发消息连接
                                msg.setConnection(ActiveMQConnection.this);
                                msg.setMemoryUsage(null);
                                md.setMessage(msg);
                            }
			     //分发消息
                            dispatcher.dispatch(md);
                    }


public class ActiveMQConnectionConsumer
    implements ConnectionConsumer, ActiveMQDispatcher
{
   private ActiveMQConnection connection;//连接
    private ServerSessionPool sessionPool;//会话池
    private ConsumerInfo consumerInfo;//消费者信息
    private boolean closed;
    public void dispatch(MessageDispatch messageDispatch)
    {
        ServerSession serverSession;
        ActiveMQSession session;
        messageDispatch.setConsumer(this);
	//获取护花
        serverSession = sessionPool.getServerSession();
        Session s = serverSession.getSession();
        session = null;
        if(s instanceof ActiveMQSession)
            session = (ActiveMQSession)s;
        else
        if(s instanceof ActiveMQTopicSession)
        {
            ActiveMQTopicSession topicSession = (ActiveMQTopicSession)s;
            session = (ActiveMQSession)topicSession.getNext();
        } else
        if(s instanceof ActiveMQQueueSession)
        {
            ActiveMQQueueSession queueSession = (ActiveMQQueueSession)s;
            session = (ActiveMQSession)queueSession.getNext();
        } else
        {
            connection.onClientInternalException(new JMSException((new StringBuilder()).append("Session pool provided an invalid session type: ").append(s.getClass()).toString()));
            return;
        }
        try
        {
	    //会话分发消息,这个前面已说过,就是
            session.dispatch(messageDispatch);
            serverSession.start();
        }
        catch(JMSException e)
        {
            connection.onAsyncException(e);
        }
        return;
    }
}


//ActiveMQSession
public void dispatch(MessageDispatch messageDispatch)
    {
        try
        {
	   //会话执行,执行消息分发
            executor.execute(messageDispatch);
        }
    }

//ActiveMQSessionExecutor
public class ActiveMQSessionExecutor
    implements Task
{
 private final ActiveMQSession session;
    private final MessageDispatchChannel messageQueue;//未消费消息队列
    private boolean dispatchedBySessionPool;
    private volatile TaskRunner taskRunner;
    private boolean startedOrWarnedThatNotStarted;
    void execute(MessageDispatch message)
        throws InterruptedException
    {
        if(!startedOrWarnedThatNotStarted)
        {
            ActiveMQConnection connection = session.connection;
            long aboutUnstartedConnectionTimeout = connection.getWarnAboutUnstartedConnectionTimeout();
            if(connection.isStarted() || aboutUnstartedConnectionTimeout < 0L)
            {
                startedOrWarnedThatNotStarted = true;
            } else
            {
                long elapsedTime = System.currentTimeMillis() - connection.getTimeCreated();
                if(elapsedTime > aboutUnstartedConnectionTimeout)
                {
                    LOG.warn((new StringBuilder()).append("Received a message on a connection which is not yet started. Have you forgotten to call Connection.start()? Connection: ").append(connection).append(" Received: ").append(message).toString());
                    startedOrWarnedThatNotStarted = true;
                }
            }
        }
        if(!session.isSessionAsyncDispatch() && !dispatchedBySessionPool)
        {
	    //如果不是异步分发消息,则直接分发消息
            dispatch(message);
        } else
        {
	    //将分发消息添加到未分发消息队列
            messageQueue.enqueue(message);
            wakeup();
        }
    }
}

我们来看同步,获取会话消费者,遍历消费,消费消息
void dispatch(MessageDispatch message)
    {
        Iterator i$ = session.consumers.iterator();
        do
        {
            if(!i$.hasNext())
                break;
            ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)i$.next();
            ConsumerId consumerId = message.getConsumerId();
            if(!consumerId.equals(consumer.getConsumerId()))
                continue;
            consumer.dispatch(message);
            break;
        } while(true);
    }

上面分发消息这一段,我们在前几篇有说过,这一就不在讲。
总结:
TransportConnector的启动,主要为添加TcpTransportServer的监听器TransportAcceptListener,监听器主要任务是接受连接,并启动连接TransportConnector,在启动的过程启动一个TcpTransportServer,并启动TcpTransportServer,TcpTransportServer监听连接请求,如果有连接请求,则创建连接,同时启动连接的Transport,Transport启动过程主要是读取命令,然后交由TransportListener处理,实际为ActiveMQConnection,如果命令为消息分发命令则有ActiveMQConnectionConsumer根据分化消息获取消费者信息,并从ActiveMQConnectionConsumer获取连接会话,然后由会话来分发消息,最后交由会话执行器分发消息。
0
0
分享到:
评论
1 楼 cuixianfei521 2017-04-20  
你好博主,我想问下  activemq 其实就是个中间件,就是把 发布者 发送的消息进行转发到不同的订阅者,可是 activemq 怎么知道  有几个发布代理和订阅代理 是处在启动的状态。522351468 这是我的QQ 想让博主 帮忙出个思路

相关推荐

    ActiveMQ消息过期时间设置和自动清除解决方案

    为了确保消息处理的高效性,ActiveMQ提供了慢消费者策略来处理那些处理消息速度过慢的消费者。该策略可以通过定期检查所有慢速消费者并在达到一定阈值时中断它们,以提高系统的整体性能。 ##### 配置示例 ```xml ...

    ActiveMQ 配置文件详解

    **ActiveMQ配置文件详解** Apache ActiveMQ 是一个开源的消息中间件,它实现了多种消息协议,如JMS(Java Message Service)和AMQP(Advanced Message Queuing Protocol),并且广泛应用于分布式系统中,提供可靠的...

    activemq消息中间件-视频教程

    4. **API使用**:通过Java API演示如何创建生产者和消费者,发送和接收消息,以及设置消息属性和过滤条件。 5. **消息持久化**:讲解ActiveMQ如何实现消息的持久化,确保在网络故障或服务器重启后仍能恢复未处理的...

    ActiveMQ消息传送机制以及ACK机制详解

    它们的协同中心就是ActiveMQbroker,broker也是让producer和consumer调用过程解耦的工具,最终实现了异步RPC/数据交换的功能。随着ActiveMQ的不断发展,支持了越来越多的特性,也解决开发者在各种场景下使用ActiveMQ...

    SpringBoot集成ActiveMQ实例详解.docx

    生产者使用`JmsTemplate`发送消息,消费者可以通过实现`MessageListener`接口或使用`@JmsListener`注解来监听消息。例如: ```java @Autowired private JmsTemplate jmsTemplate; public void sendMessage(String ...

    ActiveMQ高并发处理方案

    ActiveMQ使用了一种称为“预取策略”的机制来决定向消费者发送多少条消息。默认情况下,每个消费者的预取数量为1000条,这意味着在没有特别配置的情况下,消费者将预取最多1000条消息到本地缓冲区中。当一个消费者...

    高可用之ActiveMQ集群:网络连接模式(network connector)详解.docx

    每个 broker 实例可以共享队列和消费者信息,使得消息可以在集群内的任何节点间流动,提高了消息传递的可靠性和效率。 **ActiveMQ部署拓扑结构** ActiveMQ 集群有多种部署模式,包括: 1. **嵌入模式**:在一个应用...

    activemq 入门示例代码

    // 创建消息消费者 MessageConsumer consumer = session.createConsumer(destination); // 注册消息监听器 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message ...

    ActiveMQ+Spring完整详解例子

    5. **创建消息消费者** - 创建`MessageListener`接口实现类,重写`onMessage`方法,处理接收到的消息。 - 在消息监听器容器中注册监听器,关联到特定的队列或主题。 6. **ActiveMQ的高级特性** - **持久化**: ...

    ActiveMQ开发教程

    在ActiveMQ中,当消费者成功处理完一条消息后,需要向Broker发送确认消息。这通常被称为“签收”(Acknowledge)。如果不进行签收,Broker可能会重新发送该消息,以防消息丢失。 **1.5 ActiveMQ消息传送模式** ...

    ActiveMQ消息服务配置

    ### ActiveMQ消息服务配置详解 #### 一、ActiveMQ配置概览 ActiveMQ是一款非常流行的开源消息中间件,它基于Java开发,支持多种消息传递模式,如点对点(P2P)、发布/订阅(Pub/Sub)等。本文将详细介绍ActiveMQ的配置...

    ActiveMq-JMS好用实例详解

    ### ActiveMQ-JMS好用实例详解 #### 一、ActiveMQ简介及特点 **ActiveMQ** 是一个非常流行的开源消息中间件,它基于 **Java消息服务(JMS)** 规范,能够提供高度可靠的消息传递机制。ActiveMQ 以其丰富的功能集、...

    【BAT必备】activeMQ面试题

    消息队列是一种点对点的通信模型,在这种模型下,消息被发送到特定的队列中,然后由一个或多个消费者接收。每个消息只能被一个消费者消费。当消费者成功接收并处理完消息后,消息就会从队列中移除。 **2.3 什么是...

    activeMQ实例

    1. **持久化**:ActiveMQ支持消息持久化,即使Broker重启,未消费的消息也能恢复。 2. **事务**:Session支持事务操作,确保消息的一致性。 3. **消息优先级**:可以设置消息优先级,高优先级的消息优先被处理。 4. ...

    Spring+ActiveMQ整合实例代码工程

    对于消息消费者,我们可以通过定义`MessageListener`接口的实现类来监听特定的队列或主题。Spring会自动管理这些监听器,并在有新消息到达时调用其`onMessage()`方法。例如: ```java public class ...

    ActiveMQ集群:网络连接模式(network connector)详解.docx

    ActiveMQ 通过 network connector 实现了分布式队列的目的, broker 实例之间可以共享队列和消费者列表。Network Connector 的配置主要包括两个方面:URI 配置和 networkConnector 配置。 URI 配置:URI 是 ...

    activeMQ学习资料

    - **消息队列**:ActiveMQ的核心是消息队列,它允许应用程序之间通过发送和接收消息进行通信,解耦了生产者和消费者。 - **JMS接口**:ActiveMQ实现了JMS接口,提供了消息生产者、消费者、目的地(队列和主题)等...

Global site tag (gtag.js) - Google Analytics