论坛首页 Java企业应用论坛

ActiveMQ使用连接池后程序无法退出

浏览 9809 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
作者 正文
   发表时间:2013-01-12   最后修改:2013-01-12
程序无法推出,代码中没有任何死循环代码,但实际线程却无法退出,运行过程是死循环?

环境:
OS - win7 64bit
JDK-7.005-32bit
ActiveMQ 5.7.0

类库:
activemq-all-5.7.0.jar
activemq-pool-5.7.0.jar
commons-pool-1.6.jar

1、连接池代码
import javax.jms.JMSException;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.pool.PooledConnection;
import org.apache.activemq.pool.PooledConnectionFactory;

/**
 * <pre>
 * <b>功能描述:</b>管理ActiveMQ的连接池
 * 
 * @author :<br>
 * 
 * <b>修改历史:</b>(修改人,修改时间,修改原因/内容)
 * 
 * </pre>
 */
public final class ActiveMQPoolsUtil {

	/**
	 * 连接
	 */
	private static PooledConnection connection;

	/**
	 * 
	 * <b>构造函数:</b>
	 * 
	 */
	private ActiveMQPoolsUtil() {

	}

	// 初始化连接池等工作
	static {
		String url = "failover:(tcp://127.0.0.1:61616)?initialReconnectDelay=1000";
		ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
		activeMQConnectionFactory.setUserName("system");
		activeMQConnectionFactory.setPassword("manage");
		activeMQConnectionFactory.setBrokerURL(url);

		try {

			PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(
					activeMQConnectionFactory);

			// session数
			int maximumActive = 200;

	                pooledConnectionFactory.setMaximumActiveSessionPerConnection(maximumActive);
			pooledConnectionFactory.setIdleTimeout(120);
			pooledConnectionFactory.setMaxConnections(5);
			pooledConnectionFactory.setBlockIfSessionPoolIsFull(true);
			connection = (PooledConnection) pooledConnectionFactory.createConnection();
			// 必须start,否则无法接收消息
			connection.start();
		} catch (JMSException e) {

			e.printStackTrace();
		}
	}

	/**
	 * 
	 * <pre>
	 * <b>功能描述:</b>关闭连接
	 * 
	 * @author :
	 * <b>创建日期 :</b>2013-1-9 上午10:56:23
	 * 
	 * <b>修改历史:</b>(修改人,修改时间,修改原因/内容)
	 * 
	 * </pre>
	 */
	public static void close() {

		try {

			if (connection != null) {

				connection.close();
			}
		} catch (JMSException e) {

			e.printStackTrace();
		}
	}

	/**
	 * 
	 * <pre>
	 * <b>功能描述:</b>获取一个连接
	 * 
	 * @author :
	 * <b>创建日期 :</b>2013-1-9 上午10:56:39
	 * 
	 * @return PooledConnection 返回一个连接实例
	 * 
	 * <b>修改历史:</b>(修改人,修改时间,修改原因/内容)
	 * 
	 * </pre>
	 */
	public static PooledConnection getConnection() {

		return connection;
	}

	/**
	 * 
	 * <pre>
	 * <b>功能描述:</b>设置连接
	 * 
	 * @author :
	 * <b>创建日期 :</b>2013-1-9 上午10:57:01
	 * 
	 * @param connection 设置连接
	 * 
	 * </pre>
	 */
	public static void setConnection(PooledConnection connection) {

		ActiveMQPoolsUtil.connection = connection;
	}
}



2、生产者代码:

import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import com.kevin.mq.apache.ok.pool.ActiveMQPoolsUtil;

/**
 * 
 * <pre>
 * <b>功能描述:</b>创建session及Topic类型的producer.
 * 
 * @author :<br>
 * 
 * <b>修改历史:</b>(修改人,修改时间,修改原因/内容)
 * 
 * </pre>
 */
public class ActiveMQTopicProducer extends Thread {

	/**
	 * 目标主题的名称
	 */
	private String topicName;

	public ActiveMQTopicProducer(String topicName) {

		this.topicName = topicName;
	}

