Source: http://ifeve.com/generic-concurrent-object-pool/
Source of English: http://www.javacodegeeks.com/2012/09/a-generic-and-concurrent-object-pool.html
这篇文章里我们主要讨论下如何在Java里实现一个对象池。最近几年,Java虚拟机的性能在各方面都得到了极大的提升,因此对大多数对象而言,已经没有必要通过对象池来提高性能了。根本的原因是,创建一个新的对象的开销已经不像过去那样昂贵了。
然而,还是有些对象,它们的创建开销是非常大的,比如线程,数据库连接等这些非轻量级的对象。在任何一个应用程序里面,我们肯定会用到不止一个这样的对象。如果有一种很方便的创建管理这些对象的池,使得这些对象能够动态的重用,而客户端代码也不用关心它们的生命周期,还是会很给力的。
在真正开始写代码前,我们先来梳理下一个对象池需要完成哪些功能。
- 如果有可用的对象,对象池应当能返回给客户端。
- 客户端把对象放回池里后,可以对这些对象进行重用。
- 对象池能够创建新的对象来满足客户端不断增长的需求。
- 需要有一个正确关闭池的机制来确保关闭后不会发生内存泄露。
不用说了,上面几点就是我们要暴露给客户端的连接池的接口的基本功能。
我们的声明的接口如下:
package com.test.pool; /** * Represents a cached pool of objects. * * @author Swaranga * * @param < T > the type of object to pool. */ public interface Pool< T > { /** * Returns an instance from the pool. * The call may be a blocking one or a non-blocking one * and that is determined by the internal implementation. * * If the call is a blocking call, * the call returns immediately with a valid object * if available, else the thread is made to wait * until an object becomes available. * In case of a blocking call, * it is advised that clients react * to {@link InterruptedException} which might be thrown * when the thread waits for an object to become available. * * If the call is a non-blocking one, * the call returns immediately irrespective of * whether an object is available or not. * If any object is available the call returns it * else the call returns < code >null< /code >. * * The validity of the objects are determined using the * {@link Validator} interface, such that * an object < code >o< /code > is valid if * < code > Validator.isValid(o) == true < /code >. * * @return T one of the pooled objects. */ T get(); /** * Releases the object and puts it back to the pool. * * The mechanism of putting the object back to the pool is * generally asynchronous, * however future implementations might differ. * * @param t the object to return to the pool */ void release(T t); /** * Shuts down the pool. In essence this call will not * accept any more requests * and will release all resources. * Releasing resources are done * via the < code >invalidate()< /code > * method of the {@link Validator} interface. */ void shutdown(); }
为了能够支持任意对象,上面这个接口故意设计得很简单通用。它提供了从池里获取/返回对象的方法,还有一个关闭池的机制,以便释放对象。
现在我们来实现一下这个接口。开始动手之前,值得一提的是,一个理想的release方法应该先尝试检查下这个客户端返回的对象是否还能重复使用。如果是的话再把它扔回池里,如果不是,就舍弃掉这个对象。我们希望这个Pool接口的所有实现都能遵循这个规则。在开始具体的实现类前,我们先创建一个抽象类,以便限制后续的实现能遵循这点。我们实现的抽象类就叫做AbstractPool,它的定义如下:
package com.test.pool; /** * Represents an abstract pool, that defines the procedure * of returning an object to the pool. * * @author Swaranga * * @param < T > the type of pooled objects. */ abstract class AbstractPool < T > implements Pool < T > { /** * Returns the object to the pool. * The method first validates the object if it is * re-usable and then puts returns it to the pool. * * If the object validation fails, * some implementations * will try to create a new one * and put it into the pool; however * this behaviour is subject to change * from implementation to implementation * */ @Override public final void release(T t) { if(isValid(t)) { returnToPool(t); } else { handleInvalidReturn(t); } } protected abstract void handleInvalidReturn(T t); protected abstract void returnToPool(T t); protected abstract boolean isValid(T t); }
在上面这个类里,我们让对象池必须得先验证对象后才能把它放回到池里。具体的实现可以自由选择如何实现这三种方法,以便定制自己的行为。它们根据自己的逻辑来决定如何判断一个对象有效,无效的话应该怎么处理(handleInvalidReturn方法),怎么把一个有效的对象放回到池里(returnToPool方法)。
有了上面这几个类,我们就可以着手开始具体的实现了。不过还有个问题,由于上面这些类是设计成能支持通用的对象池的,因此具体的实现不知道该如何验证对象的有效性(因为对象都是泛型的)。因此我们还需要些别的东西来帮助我们完成这个。
我们需要一个通用的方法来完成对象的校验,而具体的实现不必关心对象是何种类型。因此我们引入了一个新的接口,Validator,它定义了验证对象的方法。这个接口的定义如下:
package com.test.pool; /** * Represents the functionality to * validate an object of the pool * and to subsequently perform cleanup activities. * * @author Swaranga * * @param < T > the type of objects to validate and cleanup. */ public static interface Validator < T > { /** * Checks whether the object is valid. * * @param t the object to check. * * @return <code>true</code> * if the object is valid else <code>false</code>. */ public boolean isValid(T t); /** * Performs any cleanup activities * before discarding the object. * For example before discarding * database connection objects, * the pool will want to close the connections. * This is done via the * <code>invalidate()</code> method. * * @param t the object to cleanup */ public void invalidate(T t); }
上面这个接口定义了一个检验对象的方法,以及一个把对象置为无效的方法。当准备废弃一个对象并清理内存的时候,invalidate方法就派上用场了。值得注意的是这个接口本身没有任何意义,只有当它在对象池里使用的时候才有意义,所以我们把这个接口定义到Pool接口里面。这和Java集合库里的Map和Map.Entry是一样的。所以我们的Pool接口就成了这样:
package com.test.pool; /** * Represents a cached pool of objects. * * @author Swaranga * * @param < T > the type of object to pool. */ public interface Pool< T > { /** * Returns an instance from the pool. * The call may be a blocking one or a non-blocking one * and that is determined by the internal implementation. * * If the call is a blocking call, * the call returns immediately with a valid object * if available, else the thread is made to wait * until an object becomes available. * In case of a blocking call, * it is advised that clients react * to {@link InterruptedException} which might be thrown * when the thread waits for an object to become available. * * If the call is a non-blocking one, * the call returns immediately irrespective of * whether an object is available or not. * If any object is available the call returns it * else the call returns < code >null< /code >. * * The validity of the objects are determined using the * {@link Validator} interface, such that * an object < code >o< /code > is valid if * < code > Validator.isValid(o) == true < /code >. * * @return T one of the pooled objects. */ T get(); /** * Releases the object and puts it back to the pool. * * The mechanism of putting the object back to the pool is * generally asynchronous, * however future implementations might differ. * * @param t the object to return to the pool */ void release(T t); /** * Shuts down the pool. In essence this call will not * accept any more requests * and will release all resources. * Releasing resources are done * via the < code >invalidate()< /code > * method of the {@link Validator} interface. */ void shutdown(); /** * Represents the functionality to * validate an object of the pool * and to subsequently perform cleanup activities. * * @author Swaranga * * @param < T > the type of objects to validate and cleanup. */ public static interface Validator < T > { /** * Checks whether the object is valid. * * @param t the object to check. * * @return <code>true</code> * if the object is valid else <code>false</code>. */ public boolean isValid(T t); /** * Performs any cleanup activities * before discarding the object. * For example before discarding * database connection objects, * the pool will want to close the connections. * This is done via the * <code>invalidate()</code> method. * * @param t the object to cleanup */ public void invalidate(T t); } }
准备工作已经差不多了,在最后开始前我们还需要一个终极武器,这才是这个对象池的杀手锏。就是“能够创建新的对象”。我们的对象池是泛型的,因此它们得知道如何去生成新的对象来填充这个池子。这个功能不能依赖于对象池本身,必须要有一个通用的方式来创建新的对象。通过一个ObjectFactory的接口就能完成这个,它只有一个“如何创建新的对象”的方法。我们的ObjectFactory接口如下:
package com.test.pool; /** * Represents the mechanism to create * new objects to be used in an object pool. * * @author Swaranga * * @param < T > the type of object to create. */ public interface ObjectFactory < T > { /** * Returns a new instance of an object of type T. * * @return T an new instance of the object of type T */ public abstract T createNew(); }
我们的工具类都已经搞定了,现在可以开始真正实现我们的Pool接口了。因为我们希望这个池能在并发程序里面使用,所以我们会创建一个阻塞的对象池,当没有对象可用的时候,让客户端先阻塞住。我们的阻塞机制是让客户端一直阻塞直到有对象可用为止。这样的话导致我们还需要再增加一个只阻塞一定时间的方法,如果在超时时间到来前有对象可用则返回,如果超时了就返回null而不是一直等待下去。这样的实现有点类似Java并发库里的LinkedBlockingQueue,因此真正实现前我们再暴露一个接口,BlockingPool,类似于Java并发库里的BlockingQueue接口。
这里是BlockingQueue的声明:
package com.test.pool; import java.util.concurrent.TimeUnit; /** * Represents a pool of objects that makes the * requesting threads wait if no object is available. * * @author Swaranga * * @param < T > the type of objects to pool. */ public interface BlockingPool < T > extends Pool < T > { /** * Returns an instance of type T from the pool. * * The call is a blocking call, * and client threads are made to wait * indefinitely until an object is available. * The call implements a fairness algorithm * that ensures that a FCFS service is implemented. * * Clients are advised to react to InterruptedException. * If the thread is interrupted while waiting * for an object to become available, * the current implementations * sets the interrupted state of the thread * to <code>true</code> and returns null. * However this is subject to change * from implementation to implementation. * * @return T an instance of the Object * of type T from the pool. */ T get(); /** * Returns an instance of type T from the pool, * waiting up to the * specified wait time if necessary * for an object to become available.. * * The call is a blocking call, * and client threads are made to wait * for time until an object is available * or until the timeout occurs. * The call implements a fairness algorithm * that ensures that a FCFS service is implemented. * * Clients are advised to react to InterruptedException. * If the thread is interrupted while waiting * for an object to become available, * the current implementations * set the interrupted state of the thread * to <code>true</code> and returns null. * However this is subject to change * from implementation to implementation. * * * @param time amount of time to wait before giving up, * in units of <tt>unit</tt> * @param unit a <tt>TimeUnit</tt> determining * how to interpret the * <tt>timeout</tt> parameter * * @return T an instance of the Object * of type T from the pool. * * @throws InterruptedException * if interrupted while waiting */ T get(long time, TimeUnit unit) throws InterruptedException; }
BoundedBlockingPool的实现如下:
package com.test.pool; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; public final class BoundedBlockingPool < T > extends AbstractPool < T > implements BlockingPool < T > { private int size; private BlockingQueue < T > objects; private Validator < T > validator; private ObjectFactory < T > objectFactory; private ExecutorService executor = Executors.newCachedThreadPool(); private volatile boolean shutdownCalled; public BoundedBlockingPool( int size, Validator < T > validator, ObjectFactory < T > objectFactory) { super(); this.objectFactory = objectFactory; this.size = size; this.validator = validator; objects = new LinkedBlockingQueue < T >(size); initializeObjects(); shutdownCalled = false; } public T get(long timeOut, TimeUnit unit) { if(!shutdownCalled) { T t = null; try { t = objects.poll(timeOut, unit); return t; } catch(InterruptedException ie) { Thread.currentThread().interrupt(); } return t; } throw new IllegalStateException( 'Object pool is already shutdown'); } public T get() { if(!shutdownCalled) { T t = null; try { t = objects.take(); } catch(InterruptedException ie) { Thread.currentThread().interrupt(); } return t; } throw new IllegalStateException( 'Object pool is already shutdown'); } public void shutdown() { shutdownCalled = true; executor.shutdownNow(); clearResources(); } private void clearResources() { for(T t : objects) { validator.invalidate(t); } } @Override protected void returnToPool(T t) { if(validator.isValid(t)) { executor.submit(new ObjectReturner(objects, t)); } } @Override protected void handleInvalidReturn(T t) { } @Override protected boolean isValid(T t) { return validator.isValid(t); } private void initializeObjects() { for(int i = 0; i < size; i++) { objects.add(objectFactory.createNew()); } } private class ObjectReturner < E > implements Callable < Void > { private BlockingQueue < E > queue; private E e; public ObjectReturner(BlockingQueue < E > queue, E e) { this.queue = queue; this.e = e; } public Void call() { while(true) { try { queue.put(e); break; } catch(InterruptedException ie) { Thread.currentThread().interrupt(); } } return null; } } }
上面是一个非常基本的对象池,它内部是基于一个LinkedBlockingQueue来实现的。这里唯一比较有意思的方法就是returnToPool。因为内部的存储是一个LinkedBlockingQueue实现的,如果我们直接把返回的对象扔进去的话,如果队列已满可能会阻塞住客户端。不过我们不希望客户端因为把对象放回池里这么个普通的方法就阻塞住了。所以我们把最终将对象插入到队列里的任务作为一个异步的的任务提交给一个Executor来执行,以便让客户端线程能立即返回。
现在我们将在自己的代码中使用上面这个对象池,用它来缓存数据库连接。我们需要一个校验器来验证数据库连接是否有效。
下面是这个JDBCConnectionValidator:
package com.test; import java.sql.Connection; import java.sql.SQLException; import com.test.pool.Pool.Validator; public final class JDBCConnectionValidator implements Validator < Connection > { public boolean isValid(Connection con) { if(con == null) { return false; } try { return !con.isClosed(); } catch(SQLException se) { return false; } } public void invalidate(Connection con) { try { con.close(); } catch(SQLException se) { } } }
还有一个JDBCObjectFactory,它将用来生成新的数据库连接对象:
package com.test; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import com.test.pool.ObjectFactory; public class JDBCConnectionFactory implements ObjectFactory < Connection > { private String connectionURL; private String userName; private String password; public JDBCConnectionFactory( String driver, String connectionURL, String userName, String password) { super(); try { Class.forName(driver); } catch(ClassNotFoundException ce) { throw new IllegalArgumentException( 'Unable to find driver in classpath', ce); } this.connectionURL = connectionURL; this.userName = userName; this.password = password; } public Connection createNew() { try { return DriverManager.getConnection( connectionURL, userName, password); } catch(SQLException se) { throw new IllegalArgumentException( 'Unable to create new connection', se); } } }
现在我们用上述的Validator和ObjectFactory来创建一个JDBC的连接池:
package com.test; import java.sql.Connection; import com.test.pool.Pool; import com.test.pool.PoolFactory; public class Main { public static void main(String[] args) { Pool < Connection > pool = new BoundedBlockingPool < Connection > ( 10, new JDBCConnectionValidator(), new JDBCConnectionFactory('', '', '', '') ); //do whatever you like } }
为了犒劳下能读完整篇文章的读者,我这再提供另一个非阻塞的对象池的实现,这个实现和前面的唯一不同就是即使对象不可用,它也不会让客户端阻塞,而是直接返回null。具体的实现在这:
package com.test.pool; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.Semaphore; public class BoundedPool < T > extends AbstractPool < T > { private int size; private Queue < T > objects; private Validator < T > validator; private ObjectFactory < T > objectFactory; private Semaphore permits; private volatile boolean shutdownCalled; public BoundedPool( int size, Validator < T > validator, ObjectFactory < T > objectFactory) { super(); this.objectFactory = objectFactory; this.size = size; this.validator = validator; objects = new LinkedList < T >(); initializeObjects(); shutdownCalled = false; } @Override public T get() { T t = null; if(!shutdownCalled) { if(permits.tryAcquire()) { t = objects.poll(); } } else { throw new IllegalStateException( 'Object pool already shutdown'); } return t; } @Override public void shutdown() { shutdownCalled = true; clearResources(); } private void clearResources() { for(T t : objects) { validator.invalidate(t); } } @Override protected void returnToPool(T t) { boolean added = objects.add(t); if(added) { permits.release(); } } @Override protected void handleInvalidReturn(T t) { } @Override protected boolean isValid(T t) { return validator.isValid(t); } private void initializeObjects() { for(int i = 0; i < size; i++) { objects.add(objectFactory.createNew()); } } }
考虑到我们现在已经有两种实现,非常威武了,得让用户通过工厂用具体的名称来创建不同的对象池了。工厂来了:
package com.test.pool; import com.test.pool.Pool.Validator; /** * Factory and utility methods for * {@link Pool} and {@link BlockingPool} classes * defined in this package. * This class supports the following kinds of methods: * * <ul> * <li> Method that creates and returns a default non-blocking * implementation of the {@link Pool} interface. * </li> * * <li> Method that creates and returns a * default implementation of * the {@link BlockingPool} interface. * </li> * </ul> * * @author Swaranga */ public final class PoolFactory { private PoolFactory() { } /** * Creates a and returns a new object pool, * that is an implementation of the {@link BlockingPool}, * whose size is limited by * the <tt> size </tt> parameter. * * @param size the number of objects in the pool. * @param factory the factory to create new objects. * @param validator the validator to * validate the re-usability of returned objects. * * @return a blocking object pool * bounded by <tt> size </tt> */ public static < T > Pool < T > newBoundedBlockingPool( int size, ObjectFactory < T > factory, Validator < T > validator) { return new BoundedBlockingPool < T > ( size, validator, factory); } /** * Creates a and returns a new object pool, * that is an implementation of the {@link Pool} * whose size is limited * by the <tt> size </tt> parameter. * * @param size the number of objects in the pool. * @param factory the factory to create new objects. * @param validator the validator to validate * the re-usability of returned objects. * * @return an object pool bounded by <tt> size </tt> */ public static < T > Pool < T > newBoundedNonBlockingPool( int size, ObjectFactory < T > factory, Validator < T > validator) { return new BoundedPool < T >(size, validator, factory); } }
现在我们的客户端就能用一种可读性更强的方式来创建对象池了:
package com.test; import java.sql.Connection; import com.test.pool.Pool; import com.test.pool.PoolFactory; public class Main { public static void main(String[] args) { Pool < Connection > pool = PoolFactory.newBoundedBlockingPool( 10, new JDBCConnectionFactory('', '', '', ''), new JDBCConnectionValidator()); //do whatever you like } }
好吧,终于写完了,尽情使用和完善它吧,或者再多加几种实现。
相关推荐
用Java实现一个通用并发对象池.pdf
"Go Commons Pool"就是这样一个专门用于Golang的通用对象池库,它旨在提高程序运行效率,降低系统资源的频繁分配与回收带来的开销。 对象池的基本思想是预先创建一定数量的对象,并存储在一个池中,当程序需要使用...
下面是一个简单的Java对象池实现,以`ObjectPool`类为例: ```java import java.util.ArrayList; import java.util.List; public class ObjectPool<T> { private List<T> pool; private ObjectFactory<T> ...
通过以上内容的详细介绍,我们不仅了解了Java对象池技术的基本原理及其在提高程序性能方面的应用,还掌握了实现一个完整对象池所需要的关键技术和类。这为开发高效、稳定的Java应用程序奠定了坚实的基础。
在提供的代码片段中,可以看到一个基于 Delphi 的对象池实现示例。下面是对其中关键部分的解析: ##### 1. 类结构定义 ```delphi TObjectPool = class private type TPoolItem = class private Instance: T...
本文将深入探讨“jobjectpool”项目,这是一个专为Java设计的通用并发对象池实现。我们将讨论对象池的基本原理、其在并发环境中的应用以及如何使用“jobjectpool”库。 对象池的基本概念是基于对象生命周期管理的...
Pool2 提供了 `GenericObjectPool` 类作为默认的对象池实现。这个类包含了一些关键属性,如最大活动对象数(maxTotal)、最大单个类别对象数(maxIdle)和最小空闲对象数(minIdle)。这些参数可以调整以优化性能和...
本文章介绍了如何通过`ConnectionBean`和`PoolBean`两个类来实现一个简单的连接池。这种设计模式能够有效管理数据库连接资源,提高系统的并发处理能力。需要注意的是,在实际项目中,还需要考虑更多的细节问题,如...
这个"java数据连接池通用类(范例)"可能是为了提供一个可以适用于多种连接池实现的通用模板,方便开发者快速集成和管理数据库连接。 首先,我们要理解数据连接池的工作原理。当应用程序启动时,连接池会预先初始化...
本资源名为“最新版加注释通用JDBC数据库连接池”,显然它提供了一个带有详细注释的通用JDBC数据库连接池实现,这将有助于开发者理解和使用。这里我们将详细讲解数据库连接池的概念、工作原理以及如何使用。 **...
《C++ UMP:一个通用的线程安全内存池》 在C++编程中,内存管理是至关重要的一环。为了提高程序性能并优化资源利用,内存池技术应运而生。本文将深入探讨UMP(通用线程安全内存池)的概念、实现原理以及它在C++中的...
2. **对象池实现**:除了API,Commons Pool还提供了一些预定义的对象池实现,如`GenericObjectPool`,这是一个通用的对象池实现,支持基本的池策略,如最大活动对象数、空闲对象的最大数量等。 3. **池化策略**:...
Apache Commons Pool 2(在提供的文件`commons-pool2-2.3.jar`中)是一个通用的对象池服务,它为创建对象池提供了基础框架。Kafka生产者连接池可能会使用这个库来实现连接的创建、管理和回收。对象池的基本操作包括...
"一个通用数据库操作通用类"的标题表明这是一个可以应用于多种场景的数据库操作工具,它可能封装了SQL查询和其他相关功能,使得开发者无需直接编写SQL语句,也能实现对数据库的操作。描述中的“可以直接调用,本人试...
Apache Commons Pool是Apache软件基金会的一个开源项目,它提供了一个通用的对象池实现,使得开发者可以方便地在自己的应用中使用对象池技术。这里的"commons-pool-1.4-src.zip"是一个源代码压缩包,包含了Apache ...
最后,`commons-pool.jar`是Apache Commons Pool库,它是一个通用的对象池服务,为其他组件(如DBCP)提供对象池实现。对象池是数据库连接池的核心,它负责创建对象、维护对象池、分配和回收对象。通过对象池,可以...
- **GenericObjectPool**:这是一个通用的对象池实现,它管理对象的分配和回收。你可以通过配置参数来调整池的行为,比如最大池大小、空闲超时等。 - **PoolableWrapper**:如果你的对象不直接实现Poolable接口,...
而`commons-pool`是Apache Commons的另一个项目,它的主要功能是提供一个通用的对象池服务。在数据库连接池中,`commons-pool`负责对象(如数据库连接)的池化管理,它实现了对象的创建、分配、回收和检测等核心功能...
- Commons Pool:Apache的一个通用对象池库,可以用来创建任何可池化的对象,包括数据库连接。 3. **如何使用连接池** 使用数据库连接池通常需要以下步骤: - 引入连接池依赖:在项目的Maven或Gradle配置文件中...
否则,连接池管理器将创建一个新的连接对象,并将其添加到连接池中。应用程序使用完毕连接后,需要将连接对象返回连接池,以便下一个应用程序可以使用。 在服务器端,连接池管理器可以在服务器启动时初始化,并在...