`
Donald_Draper
  • 浏览: 979855 次
社区版块
存档分类
最新评论

ActiveMQ生产者详解

阅读更多
JMS(ActiveMQ) PTP和PUB/SUB模式实例:http://donald-draper.iteye.com/blog/2347445
ActiveMQ连接工厂、连接详解:http://donald-draper.iteye.com/blog/2348070
ActiveMQ会话初始化:http://donald-draper.iteye.com/blog/2348341
ActiveMQ生产者:http://donald-draper.iteye.com/blog/2348381
ActiveMQ消费者:http://donald-draper.iteye.com/blog/2348389
ActiveMQ启动过程详解:http://donald-draper.iteye.com/blog/2348399
ActiveMQ Broker发送消息给消费者过程详解:http://donald-draper.iteye.com/blog/2348440
Spring与ActiveMQ的集成:http://donald-draper.iteye.com/blog/2347638
Spring与ActiveMQ的集成详解一:http://donald-draper.iteye.com/blog/2348449
Spring与ActiveMQ的集成详解二:http://donald-draper.iteye.com/blog/2348461
上一篇我们讲到会话的初始化
Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); 


会话的创建主要最的工作是,初始化消费者,生产者id产生器,会话消费者,生产者队列,
消息确认模式,是否异步分发,设置连接事务上下文,异步发送会话信息,新建消息会话执行器,会话添加到ActiveMQConnection的会话队列CopyOnWriteArrayList;然后启动消费则,主要是启动消息分发通道,唤醒会话执行器ActiveMQSessionExecutor;最后启动会话执行器ActiveMQSessionExecutor,在启动会话执行器时,如果消息分发通道处于未启动状态,则启动消息分发通道,如果有未消费的消息,唤醒消息执行器,唤醒主要做的做工作是
ActiveMQConnection创建任务执行TaskRunnerFactory,有任务执行工厂TaskRunnerFactory,有任务执行工厂创建执行任务PooledTaskRunner,PooledTaskRunner是ActiveMQSessionExecutor的包装,PooledTaskRunner执行就是执行ActiveMQSessionExecutor
iterate的函数,这个过程主要是ActiveMQSessionExecutor从ActiveMQSession获取会话消费者consumer,然后遍历消费者,消费者通过MessageListener消费消息。ActiveMQConnection与ActiveMQSession,ActiveMQMessageConsumer,ActiveMQMessageProducer的关系,连接管理会话(1-n),会话管理消息者与生产者(1-n)。ActiveMQSession关联一个
ActiveMQSessionExecutor,由会话执行器,消费者消费消息。

今天我们往下看,看看消息队列,生产者,以及发送消息

Queue :消息的目的地;消息发送给谁.  
Queue  destination = session.createQueue(qname);  
MessageProducer:消息发送者  
MessageProducer producer = session.createProducer(destination);  
//设置生产者的模式,有两种可选
//DeliveryMode.PERSISTENT 当activemq关闭的时候,队列数据将会被保存
//DeliveryMode.NON_PERSISTENT 当activemq关闭的时候,队列里面的数据将会被清空 
producer.setDeliveryMode(DeliveryMode.PERSISTENT);  
构造消息,此处写死,项目就是参数,或者方法获取  
sendMessage(session, producer);  
session.commit();  
connection.close();  



1.先看消息队列,从下一句开始
Queue  destination = session.createQueue(qname);
 public Queue createQueue(String queueName)
        throws JMSException
    {
        checkClosed();
	//如果队列名以id开始则创建临时队列,否则创建ActiveMQQueue
        if(queueName.startsWith("ID:"))
            return new ActiveMQTempQueue(queueName);
        else
            return new ActiveMQQueue(queueName);
    }

来看ActiveMQQueue
//ActiveMQQueue
public class ActiveMQQueue extends ActiveMQDestination
    implements Queue
{
    public static final byte DATA_STRUCTURE_TYPE = 100;
    private static final long serialVersionUID = -3885260014960795889L;
 public ActiveMQQueue(String name)
    {
        //构造父类
        super(name);
    }
    public String getQueueName()
        throws JMSException
    {
        return getPhysicalName();
    }
    public byte getDestinationType()
    {
        return 1;
    }
    protected String getQualifiedPrefix()
    {
        return "queue://";
    } 
}

public abstract class ActiveMQDestination extends JNDIBaseStorable
    implements DataStructure, Destination, Externalizable, Comparable
{
    public static final String PATH_SEPERATOR = ".";
    public static final char COMPOSITE_SEPERATOR = 44;
    public static final byte QUEUE_TYPE = 1;//队列类型
    public static final byte TOPIC_TYPE = 2;//主题类型
    public static final byte TEMP_MASK = 4;
    public static final byte TEMP_TOPIC_TYPE = 6;
    public static final byte TEMP_QUEUE_TYPE = 5;
    public static final String QUEUE_QUALIFIED_PREFIX = "queue://";//队列命名目录
    public static final String TOPIC_QUALIFIED_PREFIX = "topic://";//主题命名目录
    public static final String TEMP_QUEUE_QUALIFED_PREFIX = "temp-queue://";
    public static final String TEMP_TOPIC_QUALIFED_PREFIX = "temp-topic://";
    public static final String IS_DLQ = "isDLQ";
    public static final String TEMP_DESTINATION_NAME_PREFIX = "ID:";
    private static final long serialVersionUID = -3885260014960795889L;
    protected String physicalName;
    protected transient ActiveMQDestination compositeDestinations[];
    protected transient String destinationPaths[];
    protected transient boolean isPattern;
    protected transient int hashValue;
    protected Map options;
    protected static UnresolvedDestinationTransformer unresolvableDestinationTransformer = new DefaultUnresolvedDestinationTransformer();
 protected ActiveMQDestination(String name)
    {
        setPhysicalName(name);
    }
    //获取目的地类型
     public String getDestinationTypeAsString()
    {
        switch(getDestinationType())
        {
        case 1: // '\001'
            return "Queue";

        case 2: // '\002'
            return "Topic";

        case 5: // '\005'
            return "TempQueue";

        case 6: // '\006'
            return "TempTopic";

        case 3: // '\003'
        case 4: // '\004'
        default:
            throw new IllegalArgumentException((new StringBuilder()).append("Invalid destination type: ").append(getDestinationType()).toString());
        }
    }
    //创建目的地
    public static ActiveMQDestination createDestination(String name, byte defaultType)
    {
        if(name.startsWith("queue://"))
            return new ActiveMQQueue(name.substring("queue://".length()));
        if(name.startsWith("topic://"))
            return new ActiveMQTopic(name.substring("topic://".length()));
        if(name.startsWith("temp-queue://"))
            return new ActiveMQTempQueue(name.substring("temp-queue://".length()));
        if(name.startsWith("temp-topic://"))
            return new ActiveMQTempTopic(name.substring("temp-topic://".length()));
        switch(defaultType)
        {
        case 1: // '\001'
            return new ActiveMQQueue(name);

        case 2: // '\002'
            return new ActiveMQTopic(name);

        case 5: // '\005'
            return new ActiveMQTempQueue(name);

        case 6: // '\006'
            return new ActiveMQTempTopic(name);

        case 3: // '\003'
        case 4: // '\004'
        default:
            throw new IllegalArgumentException((new StringBuilder()).append("Invalid default destination type: ").append(defaultType).toString());
        }
    }
    //将目的地物理名和配置项写入输出流
     public void writeExternal(ObjectOutput out)
        throws IOException
    {
        out.writeUTF(getPhysicalName());
        out.writeObject(options);
    }
    //从输入流读取目的地物理名和配置项
    public void readExternal(ObjectInput in)
        throws IOException, ClassNotFoundException
    {
        setPhysicalName(in.readUTF());
        options = (Map)in.readObject();
    }

}


Topic :消息的目的地;消息发送给谁. 
Topic  destination = session.createTopic(tname);  

public Topic createTopic(String topicName)
        throws JMSException
    {
        checkClosed();
        if(topicName.startsWith("ID:"))
            return new ActiveMQTempTopic(topicName);
        else
            return new ActiveMQTopic(topicName);
    }

public class ActiveMQTopic extends ActiveMQDestination
    implements Topic
{
public static final byte DATA_STRUCTURE_TYPE = 101;
    private static final long serialVersionUID = 7300307405896488588L;
    public ActiveMQTopic(String name)
    {
        super(name);
    }
    public byte getDataStructureType()
    {
        return 101;
    }
    public boolean isTopic()
    {
        return true;
    }
    public String getTopicName()
        throws JMSException
    {
        return getPhysicalName();
    }
    public byte getDestinationType()
    {
        return 2;
    }
     protected String getQualifiedPrefix()
    {
        return "topic://";
    }
}

从分析ActiveMQTopic和ActiveMQQueue可以看出本质都是ActiveMQDestination
只是JNDI和数据结构类型不同


2.再来会话创建生产者
MessageProducer producer = session.createProducer(destination); 

先看destination为ActiveMQQueue,再看ActiveMQTopic

destination为ActiveMQQueue
//根据队列目的地创建消息生产者
public MessageProducer createProducer(Destination destination)
        throws JMSException
    {
        checkClosed();
        if(destination instanceof CustomDestination)
        {
            CustomDestination customDestination = (CustomDestination)destination;
            return customDestination.createProducer(this);
        } else
        {
            int timeSendOut = connection.getSendTimeout();
            return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination), timeSendOut);
        }
    }

//ActiveMQMessageProducer
public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport
    implements StatsCapable, Disposable
{
   protected ProducerInfo info;//生产者信息
    protected boolean closed;
    private final JMSProducerStatsImpl stats;//生产者状态管理器
    private AtomicLong messageSequence;//消息序列号
    private final long startTime = System.currentTimeMillis();
    private MessageTransformer transformer;//消息转化器
    private MemoryUsage producerWindow;//生产者窗口
     protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination, int sendTimeout)
        throws JMSException
    {
        super(session);
	//创建生产者信息
        info = new ProducerInfo(producerId);
	//从连接获取生产者窗口信息
        info.setWindowSize(session.connection.getProducerWindowSize());
        if(destination != null && destination.getOptions() != null)
        {
            Map options = IntrospectionSupport.extractProperties(new HashMap(destination.getOptions()), "producer.");
            IntrospectionSupport.setProperties(info, options);
            if(options.size() > 0)
            {
                String msg = (new StringBuilder()).append("There are ").append(options.size()).append(" producer options that couldn't be set on the producer.").append(" Check the options are spelled correctly.").append(" Unknown parameters=[").append(options).append("].").append(" This producer cannot be started.").toString();
                LOG.warn(msg);
                throw new ConfigurationException(msg);
            }
        }
        info.setDestination(destination);
        if(session.connection.getProtocolVersion() >= 3 && info.getWindowSize() > 0)
        {
            producerWindow = new MemoryUsage((new StringBuilder()).append("Producer Window: ").append(producerId).toString());
            producerWindow.setExecutor(session.getConnectionExecutor());
            producerWindow.setLimit(info.getWindowSize());
            producerWindow.start();
        }
	//默认为消息持久化
        defaultDeliveryMode = 2;//
        defaultPriority = 4;//默认优先级为r4
        defaultTimeToLive = 0L;
        messageSequence = new AtomicLong(0L);//新建消息序列号
        stats = new JMSProducerStatsImpl(session.getSessionStats(), destination);
        try
        {
	   //将消息生产者添加到会话,通过会话将生产者信息发送给Server
            this.session.addProducer(this);
            this.session.syncSendPacket(info);
        }
        setSendTimeout(sendTimeout);
	//设置消息转换器
        setTransformer(session.getTransformer());
    }
}


//ActiveMQMessageProducerSupport
public abstract class ActiveMQMessageProducerSupport
    implements MessageProducer, Closeable
{
    protected ActiveMQSession session;//会话
    protected boolean disableMessageID;
    protected boolean disableMessageTimestamp;
    protected int defaultDeliveryMode;//默认传输模式
    protected int defaultPriority;//默认优先级
    protected long defaultTimeToLive;
    protected int sendTimeout;
    public ActiveMQMessageProducerSupport(ActiveMQSession session)
    {
        sendTimeout = 0;
        this.session = session;
        disableMessageTimestamp = session.connection.isDisableTimeStampsByDefault();
    }
}


从上面来看,会话创建消息生产者就是,初始化ActiveMQMessageProducer的生产者信息,生产者状态管理器
,消息序列号,传输模式(默认持久化),消息转化器,生产者窗口,并将生产者信息通过会话发送给Server
再来看destination为ActiveMQTopic的情况
 public TopicPublisher createPublisher(Topic topic)
        throws JMSException
    {
        checkClosed();
        if(topic instanceof CustomDestination)
        {
            CustomDestination customDestination = (CustomDestination)topic;
            return customDestination.createPublisher(this);
        } else
        {
            int timeSendOut = connection.getSendTimeout();
            return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic), timeSendOut);
        }
    }

//ActiveMQTopicPublisher
public class ActiveMQTopicPublisher extends ActiveMQMessageProducer
    implements TopicPublisher
{
    protected ActiveMQTopicPublisher(ActiveMQSession session, ActiveMQDestination destination, int sendTimeout)
        throws JMSException
    {
        super(session, session.getNextProducerId(), destination, sendTimeout);
    }
    public Topic getTopic()
        throws JMSException
    {
        return (Topic)super.getDestination();
    }
    public void publish(Message message)
        throws JMSException
    {
        super.send(message);
    }
    public void publish(Message message, int deliveryMode, int priority, long timeToLive)
        throws JMSException
    {
        super.send(message, deliveryMode, priority, timeToLive);
    }
    public void publish(Topic topic, Message message)
        throws JMSException
    {
        super.send(topic, message);
    }
    public void publish(Topic topic, Message message, int deliveryMode, int priority, long timeToLive)
        throws JMSException
    {
        super.send(topic, message, deliveryMode, priority, timeToLive);
    }
}

从上面可以看出,ActiveMQTopicPublisher本质上是ActiveMQMessageProducer,TopicPublisher的消息的发送都是委托给ActiveMQMessageProducer

3.发送消息
sendMessage(session, producer);
 public static void sendMessage(Session session, MessageProducer producer)  
           throws Exception {  
       for (int i = 1; i <= 5; i++) {//有限制,达到1000就不行  
           TextMessage message = session.createTextMessage("向ActiveMq发送的Queue消息" + i);  
           // 发送消息到目的地方  
           System.out.println("发送消息:" + "ActiveMq 发送的Queue消息" + i);  
           producer.send(message);  
       }  
   }  



先看会话创建消息,再看生产者发送消息

会话创建消息
//ActiveMQMessageSession
 public TextMessage createTextMessage(String text)
        throws JMSException
    {
        //创建文本消息
        ActiveMQTextMessage message = new ActiveMQTextMessage();
        message.setText(text);
	//配置消息
        configureMessage(message);
        return message;
    }


创建文本消息
ActiveMQTextMessage message = new ActiveMQTextMessage();

public class ActiveMQTextMessage extends ActiveMQMessage
    implements TextMessage
{
    public static final byte DATA_STRUCTURE_TYPE = 28;
    protected String text;
     private void copy(ActiveMQTextMessage copy)
    {
        super.copy(copy);
        copy.text = text;
    }

    public byte getDataStructureType()
    {
        return 28;
    }
    public String getJMSXMimeType()
    {
        return "jms/text-message";
    }
    public void setText(String text)
        throws MessageNotWriteableException
    {
        checkReadOnlyBody();
        this.text = text;
        setContent(null);
    }
}

配置消息
configureMessage(message);

protected void configureMessage(ActiveMQMessage message)
        throws IllegalStateException
    {
        checkClosed();
	//设置消息MQ连接
        message.setConnection(connection);
    }

public abstract class Message extends BaseCommand
    implements MarshallAware, MessageReference
{
    public static final String ORIGINAL_EXPIRATION = "originalExpiration";
    public static final int DEFAULT_MINIMUM_MESSAGE_SIZE = 1024;
    protected MessageId messageId;//消息id
    protected ActiveMQDestination originalDestination;//消息原目的地
    protected TransactionId originalTransactionId;//事务原id
    protected ProducerId producerId;//生产者id
    protected ActiveMQDestination destination;//消息目的地
    protected TransactionId transactionId;//事务id
    protected long expiration;//失效时间
    protected long timestamp;//时间戳
    protected long arrival;
    protected long brokerInTime;
    protected long brokerOutTime;
    protected String correlationId;
    protected ActiveMQDestination replyTo;//消息回复目的地
    protected boolean persistent;//是否持久化
    protected String type;
    protected byte priority;//优先级
    protected String groupID;
    protected int groupSequence;
    protected ConsumerId targetConsumerId;//消息者id
    protected boolean compressed;
    protected String userID;
    protected ByteSequence content;
    protected ByteSequence marshalledProperties;
    protected DataStructure dataStructure;//数据结构
    protected int redeliveryCounter;//传输计数器
    protected int size;
    protected Map properties;
    protected boolean readOnlyProperties;//读写属性
    protected boolean readOnlyBody;
    protected transient boolean recievedByDFBridge;
    protected boolean droppable;
    protected boolean jmsXGroupFirstForConsumer;
    private transient short referenceCount;
    private transient ActiveMQConnection connection;//连接
    transient MessageDestination regionDestination;
    transient MemoryUsage memoryUsage;//内存使用情况
    private BrokerId brokerPath[];//broker地址
    private BrokerId cluster[];//簇地址
}

再来看一下ObjectMessage
public ObjectMessage createObjectMessage(Serializable object)
        throws JMSException
    {
       //创建ObjectMessage
        ActiveMQObjectMessage message = new ActiveMQObjectMessage();
	//配置消息连接ActiveMQConnection信息
        configureMessage(message);
        message.setObject(object);
        return message;
    }

//ActiveMQObjectMessage
public class ActiveMQObjectMessage extends ActiveMQMessage
    implements ObjectMessage
{
    public static final byte DATA_STRUCTURE_TYPE = 26;
    static final ClassLoader ACTIVEMQ_CLASSLOADER = org/apache/activemq/command/ActiveMQObjectMessage.getClassLoader();
    protected transient Serializable object;
    public byte getDataStructureType()
    {
        return 26;
    }
    public String getJMSXMimeType()
    {
        return "jms/object-message";
    }
}

生产者发送消息



//ActiveMQMessageProducerSupport
 public void send(Message message)
        throws JMSException
    {
        send(getDestination(), message, defaultDeliveryMode, defaultPriority, defaultTimeToLive);
    }



//ActiveMQMessageProducer发送消息
 public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
        throws JMSException
    {
        send(destination, message, deliveryMode, priority, timeToLive, null);
    }

     public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete)
        throws JMSException
    {
        checkClosed();
        if(destination == null)
            if(info.getDestination() == null)
                throw new UnsupportedOperationException("A destination must be specified.");
            else
                throw new InvalidDestinationException("Don't understand null destinations");
        ActiveMQDestination dest;
        //转换消息目的地
        if(destination.equals(info.getDestination()))
            dest = (ActiveMQDestination)destination;
        else
        if(info.getDestination() == null)
            dest = ActiveMQDestination.transform(destination);
        else
            throw new UnsupportedOperationException((new StringBuilder()).append("This producer can only send messages to: ").append(info.getDestination().getPhysicalName()).toString());
        if(dest == null)
            throw new JMSException("No destination specified");
        //转换消息
        if(transformer != null)
        {
	    //实际使用ActiveMQConnection的消息转换器
            Message transformedMessage = transformer.producerTransform(session, this, message);
            if(transformedMessage != null)
                message = transformedMessage;
        }
	//如果生产者窗口不为空,则等待有足够的空间
        if(producerWindow != null)
            try
            {
                producerWindow.waitForSpace();
            }
            catch(InterruptedException e)
            {
                throw new JMSException("Send aborted due to thread interrupt.");
            }
	//通过会话发送消息
        session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete);
        //记录消息发送事件
	stats.onMessage();
    }


通过会话发送消息
session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete);


//ActiveMQSession
protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, 
            MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete)
        throws JMSException
    {
        checkClosed();
        if(destination.isTemporary() && connection.isDeleted(destination))
            throw new InvalidDestinationException((new StringBuilder()).append("Cannot publish to a deleted Destination: ").append(destination).toString());
        //获取发送互质锁
	synchronized(sendMutex)
        {
	    //开始事务,获取事务id
            doStartTransaction();
            TransactionId txid = transactionContext.getTransactionId();
	    //获取生产者序列号
            long sequenceNumber = producer.getMessageSequence();
            message.setJMSDeliveryMode(deliveryMode);//设置持久化模式
            long expiration = 0L;
            if(!producer.getDisableMessageTimestamp())
            {
	        //设置消息时间戳及生存时间
                long timeStamp = System.currentTimeMillis();
                message.setJMSTimestamp(timeStamp);
                if(timeToLive > 0L)
                    expiration = timeToLive + timeStamp;
            }
	    //设置消息失效时间,优先级
            message.setJMSExpiration(expiration);
            message.setJMSPriority(priority);
            message.setJMSRedelivered(false);//不允许消息重传
	    //转化消息
            ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);
	    //设置消息目的地
            msg.setDestination(destination);
	    //设置消息id
            msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
            if(msg != message)
            {
                message.setJMSMessageID(msg.getMessageId().toString());
                message.setJMSDestination(destination);
            }
            msg.setBrokerPath(null);
            msg.setTransactionId(txid);//设置事务id
            if(connection.isCopyMessageOnSend())
                msg = (ActiveMQMessage)msg.copy();
            //设置消息连接
            msg.setConnection(connection);
            msg.onSend();
	    //设置消息生产者id
            msg.setProducerId(msg.getMessageId().getProducerId());
            if(LOG.isTraceEnabled())
                LOG.trace((new StringBuilder()).append(getSessionId()).append(" sending message: ").append(msg).toString());
            if(onComplete == null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null))
            {
	        //如果消息发送回调为空,则异步发送消息
                connection.asyncSendPacket(msg);
                if(producerWindow != null)
                {
                    int size = msg.getSize();
		    //增加内存消耗
                    producerWindow.increaseUsage(size);
                }
            } else
            if(sendTimeout > 0 && onComplete == null)
	        //否则同步延时发送消息
                connection.syncSendPacket(msg, sendTimeout);
            else
                connection.syncSendPacket(msg, onComplete);
        }
    }

先看一下会话发送消息新建事务所做的事情
开始事务,获取事务id
doStartTransaction();


protected void doStartTransaction()
        throws JMSException
    {
        if(getTransacted() && !transactionContext.isInXATransaction())
            transactionContext.begin();
    }

public class TransactionContext
    implements XAResource
{ private static final HashMap ENDED_XA_TRANSACTION_CONTEXTS = new HashMap();
    private ActiveMQConnection connection;//事务所属连接
    private final LongSequenceGenerator localTransactionIdGenerator;
    private List synchronizations;
    private Xid associatedXid;
    private TransactionId transactionId;//事务id
    private LocalTransactionEventListener localTransactionEventListener;
    private int beforeEndIndex;
 public void begin()
        throws JMSException
    {
        if(isInXATransaction())
            throw new TransactionInProgressException("Cannot start local transaction.  XA transaction is already in progress.");
        if(transactionId == null)
        {
            synchronizations = null;
            beforeEndIndex = 0;
	    //新建本地事务id,和事务信息
            transactionId = new LocalTransactionId(getConnectionId(), localTransactionIdGenerator.getNextSequenceId());
            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, (byte)0);
            connection.ensureConnectionInfoSent();
	    //通过连接异步发送事务信息报
            connection.asyncSendPacket(info);
            if(localTransactionEventListener != null)
                localTransactionEventListener.beginEvent();
            LOG.debug("Begin:{}", transactionId);
        }
    }
}

回到会话异步发送消息

如果消息发送回调为空且无发送延时,则异步发送消息
connection.asyncSendPacket(msg);

//ActiveMQConnection,异步发送消息
 
public void asyncSendPacket(Command command)
        throws JMSException
    {
        if(isClosed())
        {
            throw new ConnectionClosedException();
        } else
        {
	    //异步发送消息
            doAsyncSendPacket(command);
            return;
        }
    }
//异步发送消息
private void doAsyncSendPacket(Command command)
        throws JMSException
    {
        try
        {
	    //这个我们前面看过是transport-》TcpTransport-》MutexTransport-》ResponseCorrelator
            transport.oneway(command);
        }
        catch(IOException e)
        {
            throw JMSExceptionSupport.create(e);
        }
    }

//ResponseCorrelator
public class ResponseCorrelator extends TransportFilter
{
    private final Map requestMap;
    private IntSequenceGenerator sequenceGenerator;
    private final boolean debug;
    private IOException error;
    public ResponseCorrelator(Transport next)
    {
        //next为MutexTransport
        this(next, new IntSequenceGenerator());
    }
 public void oneway(Object o)
        throws IOException
    {
        Command command = (Command)o;
	//设置命令序列化id
        command.setCommandId(sequenceGenerator.getNextSequenceId());
        command.setResponseRequired(false);
        next.oneway(command);
    }

//MutexTransport
public class MutexTransport extends TransportFilter
{
    private final ReentrantLock writeLock;
    private boolean syncOnCommand;
    public MutexTransport(Transport next)
    {
         //next为TcpTransport
        super(next);
        writeLock = new ReentrantLock();
        syncOnCommand = false;
    }
    //正在发送命令
     public void oneway(Object command)
        throws IOException
    {
        //获取写锁
        writeLock.lock();
        next.oneway(command);
        writeLock.unlock();
        break MISSING_BLOCK_LABEL_37;
        Exception exception;
        exception;
        writeLock.unlock();
        throw exception;
    }    
}

//TcpTransport

public class TcpTransport extends TransportThreadSupport
    implements Transport, Service, Runnable
{
   protected final URI remoteLocation;//远程URI
    protected final URI localLocation;//本地URI
    protected final WireFormat wireFormat;
    protected int connectionTimeout;//连接超时时间
    protected int soTimeout;
    protected int socketBufferSize;//socket的缓存大小
    protected int ioBufferSize;//io缓存大小
    protected boolean closeAsync;
    protected Socket socket;
    protected DataOutputStream dataOut;//socket的输出流
    protected DataInputStream dataIn;//socket的输入流
    protected TimeStampStream buffOut;
    protected int trafficClass;
    private boolean trafficClassSet;
    protected boolean diffServChosen;
    protected boolean typeOfServiceChosen;
    protected boolean trace;
    protected String logWriterName;
    protected boolean dynamicManagement;
    protected boolean startLogging;
    protected int jmxPort;
    protected boolean useLocalHost;
    protected int minmumWireFormatVersion;
    protected SocketFactory socketFactory;
    protected final AtomicReference stoppedLatch;
    protected volatile int receiveCounter;
    private Map socketOptions;
    private int soLinger;
    private Boolean keepAlive;//是否保活
    private Boolean tcpNoDelay;//tcp是否为非延时
    private Thread runnerThread;
  public void oneway(Object command)
        throws IOException
    {

        checkStarted();
	//将命令写到dataOut缓存区中,而dataOut我们在前面分析中TcpTransport的启动中,
	//TcpTransport与Broker建立套接字,dataOUt为socket的输出流
        wireFormat.marshal(command, dataOut);
	//刷新缓存
        dataOut.flush();
    }
}

我们来看一下wireFormat是什么?
在从连接工厂获取连接中,TcpTransport的构建是从TransportFactory

public abstract class TransportFactory
{
   //TRANSPORT_FACTORY_FINDER,transport加载路径
   private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/");
    //WIREFORMAT_FACTORY加载路径
    private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/");
    private static final ConcurrentMap TRANSPORT_FACTORYS = new ConcurrentHashMap();
    private static final String WRITE_TIMEOUT_FILTER = "soWriteTimeout";
    private static final String THREAD_NAME_FILTER = "threadName";
   //TransportFactory创建TcpTransport
  public Transport doConnect(URI location, Executor ex)
        throws Exception
    {
        return doConnect(location);
    }
    public Transport doConnect(URI location)
        throws Exception
    {
        Transport rc;
        Map options = new HashMap(URISupport.parseParameters(location));
        if(!options.containsKey("wireFormat.host"))
            options.put("wireFormat.host", location.getHost());
	//创建WireFormat
        WireFormat wf = createWireFormat(options);
        Transport transport = createTransport(location, wf);
        rc = configure(transport, wf, options);
        if(!options.isEmpty())
            throw new IllegalArgumentException((new StringBuilder()).append("Invalid connect parameters: ").append(options).toString());
        return rc;
        URISyntaxException e;
        e;
        throw IOExceptionSupport.create(e);
    }
    //配置Transport
     public Transport configure(Transport transport, WireFormat wf, Map options)
        throws Exception
    {
        transport = compositeConfigure(transport, wf, options);
        transport = new MutexTransport(transport);
        transport = new ResponseCorrelator(transport);
        return transport;
    }
}

创建WireFormat
WireFormat wf = createWireFormat(options);


  protected WireFormat createWireFormat(Map options)
        throws IOException
    {
        //创建WireFormatFactory
        WireFormatFactory factory = createWireFormatFactory(options);
	//从工厂创建WireFormat
        WireFormat format = factory.createWireFormat();
        return format;
    }

    protected WireFormatFactory createWireFormatFactory(Map options)
        throws IOException
    {
        String wireFormat;
        wireFormat = (String)options.remove("wireFormat");
        if(wireFormat == null)
	   //获取默认wireFormat类型
            wireFormat = getDefaultWireFormatType();
        WireFormatFactory wff;
	//这个是不是有点与获取TransportFactory工厂有点像
	//WIREFORMAT_FACTORY_FINDER,从META-INF/services/org/apache/activemq/wireformat/
	//路径下加载相应的default配置文件,然后加载配置文件class属性对应的class
	//实际org.apache.activemq.openwire.OpenWireFormatFactory
        wff = (WireFormatFactory)WIREFORMAT_FACTORY_FINDER.newInstance(wireFormat);
        IntrospectionSupport.setProperties(wff, options, "wireFormat.");
        return wff;
        Throwable e;
        e;
        throw IOExceptionSupport.create((new StringBuilder()).append("Could not create wire format factory for: ").append(wireFormat).append(", reason: ").append(e).toString(), e);
    }
    
    protected String getDefaultWireFormatType()
    {
        return "default";
    }


//default配置文件内容
class=org.apache.activemq.openwire.OpenWireFormatFactory


public class OpenWireFormatFactory
    implements WireFormatFactory
{
    private int version;//版本
    private boolean stackTraceEnabled;
    private boolean tcpNoDelayEnabled;//tcp是否有延时
    private boolean cacheEnabled;//是否开启缓存
    private boolean tightEncodingEnabled;
    private boolean sizePrefixDisabled;
    private long maxInactivityDuration;
    private long maxInactivityDurationInitalDelay;
    private int cacheSize;缓存大小//
    private long maxFrameSize;
    private String host;//broker ip
    
    public OpenWireFormatFactory()
    {
        version = 11;
        stackTraceEnabled = true;
        tcpNoDelayEnabled = true;
        cacheEnabled = true;
        tightEncodingEnabled = true;
        maxInactivityDuration = 30000L;
        maxInactivityDurationInitalDelay = 10000L;
        cacheSize = 1024;
        maxFrameSize = 9223372036854775807L;
        host = null;
    }

    public WireFormat createWireFormat()
    {
        //创建WireFormat协议信息
        WireFormatInfo info = new WireFormatInfo();
        info.setVersion(version);
        try
        {
	    //设置wireFormat协议下,socket发送消息的缓存,ip,延时等信息
            info.setStackTraceEnabled(stackTraceEnabled);
            info.setCacheEnabled(cacheEnabled);
            info.setTcpNoDelayEnabled(tcpNoDelayEnabled);
            info.setTightEncodingEnabled(tightEncodingEnabled);
            info.setSizePrefixDisabled(sizePrefixDisabled);
            info.setMaxInactivityDuration(maxInactivityDuration);
            info.setMaxInactivityDurationInitalDelay(maxInactivityDurationInitalDelay);
            info.setCacheSize(cacheSize);
            info.setMaxFrameSize(maxFrameSize);
            if(host != null)
                info.setHost(host);
        }
        catch(Exception e)
        {
            IllegalStateException ise = new IllegalStateException("Could not configure WireFormatInfo");
            ise.initCause(e);
            throw ise;
        }
	//创建OpenWireFormat
        OpenWireFormat f = new OpenWireFormat(version);
        f.setMaxFrameSize(maxFrameSize);
        f.setPreferedWireFormatInfo(info);
        return f;
    }
}

//OpenWireFormat
public final class OpenWireFormat
    implements WireFormat
{
 public static final int DEFAULT_STORE_VERSION = 11;
    public static final int DEFAULT_WIRE_VERSION = 11;
    public static final int DEFAULT_LEGACY_VERSION = 6;
    public static final long DEFAULT_MAX_FRAME_SIZE = 9223372036854775807L;
    static final byte NULL_TYPE = 0;
    private static final int MARSHAL_CACHE_SIZE = 16383;
    private static final int MARSHAL_CACHE_FREE_SPACE = 100;
    private DataStreamMarshaller dataMarshallers[];
    private int version;
    private boolean stackTraceEnabled;
    private boolean tcpNoDelayEnabled;//是否有延时
    private boolean cacheEnabled;//是否启动缓存
    private boolean tightEncodingEnabled;
    private boolean sizePrefixDisabled;
    private long maxFrameSize;
    private short nextMarshallCacheIndex;
    private short nextMarshallCacheEvictionIndex;
    private Map marshallCacheMap;
    private DataStructure marshallCache[];
    private DataStructure unmarshallCache[];
    private DataByteArrayOutputStream bytesOut;//输出流
    private DataByteArrayInputStream bytesIn;
    private WireFormatInfo preferedWireFormatInfo;//协议信息
    public OpenWireFormat(int i)
    {
        maxFrameSize = 9223372036854775807L;
        marshallCacheMap = new HashMap();
        marshallCache = null;
        unmarshallCache = null;
        bytesOut = new DataByteArrayOutputStream();
        bytesIn = new DataByteArrayInputStream();
        setVersion(i);
    }
}

从上面分析可以看出TcpTransport的协议为OpenWireFormat

再回到TcpTransport发送命令
//将命令写到dataOut缓存区中,而dataOut我们在前面分析中TcpTransport的启动中,
//TcpTransport与Broker建立套接字,dataOUt为socket的输出流,前文中我们分析过dataOut为DataOutputStream

 wireFormat.marshal(command, dataOut);


//OpenWireFormat


 public synchronized void marshal(Object o, DataOutput dataOut)
        throws IOException
    {
        if(cacheEnabled)
            runMarshallCacheEvictionSweep();
        int size = 1;
        if(o != null)
        {
	    //获取数据类型
            DataStructure c = (DataStructure)o;
            byte type = c.getDataStructureType();
            DataStreamMarshaller dsm = dataMarshallers[type & 255];
            if(dsm == null)
                throw new IOException((new StringBuilder()).append("Unknown data type: ").append(type).toString());
            if(tightEncodingEnabled)
            {
                BooleanStream bs = new BooleanStream();
                size += dsm.tightMarshal1(this, c, bs);
                size += bs.marshalledSize();
                if(!sizePrefixDisabled)
                    dataOut.writeInt(size);
                dataOut.writeByte(type);
                bs.marshal(dataOut);
                dsm.tightMarshal2(this, c, dataOut, bs);
            } else
            {
                DataOutput looseOut = dataOut;
                if(!sizePrefixDisabled)
                {
                    bytesOut.restart();
                    looseOut = bytesOut;
                }
		//将类型写到数据输出流
                looseOut.writeByte(type);
		//将消息信息写到输出流
                dsm.looseMarshal(this, c, looseOut);
                if(!sizePrefixDisabled)
                {
                    ByteSequence sequence = bytesOut.toByteSequence();
                    dataOut.writeInt(sequence.getLength());
                    dataOut.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
                }
            }
        } else
        {
            if(!sizePrefixDisabled)
                dataOut.writeInt(size);
            dataOut.writeByte(0);
        }
    }

将消息信息写到输出流
dsm.looseMarshal(this, c, looseOut);

//WireFormatInfoMarshaller
public void looseMarshal(OpenWireFormat wireFormat, Object o, DataOutput dataOut)
        throws IOException
    {
        WireFormatInfo info = (WireFormatInfo)o;
        info.beforeMarshall(wireFormat);
        super.looseMarshal(wireFormat, o, dataOut);
        looseMarshalConstByteArray(wireFormat, info.getMagic(), dataOut, 8);
        dataOut.writeInt(info.getVersion());
        looseMarshalByteSequence(wireFormat, info.getMarshalledProperties(), dataOut);
    }


做个小节:
生产者发送消息,首先转换消息,然后通过会话发送消息,会话创建事务,并通过连接发送事务信息,最后会话通过连接发送信息,连接发送信息,实际上是通过ResponseCorrelator,ResponseCorrelator设置发送消息的命令id及是否需要回复信息,然后交给MutexTransport,MutexTransport首先获取发送消息命令锁,再通过TcpTransport发送,MutexTransport最后通过OpenWireFormat发送消息。

记录消息发送事件
stats.onMessage();

//JMSEndpointStatsImpl
public class JMSEndpointStatsImpl extends StatsImpl
{
    protected CountStatisticImpl messageCount;//消息数统计
    protected CountStatisticImpl pendingMessageCount;
    protected CountStatisticImpl expiredMessageCount;//过期消息统计
    protected TimeStatisticImpl messageWaitTime;//消息等待时间统计
    protected TimeStatisticImpl messageRateTime;
  public void onMessage()
    {
        if(enabled)
        {
            long start = messageCount.getLastSampleTime();
            messageCount.increment();//增加消息数
            long end = messageCount.getLastSampleTime();
            messageRateTime.addTime(end - start);
        }
    }
}

//CountStatisticImpl
public class CountStatisticImpl extends StatisticImpl
    implements CountStatistic
{
    private final AtomicLong counter;//消息计数器
    private CountStatisticImpl parent;
    //生产者,生产消息,增加消息计数器
    public void increment()
    {
        if(isEnabled())
        {
            counter.incrementAndGet();
            updateSampleTime();
            if(parent != null)
                parent.increment();
        }
    }
 }

4.会话提交
session.commit(); 

//ActiveMQSession
public void commit()
        throws JMSException
    {
        checkClosed();
        if(!getTransacted())
            throw new IllegalStateException("Not a transacted session");
        if(LOG.isDebugEnabled())
            LOG.debug((new StringBuilder()).append(getSessionId()).append(" Transaction Commit :").append(transactionContext.getTransactionId()).toString());
        //提交事务
	transactionContext.commit();
    }

//TransactionContext
 
public void commit()
        throws JMSException
    {
        if(isInXATransaction())
            throw new TransactionInProgressException("Cannot commit() if an XA transaction is already in progress ");
        try
        {
            beforeEnd();
        }
        catch(JMSException e)
        {
            rollback();
            throw e;
        }
        if(transactionId != null)
        {
            LOG.debug("Commit: {} syncCount: {}", transactionId, Integer.valueOf(synchronizations == null ? 0 : synchronizations.size()));
            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, (byte)2);
            transactionId = null;
            try
            {
	       //发送事务提交信息
                syncSendPacketWithInterruptionHandling(info);
                if(localTransactionEventListener != null)
                    localTransactionEventListener.commitEvent();
                afterCommit();
            }
            catch(JMSException cause)
            {
                LOG.info("commit failed for transaction {}", info.getTransactionId(), cause);
                if(localTransactionEventListener != null)
                    localTransactionEventListener.rollbackEvent();
                afterRollback();
                throw cause;
            }
        }
    }



5.关闭连接
connection.close(); 


public void close()
        throws JMSException
    {
        boolean interrupted = Thread.interrupted();
        if(!closed.get() && !transportFailed.get())
	    //关闭会话
            doStop(false);
        synchronized(this)
        {
            if(!closed.get())
            {
                closing.set(true);//设置关闭状态
		//关闭目的地
                if(destinationSource != null)
                {
                    destinationSource.stop();
                    destinationSource = null;
                }
                if(advisoryConsumer != null)
                {
                    advisoryConsumer.dispose();
                    advisoryConsumer = null;
                }
		//关闭调度器
                Scheduler scheduler = this.scheduler;
                if(scheduler != null)
                    try
                    {
                        scheduler.stop();
                    }
                    catch(Exception e)
                    {
                        JMSException ex = JMSExceptionSupport.create(e);
                        throw ex;
                    }
                long lastDeliveredSequenceId = -1L;
                for(Iterator i = sessions.iterator(); i.hasNext();)
                {
                    ActiveMQSession s = (ActiveMQSession)i.next();
                    s.dispose();
                    lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, s.getLastDeliveredSequenceId());
                }

                ActiveMQConnectionConsumer c;
		//移除消费者信息
                for(Iterator i = connectionConsumers.iterator(); i.hasNext(); c.dispose())
                    c = (ActiveMQConnectionConsumer)i.next();

                activeTempDestinations.clear();
                if(isConnectionInfoSentToBroker)
                {
                    RemoveInfo removeCommand = info.createRemoveCommand();
                    removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
                    try
                    {
		        //发送移除命令
                        doSyncSendPacket(removeCommand, closeTimeout);
                    }
                    catch(JMSException e)
                    {
                        if(!(e.getCause() instanceof RequestTimedOutIOException))
                            throw e;
                    }
		    //发送关闭命令
                    doAsyncSendPacket(new ShutdownInfo());
                }
                started.set(false);
                if(sessionTaskRunner != null)
                    sessionTaskRunner.shutdown();
                closed.set(true);
                closing.set(false);
            }
        }
        try
        {
            if(executor != null)
	        //关闭线程执行器
                ThreadPoolUtils.shutdown(executor);
        }
        catch(Throwable e)
        {
            LOG.warn((new StringBuilder()).append("Error shutting down thread pool: ").append(executor).append(". This exception will be ignored.").toString(), e);
        }
	//关闭transport
        ServiceSupport.dispose(transport);
	//从连接状态管理器移除连接
        factoryStats.removeConnection(this);
        if(interrupted)
            Thread.currentThread().interrupt();
        break MISSING_BLOCK_LABEL_506;
        Exception exception1;
        exception1;
        try
        {
            if(executor != null)
                ThreadPoolUtils.shutdown(executor);
        }
        catch(Throwable e)
        {
            LOG.warn((new StringBuilder()).append("Error shutting down thread pool: ").append(executor).append(". This exception will be ignored.").toString(), e);
        }
        ServiceSupport.dispose(transport);
        factoryStats.removeConnection(this);
        if(interrupted)
            Thread.currentThread().interrupt();
        throw exception1;
    }

//关闭会话
 void doStop(boolean checkClosed)
        throws JMSException
    {
        if(checkClosed)
            checkClosedOrFailed();
        if(started.compareAndSet(true, false))
            synchronized(sessions)
            {
                ActiveMQSession s;
                for(Iterator i = sessions.iterator(); i.hasNext(); s.stop())
                    s = (ActiveMQSession)i.next();

            }
    }


总结:

ActiveMQTopic和ActiveMQQueue可以看出本质都是ActiveMQDestination
只是JNDI和数据结构类型不同;会话创建消息生产者就是,初始化ActiveMQMessageProducer的生产者信息,生产者状态管理器,消息序列号,传输模式(默认持久化),消息转化器,生产者窗口,并将生产者信息通过会话发送给Server;ActiveMQTopicPublisher本质上是ActiveMQMessageProducer,TopicPublisher的消息的发送都是委托给ActiveMQMessageProducer;生产者发送消息,首先转换消息,然后通过会话发送消息,
会话创建事务,并通过连接发送事务信息,最后会话通过连接发送信息,连接发送信息,实际上是通过ResponseCorrelator,ResponseCorrelator设置发送消息的命令id及是否需要回复信息,然后交给MutexTransport,MutexTransport首先获取发送消息命令锁,再通过TcpTransport发送,MutexTransport最后通过OpenWireFormat发送消息;关闭连接就是关闭执行器,TcpTransport,调度器,移除消费者信息,向broker发送关闭命令。



//内存使用
public class MemoryUsage extends Usage
{
 public MemoryUsage(String name)
    {
        this(null, name);
    }
    //等待空间可利用
     public boolean waitForSpace(long timeout)
        throws InterruptedException
    {
        if(parent != null && !((MemoryUsage)parent).waitForSpace(timeout))
            return false;
        usageLock.readLock().lock();
        if(percentUsage < 100)
            break MISSING_BLOCK_LABEL_124;
        usageLock.readLock().unlock();
        usageLock.writeLock().lock();
        while(percentUsage >= 100) 
            waitForSpaceCondition.await(timeout, TimeUnit.MILLISECONDS);
        usageLock.readLock().lock();
        usageLock.writeLock().unlock();
        break MISSING_BLOCK_LABEL_124;
        Exception exception;
        exception;
        usageLock.writeLock().unlock();
        throw exception;
        boolean flag = percentUsage < 100;
        usageLock.readLock().unlock();
        return flag;
        Exception exception1;
        exception1;
        usageLock.readLock().unlock();
        throw exception1;
    }
   //内存是否满
    public boolean isFull()
    {
        if(parent != null && ((MemoryUsage)parent).isFull())
            return true;
        usageLock.readLock().lock();
        boolean flag = percentUsage >= 100;
        usageLock.readLock().unlock();
        return flag;
        Exception exception;
        exception;
        usageLock.readLock().unlock();
        throw exception;
    }
    //增加内存使用量
    public void increaseUsage(long value)
    {
        if(value == 0L)
            return;
        usageLock.writeLock().lock();
        usage += value;
        setPercentUsage(caclPercentUsage());
        usageLock.writeLock().unlock();
        break MISSING_BLOCK_LABEL_61;
        Exception exception;
        exception;
        usageLock.writeLock().unlock();
        throw exception;
        if(parent != null)
            ((MemoryUsage)parent).increaseUsage(value);
        return;
    }
    //减少内存使用量
    public void decreaseUsage(long value)
    {
        if(value == 0L)
            return;
        usageLock.writeLock().lock();
        usage -= value;
        setPercentUsage(caclPercentUsage());
        usageLock.writeLock().unlock();
        break MISSING_BLOCK_LABEL_61;
        Exception exception;
        exception;
        usageLock.writeLock().unlock();
        throw exception;
        if(parent != null)
            ((MemoryUsage)parent).decreaseUsage(value);
        return;
    }
}

public abstract class Usage
    implements Service
{
    //内存使用锁
    protected final ReentrantReadWriteLock usageLock = new ReentrantReadWriteLock();
    protected final Condition waitForSpaceCondition;//等待条件
    protected int percentUsage;//内存使用百分比
    protected Usage parent;
    protected String name;
    private UsageCapacity limiter;
    private int percentUsageMinDelta;
    private final List listeners = new CopyOnWriteArrayList();
    private final boolean debug;
    private float usagePortion;
    private final List children = new CopyOnWriteArrayList();
    private final List callbacks = new LinkedList();
    private int pollingTime;
    private final AtomicBoolean started = new AtomicBoolean();
    private ThreadPoolExecutor executor;
}


//消息,消息目的转化器
public final class ActiveMQMessageTransformation
{
    //转化目的地
    public static ActiveMQDestination transformDestination(Destination destination)
        throws JMSException
    {
        ActiveMQDestination activeMQDestination = null;
        if(destination != null)
        {
            if(destination instanceof ActiveMQDestination)
                return (ActiveMQDestination)destination;
            if(destination instanceof TemporaryQueue)
                activeMQDestination = new ActiveMQTempQueue(((Queue)destination).getQueueName());
            else
            if(destination instanceof TemporaryTopic)
                activeMQDestination = new ActiveMQTempTopic(((Topic)destination).getTopicName());
            else
            if(destination instanceof Queue)
                activeMQDestination = new ActiveMQQueue(((Queue)destination).getQueueName());
            else
            if(destination instanceof Topic)
                activeMQDestination = new ActiveMQTopic(((Topic)destination).getTopicName());
        }
        return activeMQDestination;
    }
   //转换消息
    public static ActiveMQMessage transformMessage(Message message, ActiveMQConnection connection)
        throws JMSException
    {
        if(message instanceof ActiveMQMessage)
            return (ActiveMQMessage)message;
        ActiveMQMessage activeMessage = null;
        if(message instanceof BytesMessage)
        {
            BytesMessage bytesMsg = (BytesMessage)message;
            bytesMsg.reset();
            ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
            msg.setConnection(connection);
            try
            {
                do
                    msg.writeByte(bytesMsg.readByte());
                while(true);
            }
            catch(MessageEOFException messageeofexception) { }
            catch(JMSException jmsexception) { }
            activeMessage = msg;
        } else
        if(message instanceof MapMessage)
        {
            MapMessage mapMsg = (MapMessage)message;
            ActiveMQMapMessage msg = new ActiveMQMapMessage();
            msg.setConnection(connection);
            String name;
            for(Enumeration iter = mapMsg.getMapNames(); iter.hasMoreElements(); msg.setObject(name, mapMsg.getObject(name)))
                name = iter.nextElement().toString();

            activeMessage = msg;
        } else
        if(message instanceof ObjectMessage)
        {
            ObjectMessage objMsg = (ObjectMessage)message;
            ActiveMQObjectMessage msg = new ActiveMQObjectMessage();
            msg.setConnection(connection);
            msg.setObject(objMsg.getObject());
            msg.storeContent();
            activeMessage = msg;
        } else
        if(message instanceof StreamMessage)
        {
            StreamMessage streamMessage = (StreamMessage)message;
            streamMessage.reset();
            ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
            msg.setConnection(connection);
            Object obj = null;
            try
            {
                while((obj = streamMessage.readObject()) != null) 
                    msg.writeObject(obj);
            }
            catch(MessageEOFException messageeofexception1) { }
            catch(JMSException jmsexception1) { }
            activeMessage = msg;
        } else
        if(message instanceof TextMessage)
        {
            TextMessage textMsg = (TextMessage)message;
            ActiveMQTextMessage msg = new ActiveMQTextMessage();
            msg.setConnection(connection);
            msg.setText(textMsg.getText());
            activeMessage = msg;
        } else
        if(message instanceof BlobMessage)
        {
            BlobMessage blobMessage = (BlobMessage)message;
            ActiveMQBlobMessage msg = new ActiveMQBlobMessage();
            msg.setConnection(connection);
            if(connection != null)
                msg.setBlobDownloader(new BlobDownloader(connection.getBlobTransferPolicy()));
            try
            {
                msg.setURL(blobMessage.getURL());
            }
            catch(MalformedURLException malformedurlexception) { }
            activeMessage = msg;
        } else
        {
            activeMessage = new ActiveMQMessage();
            activeMessage.setConnection(connection);
        }
        copyProperties(message, activeMessage);
        return activeMessage;
    }

    public static void copyProperties(Message fromMessage, Message toMessage)
        throws JMSException
    {
        toMessage.setJMSMessageID(fromMessage.getJMSMessageID());
        toMessage.setJMSCorrelationID(fromMessage.getJMSCorrelationID());
        toMessage.setJMSReplyTo(transformDestination(fromMessage.getJMSReplyTo()));
        toMessage.setJMSDestination(transformDestination(fromMessage.getJMSDestination()));
        toMessage.setJMSDeliveryMode(fromMessage.getJMSDeliveryMode());
        toMessage.setJMSRedelivered(fromMessage.getJMSRedelivered());
        toMessage.setJMSType(fromMessage.getJMSType());
        toMessage.setJMSExpiration(fromMessage.getJMSExpiration());
        toMessage.setJMSPriority(fromMessage.getJMSPriority());
        toMessage.setJMSTimestamp(fromMessage.getJMSTimestamp());
        String name;
        Object obj;
        for(Enumeration propertyNames = fromMessage.getPropertyNames(); propertyNames.hasMoreElements(); toMessage.setObjectProperty(name, obj))
        {
            name = propertyNames.nextElement().toString();
            obj = fromMessage.getObjectProperty(name);
        }

    }
}




public interface DeliveryMode
{
    public static final int NON_PERSISTENT = 1;
    public static final int PERSISTENT = 2;
}







package java.io;

import java.io.ObjectOutput;
import java.io.ObjectInput;

/**
 * Only the identity of the class of an Externalizable instance is
 * written in the serialization stream and it is the responsibility
 * of the class to save and restore the contents of its instances.
 *
 * The writeExternal and readExternal methods of the Externalizable
 * interface are implemented by a class to give the class complete
 * control over the format and contents of the stream for an object
 * and its supertypes. These methods must explicitly
 * coordinate with the supertype to save its state. These methods supersede
 * customized implementations of writeObject and readObject methods.<br>
 *
 * Object Serialization uses the Serializable and Externalizable
 * interfaces.  Object persistence mechanisms can use them as well.  Each
 * object to be stored is tested for the Externalizable interface. If
 * the object supports Externalizable, the writeExternal method is called. If the
 * object does not support Externalizable and does implement
 * Serializable, the object is saved using
 * ObjectOutputStream. <br> When an Externalizable object is
 * reconstructed, an instance is created using the public no-arg
 * constructor, then the readExternal method called.  Serializable
 * objects are restored by reading them from an ObjectInputStream.<br>
 *
 * An Externalizable instance can designate a substitution object via
 * the writeReplace and readResolve methods documented in the Serializable
 * interface.<br>
 *
 * @author  unascribed
 * @see java.io.ObjectOutputStream
 * @see java.io.ObjectInputStream
 * @see java.io.ObjectOutput
 * @see java.io.ObjectInput
 * @see java.io.Serializable
 * @since   JDK1.1
 */
public interface Externalizable extends java.io.Serializable {
    /**
     * The object implements the writeExternal method to save its contents
     * by calling the methods of DataOutput for its primitive values or
     * calling the writeObject method of ObjectOutput for objects, strings,
     * and arrays.
     *
     * @serialData Overriding methods should use this tag to describe
     *             the data layout of this Externalizable object.
     *             List the sequence of element types and, if possible,
     *             relate the element to a public/protected field and/or
     *             method of this Externalizable class.
     *
     * @param out the stream to write the object to
     * @exception IOException Includes any I/O exceptions that may occur
     */
    void writeExternal(ObjectOutput out) throws IOException;

    /**
     * The object implements the readExternal method to restore its
     * contents by calling the methods of DataInput for primitive
     * types and readObject for objects, strings and arrays.  The
     * readExternal method must read the values in the same sequence
     * and with the same types as were written by writeExternal.
     *
     * @param in the stream to read data from in order to restore the object
     * @exception IOException if I/O errors occur
     * @exception ClassNotFoundException If the class for an object being
     *              restored cannot be found.
     */
    void readExternal(ObjectInput in) throws IOException, ClassNotFoundException;
}


package java.io;

/**
 * ObjectOutput extends the DataOutput interface to include writing of objects.
 * DataOutput includes methods for output of primitive types, ObjectOutput
 * extends that interface to include objects, arrays, and Strings.
 *
 * @author  unascribed
 * @see java.io.InputStream
 * @see java.io.ObjectOutputStream
 * @see java.io.ObjectInputStream
 * @since   JDK1.1
 */
public interface ObjectOutput extends DataOutput, AutoCloseable {
    /**
     * Write an object to the underlying storage or stream.  The
     * class that implements this interface defines how the object is
     * written.
     *
     * @param obj the object to be written
     * @exception IOException Any of the usual Input/Output related exceptions.
     */
    public void writeObject(Object obj)
      throws IOException;

    /**
     * Writes a byte. This method will block until the byte is actually
     * written.
     * @param b the byte
     * @exception IOException If an I/O error has occurred.
     */
    public void write(int b) throws IOException;

    /**
     * Writes an array of bytes. This method will block until the bytes
     * are actually written.
     * @param b the data to be written
     * @exception IOException If an I/O error has occurred.
     */
    public void write(byte b[]) throws IOException;

    /**
     * Writes a sub array of bytes.
     * @param b the data to be written
     * @param off       the start offset in the data
     * @param len       the number of bytes that are written
     * @exception IOException If an I/O error has occurred.
     */
    public void write(byte b[], int off, int len) throws IOException;

    /**
     * Flushes the stream. This will write any buffered
     * output bytes.
     * @exception IOException If an I/O error has occurred.
     */
    public void flush() throws IOException;

    /**
     * Closes the stream. This method must be called
     * to release any resources associated with the
     * stream.
     * @exception IOException If an I/O error has occurred.
     */
    public void close() throws IOException;
}

0
2
分享到:
评论

相关推荐

    ActiveMQ生产者

    **ActiveMQ生产者详解** ActiveMQ是Apache组织开发的一个开源消息中间件,它遵循Java Message Service(JMS)规范,提供了高效、可靠的异步通信能力。在分布式系统中,ActiveMQ作为消息代理,允许应用程序之间通过...

    spring配置activemq详解

    - `activemq-produce.xml`则包含了消息生产者的配置,例如定义一个JmsTemplate,用于发送消息到ActiveMQ。 3. **消息消费**: - 消费者(Consumer)通过定义MessageListener接口的实现类来处理接收到的消息。这些...

    Apache ActiveMQ Queue Topic 详解

    ### Apache ActiveMQ Queue & Topic 详解 #### 一、特性及优势 Apache ActiveMQ 是一款高性能、功能丰富的消息中间件,具有以下显著特点: 1. **实现 JMS 1.1 规范**:支持 J2EE 1.4 及以上版本,这意味着它可以...

    ActiveMQ的技术详解(高级).doc

    ActiveMQ 支持基于TCP的协议连接,通过创建生产者和消费者对象,发送和接收消息。 五、ActiveMQ 内部实现 内部实现包括消息的创建、存储、传输和消费,以及各种协议的支持,如OpenWire、STOMP、AMQP等。 六、Queue...

    SpringBoot集成ActiveMQ实例详解.docx

    点对点模型中,消息由一个生产者发送到一个队列,然后由一个消费者接收。这种模式确保每个消息仅被一个消费者消费一次。而在发布/订阅模型中,多个消费者可以订阅同一个主题,当生产者发布消息时,所有订阅者都能...

    ActiveMQ环境搭建及实例详解的源码

    以上就是ActiveMQ环境的搭建以及一个简单的生产者和消费者实例。通过这个例子,你可以理解ActiveMQ如何在应用程序之间传递消息。在实际开发中,你可以根据业务需求利用ActiveMQ的高级特性,构建高效、可靠的分布式...

    ActiveMQ+Spring完整详解例子

    - **生产者(Producer)**: 负责创建并发送消息到消息队列的组件。 - **消费者(Consumer)**: 从消息队列中接收并处理消息的组件。 - **队列(Queue)**: 采用FIFO(先进先出)策略,保证消息顺序传递,一个消息只能被...

    ActiveMq-JMS好用实例详解

    ### ActiveMQ-JMS好用实例详解 #### 一、ActiveMQ简介及特点 **ActiveMQ** 是一个非常流行的开源消息中间件,它基于 **Java消息服务(JMS)** 规范,能够提供高度可靠的消息传递机制。ActiveMQ 以其丰富的功能集、...

    Spring 实现远程访问详解——jms和activemq

    它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息。把它应用到实际的业务需求中的话我们可以在特定的时候利用生产者生成一消息,并进行发送,对应的消费者在接收到对应的消息...

    JMS_ActiveMQ交流学习

    ### JMS与ActiveMQ知识点详解 #### 一、JMS简介 **Java Message Service (JMS)** 是由Sun Microsystems提出的一项技术规范,旨在为多种消息中间件(Message-Oriented Middleware, MOM)提供统一的接口标准。JMS的...

    ActiveMQ消息服务器 v5.17.6.zip

    3. **创建和消费消息**:使用JMS API或者ActiveMQ提供的客户端库,可以创建生产者发送消息,消费者接收消息。 4. **监控管理**:访问Web Console(默认http://localhost:8161/admin/),可以查看队列状态、监控性能...

    ActiveMQ实战资料

    **ActiveMQ实战资料详解** Apache ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它是Java消息服务(Java Message Service,简称JMS)的一个实现。在分布式系统中,ActiveMQ扮演着至关重要的角色,它允许...

    ActiveMQ完整项目示例

    **ActiveMQ完整项目示例详解** ActiveMQ是Apache软件基金会下的一个开源消息中间件,它遵循JMS(Java消息服务)规范,用于在分布式系统中传递消息。在本项目示例中,我们将深入探讨如何使用MyEclipse 10开发基于...

    ActiveMQ实例

    在ActiveMQ实例中,我们可能需要使用Java代码或者配置文件来设置生产者和消费者的连接,定义消息格式,以及指定消息的路由策略。同时,理解ActiveMQ的管理控制台也很重要,它提供了监控和管理消息队列、主题以及其他...

    activemq的简单配置

    ActiveMQ支持两种基本的角色模型:**生产者(Producer)**和**消费者(Consumer)**。其中生产者负责将消息发送到消息队列中,而消费者则从队列中获取并处理这些消息。 - **生产者**:负责创建并发送消息到指定的...

    【BAT必备】activeMQ面试题

    ActiveMQ采用了典型的“生产者-消费者”模型,其中包含以下几个关键组件: - **Broker**: 消息代理或服务器,负责接收和转发消息。 - **Producer**: 生产者,负责发送消息到Broker。 - **Consumer**: 消费者,负责...

    Spring与ActiveMQ整合完整案例

    5. **配置详解**:在`applicationContext.xml`中,你需要配置JMS的ConnectionFactory,通常使用ActiveMQ的`org.apache.activemq.ActiveMQConnectionFactory`。然后,创建一个Destination(如Queue或Topic),接着...

    ActiveMQ连接和使用测试工程

    ActiveMQ作为MQ的实现,它充当消息的生产者和消费者的中介,确保消息的可靠传输,即使在生产者和消费者之间存在网络问题或处理速度不匹配的情况下。 在该测试工程中,我们将重点关注以下几点: 1. **ActiveMQ的...

    ActiveMQ例子

    **Apache ActiveMQ 详解** Apache ActiveMQ 是一款开源的消息中间件,它遵循Java Message Service (JMS) 规范,提供了高效、可靠的异步消息传递功能。ActiveMQ 的设计目标是提供灵活、高性能且易用的消息传递解决...

    linux版本ActiveMQ 5.15.8

    - **生产者(Producer)**:创建并发送消息到队列的应用程序。 - **消费者(Consumer)**:从队列中接收并处理消息的应用程序。 - **协议支持**:ActiveMQ支持多种消息传递协议,如OpenWire、STOMP、AMQP、MQTT、...

Global site tag (gtag.js) - Google Analytics