	public void run() {

		Session session = null;
		MessageProducer producer = null;

		try {

			session = ActiveMQPoolsUtil.getConnection().createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

			Destination destination = session.createTopic(this.topicName);

			System.out.println("session info ->" + session);

			producer = session.createProducer(destination);
			producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
			TextMessage message = session.createTextMessage("hello message!");
			producer.send(message);
			System.out.println("meassage sended out!");

                       // 使用完必须要关闭,否则消息会被阻塞,无法发送出去
			session.close();
			ActiveMQPoolsUtil.close();

			while (this.isAlive()) {
				System.out.println("Thread - " + this.getName()
						+ " still alive.......");
				Thread.sleep(1000);
			}
			
			// 必须显示调用才能推出程序
			// System.exit(0);
		} catch (JMSException e) {

			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}


3、测试代码:
public class TestMain {
	public static void main(String[] args) {
		// for (int i = 0; i < 200; i++) {
		(new ActiveMQQueueProducer("kevin")).start();
		// }
	}
}


程序中没有任何死循环的代码。正常结果是程序发送完消息后,线程就结束了,程序退出。
但实际是线程一直在允许,并不释放连接。调用session.close;connection.close都没有用!

只能线程里显示调用System.exit(0); 才释放连接。

运行结果如下图:
log4j:WARN No appenders could be found for logger (org.apache.activemq.thread.TaskRunnerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Pool 129 -> PooledConnection { ConnectionPool[ActiveMQConnection {id=ID:kevin-5284-1357967657675-1:1,clientId=ID:kevin-5284-1357967657675-0:1,started=true}] }
session info ->PooledSession { ActiveMQSession {id=ID:kevin-5284-1357967657675-1:1:1,started=true} }
 --> meassage sended out!
connection closed ...
Thread - Thread-0 still alive.......
Thread - Thread-0 still alive.......
Thread - Thread-0 still alive.......
Thread - Thread-0 still alive.......
.........
.........
程序无法退出......


MQ控制台显示连接无法释放,一直保持着及Eelipse控制台上也显示程序没有停止
MQ控制台的连接:


Eelipse控制台



如果在线程的run方法最后,也就是执行完任务后,显示调用system.exit(0);
则程序可以推出,连接也会被释放
  • 大小: 20.4 KB
  • 大小: 115.1 KB
   发表时间:2013-01-12  
对于Consumer来说,可以理解,因为他需要不断的监听消息是否有连接,所有可能需要保持与MQ的连接;

但是对于Producer来说,可能我只是发送完消息(假设不需要应当什么之列,或者不管是否发送成功)就行了。那么为什么消费者还一直推出,还要继续保持连接呢?

而且如果程序不退出,就会不断的消耗内存,那么迟早内存也会摆耗干吧?
0 请登录后投票
   发表时间:2013-01-16  
哥们,麻烦共享工程研究下,谢谢
0 请登录后投票
   发表时间:2013-01-17  
while (this.isAlive()) {   
System.out.println("Thread - " + this.getName()   
+ " still alive.......");   
Thread.sleep(1000);   
}



你这里不是一直循环吗,怎么退出
0 请登录后投票
   发表时间:2013-01-18  
Shen.Yiyang 写道
while (this.isAlive()) {   
System.out.println("Thread - " + this.getName()   
+ " still alive.......");   
Thread.sleep(1000);   
}



你这里不是一直循环吗,怎么退出



晕死了,我当然知道这里是死循环了,但这只是我的测试代码啊。
抱歉,可能是忘记删除了这个段测试代码!

因为没有家这段代码时候,程序一直退不出来,所以新增了这个测试代码,
把这段代码注释掉了,同时把system.exit(0);也注释掉,程序换是退不出来

案例,线程中没有任何死循环代码,run方法执行完后就会退出,但是就是无法退出,查看mq的控制台,连接移植保持着
0 请登录后投票
   发表时间:2013-01-18  
lvwenwen 写道
哥们,麻烦共享工程研究下,谢谢


哥们,这代基本已经可以运行了啊!连接池的代码已经给出来了, 生产者的代码也已经出来了啊。
直接创建一个工程,然后创建这两个类,引用相关mq的类库就ok了

当然,我原来的程序时做了封装的,但是逼近时公司项目,放出代码不好,所以就弄了个简单版本,只要能重现问题就可以了,所以搞了个最简单的版本出来!



0 请登录后投票
   发表时间:2013-01-18  
我怀疑可能是连接池的问题,因为既然使用了连接池,就不应该让使用者直接操作connection,即关闭它,如果释放连接,应该通过连接池的某个类的某个方法来控制,否则连接池也没有什么意义了。

不过我还没有看源码,所以暂时还试不确定这个东东是不是这样的!
0 请登录后投票
   发表时间:2013-01-18  
可能跟你的连接池有关,你试一下不用连接池跑一次,看能不能正常退出。
另外,(new ActiveMQQueueProducer("kevin")).start(); 这里应该是ActiveMQTopicProducer吧?
0 请登录后投票
   发表时间:2013-01-18  
xieyanhua 写道
Shen.Yiyang 写道
while (this.isAlive()) {   
System.out.println("Thread - " + this.getName()   
+ " still alive.......");   
Thread.sleep(1000);   
}



你这里不是一直循环吗,怎么退出



晕死了,我当然知道这里是死循环了,但这只是我的测试代码啊。
抱歉,可能是忘记删除了这个段测试代码!

因为没有家这段代码时候,程序一直退不出来,所以新增了这个测试代码,
把这段代码注释掉了,同时把system.exit(0);也注释掉,程序换是退不出来

案例,线程中没有任何死循环代码,run方法执行完后就会退出,但是就是无法退出,查看mq的控制台,连接移植保持着

哪有你这样测试的。。。你把一个原先会结束的线程搞成不能结束的,不是更加找不到原来的问题吗;
MQ连接池很可能有其他后台线程在管理连接,不是你生产者的问题。
0 请登录后投票
   发表时间:2013-01-18  
Shen.Yiyang 写道
xieyanhua 写道
Shen.Yiyang 写道
while (this.isAlive()) {   
System.out.println("Thread - " + this.getName()   
+ " still alive.......");   
Thread.sleep(1000);   
}



你这里不是一直循环吗,怎么退出



晕死了,我当然知道这里是死循环了,但这只是我的测试代码啊。
抱歉,可能是忘记删除了这个段测试代码!

因为没有家这段代码时候,程序一直退不出来,所以新增了这个测试代码,
把这段代码注释掉了,同时把system.exit(0);也注释掉,程序换是退不出来

案例,线程中没有任何死循环代码,run方法执行完后就会退出,但是就是无法退出,查看mq的控制台,连接移植保持着

哪有你这样测试的。。。你把一个原先会结束的线程搞成不能结束的,不是更加找不到原来的问题吗;
-- 呵呵,其实我原来的代码不是这样的,工程里没有这样的代码,只是后面为了保密的问题,我把这个ActiveMQTopicProducer 程序修改了很多,原来封装的东西都已经取消了。


MQ连接池很可能有其他后台线程在管理连接,不是你生产者的问题。
-- 我先也是这样的,就像前面说的,如果直接使用connection并关掉它,可能连接池就无法获知了,所以正常来说,既然连接池负责管理所有connection,那么释放连接必然也应该有链接池来负责,而不应该直接调用connection的close来释放连接。


事实上也证明了,直接调用connection.close()来释放连接,也是不行的,实际连接还试保持的。通过mq的控制可以看到。

我的想法跟你像的一样,连接不释放的问题,不是我程序的问题,而是QM连接池中有其他后台线程在管理着。只是这个链接必须要释放,如果不释放资源就无法释放,这是个很大的问题。
刚开始以为是旧版本的bug(MQ连接池的jar为4.x的版本,后面换到了5.7版本的谅解池,也是一样的)

只是目前我还没有找到怎么释放这个链接的方法,有时间再看看源码,不过如果有谁知道,那就更好了!
0 请登录后投票
论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics