`

java线程池Executor简单学习与解析

    博客分类:
  • java
阅读更多
线程池与对象池的学习中,个人感觉线程池是将线程转移到内部一直在运行在容器中的线程中来运行,减少的是线程run时间,而不是创建时间,将其引用至新线程,而不需要重新分配资源

1.线程池的类与接口关系构造

Executor-->Executoservice->AbstractExcetorService->ThreadpoolExecutor

Executor->ExecutorService->SchedulEXecutorService+ThreadpoolExceutor->ScheduleThreadpoolExecutor

Executors.new***ThreadPoolExecutor(...)根据不同参数来构造线程池:主要分为5种

newfixThreadPoolExecutor,newSingleThreadExecutor ,newCachedThreadPool以threadpoolexecutor构造完成

newSingleThreadScheduledExecutor,newThreadScheduledExecutor以ScheduleThreadpoolExecutor()构造完成

2.线程池的一些内部源码查看

A.ThreadPoolExecutor()线程池

内 部容器使用blockingQueue,使用Condition配合reentrantlock来监听对象的状态信息,从而实现Queue的消费-生产模 式的构建实现,也保证了线程安全,内部常量使用atomic(基于CAS算法实现原子性操作安全,AMINO框架提供更多类型)或者Static

B.ScheduleThreadPoolExecutor()

是在Threadpoolexecutor基础上实现,主要区别为可以周期内定时执行Runnable任务

内部容器使用DelayedWorkQueue对blockingQueue进行代理使用进行封装,里面使用RunnableScheduledFuture这个接口,能够执行线程,返回future,设置执行时间(可能不准确),使用future模式,Futuretask进行异步结果计算,Callable(Task)类似于thread任务执行单元,但是可以返回参数,实现future模式,所有执行线程的方法均调用schedule(..),

以下是来源于jdk API中一个带方法的类,它设置了 ScheduledExecutorService ,在 1 小时内每 10 秒钟蜂鸣一次:

import static java.util.concurrent.TimeUnit.*;
class BeeperControl {
    private final ScheduledExecutorService scheduler =
       Executors.newScheduledThreadPool(1);

    public void beepForAnHour() {
        final Runnable beeper = new Runnable() {
                public void run() { System.out.println("beep"); }
            };
        final ScheduledFuture<?> beeperHandle =
            scheduler.scheduleAtFixedRate(beeper, 10, 10, SECONDS);<!---设置->
        scheduler.schedule(new Runnable() {
                public void run() { beeperHandle.cancel(true); }
            }, 60 * 60, SECONDS);
    }


线程安全控制

public int drainTo(Collection<? super Runnable> c) {
            if (c == null)
                throw new NullPointerException();
            if (c == this)
                throw new IllegalArgumentException();
            final ReentrantLock lock = this.lock;//重入锁
            lock.lock();
            try {
                RunnableScheduledFuture first;
                int n = 0;
                while ((first = pollExpired()) != null) {
                    c.add(first);
                    ++n;
                }
                return n;
            } finally {
                lock.unlock();
            }
        }
在ScheduleThreadPoolExecutor.invokeall(Callable()任务列表) 返回future列表代表返回结果集
.submit(Callbale<T>/Runnable)提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。体现使用Callable和future模式的优势
一般执行使用void Execute(Runnable())
内部周期定时操作的实现:
DelayedWorkQueue延时调用比如在poll方法中long timeout, TimeUnit unit,传入时间参数,延时的实现,使用对线程对象的锁机制下关联的
Condition available.awaitNanos(nanos)进行控制.可能不准确,个人整体去看了一遍,但是一步一步实现困难,各种继承和调用关系图实在有点困难,以下是poll()带代码


        public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
            throws InterruptedException {
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture first = queue[0];
                    if (first == null) {
                        if (nanos <= 0)
                            return null;
                        else
                            nanos = available.awaitNanos(nanos);
                    } else {
                        long delay = first.getDelay(TimeUnit.NANOSECONDS);
                        if (delay <= 0)
                            return finishPoll(first);
                        if (nanos <= 0)
                            return null;
                        if (nanos < delay || leader != null)
                            nanos = available.awaitNanos(nanos);
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                long timeLeft = available.awaitNanos(delay);
                                nanos -= delay - timeLeft;
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }



线程池scheduleThreadPoolExecutors
分为几个模块 处理线程(真是去执行,复用的线程),工作队列(workdelayqueue) ,executor设置的一些策略 ,执行任务单元scheduledfututretask

1处理线程,这个倒是没有什么特别的地方,思想就是复用线程,但是说在用的时候,设置下对于有界且当前队列不能容纳时的策略handler(jdk具体提供实现ThreadPoolExecutor.AbortPolicy, ThreadPoolExecutor.CallerRunsPolicy, ThreadPoolExecutor.DiscardOldestPolicy, ThreadPoolExecutor.DiscardPolicy),之前倒是看到过对于当前工作队列不能容纳时,采纳双队列(添加预备队列,给工作队列使用,但是也有满的时候,看应用场景)
2.工作队列,workdelayqueue(里面包含子属性priorityqueue),对于虽然之前对于delayqueue有看过,但也是扫了眼。总的来说,使用delyaedqueue进行延时控制,取队列子元素在priorityqueue(存在优先级的原因)。比如dealyedqueue现在从priorityqueue取一个工作节点(如果没有,说明工作队列已经为空,肯定在一直在等待去获取节点),肯定要判断,拿出优先级最高的节点,然后使用getDelay(TimeUnit.NANOSECONDS)获取间隔时间,再按照dealyedqueue 的延时实现办法,reeentrantloack加conditiond.awaitNanos();执行完成之后(其中使用condition来进行控制,因为本身是假等待,不释放cpu的,可以相应中断等操作,这里是因为在等待的过程中,如果优先列队发生更改的话,可能要去执行其他的任务单元,那么这时候就需要停止当前等待执行,转而去执行优先级高的,则问题就是当前线程要去响应级别更高的线程,要去通知它,唤醒它,加上之前使用重入锁,正好也用上condition进行控制,一般情况下synchornized与object.notify/wait结合使用,lock与condition结合使用) ,相当于已经完成一次延时调度执行了,那么周期性执行怎么搞呢?在外面做一次判断是否为周期性,片段时间做一次计算,得出下次执行的距离时间,并添加到workqueue中,这一步其实也是在线程池执行的时候前一起执行的
3.executors的执行策略:其实就是线程池大小,类型,线程工程这些,使用的时候用的多
4.执行任务单元,schedulefuturework,可以说是具备future的特性,也可以去执行线程,callback,能周期执行,delay接口处理延时,comparable处理在有限队列中延时比较,从字面意思来看也是能知道的future的主要用于处理异步计算,有需要调用的时候获取处理结果(这个没有处理完,就去调用返回是null还是会阻塞,这个具体要看,像本身调用future是阻塞,有些框架或者中间件是返回null的),callback类似于线程,任务执行单元,但是具备有返回结果,一般结合future使用。


然后,就是对于executors每次操作都是submit或者execute,然后不管是有没有处理完成返回值,都要专门建一个future对象进行追踪,使用
CompletionService可以简化这一操作,提交和处理返回(你只需要等到有处理完future之后就行相当于等待执行不管是哪个future,当然如果你要进行细致或者每个future不一样要么你做一次判断,要么就不用这个方式)

public void run() {
        while (!end) {
            try {
                Future<String> result = service.poll(20, TimeUnit.SECONDS);
                if (result != null) {
                    String report = result.get();
                    System.out.printf("ReportReceiver: Report Received:%s\n", report);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        System.out.printf("ReportSender: End\n");
    }

对于timertask timer相当于一个管理者,里面有线程去工作(同步单线程),延时处理时使用计算出当前时间和相对时间,比较两个值,当不相等时就重复判断,然后执行,并且使用对象的notify与wait来进行通行进行队列下一个任务处理

可以参考这个博客 讲了整个运行过程非常好
http://ju.outofmemory.cn/entry/25473
讲了接口以及对应的运行情况和流程,很不错
http://shift-alt-ctrl.iteye.com/blog/1841088


代码均来自于OPEN-JDK中java.lang 和java.util.concurrent包

来源: <http://183.56.168.160:8080/blog/admin-index.do#article/article>
分享到:
评论

相关推荐

    掌握并发的钥匙:Java Executor框架深度解析

    # 掌握并发的钥匙:Java Executor框架深度解析 Java作为一种广泛应用的编程语言,自1995年由Sun Microsystems公司(现属Oracle公司)首次发布以来,已经发展成为软件开发领域的重要工具。Java的设计目标包括跨平台...

    线程池原理-ThreadPoolExecutor源码解析

    线程池原理-ThreadPoolExecutor源码解析 1.构造方法及参数 2.阻塞对列: BlockingQueue 3.线程工厂: DefaultThreadFactory 4.拒绝策略: RejectedExecutionHandler 5.执行线程 Executor

    Java 线程池详解及创建简单实例

    本文将深入解析Java线程池的工作原理,并给出创建简单实例的步骤。 线程池的核心在于`java.util.concurrent`包中的`ExecutorService`接口,它是执行任务的中心接口。`ExecutorService`扩展了`Executor`,提供了更...

    java线程池常用方法.docx

    ### Java线程池常用方法详解 #### 一、引言 自Java 5引入并发包`java.util.concurrent`以来,线程的管理与控制变得更为高效与便捷。在这个包中,`Executor`框架成为了线程管理和调度的核心。通过`Executor`,我们...

    Java并发编程利器:Executor框架深度解析与应用实践

    Java通过引入Executor框架,为并发任务的执行提供了一种高效、灵活的管理机制。本文将深入探讨Executor框架的设计哲学、核心组件,并结合实例演示如何利用这一框架提升程序的性能和响应性。 注意事项和最佳实践 合理...

    Java 线程池框架

    Java线程池框架是Java并发处理的核心工具,它允许开发者高效地管理多个并发任务,避免了频繁创建和销毁线程的开销。线程池通过维护一组可重用的线程来提高系统的性能和响应性。本文将深入探讨Java线程池的结构、常见...

    Java并发之串行线程池实例解析

    Java并发之串行线程池实例解析 Java并发之串行线程池实例解析是Java并发编程中非常重要的一部分,今天我们就来详细介绍如何实现一个串行的线程池实例。在介绍之前,首先我们需要了解什么是串行线程池。串行线程池是...

    使用Java匿名内部类实现一个简单的线程池.txt

    ### 使用Java匿名内部类实现一个简单的线程池 #### 一、引言 在现代软件开发中,线程池是管理并发任务的重要工具之一。它能够有效地控制运行中的线程数量,合理分配系统资源,避免频繁创建销毁线程带来的性能开销...

    JDK1.5线程池源码及详细注释

    在Java并发编程中,线程池(ThreadPoolExecutor)是一个至关重要的工具,它允许开发者有效地管理线程资源,提高系统的性能和响应性。JDK 1.5引入了java.util.concurrent包,其中包含了线程池的实现,使得并发编程...

    JDK线程池和Spring线程池的使用实例解析

    JDK线程池和Spring线程池的使用实例解析 JDK线程池和Spring线程池是两种常用的线程池实现,它们都提供了线程池的功能,但它们在使用和配置上有所不同。下面我们将详细介绍JDK线程池和Spring线程池的使用实例解析。 ...

    JAVA课程学习笔记.doc

    本篇学习笔记将深入解析Java线程池的框架、结构、原理以及相关源码,帮助读者全面理解线程池的工作机制。 1. 线程池模块结构 线程池框架分为多层结构,其中包括核心实现类、辅助类和接口等组件。例如,`sun.nio.ch....

    13-Java并发编程学习宝典.zip

    3. **Executor框架** - "20 其实不用造轮子—Executor框架详解-慕课专栏.html":讲解了Java的`ExecutorService`和`ThreadPoolExecutor`,这是管理和控制线程执行的重要工具,可以有效地管理线程池,提高系统性能。...

    基于Spring中的线程池和定时任务功能解析

    基于Spring中的线程池和定时任务功能解析 Spring框架提供了线程池和定时任务执行的抽象接口:TaskExecutor和TaskScheduler来支持异步执行任务和定时执行任务功能。TaskExecutor是一个抽象接口,定义了execute方法,...

    JAVA 资料集合

    这不仅包括了对Java线程池基本概念的讲解,如Executor框架,还可能涉及了如何配置和优化线程池,以适应不同的并发需求。例如,文档可能会详细介绍如何合理设置线程池大小、处理任务队列溢出、以及监控线程池的状态和...

    美团动态线程池实践思路开源项目(DynamicTp),线程池源码解析及通知告警篇.doc

    美团动态线程池实践思路开源项目(DynamicTp)线程池源码解析及通知告警篇 本文详细介绍了美团动态线程池实践思路开源项目(DynamicTp)的通知告警模块,该模块提供了多种通知告警功能,每一个通知项都可以独立配置...

    Java并发编程原理与实战

    单例问题与线程安全性深入解析.mp4 理解自旋锁,死锁与重入锁.mp4 深入理解volatile原理与使用.mp4 JDK5提供的原子类的操作以及实现原理.mp4 Lock接口认识与使用.mp4 手动实现一个可重入锁.mp4 ...

    java简单UDP通信代码详解

    本资料主要针对初学者,通过简单的代码示例来讲解如何在Java中实现UDP通信,特别是多线程通信SERVER的构建。 首先,我们需要理解UDP的基本概念。UDP不建立连接,发送数据前不需要确认对方是否在线,也不保证数据包...

    JAVA线程根据给定URL生成网页快照

    Java的`java.util.concurrent`包提供了线程池(ThreadPoolExecutor)来管理和控制并发执行的任务。我们可以通过创建一个固定大小的线程池,然后将每个URL的抓取任务提交给线程池: ```java ExecutorService ...

    hippo4j动态线程池框架 v1.5.0.zip

    hippo4j动态线程池框架是一款高效、灵活的Java线程池管理工具,主要用于优化多线程环境下的任务调度与执行。该框架的核心在于其动态调整线程池大小的能力,能够根据系统的实时负载状况,自动调整线程池的配置,以...

    java concurrent 包 详细解析

    Java并发包(java.concurrent)是Java平台中处理多线程编程的核心工具包,它提供了丰富的类和接口,使得开发者能够高效、安全地编写多线程程序。这个包的设计目标是提高并发性能,减少同步代码的复杂性,并提供高级...

Global site tag (gtag.js) - Google Analytics