论坛首页 Java企业应用论坛

召唤activeMQ大神,数据丢失!!!!!!

浏览 6403 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
作者 正文
   发表时间:2015-03-06  

 

召唤activeMQ大神!最近由于项目需要,使用activemq5.10.0,但是发现了丢失数据的现象,特来求证。

 

需求如下:

需求1,队列之间资源独立,不能因为某个队列消费缓慢导致其生产者生产的消息累计,将内存/硬盘耗尽使其它队列不能使用,所以需要配置指定每个队列的资源大小,互不影响;

需求2,使用异步发送方式,加快生产消息速度,经测试持久化消息(默认同步发送)与非持久化消息(默认异步发送)速度相差一个数量级,非持久化方式符合需求;

需求3,使用非持久化消息(异步发送方式),在activeMQ服务器不出问题的情况下,不允许丢失数据。

 

测试发现的问题:

异步发送非持久化信息的时候,使用内存游标方式<vmQueueCursor>将非持久化消息放到内存中,并使用流量控制,配置单个队列最大内存大小为50MB,在<systemUsage>中配置sendFailIfNoSpaceAfterTimeOut=5000,在单个队列内存达到限制时(此时没有开启消费者)按理生产者应该报内存不足异常或者阻塞,但是测得生产者没有报异常,只是产生阻塞,且继续缓慢发送,为啥?

如果接着开启消费者,按理消息将被消费,内存得到缓解,生产者会继续发送消息,但是测得最终收到的消息与发送的消息总数不等,消息丢失,如何解决?

 

分析如下:

当使用异步发送消息(非持久化消息默认使用异步发送)时,activeMQ可能不会进行ack确认,导致生产者无法得知队列内存耗尽无法报错,但是不应该出现消息丢失啊,难道内存用尽时存在处理BUG导致消息丢失?

 

进一步分析:

依据以上分析,当队列内存到达限制(50MB),生产者会阻塞但继续缓慢发送消息直至最终完全堵塞,查询资料使用setProducerWindowSize()对生产者“获得ack前”能够发送的消息量进行限制,推测此功能可打开异步发送批量ack确认功能,这里设置为setProducerWindowSize(1024)。从新测试,当生产者消息堵塞,然后开启消费者消费消息,按理说内存会得到恢复,生产者继续发送消息,但是测得生产者不能继续发送消息,生产者和消费者均不能允许貌似互锁,为啥?

 

本人一直尝试各种配置,并未找到一种高效并且不会丢失消息的方案,因此求助版面各位大神推荐一种好的配置,难道是activeMQBUG

 

如果是BUG请英文好的大神帮忙提交到activeMQ开源社区,万分感谢,急等中ing

 

附上测试程序与配置文件:

 

生产者代码:(运行时开启一个线程调用它即可)

 

public class Sender implements Runnable {
	int threadId;
	public Sender(int threadId) {
		this.threadId = threadId;
	}

