对于大数据量关联的业务处理逻辑,比较直接的想法就是用JDK提供的并发包去解决多线程情况下的业务数据处理。线程池可以提供很好的管理线程的方式,并且可以提高线程利用率,并发包中的原子计数在多线程的情况下可以让我们避免去写一些同步代码。
这里就先把jdk并发包中的线程池处理器ThreadPoolExecutor 以原子计数类AomicInteger 和倒数计时锁CountDownLatch的一些常用api介绍下。
一、ThreadPoolExecutor
JDK建议我们用工厂方法来创建线程池。Executors有四种工厂方法
1
、Executors.newCachedThreadPool
2
、Executors.newFixedThreadPool
3
、Executors.newSingleThreadExecutor
4
、Executors.newScheduledThreadPool()
当然也可以自己订制线程池来管理线程。下面简单举个订制线程池的构造器例子:
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(ThreadPoolUtil.CORE_POOL_SIZE, ThreadPoolUtil.MAX_POOL_SIZE, ThreadPoolUtil.KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(ThreadPoolUtil.WORK_QUEUE_SIZE), new ThreadPoolExecutor.CallerRunsPolicy());
这里结合execute的实现来简单解释一下参数的含义。
1、corePoolSize:核心线程的数目。也就是线程池维护的最小线程数目。如果Idle time超过了keepAliveTime,线程实例就会被终止,移出线程池。对于核心线程数目的大小需要根据不同的应用场景和CPU的情况来具体确定,以保证最佳的线程数。
2、maximumPoolSize:线程池维护的最大线程数目。对于无界队列,这个是形同虚设的。在有界队列中可以通过该项设置特殊的handler来处理线程。
3、keepAliveTime 以及unit是用来控制idleTime的。即线程池维护线程所允许的空闲时间以及时间单位。
4、
workQueue任务队列:有多种队列可供选择,SynchronousQueue
:直接提交类型的队列。不保持线程而是直接创建。LinkedBlockingQueue:无界队列,维持线程数目在corePoolSize之内不会创建新的线程,而是放到队列中,适合任务独立的线程类型。ArrayBlockingQueue:有界队列。有助于防止资源耗尽但是需要调整好合适的队列size以及最大线程数目。
5、policy:
处理策略。也就是当任务数超过
maximumPoolSize时的处理策略。可以自己定义单独的handler也可以用线程池提供的。
AbortPolicy:抛出java.util.concurrent.RejectedExecutionException异常
CallerRunsPolicy:重试添加当前的任务,他会自动重复调用execute()方法
DiscardOldestPolicy:抛弃旧的任务
DiscardPolicy:抛弃当前的任务
通过以上几点可以看一下execute方法的内部。其处理的顺序基本上是corePoolSize会先填满,然后任务会放到队列中,有空闲的线程就会将队列中的任务“offer”出来执行,当队列也满了的时候就会交给handler来处理任务。
public void execute(Runnable command) {
//任务为空的时候抛出空
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
//添加线程到线程池中运行或者如果size>core放到队列中
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
//对于超出maxmum的线程,调用对应的reject策略
reject(command); // is shutdown or saturated
}
}
二、 AomicInteger计数器
在多线程的情况下写一个计数器用同步的方式比较麻烦。并发包提供的AomicInteger将程序员从写一些同步代码中解脱出来。AomicInteger中最重要的方法就是incrementAndGet()。看一下源码:
/**
* Atomically increments by one the current value.
*
* @return the updated value
*/
public final int incrementAndGet() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return next;
}
}
其中compareAndSet方法是保证不会重复计数的关键。其中调用了CAS原语的操作,使得效率和准确性得以保证,但是起上一步在读next的时候如果另一个线程修改了当前值还是会出现问题,但这种问题的几率不高,而且也比通过synchronized和notify要更简便直接。每次调用incrementAndGet就会使AomicInteger的值加一返回,调用起来也很方便。
三、 CountDownLatch
可以配合AomicInteger一起进行倒数计时。其控制了指定数目线程全部执行完相应的逻辑后进行减数计算,知道数值为0之后唤醒后续线程继续执行。在这次应用中控制所有的AomicInteger计数结束后,也即所有子线程执行结束之后,再运行父线程来返回结果。CountDownLatch 中常用的两个方法就是await()和countDown()。顾名思义,await方法会阻塞线程往后执行,而去继续执行子线程。countDown()也是原语操作。每次调用都会将latch的数值减一,其初始数值可以在构造其中指定,一般是需执行线程的size。在调用的时候为了确保其能countDown,建议将其放到finally代码块中。保证计数与否都要countDown,要不父线程就永远被阻塞在那里了。countDown调用的内部静态类Sync继承的AbstractQueuedSynchronizer的releaseShared()方法。在tryReleaseShared(1)返回true的时候会进一步执行 doReleaseShared()方法。看一下其内部处理:
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
Node是用来标记等待状态。如果是head就保证等待状态不被取消。如果是tail表示是等待队列的尾元素。通过CAS原语操作来判断其在操作系统中的waitStatus。
相关推荐
Java并发包思维导图.xmind Java并发包思维导图.xmind Java并发包思维导图.xmind Java并发包思维导图.xmind Java并发包思维导图.xmind Java并发包思维导图.xmind
`Callable`和`Future`的组合是Java并发包中一个强大的工具,它弥补了`Runnable`接口的不足,使得异步编程可以获取到返回值并处理异常。同时,`ExecutorService`作为线程池的实现,为并发编程提供了一种优雅的解决...
在高并发的场景下,JDK提供的并发包为我们提供了许多支持线程安全的数据结构,这使得在多线程环境下编程变得更加容易和高效。接下来,我们来深入探讨几个在并发编程中经常用到的数据结构,以及它们的概念、原理、...
Java 5并发包(`java.util.concurrent`,简称`Concurrent`包)是Java平台中用于多线程编程的重要组成部分,它提供了丰富的并发工具类,极大地简化了在多线程环境下的编程工作。这个包的设计目标是提高并发性能,减少...
17. 线程池执行者 ThreadPoolExecutor 18. 定时执行者服务 ScheduledExecutorService 19. 使用 ForkJoinPool 进行分叉和合并 20. 锁 Lock 21. 读写锁 ReadWriteLock 22. 原子性布尔 AtomicBoolean 23. 原子性整型 ...
ThreadPoolExecutor类是Java并发包中一个非常重要的类,它提供了一个线程池的实现,我们可以使用它来创建一个串行的线程池。 首先,我们需要创建一个队列来存储Runnable对象,这里我们使用ArrayDeque来实现队列。...
线程安全、volatile关键字、原子性、并发包、死锁、线程池学习笔记
- 大量创建ThreadLocal变量,并为每个变量的value分配大对象,观察内存使用情况和内存溢出的风险。 - 执行remove操作,之后通过调试工具检查内存中的对象是否被回收,以此来验证ThreadLocal的内存泄漏问题。 ...
Java 7并发包最新思维导图,基于《深入浅出Java concurrency》修改
在Java编程中,"并发-线程池和阻塞队列"是两个核心概念,它们在多线程环境下处理任务调度和数据同步方面发挥着重要作用。线程池是一种管理线程资源的有效方式,而阻塞队列则常用于线程间通信和数据共享。 线程池...
这个名为"java-core"的资源集重点关注了几个关键领域:集合框架、多线程、线程池、并发包以及非阻塞I/O(NIO)。让我们逐一深入探讨这些主题。 1. **Java集合框架**: Java集合框架是处理对象集合的一组接口和类,...
4. **并发集合**:Java并发包`java.util.concurrent`中提供了一系列并发安全的集合,如`ConcurrentHashMap`、`CopyOnWriteArrayList`和`BlockingQueue`等。这些集合在多线程环境下能保证数据一致性,避免数据竞争...
`CountDownLatch`是Java并发包`java.util.concurrent`中的一个重要工具类,用于实现线程间的同步。它基于计数器的概念,初始化时设置一个非负的计数值,然后通过调用`countDown()`方法来递减这个计数器。主线程或...
Java并发编程库(Java Util Concurrency,简称JUC)是Java平台中用于高效并发处理的重要工具包,包含在`java.util.concurrent`包下。JUC提供了丰富的并发原语,如线程池、同步器、并发容器等,极大地简化了多线程...
在Java编程中,`ExecutorService`是Java并发包(`java.util.concurrent`)中的核心接口,它提供了一种管理和控制线程的方式。在处理Socket连接时,尤其是TCP和UDP通信,`ExecutorService`可以帮助我们有效地利用系统...
`AtomicInteger`是Java并发包`java.util.concurrent.atomic`中的一个原子类型类,它提供了在多线程环境下安全地修改整型变量的方法。`AtomicInteger`的核心特性是其内部维护的`value`字段是volatile的,这确保了多...
3. **同步工具类**:Java并发包`java.util.concurrent`中的工具类,如`Semaphore`(信号量)、`CyclicBarrier`(回环栅栏)、`CountDownLatch`(倒计时器)和`FutureTask`(未来任务)等,提供了更灵活的线程同步和...
java.util.concurrent包提供了创建并发应用程序的工具,本资源主要是对该api进行详细的解读,并对api的使用做出安全高效的引用建议.