ActiveMQBytesMessage类型的消息在特殊情况下会丢失数据,就是在被拷贝前设置消息的某个属性。下面是测试代码:
producer代码
MessageProducer producer; //initialize Connection, Session, Producer ...... byte[] bs = "bytes message".getBytes(); BytesMessage message = session.createBytesMessage(); message.writeBytes(bs); for(int i=0; i< 10; i++){ message.setLongProperty("sendTime", System.currentTimeMillis()); try{ producer.send(message); }catch(Exception e){ e.printStackTrace(); } }
Consumer代码
MessageConsumer consumer //initailize Connection, Session, MessageConsumer for(int i=0; i<10; i++){ ActiveMQBytesMessage message = (ActiveMQBytesMessage)consumer.receive(60*1000); long sendTime = message.getLongProperty("sendTime"); System.out.println("sendtime:" + sendTime); ByteSequence bs = message.getMessage().getContent(); System.out.println("bytes data:" + new String(bs.getData())); }
期待的结果:
consumer在所有接收到的消息中得到bytes数据
实际结果:
只有第一条消息有bytes数据,其他消息都丢失了bytes数据,而long property的值都没有丢失。
分析:
ActiveMQ在发送消息的时候会拷贝消息,实际发送的是消息的拷贝。在拷贝之前会调用storeContent() ,ActiveMQBytesMessage中的属性DataOutputStream dataOut 会被关闭,dataOut 会被置为null,dataOut中的值会被set到content属性中。如果不设置消息的属性,这个逻辑是没有问题的。当设置消息属性时,setLongProperty 方法中会调用setObjectProperty() ,然后调用initializeWriting(),在initializeWriting()中DataOutputStream dataOut会再次被创建。这样,当消息第二次被拷贝的时候,DataOutputStream dataOut不是null,而是EMPTY。由于不是null,dataOut的值会被set到content,这样content的值就被清空了。
根据JMS规范的要求:
3.9 Access to Sent Messages After sending a message, a client may retain and modify it without affecting the message that has been sent. The same message object may be sent multiple times. During the execution of its sending method, the message must not be changed by the client. If it is modified, the result of the send is undefined.
消息发送后,原消息不应该受到影响。根据这个要求,目前的实现是个bug
建议:
在initializeWriting()中把content的数据设置到DataOutputStream dataOut中,这样就保持消息中bytes数据不变。
我的fix代码
ActiveMQBytesMessage : private void initializeWriting() throws JMSException { 669 The original code ...... 701 //fix code if(this.content !=null && this.content.length >0){ try { this.dataOut.write(this.content.getData()); } catch(IOException ioe) { throw JMSExceptionSupport.create(ioe); } } 702 }
这个bug 已经提交给APACHE,还没有被修复。
更新:该bug已经被修复,最新的Patch:
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java index 6de35aa..923e0e1 100755 --- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java @@ -856,6 +856,42 @@ } this.dataOut = new DataOutputStream(os); } + + restoreOldContent(); + } + + private void restoreOldContent() throws JMSException { + // For a message that already had a body and was sent we need to restore the content + // if the message is used again without having its clearBody method called. + if (this.content != null && this.content.length > 0) { + try { + ByteSequence toRestore = this.content; + if (compressed) { + InputStream is = new ByteArrayInputStream(toRestore); + int length = 0; + try { + DataInputStream dis = new DataInputStream(is); + length = dis.readInt(); + dis.close(); + } catch (IOException e) { + throw JMSExceptionSupport.create(e); + } + is = new InflaterInputStream(is); + DataInputStream input = new DataInputStream(is); + + byte[] buffer = new byte[length]; + input.readFully(buffer); + toRestore = new ByteSequence(buffer); + } + + this.dataOut.write(toRestore.getData(), toRestore.getOffset(), toRestore.getLength()); + // Free up the buffer from the old content, will be re-written when + // the message is sent again and storeContent() is called. + this.content = null; + } catch (IOException ioe) { + throw JMSExceptionSupport.create(ioe); + } + } } protected void checkWriteOnlyBody() throws MessageNotReadableException { diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java index f9dda6c..e809d03 100755 --- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java @@ -118,6 +118,7 @@ protected transient DataInputStream dataIn; protected transient int remainingBytes = -1; + @Override public Message copy() { ActiveMQStreamMessage copy = new ActiveMQStreamMessage(); copy(copy); @@ -132,6 +133,7 @@ copy.dataIn = null; } + @Override public void onSend() throws JMSException { super.onSend(); storeContent(); @@ -151,10 +153,12 @@ } } + @Override public byte getDataStructureType() { return DATA_STRUCTURE_TYPE; } + @Override public String getJMSXMimeType() { return "jms/stream-message"; } @@ -171,6 +175,7 @@ * due to some internal error. */ + @Override public void clearBody() throws JMSException { super.clearBody(); this.dataOut = null; @@ -191,6 +196,7 @@ * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public boolean readBoolean() throws JMSException { initializeReading(); try { @@ -233,6 +239,7 @@ * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public byte readByte() throws JMSException { initializeReading(); try { @@ -282,6 +289,7 @@ * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public short readShort() throws JMSException { initializeReading(); try { @@ -335,6 +343,7 @@ * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public char readChar() throws JMSException { initializeReading(); try { @@ -382,6 +391,7 @@ * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public int readInt() throws JMSException { initializeReading(); try { @@ -438,6 +448,7 @@ * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public long readLong() throws JMSException { initializeReading(); try { @@ -496,6 +507,7 @@ * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public float readFloat() throws JMSException { initializeReading(); try { @@ -544,6 +556,7 @@ * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public double readDouble() throws JMSException { initializeReading(); try { @@ -596,6 +609,7 @@ * @throws MessageNotReadableException if the message is in write-only mode. */ + @Override public String readString() throws JMSException { initializeReading(); try { @@ -696,6 +710,7 @@ * @see #readObject() */ + @Override public int readBytes(byte[] value) throws JMSException { initializeReading(); @@ -769,6 +784,7 @@ * @see #readBytes(byte[] value) */ + @Override public Object readObject() throws JMSException { initializeReading(); try { @@ -849,6 +865,7 @@ * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeBoolean(boolean value) throws JMSException { initializeWriting(); try { @@ -867,6 +884,7 @@ * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeByte(byte value) throws JMSException { initializeWriting(); try { @@ -885,6 +903,7 @@ * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeShort(short value) throws JMSException { initializeWriting(); try { @@ -903,6 +922,7 @@ * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeChar(char value) throws JMSException { initializeWriting(); try { @@ -921,6 +941,7 @@ * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeInt(int value) throws JMSException { initializeWriting(); try { @@ -939,6 +960,7 @@ * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeLong(long value) throws JMSException { initializeWriting(); try { @@ -957,6 +979,7 @@ * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeFloat(float value) throws JMSException { initializeWriting(); try { @@ -975,6 +998,7 @@ * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeDouble(double value) throws JMSException { initializeWriting(); try { @@ -993,6 +1017,7 @@ * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeString(String value) throws JMSException { initializeWriting(); try { @@ -1019,6 +1044,7 @@ * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeBytes(byte[] value) throws JMSException { writeBytes(value, 0, value.length); } @@ -1039,6 +1065,7 @@ * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeBytes(byte[] value, int offset, int length) throws JMSException { initializeWriting(); try { @@ -1062,6 +1089,7 @@ * @throws MessageNotWriteableException if the message is in read-only mode. */ + @Override public void writeObject(Object value) throws JMSException { initializeWriting(); if (value == null) { @@ -1102,6 +1130,7 @@ * @throws JMSException if an internal error occurs */ + @Override public void reset() throws JMSException { storeContent(); this.bytesOut = null; @@ -1111,7 +1140,7 @@ setReadOnlyBody(true); } - private void initializeWriting() throws MessageNotWriteableException { + private void initializeWriting() throws JMSException { checkReadOnlyBody(); if (this.dataOut == null) { this.bytesOut = new ByteArrayOutputStream(); @@ -1122,6 +1151,19 @@ os = new DeflaterOutputStream(os); } this.dataOut = new DataOutputStream(os); + } + + // For a message that already had a body and was sent we need to restore the content + // if the message is used again without having its clearBody method called. + if (this.content != null && this.content.length > 0) { + try { + this.dataOut.write(this.content.getData(), this.content.getOffset(), this.content.getLength()); + // Free up the buffer from the old content, will be re-written when + // tbe message is sent again and storeContent() is called. + this.content = null; + } catch (IOException ioe) { + throw JMSExceptionSupport.create(ioe); + } } } @@ -1153,6 +1195,7 @@ super.compress(); } + @Override public String toString() { return super.toString() + " ActiveMQStreamMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }"; }
相关推荐
这篇博客"ActiveMQ问题解决记录"可能涵盖了作者在使用ActiveMQ过程中遇到的问题以及解决这些问题的经验分享。 首先,让我们了解一下ActiveMQ的基本概念。ActiveMQ作为消息中间件,它充当应用程序之间的桥梁,负责...
### ActiveMQ 消息过期时间设置与自动清除解决方案 #### 概述 在消息队列的场景下,为了防止消息长时间滞留在队列中占用资源或者为了满足业务上对消息时效性的需求,通常需要对消息设定过期时间。本文档详细介绍了...
4. **实现数据同步逻辑**:在Quartz的Job类中,编写数据同步的业务逻辑,可能包括从数据库获取新数据、通过JMS模板将数据发送到ActiveMQ队列,以及在接收端监听队列并处理接收到的数据。 5. **测试与部署**:完成...
本文将围绕ActiveMQ常见的问题进行总结,以帮助开发者更好地理解和解决实际操作中的问题。 **一、ActiveMQ基本概念** 1. **生产者(Producer)**:发布消息到消息队列的客户端。 2. **消费者(Consumer)**:从消息...
ActiveMQ(中文)参考手册 ActiveMQ(中文)参考手册 ActiveMQ(中文)参考手册 ActiveMQ(中文)参考手册
在使用Spring框架集成ActiveMQ时,可能会遇到队列中积压了大量数据但只有一个消费者在处理的情况。这通常是因为ActiveMQ默认的预取策略导致数据分布不均匀。 ##### 原因分析 ActiveMQ使用了一种称为“预取策略”的...
总结来说,ActiveMQ 提供了一套强大的消息传递机制,但需要对它的特性和机制有深入理解,才能有效地预防和解决可能出现的问题。通过理解服务器宕机时的数据存储策略,处理丢消息的策略,优化持久化消息的发送,调整...
描述中的"activemq 传送数据流发送文件"意味着我们将文件的内容转换为数据流,并通过ActiveMQ进行传输。这涉及到文件的读取和流操作,如使用`FileStream`读取文件,然后将`FileStream`转换为`MemoryStream`,最后将`...
在使用ActiveMQ的过程中,可能会遇到消息丢失的问题,尤其是在网络不稳定或者客户端/服务器异常关闭的情况下。为了避免消息丢失,推荐使用持久化消息,或者在必要时开启事务,确保消息被正确处理。对于非持久化消息...
WebSocket与ActiveMQ的结合,使得实时、低延迟的数据交换成为可能,特别是对于需要实时推送消息到客户端的应用来说,这是一个非常理想的选择。 WebSocket协议的核心特点包括: 1. 长连接:一旦WebSocket连接建立,...
8. **社区资源**:描述中提到的博客链接是IT社区分享的资源,说明开发者可以借助社区的力量解决问题,学习最佳实践。 综上所述,了解和实现"ActiveMQ5.0 监视的JSP支持中文"涉及了Web开发、Java编程、消息中间件的...
activemq书籍及工具 activemq书籍及工具 activemq书籍及工具 activemq书籍及工具 activemq书籍及工具
在本示例中,"activemq ajax方式demo"是一个使用Ajax技术与ActiveMQ进行交互的简单应用演示,主要目的是解决在Web通信过程中可能出现的中文乱码问题。让我们详细了解一下这个知识点。 首先,ActiveMQ是Apache软件...
为了解决上述问题,本节将详细介绍问题产生的原因及可能的解决策略。 ##### 2.1 问题根源分析 - **Failover机制的本质**:Failover机制的核心在于实现客户端和服务端之间的动态切换,确保即使某一个服务端节点不可...
ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它遵循开放消息传递标准(JMS,Java Message Service),用于在分布式系统中实现可靠的消息传递。在本文中,我们将深入探讨ActiveMQ v6.0.1的核心特性、应用...
ActiveMQ是中国最流行的开源消息中间件之一,它基于Java Message Service (JMS) 规范,为分布式系统提供高效、可靠的...通过实际操作和不断实践,你可以更好地理解和利用ActiveMQ的特性,解决复杂系统中的通信问题。
在ActiveMQ中,消息传递是基于Java Message Service (JMS)规范的,它定义了不同种类的消息类型,以便于在应用程序之间传输数据。以下是JMS规范中的五种主要消息类型,以及它们在ActiveMQ中的应用: 1. **...
ActiveMQ开发规范及方案 ActiveMQ是一种流行的开源消息队列 middleware,广泛应用于分布式系统中。作为一种消息队列 middleware,ActiveMQ提供了许多功能,例如支持多种消息协议、事务支持、持久化机制等。为了确保...
在生产环境中,jmx监控是非常重要的,可以帮助开发者和运维人员实时监控ActiveMQ的运行状态,快速检测和解决问题。同时,jmx监控也可以与其他监控工具集成,提供更加全面的监控功能。 jmx监控ActiveMQ监控是一种...
ActiveMQ是中国最流行的开源消息中间件之一,由Apache软件基金会开发。它基于Java Message Service (JMS) 规范,提供了可靠的消息传递功能,适用于分布式系统中的应用间通信。本压缩包“activeMQ收发工具.rar”包含...