召唤activeMQ大神!最近由于项目需要,使用activemq5.10.0,但是发现了丢失数据的现象,特来求证。
需求如下:
需求1,队列之间资源独立,不能因为某个队列消费缓慢导致其生产者生产的消息累计,将内存/硬盘耗尽使其它队列不能使用,所以需要配置指定每个队列的资源大小,互不影响;
需求2,使用异步发送方式,加快生产消息速度,经测试持久化消息(默认同步发送)与非持久化消息(默认异步发送)速度相差一个数量级,非持久化方式符合需求;
需求3,使用非持久化消息(异步发送方式),在activeMQ服务器不出问题的情况下,不允许丢失数据。
测试发现的问题:
异步发送非持久化信息的时候,使用内存游标方式<vmQueueCursor>将非持久化消息放到内存中,并使用流量控制,配置单个队列最大内存大小为50MB,在<systemUsage>中配置sendFailIfNoSpaceAfterTimeOut=5000,在单个队列内存达到限制时(此时没有开启消费者)按理生产者应该报内存不足异常或者阻塞,但是测得生产者没有报异常,只是产生阻塞,且继续缓慢发送,为啥?
如果接着开启消费者,按理消息将被消费,内存得到缓解,生产者会继续发送消息,但是测得最终收到的消息与发送的消息总数不等,消息丢失,如何解决?
分析如下:
当使用异步发送消息(非持久化消息默认使用异步发送)时,activeMQ可能不会进行ack确认,导致生产者无法得知队列内存耗尽无法报错,但是不应该出现消息丢失啊,难道内存用尽时存在处理BUG导致消息丢失?
进一步分析:
依据以上分析,当队列内存到达限制(50MB),生产者会阻塞但继续缓慢发送消息直至最终完全堵塞,查询资料使用setProducerWindowSize()对生产者“获得ack前”能够发送的消息量进行限制,推测此功能可打开异步发送批量ack确认功能,这里设置为setProducerWindowSize(1024)。从新测试,当生产者消息堵塞,然后开启消费者消费消息,按理说内存会得到恢复,生产者继续发送消息,但是测得生产者不能继续发送消息,生产者和消费者均不能允许貌似互锁,为啥?
本人一直尝试各种配置,并未找到一种高效并且不会丢失消息的方案,因此求助版面各位大神推荐一种好的配置,难道是activeMQ的BUG?
如果是BUG请英文好的大神帮忙提交到activeMQ开源社区,万分感谢,急等中ing。
附上测试程序与配置文件:
生产者代码:(运行时开启一个线程调用它即可)
public class Sender implements Runnable { int threadId; public Sender(int threadId) { this.threadId = threadId; } @Override public void run() { try { String user = "admin"; String password = "activemq"; String url = "tcp://192.168.18.151:61616"; ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( user, password, url); //配置AlwaysSyncSend,非持久化在资源耗尽的时候将得到通知,虽然速度会变慢. connectionFactory.setProducerWindowSize(1024); connectionFactory.setSendAcksAsync(true);//这个方法有什么用? // connectionFactory.setAlwaysSyncSend(true);//同步 // connectionFactory.setProducerWindowSize(1024*1024); connectionFactory.setUseAsyncSend(true);//异步 Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("test."+this.threadId); MessageProducer producer = session.createProducer(destination); Date date = new Date(); long t1 = date.getTime(); System.out.println("connThread"+threadId+" 开始时间:"+new Date()); int p = 0; producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); for (int i = 0; i < 100000; i++) { TextMessage message = session.createTextMessage(); message.setText(Integer.toString(i)); //Thread.sleep(1); try { producer.send(message); p++; if(p%1000 == 0){ // session.commit(); System.out.println("connThread"+threadId+" 已发送:"+p+"条,用时:"+(new Date().getTime() - t1)); } // System.out.println("connThread"+threadId+" 已发送:"+p+"条,用时:"+(new Date().getTime() - t1)); } catch (Exception e) { e.printStackTrace(); } // session.commit(); //System.out.println("--发送消息:" + date); } System.out.println("connThread"+threadId+" 共发送"+p+"条,结束时间:"+new Date()); // System.out.println("connThread"+threadId+" 共用时:"+(new Date().getTime() - t1)); //session.commit(); session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } }
消费者代码:
public class ConnConsumer { public static int threadId = 0; public static void main(String[] args) throws JMSException, InterruptedException{ for(int i=0; i<15; i++){ ConnConsumer.receive(threadId++); } Thread.sleep(3*24*60*60*1000); } public static void receive(int threadId) throws InterruptedException{ try { String user = "admin"; String password = "activemq"; String url = "tcp://192.168.18.151:61616"; // ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( user, password, url); //异步分发,当使用listener时是异步,receive是同步 // connectionFactory.setDispatchAsync(true); Connection connection = connectionFactory.createConnection(); connection.start(); final Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("TEST."+threadId); MessageConsumer message = session.createConsumer(destination); message.setMessageListener(new MsgListener(threadId)); //Thread.sleep(3*24*60*60*1000); //session.close(); //connection.close(); } catch (JMSException e) { e.printStackTrace(); } } }
public class MsgListener implements MessageListener { int threadId; int count = 0; Date date = new Date(); long t1 = date.getTime(); int sum = 0; public MsgListener(int threadId) { System.out.println("threadId:"+threadId+"启动监听"); this.threadId = threadId; } @Override public void onMessage(Message msg) { TextMessage message = (TextMessage) msg; // try { // sum += Integer.parseInt(message.getText()); // } catch (NumberFormatException e) { // // TODO Auto-generated catch block // e.printStackTrace(); // } catch (JMSException e) { // // TODO Auto-generated catch block // e.printStackTrace(); // } //System.out.println("--收到消息:" + new Date(message.getLong("count"))); count ++; if(count%1000 == 0){ System.out.println("connThread"+threadId+" 接收:"+count+"条,用时:"+(new Date().getTime() - t1)); }else if(count == 1){ System.out.println("connThread"+threadId+" 开始时间:"+new Date()); } // System.out.println("sum is " + sum); //session.commit(); } }
配置文件:
…. <policyEntry queue=">" producerFlowControl="true" memoryLimit="50mb"> <!-- 使用VM游标,非持久化消息放在内存中,达到指定内存后停掉生产者--> <pendingQueuePolicy> <vmQueueCursor/> <pendingQueuePolicy> </policyEntry> …. <systemUsage> <!-- 当资源耗尽时,生产者阻塞5秒后,API报错--> <systemUsage sendFailIfNoSpaceAfterTimeOut="5000"> <memoryUsage> <memoryUsage percentOfJvmHeap="70" /> </memoryUsage> <storeUsage> <storeUsage limit="100 gb"/> </storeUsage> <tempUsage> <tempUsage limit="50 gb"/> </tempUsage> </systemUsage> </systemUsage> …