`

ActiveMQ中ActiveMQBytesMessage类型可能会丢失数据的问题及解决

 
阅读更多

ActiveMQBytesMessage类型的消息在特殊情况下会丢失数据,就是在被拷贝前设置消息的某个属性。下面是测试代码:

 producer代码

MessageProducer producer;
//initialize Connection, Session, Producer
......

byte[] bs = "bytes message".getBytes();
BytesMessage message = session.createBytesMessage();
message.writeBytes(bs);

for(int i=0; i< 10; i++){
    message.setLongProperty("sendTime", System.currentTimeMillis());
    try{
        producer.send(message);
    }catch(Exception e){
         e.printStackTrace();
    }
    
}

 Consumer代码

MessageConsumer consumer
//initailize Connection, Session, MessageConsumer
for(int i=0; i<10; i++){
    ActiveMQBytesMessage message = (ActiveMQBytesMessage)consumer.receive(60*1000);
    long sendTime = message.getLongProperty("sendTime");
    System.out.println("sendtime:" + sendTime);
    ByteSequence bs = message.getMessage().getContent();
    System.out.println("bytes data:" + new String(bs.getData()));
}

 

期待的结果:
consumer在所有接收到的消息中得到bytes数据

 

实际结果:
只有第一条消息有bytes数据,其他消息都丢失了bytes数据,而long property的值都没有丢失。

 

分析:
ActiveMQ在发送消息的时候会拷贝消息,实际发送的是消息的拷贝。在拷贝之前会调用storeContent() ,ActiveMQBytesMessage中的属性DataOutputStream dataOut 会被关闭,dataOut 会被置为null,dataOut中的值会被set到content属性中。如果不设置消息的属性,这个逻辑是没有问题的。当设置消息属性时,setLongProperty 方法中会调用setObjectProperty() ,然后调用initializeWriting(),在initializeWriting()中DataOutputStream dataOut会再次被创建。这样,当消息第二次被拷贝的时候,DataOutputStream dataOut不是null,而是EMPTY。由于不是null,dataOut的值会被set到content,这样content的值就被清空了。

 

根据JMS规范的要求:

 3.9 Access to Sent Messages
 After sending a message, a client may retain and modify it without affecting the message 
that has been sent. The same message object may be sent multiple times. During the 
execution of its sending method, the message must not be changed by the client. If it is 
modified, the result of the send is undefined.

 

消息发送后,原消息不应该受到影响。根据这个要求,目前的实现是个bug

建议:

 

在initializeWriting()中把content的数据设置到DataOutputStream dataOut中,这样就保持消息中bytes数据不变。

 

我的fix代码

 

ActiveMQBytesMessage :
private void initializeWriting() throws JMSException {
669 The original code
......
701 
//fix code
if(this.content !=null && this.content.length >0){
try
{ this.dataOut.write(this.content.getData()); }
catch(IOException ioe)
{ throw JMSExceptionSupport.create(ioe); }
}
702 }

 

 这个bug 已经提交给APACHE,还没有被修复。

 

更新:该bug已经被修复,最新的Patch:

diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
index 6de35aa..923e0e1 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
@@ -856,6 +856,42 @@
             }
             this.dataOut = new DataOutputStream(os);
         }
+
+        restoreOldContent();
+    }
+
+    private void restoreOldContent() throws JMSException {
+        // For a message that already had a body and was sent we need to restore the content
+        // if the message is used again without having its clearBody method called.
+        if (this.content != null && this.content.length > 0) {
+            try {
+                ByteSequence toRestore = this.content;
+                if (compressed) {
+                    InputStream is = new ByteArrayInputStream(toRestore);
+                    int length = 0;
+                    try {
+                        DataInputStream dis = new DataInputStream(is);
+                        length = dis.readInt();
+                        dis.close();
+                    } catch (IOException e) {
+                        throw JMSExceptionSupport.create(e);
+                    }
+                    is = new InflaterInputStream(is);
+                    DataInputStream input = new DataInputStream(is);
+
+                    byte[] buffer = new byte[length];
+                    input.readFully(buffer);
+                    toRestore = new ByteSequence(buffer);
+                }
+
+                this.dataOut.write(toRestore.getData(), toRestore.getOffset(), toRestore.getLength());
+                // Free up the buffer from the old content, will be re-written when
+                // the message is sent again and storeContent() is called.
+                this.content = null;
+            } catch (IOException ioe) {
+                throw JMSExceptionSupport.create(ioe);
+            }
+        }
     }
 
     protected void checkWriteOnlyBody() throws MessageNotReadableException {
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
index f9dda6c..e809d03 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
@@ -118,6 +118,7 @@
     protected transient DataInputStream dataIn;
     protected transient int remainingBytes = -1;
 
+    @Override
     public Message copy() {
         ActiveMQStreamMessage copy = new ActiveMQStreamMessage();
         copy(copy);
@@ -132,6 +133,7 @@
         copy.dataIn = null;
     }
 
+    @Override
     public void onSend() throws JMSException {
         super.onSend();
         storeContent();
@@ -151,10 +153,12 @@
         }
     }
 
+    @Override
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
     }
 
+    @Override
     public String getJMSXMimeType() {
         return "jms/stream-message";
     }
@@ -171,6 +175,7 @@
      *                 due to some internal error.
      */
 
+    @Override
     public void clearBody() throws JMSException {
         super.clearBody();
         this.dataOut = null;
@@ -191,6 +196,7 @@
      * @throws MessageNotReadableException if the message is in write-only mode.
      */
 
+    @Override
     public boolean readBoolean() throws JMSException {
         initializeReading();
         try {
@@ -233,6 +239,7 @@
      * @throws MessageNotReadableException if the message is in write-only mode.
      */
 
+    @Override
     public byte readByte() throws JMSException {
         initializeReading();
         try {
@@ -282,6 +289,7 @@
      * @throws MessageNotReadableException if the message is in write-only mode.
      */
 
+    @Override
     public short readShort() throws JMSException {
         initializeReading();
         try {
@@ -335,6 +343,7 @@
      * @throws MessageNotReadableException if the message is in write-only mode.
      */
 
+    @Override
     public char readChar() throws JMSException {
         initializeReading();
         try {
@@ -382,6 +391,7 @@
      * @throws MessageNotReadableException if the message is in write-only mode.
      */
 
+    @Override
     public int readInt() throws JMSException {
         initializeReading();
         try {
@@ -438,6 +448,7 @@
      * @throws MessageNotReadableException if the message is in write-only mode.
      */
 
+    @Override
     public long readLong() throws JMSException {
         initializeReading();
         try {
@@ -496,6 +507,7 @@
      * @throws MessageNotReadableException if the message is in write-only mode.
      */
 
+    @Override
     public float readFloat() throws JMSException {
         initializeReading();
         try {
@@ -544,6 +556,7 @@
      * @throws MessageNotReadableException if the message is in write-only mode.
      */
 
+    @Override
     public double readDouble() throws JMSException {
         initializeReading();
         try {
@@ -596,6 +609,7 @@
      * @throws MessageNotReadableException if the message is in write-only mode.
      */
 
+    @Override
     public String readString() throws JMSException {
         initializeReading();
         try {
@@ -696,6 +710,7 @@
      * @see #readObject()
      */
 
+    @Override
     public int readBytes(byte[] value) throws JMSException {
 
         initializeReading();
@@ -769,6 +784,7 @@
      * @see #readBytes(byte[] value)
      */
 
+    @Override
     public Object readObject() throws JMSException {
         initializeReading();
         try {
@@ -849,6 +865,7 @@
      * @throws MessageNotWriteableException if the message is in read-only mode.
      */
 
+    @Override
     public void writeBoolean(boolean value) throws JMSException {
         initializeWriting();
         try {
@@ -867,6 +884,7 @@
      * @throws MessageNotWriteableException if the message is in read-only mode.
      */
 
+    @Override
     public void writeByte(byte value) throws JMSException {
         initializeWriting();
         try {
@@ -885,6 +903,7 @@
      * @throws MessageNotWriteableException if the message is in read-only mode.
      */
 
+    @Override
     public void writeShort(short value) throws JMSException {
         initializeWriting();
         try {
@@ -903,6 +922,7 @@
      * @throws MessageNotWriteableException if the message is in read-only mode.
      */
 
+    @Override
     public void writeChar(char value) throws JMSException {
         initializeWriting();
         try {
@@ -921,6 +941,7 @@
      * @throws MessageNotWriteableException if the message is in read-only mode.
      */
 
+    @Override
     public void writeInt(int value) throws JMSException {
         initializeWriting();
         try {
@@ -939,6 +960,7 @@
      * @throws MessageNotWriteableException if the message is in read-only mode.
      */
 
+    @Override
     public void writeLong(long value) throws JMSException {
         initializeWriting();
         try {
@@ -957,6 +979,7 @@
      * @throws MessageNotWriteableException if the message is in read-only mode.
      */
 
+    @Override
     public void writeFloat(float value) throws JMSException {
         initializeWriting();
         try {
@@ -975,6 +998,7 @@
      * @throws MessageNotWriteableException if the message is in read-only mode.
      */
 
+    @Override
     public void writeDouble(double value) throws JMSException {
         initializeWriting();
         try {
@@ -993,6 +1017,7 @@
      * @throws MessageNotWriteableException if the message is in read-only mode.
      */
 
+    @Override
     public void writeString(String value) throws JMSException {
         initializeWriting();
         try {
@@ -1019,6 +1044,7 @@
      * @throws MessageNotWriteableException if the message is in read-only mode.
      */
 
+    @Override
     public void writeBytes(byte[] value) throws JMSException {
         writeBytes(value, 0, value.length);
     }
@@ -1039,6 +1065,7 @@
      * @throws MessageNotWriteableException if the message is in read-only mode.
      */
 
+    @Override
     public void writeBytes(byte[] value, int offset, int length) throws JMSException {
         initializeWriting();
         try {
@@ -1062,6 +1089,7 @@
      * @throws MessageNotWriteableException if the message is in read-only mode.
      */
 
+    @Override
     public void writeObject(Object value) throws JMSException {
         initializeWriting();
         if (value == null) {
@@ -1102,6 +1130,7 @@
      * @throws JMSException if an internal error occurs
      */
 
+    @Override
     public void reset() throws JMSException {
         storeContent();
         this.bytesOut = null;
@@ -1111,7 +1140,7 @@
         setReadOnlyBody(true);
     }
 
-    private void initializeWriting() throws MessageNotWriteableException {
+    private void initializeWriting() throws JMSException {
         checkReadOnlyBody();
         if (this.dataOut == null) {
             this.bytesOut = new ByteArrayOutputStream();
@@ -1122,6 +1151,19 @@
                 os = new DeflaterOutputStream(os);
             }
             this.dataOut = new DataOutputStream(os);
+        }
+
+        // For a message that already had a body and was sent we need to restore the content
+        // if the message is used again without having its clearBody method called.
+        if (this.content != null && this.content.length > 0) {
+            try {
+                this.dataOut.write(this.content.getData(), this.content.getOffset(), this.content.getLength());
+                // Free up the buffer from the old content, will be re-written when
+                // tbe message is sent again and storeContent() is called.
+                this.content = null;
+            } catch (IOException ioe) {
+                throw JMSExceptionSupport.create(ioe);
+            }
         }
     }
 
@@ -1153,6 +1195,7 @@
         super.compress();
     }
 
+    @Override
     public String toString() {
         return super.toString() + " ActiveMQStreamMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }";
     }

 

 

分享到:
评论

相关推荐

    ActiveMQ问题解决记录

    这篇博客"ActiveMQ问题解决记录"可能涵盖了作者在使用ActiveMQ过程中遇到的问题以及解决这些问题的经验分享。 首先,让我们了解一下ActiveMQ的基本概念。ActiveMQ作为消息中间件,它充当应用程序之间的桥梁,负责...

    ActiveMQ消息过期时间设置和自动清除解决方案

    ### ActiveMQ 消息过期时间设置与自动清除解决方案 #### 概述 在消息队列的场景下,为了防止消息长时间滞留在队列中占用资源或者为了满足业务上对消息时效性的需求,通常需要对消息设定过期时间。本文档详细介绍了...

    采用Spring整合activeMQ与quartz的JMS数据同步实例

    4. **实现数据同步逻辑**:在Quartz的Job类中,编写数据同步的业务逻辑,可能包括从数据库获取新数据、通过JMS模板将数据发送到ActiveMQ队列,以及在接收端监听队列并处理接收到的数据。 5. **测试与部署**:完成...

    activemq问题总结

    本文将围绕ActiveMQ常见的问题进行总结,以帮助开发者更好地理解和解决实际操作中的问题。 **一、ActiveMQ基本概念** 1. **生产者(Producer)**:发布消息到消息队列的客户端。 2. **消费者(Consumer)**:从消息...

    ActiveMQ(中文)参考手册

    ActiveMQ(中文)参考手册 ActiveMQ(中文)参考手册 ActiveMQ(中文)参考手册 ActiveMQ(中文)参考手册

    ActiveMQ高并发处理方案

    在使用Spring框架集成ActiveMQ时,可能会遇到队列中积压了大量数据但只有一个消费者在处理的情况。这通常是因为ActiveMQ默认的预取策略导致数据分布不均匀。 ##### 原因分析 ActiveMQ使用了一种称为“预取策略”的...

    ActiveMQ消息中间件面试专题1

    总结来说,ActiveMQ 提供了一套强大的消息传递机制,但需要对它的特性和机制有深入理解,才能有效地预防和解决可能出现的问题。通过理解服务器宕机时的数据存储策略,处理丢消息的策略,优化持久化消息的发送,调整...

    ACTIVEMQ C#下的例子

    描述中的"activemq 传送数据流发送文件"意味着我们将文件的内容转换为数据流,并通过ActiveMQ进行传输。这涉及到文件的读取和流操作,如使用`FileStream`读取文件,然后将`FileStream`转换为`MemoryStream`,最后将`...

    7道消息队列ActiveMQ面试题!

    在使用ActiveMQ的过程中,可能会遇到消息丢失的问题,尤其是在网络不稳定或者客户端/服务器异常关闭的情况下。为了避免消息丢失,推荐使用持久化消息,或者在必要时开启事务,确保消息被正确处理。对于非持久化消息...

    WebSocket协议接收ActiveMQ

    WebSocket与ActiveMQ的结合,使得实时、低延迟的数据交换成为可能,特别是对于需要实时推送消息到客户端的应用来说,这是一个非常理想的选择。 WebSocket协议的核心特点包括: 1. 长连接:一旦WebSocket连接建立,...

    ActiveMQ5.0 监视的JSP支持中文

    8. **社区资源**:描述中提到的博客链接是IT社区分享的资源,说明开发者可以借助社区的力量解决问题,学习最佳实践。 综上所述,了解和实现"ActiveMQ5.0 监视的JSP支持中文"涉及了Web开发、Java编程、消息中间件的...

    activemq书籍及工具

    activemq书籍及工具 activemq书籍及工具 activemq书籍及工具 activemq书籍及工具 activemq书籍及工具

    activemq ajax方式demo

    在本示例中,"activemq ajax方式demo"是一个使用Ajax技术与ActiveMQ进行交互的简单应用演示,主要目的是解决在Web通信过程中可能出现的中文乱码问题。让我们详细了解一下这个知识点。 首先,ActiveMQ是Apache软件...

    ActiveMQ_使用failover模式进行连接切换时,线程断开

    为了解决上述问题,本节将详细介绍问题产生的原因及可能的解决策略。 ##### 2.1 问题根源分析 - **Failover机制的本质**:Failover机制的核心在于实现客户端和服务端之间的动态切换,确保即使某一个服务端节点不可...

    ActiveMQ消息服务器 v6.0.1.zip

    ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它遵循开放消息传递标准(JMS,Java Message Service),用于在分布式系统中实现可靠的消息传递。在本文中,我们将深入探讨ActiveMQ v6.0.1的核心特性、应用...

    ActiveMQ入门及深入使用的例子

    ActiveMQ是中国最流行的开源消息中间件之一,它基于Java Message Service (JMS) 规范,为分布式系统提供高效、可靠的...通过实际操作和不断实践,你可以更好地理解和利用ActiveMQ的特性,解决复杂系统中的通信问题。

    ActiveMQ常见消息类型.docx

    在ActiveMQ中,消息传递是基于Java Message Service (JMS)规范的,它定义了不同种类的消息类型,以便于在应用程序之间传输数据。以下是JMS规范中的五种主要消息类型,以及它们在ActiveMQ中的应用: 1. **...

    ActiveMQ开发规范及方案

    ActiveMQ开发规范及方案 ActiveMQ是一种流行的开源消息队列 middleware,广泛应用于分布式系统中。作为一种消息队列 middleware,ActiveMQ提供了许多功能,例如支持多种消息协议、事务支持、持久化机制等。为了确保...

    jmx监控activeMQ监控

    在生产环境中,jmx监控是非常重要的,可以帮助开发者和运维人员实时监控ActiveMQ的运行状态,快速检测和解决问题。同时,jmx监控也可以与其他监控工具集成,提供更加全面的监控功能。 jmx监控ActiveMQ监控是一种...

    activeMQ收发工具.rar

    ActiveMQ是中国最流行的开源消息中间件之一,由Apache软件基金会开发。它基于Java Message Service (JMS) 规范,提供了可靠的消息传递功能,适用于分布式系统中的应用间通信。本压缩包“activeMQ收发工具.rar”包含...

Global site tag (gtag.js) - Google Analytics