`
chenzehe
  • 浏览: 539458 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

BoneCP源码——BoneCP中使用的第三方包 jsr166y、 LinkedTransferQueue队列、fork-join框架

 
阅读更多

 

BoneCP主要使用了下面几种第三方包:

1、Google Guava library   The Guava project contains several of Google's core libraries that we rely on in our Java-based projects: collections, caching, primitives support, concurrency libraries, common annotations, string processing, I/O, and so forth.

2、SLF4J  日志管理

3、jsr166y  Doug Lea写的一个并发包,代码量不多,已合并到JDK7的JUC包中。BoneCP中使用该包下的LinkedTransferQueue队列来保存Connection对象:

			if (config.getMaxConnectionsPerPartition() == config.getMinConnectionsPerPartition()){
				// if we have a pool that we don't want resized, make it even faster by ignoring
				// the size constraints.
				connectionHandles = queueLIFO ? new LIFOQueue<ConnectionHandle>() :  new LinkedTransferQueue<ConnectionHandle>();
			} else {
				connectionHandles = queueLIFO ? new LIFOQueue<ConnectionHandle>(this.config.getMaxConnectionsPerPartition()) : new BoundedLinkedTransferQueue<ConnectionHandle>(this.config.getMaxConnectionsPerPartition());
			}

 这段神奇的代码还没研究明白用意,当设置每个partition中的最大连接数和最小连接数相等时,保存Connection对象的队列就使用无参构造函数,也就是队列的最大值为Integer.MAX_VALUE,如果不相等则队列的最大值为用户设置的最大值。

BoneCP只使用到jsr166y中的3个类,合成到它自己的jar里:

 

 

TransferQueue和LinkedTransferQueue

著名的Java并发编程大师Doug lea在JDK7的并发包里新增一个队列集合类LinkedTransferQueue,

TransferQueue接口继承自BlockingQueue接口,LinkedTransferQueue是TransferQueue的实现,它基于单向双端链表,是个无界链表,它的Node仅持有其下一个节点的引用,是一个典型FIFO队列,如下为保存无线的Node类实现:

    static final class Node {
        final boolean isData;   // false if this is a request node
        volatile Object item;   // initially non-null if isData; CASed to match
        volatile Node next;
        volatile Thread waiter; // null until waiting,存放当前线程,当执行LockSupport.park(this)时,当前线程被阻塞,当执行LockSupport.unpark(node.waiter)时,该节点对应的线程将解除阻塞

        // CAS methods for fields
        final boolean casNext(Node cmp, Node val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }
        ......
     }

 

该类对链表的操作不加锁,而是采用第三方的sun.misc.Unsafe的CAS操作来保证同步:

    private static final Unsafe UNSAFE = getUnsafe();    
    // CAS methods for fields
    private boolean casTail(Node cmp, Node val) {
        return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
    }
    private boolean casHead(Node cmp, Node val) {
        return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
    }
    private boolean casSweepVotes(int cmp, int val) {
        return UNSAFE.compareAndSwapInt(this, sweepVotesOffset, cmp, val);
    }

 该类中操作链表的方法都调用下面方法来实现:

    public E poll() {
        return xfer(null, false, NOW, 0);
    }
 
    private E xfer(E e, boolean haveData, int how, long nanos) {
        if (haveData && (e == null))
            throw new NullPointerException();
        Node s = null;                        // the node to append, if needed

        retry: for (;;) {                     // restart on append race

            for (Node h = head, p = h; p != null;) { // find & match first node
                boolean isData = p.isData;
                Object item = p.item;
                if (item != p && (item != null) == isData) { // unmatched
                    if (isData == haveData)   // can't match
                        break;
                    if (p.casItem(item, e)) { // match
                        for (Node q = p; q != h;) {
                            Node n = q.next;  // update by 2 unless singleton
                            if (head == h && casHead(h, n == null? q : n)) {
                                h.forgetNext();
                                break;
                            }                 // advance and retry
                            if ((h = head)   == null ||
                                (q = h.next) == null || !q.isMatched())
                                break;        // unless slack < 2
                        }
                        LockSupport.unpark(p.waiter);
                        return this.<E>cast(item);
                    }
                }
                Node n = p.next;
                p = (p != n) ? n : (h = head); // Use head if p offlist
            }

            if (how != NOW) {                 // No matches available
                if (s == null)
                    s = new Node(e, haveData);
                Node pred = tryAppend(s, haveData);
                if (pred == null)
                    continue retry;           // lost race vs opposite mode
                if (how != ASYNC)
                    return awaitMatch(s, pred, e, (how == TIMED), nanos);
            }
            return e; // not waiting
        }
    }
 四个特殊变量NOW、ASYNC、SYNC、TIMED在poll、put等方法调用efer方法时使用:
    /*
     * Possible values for "how" argument in xfer method.
     */
    private static final int NOW   = 0; // for untimed poll, tryTransfer,用于不带超时的poll和tryTransfer方法
    private static final int ASYNC = 1; // for offer, put, add,用于offer,put和add方法
    private static final int SYNC  = 2; // for transfer, take,用于transfer和take方法
    private static final int TIMED = 3; // for timed poll, tryTransfer,用于带超时的poll和tryTransfer方法
 
插入元素过程
LinkedTransferQueue插入元素,元素都插入到队尾,方法有下面几种:
    //在队尾插入元素,由于队列是无界的所以不会阻塞
    public void put(E e) {
        xfer(e, true, ASYNC, 0);
    }
    
    //在队尾插入元素,永远返回true
    public boolean offer(E e) {
        xfer(e, true, ASYNC, 0);
        return true;
    }

    //同上
    public boolean offer(E e, long timeout, TimeUnit unit) {
        xfer(e, true, ASYNC, 0);
        return true;
    }
    //同上
    public boolean add(E e) {
        xfer(e, true, ASYNC, 0);
        return true;
    }

    //若当前存在一个正在等待获取的消费者线程,即立刻移交之;否则,会插入当前元素e到队列尾部,并且等待进入阻塞状态,到有消费者线程取走该元素。 
    public void transfer(E e) throws InterruptedException {
        if (xfer(e, true, SYNC, 0) != null) {
            Thread.interrupted(); // failure possible only due to interrupt
            throw new InterruptedException();
        }
    }
    
    //若当前存在一个正在等待获取的消费者线程(使用take()或者poll()函数),使用该方法会即刻转移/传输对象元素e;若不存在,则返回false,并且不进入队列。这是一个不阻塞的操作。
    public boolean tryTransfer(E e) {
        return xfer(e, true, NOW, 0) == null;
    }

    //若当前存在一个正在等待获取的消费者线程,会立即传输给它;否则将插入元素e到队列尾部,并且等待被消费者线程获取消费掉,若在指定的时间内元素e无法被消费者线程获取,则返回false,同时该元素被移除。
    public boolean tryTransfer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
            return true;
        if (!Thread.interrupted())
            return false;
        throw new InterruptedException();
    }
 弹出元素有下面方法:
  public E take() throws InterruptedException {
        E e = xfer(null, false, SYNC, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E e = xfer(null, false, TIMED, unit.toNanos(timeout));
        if (e != null || !Thread.interrupted())
            return e;
        throw new InterruptedException();
    }

    public E poll() {
        return xfer(null, false, NOW, 0);
    }
 
ThreadLocalRandom
ThreadLocalRandom是一个可以独立使用的、用于生成随机数的类。继承自Random,但性能超过Random。其内部采用了ThreadLocal来返回实例对象:
    private static final ThreadLocal<ThreadLocalRandom> localRandom =
        new ThreadLocal<ThreadLocalRandom>() {
            protected ThreadLocalRandom initialValue() {
                return new ThreadLocalRandom();
            }
    };
    ThreadLocalRandom() {
        super();
        initialized = true;
    }
    public static ThreadLocalRandom current() {
        return localRandom.get();
    }
 使用ThreadLocal来保存ThreadLocalRandom对象对于线程是私有的,这样在重置种子对象时就不用加锁:
    //Random中的
    synchronized public void setSeed(long seed) {
        seed = (seed ^ multiplier) & mask;
        this.seed.set(seed);
    	haveNextNextGaussian = false;
    }

    //ThreadLocalRandom中的方法
    public void setSeed(long seed) {
        if (initialized)
            throw new UnsupportedOperationException();
        rnd = (seed ^ multiplier) & mask;
    }
 而且ThreadLocalRandom中还采用了追加字节码到64个字节的形式,避免线程对缓存的竞争:
    // Padding to help avoid memory contention among seed updates in
    // different TLRs in the common case that they are located near
    // each other.
    private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
为什么追加64字节能够提高并发编程的效率呢?
因为对于英特尔酷睿i7,酷睿, Atom和NetBurst, Core Solo和Pentium M处理器的L1,L2或L3缓存的高速缓存行是64个字节宽,不支持部分填充缓存行,这意味着如果队列的头节点和尾节点都不足64字节的话,处理器会将它们都读到同一个高速缓存行中,在多处理器下每个处理器都会缓存同样的头尾节点,当一个处理器试图修改头接点时会将整个缓存行锁定,那么在缓存一致性机制的作用下,会导致其他处理器不能访问自己高速缓存中的尾节点,而队列的入队和出队操作是需要不停修改头接点和尾节点,所以在多处理器的情况下将会严重影响到队列的入队和出队效率
 什么是Fork-Join模式?

Fork-Join是把一个任务递归的分解成多个子任务,直到每个子问题都足够小,然后把这些问题放入队列中等待处理(fork步骤),接下来等待所有子问题的结果(join步骤),最后把多个结果合并到一起。

 

 更多的Fork-Join学习资料:

JDK 7 中的 Fork/Join 模式

Java 理论与实践: 应用 fork-join 框架

Java 理论与实践: 应用 fork-join 框架,第 2 部分

 

 例子:在一个文本里有N多个数据,使用多线程最快求和

这个是之前用JUC包实现的一个问题,使用Fork-Join模式很容易解决上JUC时的问题:

此算法的缺点有待改进的地方是结果汇总时是被动去检测,而不是某个结果计算完成后主动去汇总,既然是分段计算,如果数据量足够大时,应该采用递归去实现分段汇总会更好

 注:下面代码在JDK7下运行

/**
 * Huisou.com Inc.
 * Copyright (c) 2011-2012 All Rights Reserved.
 */

package thread;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

/**
 * @description
 * 
 * @author chenzehe
 * @email hljuczh@163.com
 * @create 2013-3-19 上午10:12:31
 */

public class CalculateWithForkJoin {
	public static void main(String[] args) {
		int[] numbers = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18 };
		NumbersStructure numbersStructure = new NumbersStructure(numbers, 0, numbers.length);
		int threshold = 5;
		int nThreads = 5;
		CalculateForkJoinTask calculateForkJoinTask = new CalculateForkJoinTask(numbersStructure, threshold);
		ForkJoinPool forkJoinPool = new ForkJoinPool(nThreads);
		forkJoinPool.invoke(calculateForkJoinTask);
		int sum = calculateForkJoinTask.sum;
		System.out.println(sum);
	}
}

class CalculateForkJoinTask extends RecursiveAction {
	private static final long		serialVersionUID	= -3958126944793236011L;
	private final int				threshold;
	private final NumbersStructure	numbersStructure;
	public int						sum;
	
	public CalculateForkJoinTask(NumbersStructure numbersStructure, int threshold) {
		this.numbersStructure = numbersStructure;
		this.threshold = threshold;
	}
	
	@Override
	protected void compute() {
		if (numbersStructure.size < threshold) {
			sum = numbersStructure.calculateSum();
		}
		else {
			int midpoint = numbersStructure.size / 2;
			CalculateForkJoinTask left = new CalculateForkJoinTask(numbersStructure.subproblem(0, midpoint), threshold);
			CalculateForkJoinTask right = new CalculateForkJoinTask(numbersStructure.subproblem(midpoint + 1, numbersStructure.size), threshold);
			invokeAll(left, right);
			sum = left.sum + right.sum;
		}
	}
	
}

class NumbersStructure {
	private final int[]	numbers;
	private final int	start;
	private final int	end;
	public final int	size;
	
	public NumbersStructure(int[] numbers, int start, int end) {
		this.numbers = numbers;
		this.start = start;
		this.end = end;
		this.size = end - start;
	}
	
	/**
	 * 求和
	 */
	public int calculateSum() {
		int sum = 0;
		for (int i = start; i <= end && i < numbers.length; i++) {
			sum += numbers[i];
		}
		return sum;
	}
	
	/**
	 * 问题分解
	 */
	public NumbersStructure subproblem(int subStart, int subEnd) {
		return new NumbersStructure(numbers, start + subStart, start + subEnd);
	}
}

 

 

 

 

分享到:
评论

相关推荐

    BoneCP.zip_BoneCP_BoneCP 源码_jdbc BoneCP_oracle

    在源码中,你应该能看到如何创建和配置` BoneCPConfig`对象,然后使用这个配置来初始化` BoneCP`实例。此外,还会有示例展示如何在代码中获取和释放连接,以及如何进行数据库操作。 在MySQL实例中,配置会有所不同...

    数据库连接池BoneCP源码分析报告

    数据库连接池是Java应用中广泛使用的技术,它能有效地管理和复用数据库连接,提高系统性能并减少资源消耗。 BoneCP是一款高效、轻量级的数据库连接池实现,它以其优秀的性能和灵活的配置赢得了开发者们的青睐。这篇...

    数据库连接池BoneCP源码分析报告.doc

    BoneCP 是一个轻量级、高性能的 Java 数据库连接池,本文将深入剖析其源码,理解其工作原理和实现机制。 1. **BoneCP 简介** BoneCP 是由 Jeff Smith 创建的一个开源数据库连接池,它设计的目标是提供比 C3P0 和 ...

    bonecp 0.7.1 jar包以及源码

    3. **bonecp-0.7.1-rc2.jar**:这个jar文件包含了 BoneCP 0.7.1-rc2的所有编译后的类和库,使得开发者可以在项目中直接引用 BoneCP 的功能,无需自行编译源码。 4. **bonecp-0.7.1-rc2-sources.jar**:源码jar文件...

    bonecp-0.8.0.RELEASE.jar

    开发者会将这个JAR添加到项目类路径中,以便在程序中使用BoneCP进行数据库连接管理。 2. **bonecp-provider-0.7.0.jar**:这个文件可能是一个早期版本的BoneCP提供者,用于提供特定的功能或者适配不同的环境。版本...

    基于Java的实例源码-数据库连接池 BoneCP.zip

    `jsr166y`目录可能包含了JSR 166y的兼容库,这是一个Java并发API的扩展, BoneCP可能利用其中的并发工具类来优化连接池的线程安全性和性能。例如,`java.util.concurrent.locks`包中的锁机制可能被用于确保在多线程...

    bonecp-0.8.0.RELEASE-API文档-中英对照版.zip

    赠送jar包:bonecp-0.8.0.RELEASE.jar; 赠送原API文档:bonecp-0.8.0.RELEASE-javadoc.jar; 赠送源代码:bonecp-0.8.0.RELEASE-sources.jar; 赠送Maven依赖信息文件:bonecp-0.8.0.RELEASE.pom; 包含翻译后的API...

    java数据库连接池-bonecp java源码

    Java数据库连接池(Database Connection Pool)是Java应用中用于管理数据库连接的重要技术,它通过复用已存在的数据库连接,减少了创建和销毁连接的开销,从而显著提升了应用程序的性能和效率。BoneCP是一款高效的...

    BoneCP的xml使用实例

    在这个实例中,我们将学习如何通过XML配置文件来使用BoneCP,以及如何在Java代码中加载这个配置。 首先,我们需要理解BoneCP的配置文件`bonecp-config.xml`。这个文件是 BoneCP 连接池的核心,用于定义数据库连接的...

    bonecp-0.7.0.jar

    bonecp-0.7.0.jar bonecp-0.7.0.jar bonecp-0.7.0.jar bonecp-0.7.0.jar bonecp-0.7.0.jar bonecp-0.7.0.jar bonecp-0.7.0.jar bonecp-0.7.0.jar

    bonecpdemo 含jar包源码

    【骨CP(BoneCP)连接池详解】 BoneCP是一款高效的、开源的JDBC数据库连接池,它被设计成轻量级且易于集成到其他项目中。在Java开发中,数据库连接池是必不可少的组件,它提高了数据库操作的性能,通过复用已建立...

    bonecp连接池demo

    在实际项目中,还可以结合Spring框架来集成 BoneCP,通过配置文件或注解方式管理数据源,简化代码并提高可维护性。不过,需要注意的是,BoneCP项目已经停止维护,对于新项目,可能需要考虑使用HikariCP、Druid等更...

    连接池bonecp-0.8.1

    bonecp-0.8.1-20131105.191813-1.jar bonecp-jdk-compat-0.8.1-20131105.191752-1.jar bonecp-provider-0.8.1-20131105.191842-1.jar slf4j-api-1.7.7.jar slf4j-log4j12-1.7.7.jar

    BoneCP数据源应用

    BoneCP 数据源是一种高效、快速的数据连接池技术,它被设计用于提高应用程序处理数据库连接的性能和效率。在Java环境中,数据库连接池是管理数据库连接的关键组件,它减少了创建和销毁连接的开销,从而提升了整体...

    BoneCP 连接池所用到的jar集合

    在项目中使用BoneCP,需要配置其属性,如连接池大小、超时时间等,这通常通过Java代码或XML配置文件完成。然后,在应用程序中通过`DataSource`接口获取数据库连接。 6. **最佳实践** - **合理设置连接池参数**:...

    BoneCP所需依赖包

    在这个“BoneCP所需依赖包”中,包含了使用BoneCP时必要的第三方库,让我们逐一解析这些依赖: 1. **guava-r07.jar**:这是Google的Guava库的一个版本,提供了大量的Java集合类和实用工具,如并发库、缓存机制、...

    基于Java的数据库连接池 BoneCP.zip

    JSR166是Java并发包(java.util.concurrent)的一部分, BoneCP 可能使用了其中的一些并发工具,如`Future`和`ExecutorService`,来实现高效的线程管理和任务调度,保证在高并发环境下稳定地提供数据库连接。...

    bonecp0.7 所有jar包

    3. **其他可能的依赖**:虽然在描述中没有明确指出,但 BoneCP 可能还需要其他的库来支持其功能,如JDBC驱动(对应于特定的数据库,如MySQL、Oracle等),这些驱动通常需要单独添加到项目中,以确保 BoneCP 能够正确...

    基于java的开发源码-数据库连接池 BoneCP.zip

    基于java的开发源码-数据库连接池 BoneCP.zip 基于java的开发源码-数据库连接池 BoneCP.zip 基于java的开发源码-数据库连接池 BoneCP.zip 基于java的开发源码-数据库连接池 BoneCP.zip 基于java的开发源码-数据库...

Global site tag (gtag.js) - Google Analytics