	@Override
	public void run() {
		try {
			String user = "admin";
			String password = "activemq";
			String url = "tcp://192.168.18.151:61616";
			ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( user, password, url);
			
			//配置AlwaysSyncSend,非持久化在资源耗尽的时候将得到通知,虽然速度会变慢.
			connectionFactory.setProducerWindowSize(1024);
			connectionFactory.setSendAcksAsync(true);//这个方法有什么用?
//			connectionFactory.setAlwaysSyncSend(true);//同步
//			connectionFactory.setProducerWindowSize(1024*1024);
			connectionFactory.setUseAsyncSend(true);//异步
			Connection connection = connectionFactory.createConnection();
			connection.start();
			Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
			Destination destination = session.createQueue("test."+this.threadId);
			MessageProducer producer = session.createProducer(destination);
			Date date = new Date();
			long t1 = date.getTime();
			System.out.println("connThread"+threadId+" 开始时间:"+new Date());
			int p = 0;
			producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
			for (int i = 0; i < 100000; i++) {
				TextMessage message = session.createTextMessage();
				message.setText(Integer.toString(i));
				//Thread.sleep(1);
				try {
					producer.send(message);
					p++;
					if(p%1000 == 0){
//						session.commit();
						System.out.println("connThread"+threadId+" 已发送:"+p+"条,用时:"+(new Date().getTime() - t1));
					}
//					System.out.println("connThread"+threadId+" 已发送:"+p+"条,用时:"+(new Date().getTime() - t1));
				} catch (Exception e) {
					e.printStackTrace();
				}
//				session.commit();
				
				//System.out.println("--发送消息:" + date);
			}
			
			System.out.println("connThread"+threadId+" 共发送"+p+"条,结束时间:"+new Date());
//			System.out.println("connThread"+threadId+" 共用时:"+(new Date().getTime() - t1));
			//session.commit();
			session.close();
			connection.close();
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

}

 消费者代码:

public class ConnConsumer {
	public static int threadId = 0;
	public static void main(String[] args) throws JMSException, InterruptedException{
		for(int i=0; i<15; i++){
			ConnConsumer.receive(threadId++);
		}
		Thread.sleep(3*24*60*60*1000);
	}
	
	public static void receive(int threadId) throws InterruptedException{
		try {
			String user = "admin";
			String password = "activemq";
			String url = "tcp://192.168.18.151:61616";  //
			ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( user, password, url);
			//异步分发,当使用listener时是异步,receive是同步
//			connectionFactory.setDispatchAsync(true);
			Connection connection = connectionFactory.createConnection();
			connection.start();
			final Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
			Destination destination = session.createQueue("TEST."+threadId);
			MessageConsumer message = session.createConsumer(destination);
			message.setMessageListener(new MsgListener(threadId));
			//Thread.sleep(3*24*60*60*1000);
			//session.close();
			//connection.close();
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}
}

 

public class MsgListener implements MessageListener {
	int threadId;
	int count = 0;
	Date date = new Date();
	long t1 = date.getTime();
	
	int sum = 0;
	
	public MsgListener(int threadId) {
		System.out.println("threadId:"+threadId+"启动监听");
		this.threadId = threadId;
	}

	@Override
	public void onMessage(Message msg) {
		TextMessage message = (TextMessage) msg;
//		try {
//			sum += Integer.parseInt(message.getText());
//		} catch (NumberFormatException e) {
//			// TODO Auto-generated catch block
//			e.printStackTrace();
//		} catch (JMSException e) {
//			// TODO Auto-generated catch block
//			e.printStackTrace();
//		}
		//System.out.println("--收到消息:" + new Date(message.getLong("count")));
		
		count ++;
		if(count%1000 == 0){
			System.out.println("connThread"+threadId+" 接收:"+count+"条,用时:"+(new Date().getTime() - t1));
		}else if(count == 1){
			System.out.println("connThread"+threadId+" 开始时间:"+new Date());
		}
//		System.out.println("sum is " + sum);
		//session.commit();
	}

}

  配置文件:

 

….
<policyEntry queue=">" producerFlowControl="true" memoryLimit="50mb">
		<!-- 使用VM游标,非持久化消息放在内存中,达到指定内存后停掉生产者-->
                <pendingQueuePolicy>
			<vmQueueCursor/>
		<pendingQueuePolicy>
</policyEntry>
….
<systemUsage>
		<!-- 当资源耗尽时,生产者阻塞5秒后,API报错-->
            <systemUsage sendFailIfNoSpaceAfterTimeOut="5000">
                <memoryUsage>
                    <memoryUsage percentOfJvmHeap="70" />
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="100 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="50 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>
…

 

 

   发表时间:2015-03-10  
上传个工程,这样别人调试也方便
0 请登录后投票
   发表时间:2015-03-11  
如果使用异步消息的方式,setProducerWindowSize配置应该是无效的,可以在官方文档在核实一下.


消息丢失,除开MQ本身存储的环节,也有可能是在消费端未成功处理导致的。
0 请登录后投票
   发表时间:2015-03-12  
楼主用activeMQ实现什么功能
0 请登录后投票
   发表时间:2015-03-13  
MQ的话解决不了 丢失问题吧,服务应该是不保证不丢失的.
0 请登录后投票
   发表时间:2015-03-18   最后修改:2015-03-18
设置connection的exceptionlistener就可以了
0 请登录后投票
论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics