`
chenzehe
  • 浏览: 539483 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

BoneCP源码——BoneCP中使用的多线程

 
阅读更多
1、asyncExecutor 可缓存线程池,用于异步的创建一个Connection对象,返回Future类型对象
	/** Executor service for obtaining a connection in an asynchronous fashion. */
	private ExecutorService asyncExecutor;
	/**
	 * Constructor.
	 * @param config Configuration for pool
	 * @throws SQLException on error
	 */
	public BoneCP(BoneCPConfig config) throws SQLException {
		......
		//在构造函数中初始化
		this.asyncExecutor = Executors.newCachedThreadPool();
		......
        }

	/** Obtain a connection asynchronously by queueing a request to obtain a connection in a separate thread. 
	 * 
	 *  Use as follows:<p>
	 *      Future&lt;Connection&gt; result = pool.getAsyncConnection();<p>
	 *       ... do something else in your application here ...<p>
	 *      Connection connection = result.get(); // get the connection<p>
	 *      
	 * @return A Future task returning a connection. 
	 */
	public Future<Connection> getAsyncConnection(){

		return this.asyncExecutor.submit(new Callable<Connection>() {

			public Connection call() throws Exception {
				return getConnection();
			}});
	}

 

2、releaseHelper 用于关闭Connection对象的线程池,该池程池中的线程为守护线程(Daemon Thread)

/** pointer to the thread containing the release helper threads. */
	private ExecutorService releaseHelper;

	/**
	 * Constructor.
	 * @param config Configuration for pool
	 * @throws SQLException on error
	 */
	public BoneCP(BoneCPConfig config) throws SQLException {
                ......
                //Gets number of release-connection helper threads to create per partition
    		int helperThreads = config.getReleaseHelperThreads();
		this.releaseHelperThreadsConfigured = helperThreads > 0;
                //If set to true, config has specified the use of statement release helper threads.		
		this.statementReleaseHelperThreadsConfigured = config.getStatementReleaseHelperThreads() > 0;
		this.config = config;
		String suffix = "";
		if (config.getPoolName()!=null) {
			suffix="-"+config.getPoolName();
		}			
		if (this.releaseHelperThreadsConfigured){
			this.releaseHelper = Executors.newFixedThreadPool(helperThreads*config.getPartitionCount(), new CustomThreadFactory("BoneCP-release-thread-helper-thread"+suffix, true));
		}
                ......
          }

 如果用户设置了以独立线程来关闭Connection对象,才创建该线程池。

ThreadFactory用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。

public interface ThreadFactory {
    /**
     * Constructs a new {@code Thread}.  Implementations may also initialize
     * priority, name, daemon status, {@code ThreadGroup}, etc.
     *
     * @param r a runnable to be executed by new thread instance
     * @return constructed thread, or {@code null} if the request to
     *         create a thread is rejected
     */
    Thread newThread(Runnable r);
}
 CustomThreadFactory为BoneCP里对ThreadFactory的一个实现,并可设置线程是否为守护线程(Daemon Thread):
package com.jolbox.bonecp;
......
public class CustomThreadFactory
        implements ThreadFactory, UncaughtExceptionHandler {
    public CustomThreadFactory(String threadName, boolean daemon){
        this.threadName = threadName;
        this.daemon = daemon;
    }
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r, this.threadName);
        t.setDaemon(this.daemon);
        t.setUncaughtExceptionHandler(this);
        return t;
    }
    ......
}
 
3、keepAliveScheduler 该线程池用于定期地测试connection的活性,即用它发送一条简单的SQL,并关闭故障的Connection,每个分区一个线程,此线程池中的线程也为守护线程。
	/** Handle to factory that creates 1 thread per partition that periodically wakes up and performs some
	 * activity on the connection.
	 */
	private ScheduledExecutorService keepAliveScheduler;
        this.keepAliveScheduler =  Executors.newScheduledThreadPool(config.getPartitionCount(), new CustomThreadFactory("BoneCP-keep-alive-scheduler"+suffix, true));
 newScheduledThreadPool创建一个大小无限的线程池,此线程池支持定时以及周期性执行任务的需求,相当于Timer。
			if (config.getIdleConnectionTestPeriodInMinutes() > 0 || config.getIdleMaxAgeInMinutes() > 0){
				
				final Runnable connectionTester = new ConnectionTesterThread(connectionPartition, this.keepAliveScheduler, this, config.getIdleMaxAge(TimeUnit.MILLISECONDS), config.getIdleConnectionTestPeriod(TimeUnit.MILLISECONDS), queueLIFO);
				long delayInMinutes = config.getIdleConnectionTestPeriodInMinutes();
				if (delayInMinutes == 0L){
					delayInMinutes = config.getIdleMaxAgeInMinutes();
				}
				if (config.getIdleMaxAgeInMinutes() != 0 && config.getIdleConnectionTestPeriodInMinutes() != 0 && config.getIdleMaxAgeInMinutes() < delayInMinutes){
					delayInMinutes = config.getIdleMaxAgeInMinutes();
				}
				this.keepAliveScheduler.schedule(connectionTester, delayInMinutes, TimeUnit.MINUTES);
			}

 如果用户没有设置了下面两个属性小于1就启动该线程池的任务:

	/** Connections older than this are sent a keep-alive statement. */
	private long idleConnectionTestPeriodInSeconds = 240*60; 
	/** Maximum age of an unused connection before it is closed off. */ 
	private long idleMaxAgeInSeconds =  60*60; 

 实例化一个ConnectionTesterThread类型的Runnable对象,该对象中也持有此线程池的引用,用于在run方法中启动下次任务,此对象的run方法负责对异常的Connection对象和超出闲置时间的对象进行close并定期给Connection对象发送简单SQL语句:

// send a keep-alive, close off connection if we fail.
if (!this.pool.isConnectionHandleAlive(connection)){
    closeConnection(connection);
    continue; 
}

 

4、maxAliveScheduler  该线程池用于给每个分区创建一个线程定期的检查Connection对象是否过期,此线程池中的线程也是守护线程

	/** Handle to factory that creates 1 thread per partition that periodically wakes up and performs some
	 * activity on the connection.
	 */
	private ScheduledExecutorService maxAliveScheduler;
        this.maxAliveScheduler =  Executors.newScheduledThreadPool(config.getPartitionCount(), new CustomThreadFactory("BoneCP-max-alive-scheduler"+suffix, true));

 如果用户设置了下面属性大于0则使用该线程池:

	/** A connection older than maxConnectionAge will be destroyed and purged from the pool. */
	private long maxConnectionAgeInSeconds = 0;
 
			if (config.getMaxConnectionAgeInSeconds() > 0){
				final Runnable connectionMaxAgeTester = new ConnectionMaxAgeThread(connectionPartition, this.maxAliveScheduler, this, config.getMaxConnectionAge(TimeUnit.MILLISECONDS), queueLIFO);
				this.maxAliveScheduler.schedule(connectionMaxAgeTester, config.getMaxConnectionAgeInSeconds(), TimeUnit.SECONDS);
			}
 先实例化一个ConnectionMaxAgeThread类型的Runnable对象,该对象定期的对超过maxConnectionAge类型的对象进行关闭:
if (connection.isExpired(currentTime)){
    // kill off this connection
    closeConnection(connection);
    continue;
}
 ConnectionMaxAgeThread对象中也对该线程池持有引用来启动下次全任务。

 

5、connectionsScheduler  该线程池用于观察每个分区,根据需要动态的创建新的Connection对象或者清理过剩的,也为守护线程

/** Executor for threads watching each partition to dynamically create new threads/kill off excess ones.
	 */
	private ExecutorService connectionsScheduler;
	this.connectionsScheduler =  Executors.newFixedThreadPool(config.getPartitionCount(), new CustomThreadFactory("BoneCP-pool-watch-thread"+suffix, true));

        // watch this partition for low no of threads
	this.connectionsScheduler.execute(new PoolWatchThread(connectionPartition, this));

 

 6、closeConnectionExecutor  该线程池用于监控那些失败的close操作

	/** Threads monitoring for bad connection requests. */
	private ExecutorService closeConnectionExecutor;
        this.closeConnectionWatch = config.isCloseConnectionWatch();
	if (this.closeConnectionWatch){
		logger.warn(THREAD_CLOSE_CONNECTION_WARNING);
		this.closeConnectionExecutor =  Executors.newCachedThreadPool(new CustomThreadFactory("BoneCP-connection-watch-thread"+suffix, true));
	}

 在getConection()方法中如果用户设置了下面属性则启用该线程:

	/** If set to true, create a new thread that monitors a connection and displays warnings if application failed to 
	 * close the connection.
	 */
	protected boolean closeConnectionWatch = false;

 

 

		if (this.closeConnectionWatch){ // a debugging tool
			watchConnection(result);
		}
 
	/** Starts off a new thread to monitor this connection attempt.
	 * @param connectionHandle to monitor 
	 */
	private void watchConnection(ConnectionHandle connectionHandle) {
		String message = captureStackTrace(UNCLOSED_EXCEPTION_MESSAGE);
		this.closeConnectionExecutor.submit(new CloseThreadMonitor(Thread.currentThread(), connectionHandle, message, this.closeConnectionWatchTimeoutInMs));
	}
 
//	@Override
	public void run() {
		try {
			this.connectionHandle.setThreadWatch(Thread.currentThread());
			// wait for the thread we're monitoring to die off.
			this.threadToMonitor.join(this.closeConnectionWatchTimeout);
			if (!this.connectionHandle.isClosed() 
					&& this.threadToMonitor.equals(this.connectionHandle.getThreadUsingConnection())
				){
				logger.error(this.stackTrace);
			}
		} catch (Exception e) {
			// just kill off this thread
			if (this.connectionHandle != null){ // safety
				this.connectionHandle.setThreadWatch(null);
			}
		} 
	}
 

 

分享到:
评论

相关推荐

    数据库连接池BoneCP源码分析报告

    2. 连接管理:BoneCP通过线程安全的数据结构(如ConcurrentHashMap)来管理连接,确保在多线程环境下正确地分配和回收连接。 3. 连接的获取与释放:应用通过调用getConnection()方法从池中获取连接,使用完毕后调用...

    数据库连接池BoneCP源码分析报告.doc

    - **高效率**:BoneCP 通过多线程和高效的对象复用来实现高并发下的快速连接获取。 - **低内存占用**:相较于其他连接池,BoneCP 在空闲时占用更少的内存。 - **自动关闭连接**:自动检测并关闭无效的数据库连接...

    java数据库连接池-bonecp java源码

    2. **线程安全**: BoneCP确保在多线程环境下,每个线程获取到的数据库连接都是安全的,不会引发数据竞争或死锁。 3. **公平性**:BoneCP采用了公平的连接分配策略,使得等待连接的线程按照请求的顺序获得连接,...

    Bonecp实例

    1. **线程安全**:BoneCP通过内部的锁机制确保了多线程环境下的安全性,避免了连接泄露和竞态条件等问题。 2. **高性能**: BoneCP采用预分配连接的方式,减少了数据库连接的创建和释放开销,提高了处理速度。 3. **...

    基于Java的实例源码-数据库连接池 BoneCP.zip

    例如,`java.util.concurrent.locks`包中的锁机制可能被用于确保在多线程环境下对数据库连接的正确访问。 最后,`META-INF`目录通常包含项目或库的元数据,比如服务注册信息和服务提供者接口(SPI)。在这个案例中...

    基于Bonecp连接池的jdbc工具类及jar包

    首先, BoneCP连接池的设计目标是提高性能,减少内存消耗,并通过线程安全的连接复用来提高多线程环境下的并发能力。它采用了一些高级策略,如预分配和缓存连接,以确保在高负载情况下也能快速响应数据库请求。 ...

    Bonecp连接池java项目源代码(eclipse版)

    2. **线程安全**:BoneCP保证了在多线程环境下的安全性,确保每个线程都能获取到自己的连接,避免了线程间的资源竞争。 3. **高性能**:BoneCP采用了一些优化策略,如预分配连接、连接的公平分配等,以提高数据库...

    基于SSM架构实现的大型分布式购物网站-B2C项目源码+项目说明.zip

    方案2:在负载均衡服务器中运行一个精灵线程,预测服务器压力过大时会自动把session转移压力过小的服务器中。 3、做专门的图片服务器。使用一个http服务器,Apache.或者Nginx。使用ftp服务上传图片,vsftpd ##...

    java开源包8

    MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...

    java开源包1

    MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...

    java开源包11

    MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...

    java开源包2

    MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...

    java开源包3

    MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...

    java开源包6

    MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...

    java开源包5

    MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...

    java开源包10

    MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...

    java开源包4

    MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...

    java开源包7

    MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...

    java开源包9

    MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...

    java开源包101

    MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...

Global site tag (gtag.js) - Google Analytics