`

《并发编程实践》读书笔记(2)

    博客分类:
  • Book
阅读更多
前一篇

构建并发应用程序

两种实现

所有线程在单一线程中顺序执行, 会产生糟糕的响应性和吞吐量; 每任务每线程会给资源管理带来麻烦.

Excutor基于生产者-消费者模式, 提交任务的执行者是生产者, 执行任务的线程是消费者, 如果要在你的程序中实现一个生产者-消费者的设计, 使用Excutor通常是最简单的方式.

两种实现方式:
每线程每任务
public class ThreadPerTaskExecutor implements Executor {
    public void execute(Runnable command) {
        new Thread(command).start();
    }
}

一个线程所有任务:
public class WithinThreadExecutor implements Executor {
    public void execute(Runnable command) {
        command.run();
    }
}


线程池
Executors有很多静态的工厂方法:
newFixedThreadPool创建一个定长的线城市, 每当提交一个任务就创建一个线程, 直到达到池的最大长度, 这时线程池会保持长度不再变化.

newCachedThreadPool 创建一个可缓存的线程池, 如果当前线程的长度超过了处理的需要时, 它可以灵活的回收空闲的线程, 当需求增加时, 它可以灵活的增加新的线程, 而并不会对池的长度做任何限制.

newSingleThreadExecutor 创建一个单线程化的executor, 它只创建唯一的工作者线程来执行任务, 如果这个线程异常结束, 会有另一个取代它, 但是任务会保存在一个queue中等待执行.

newScheduleThreadPool 创建一个定长的线程池, 而且支持定时的以及周期性执行任务, 类似timer.

Executor生命周期
作为Executor的继承者ExecutorService有三种状态: running, shuting down, terminated.
创建之后是running.
调用shutdown, 将停止接受新的任务, 同时等待已经提交的任务完成, 包括尚未完成的任务
调用showdownNow会启动一个强制的关闭过程, 尝试取消所有运行中的任务和排在队列中尚未开始的任务.

对于关闭后提交到ExecutorService中的任务, 会被rejected execution handler处理.

所有任务完成之后, 进入terminated状态, 可以调用awaitTermination等待ExecutorService到达终止状态, 也可以轮询检查isTerminated判断是否终止. 通常shutdown会紧随awaitTermination之后, 这样可以产生同步地关闭ExecutorService的效果.

Timer只会创建一个线程来执行所有task, 如果一个task非常耗时, 会导致其他的task的实效准确性出问题. 而Executors.newScheduledThreadPool()创建的ScheduledThreadPoolExecutor则不会有这样的问题

Timer的另一个问题是, 如果TimeTask抛出未检查异常, 将导致不可预料的行为. 因为Timer并不捕获异常.在JDK5.之后几乎没有必要使用Timer了.

可以使用DelayQueue来实现自己的调度服务, 它使BlockingQueue的一种实现. 其内部包含了一个Delayed对象的容器. Delayed是一种延迟时间对象, 只有元素过期后,它才会让你执行take获取元素(这个元素实现了Delayed接口.)

在给多个工作者划分相异任务时, 各个任务的大小可能完全不同, 比如给两个工作者划分了任务a和任务b, 但是a执行花费的时间是b执行时间的10倍, 那么整个过程仅仅加速了9%而已. 最后多个工作者之间划分任务, 总会涉及到一些任务协调上的开销, 为了使得划分任务是值得的, 这一开销不能多于通过并行性带来的生产力的提高.

将大量相互独立且同类的任务进行并发的处理, 会将程序的任务量分配到不同的任务中, 这样才能真正获得性能上的提升.

CompletionService用来将Executor与BlockingQueue进行结合, 将Callable任务提交给它执行, 然后使用类似队列中的take和poll在结果完整时获得这个结果.

ExecutorCompletionService在构造函数中创建一个BlockingQueue用来保存结果, 在计算完成时会调用FutureTask的done()方法, 而用来包装任务的QueueFuture会复写done方法, 用来将结果置入BlockingQueue.
public class CompletionRenderer {
    class ImageInfo {
        public ImageData downloadImage() {
            return null;
        }
    }

    class ImageData {
    }

    private final ExecutorService executor = Executors.newCachedThreadPool();

    void renderPage(String source) {
        final List<ImageInfo> imageInfos = scanForImageInfo(source);
        CompletionService<ImageData> service = new ExecutorCompletionService<ImageData>(executor);

        for (final ImageInfo imageInfo : imageInfos) {
            service.submit(new Callable<ImageData>() {
                public ImageData call() throws Exception {
                    return imageInfo.downloadImage();
                }
            });
        }

        renderText(source);

        for (int i = 0; i < imageInfos.size(); i++) {
            Future<ImageData> f;
            try {
                f = service.take();
                ImageData imageData = f.get();
                renderImage(imageData);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }

    private void renderImage(ImageData imageData) {
    }

    private void renderText(String source) {
    }

    private List<ImageInfo> scanForImageInfo(String source) {
        return null;
    }

}


还有另外一种方式通过invokeAll将多个任务提交到一个ExecutorService中, 并在指定的时间内执行完成, 超过时限后, 任务要么正常完成,  要么被取消. 可以通过get和isCancelled来查明属于那一种情况.

任务的取消
在多任务协作机制中, 有一种会通过设置取消标志, 任务会定期查看: 如果发现标志被设置过, 任务就会提前结束.
这里有一个例子:
public class PrimeGenerator implements Runnable {
    private final List<BigInteger> primes = new ArrayList<BigInteger>();
    private volatile boolean cancelled;
    @Override
    public void run() {
        BigInteger p = BigInteger.ONE;
        while (!cancelled) {
            p = p.nextProbablePrime();
            synchronized (this) {
                primes.add(p);
            }
        }
    }

    public void cancel() {
        this.cancelled = true;
    }

    public synchronized List<BigInteger> get() {
        return new ArrayList<BigInteger>(primes);
    }

    public static void main(String[] args) throws Exception {
        PrimeGenerator g = new PrimeGenerator();
        new Thread(g).start();
        try {
            Thread.sleep(10);
        } finally {
            g.cancel();
        }
        System.out.println(g.get());
    }
}


对中断的理解应该是:它并不会真正中断一个正在运行的线程; 它仅仅只是发送中断请求.

中断通常是实现取消最明智的选择.
中断结合BlockingQueue的例子:
public class PrimeProducer extends Thread {
    private final BlockingQueue<BigInteger> queue;

    public PrimeProducer(BlockingQueue<BigInteger> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {

        BigInteger p = BigInteger.ONE;
        while (!Thread.currentThread().isInterrupted()) {
            queue.put(p.nextProbablePrime());
        }
        } catch (InterruptedException e) {

        }
    }

    public void cancel() {
        interrupt();
    }
}


如果一个方法需要处理一批任务, 并在所有任务结束前不返回, 那么它可以通过使用私有的Executor来简化服务的生命周期, 其中Executor的寿命限定在该方法中:
    boolean checkMail(Set<String> hosts, long timeout, TimeUnit unit) throws InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();
        final AtomicBoolean hasNewMail = new AtomicBoolean(false); // 使用Atmic去掉volatile是因为内部Runnable访问hasNewMail标识, 那么必须是final的, 这样才能避免被修改
        try {
            for (final String host : hosts) {
                exec.execute(new Runnable() {
                    @Override
                    public void run() {
                        if (checkMail(host)) {
                            hasNewMail.set(true);
                        }
                    }

                });
            }
        } finally {
            exec.shutdown(); //关闭
            exec.awaitTermination(timeout, unit); //等待结束
        }
        return hasNewMail.get();
    }


Executor的shutdownNow方法强行关闭一个ExecutorService时, 它试图取消正在进行的任务, 并返回那些已经提交, 但未开始的任务清单, 这样这些任务可以通过日志存起来, 或者存起来等待进一步处理. 注意, shutdownNow返回的Runnable的对象可能并不是提交给ExecutorService的相同的对象: 它们可能是经过保障的已提交的任务的实例.

线程池
当任务是同类的, 独立的时候, 线程池才会有最佳的工作表现.

大多数平台类库中的阻塞方法, 都同时有限时的和非限时的两个版本, 比如Thread.join, BlockingQueue.put, CountDownLatch.await, Selector.select. 如果出现超等待, 你可以把任务标识为失败, 中止它或者把它重新放回队列, 准备之后执行.

如果线程池频频被阻塞的任务充满, 它同样也可能是池太小的一个信号.

对于计算密集的任务, 一个有N个处理器的系统通常需要有N+1个线程的线程池来获得最优的利用率(当计算时出现一个页错误或者其他原因而暂停, 刚好有一个额外的线程, 可以确保这种情况下CPU周期不会中断工作)

ThreadPoolExecutor
核心池大小, 最大池大小, 存活时间共同管理着线程的创建与销毁. 即使没有任务执行, 池的大小也等于核心池的大小, 并且直到工作队列充满前, 池都不会创建更多的线程. 最大池大小是可以同时活动的线程的上限, 如果一个线程已经闲置的时间超过了存活时间, 它将成为一个被收回的候选者, 如果当前的池的大小超过了核心池的大小, 线程会终止该候选者.

newFixedThreadPool工厂为请求的池设置了核心池的大小和最大池的大小, 而且它永远不会超时;
newCachedThreadPool工厂将最大池的大小设置为Integer.MAX_VALUE, 核心池的大小设置为0, 超时时间设置为1分钟, 这样创建出来的无限扩大的线程池会在需求量减少的情况下减少线程数量.

newFixedThreadPool和newSingleThreadExecutor默认使用的是一个无限的LinkedBlockingQueue, 如果所有的工作者线程都处于忙碌状态, 任务将会在队列中等候, 如果任务持续地到达, 超过了它被执行的速度, 队列会无限地增加.

对于庞大或者无限的池, 你可以使用SynchronousQueue, 完全绕开队列, 将任务直接从生产者移交到工作者线程, 因为SynchronousQueue并不是一个真正的队列, 而是一种管理直接在线程间移交信息的机制.

只有当池的大小是无限的, 或者可以接受任务被拒绝, SynchronousQueue才是一个有实际价值的选择, newCachedThreadPool工厂就是用了SynchronousQueue.

只有当任务彼此独立时, 才能使有限线程池或者有限工作队列的使用是合理的. 倘若任务之间是互相依赖, 有限的线程池就有可能引起线程饥饿死锁; 使用一个无限的池配置可以避免这类问题, 就像newCachedThreadPool所作的.

饱和策略
默认的终止(abort)策略会引起execute抛出RejectedExecutionException: 调用者可以捕获这个隐藏然后编写满足自己需求的处理代码
当最新提交的任务不能进入队列等待执行时, 遗弃(discard)策略会默认放弃这个任务.
遗弃最旧(discard-oldest)策略选择丢弃的任务是本应该接下来就应该执行的任务, 该策略还会尝试去重新提交新任务.
调用者运行(caller-runs)策略的实现方式, 既不会丢弃哪个任务, 也不会抛出任何异常. 它会把一些任务退回到调用者那里, 从此缓解新任务流. 他不会在池线程中执行最新提交的任务, 但是他会在一个调用了execute的线程中执行.
当工作队列充满后, 并没有预置的饱和策略来阻塞execute. 但是, 使用Semaphore信号量可以实现这个效果. Semaphore会限制任务注入率.
public class BoundedExecutor {
    private final Executor exec;
    private final Semaphore semahpore;

    public BoundedExecutor(Executor exec, int bound) {
        super();
        this.exec = exec;
        this.semahpore = new Semaphore(bound);
    }

    public void submitTask(final Runnable command) throws InterruptedException {
        semahpore.acquire();
        try {
            exec.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        command.run();
                    } finally {
                        semahpore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            semahpore.release();
        }
    }
}


定制ThreadPoolExecutor
大多数通过狗展函数传给ThreadPoolExecutor的参数(核心池大小, 最大池大小, 活动时间, 拒绝处理器) 都可以在创建之后通过setter进行设置, 但是如果通过Executors的unconfigableExecutorService的工厂方法得到一个现有的ExecutorService, 并对它进行了包装, 它只暴露出ExecutoeService的方法, 因此不能进行进一步的配置. 如果你将ExecutorService暴露给你不信任的代码, 不希望它被修改, 可以用一个unconfigableExecutorService包装它.

扩展ThreadPoolExecutor
ThreadPoolExecutor提供了一个钩子方法(afterExecute, beforeExecute, terminated), 无论任务是正常地从run返回, 还似乎抛出一个异常, afterExecute都会被调用, 如果抛出一个Error则不会, 如果任务抛出一个RuntimeException, 任务不会被执行, afterExecute也不会被调用. terminated钩子会在县城池完成关闭动作后被掉哦了给, 也就是当所有任务已完成并且所有工作者线程也已经关闭后, 会执行, 这样可以用来释放Executor在生命周期中分配的一些资源, 还可以发出通知, 记录日志或者完成统计信息.
一个对执行时间进行log的例子:
public class TimingThreadPool extends ThreadPoolExecutor {
    private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
    private final Logger log = Logger.getLogger("TimingThreadPool");
    private final AtomicLong numTask = new AtomicLong();
    private final AtomicLong totalTime = new AtomicLong();

    public TimingThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        log.fine(String.format("Thread %s: start %s", t, r));
        startTime.set(System.nanoTime());
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        try {
            long endTime = System.nanoTime();
            long taskTime = endTime - startTime.get();
            totalTime.addAndGet(taskTime);
            numTask.incrementAndGet();
        } finally {
            super.afterExecute(r, t);
        }
    }

    @Override
    protected void terminated() {
        try {
            log.info(String.format("Terminated: avg time=%dns", totalTime.get() / numTask.get()));
        }finally {
            super.terminated();
        }
    }
}


并行递归算法
当每个迭代彼此独立, 并且完成循环体中每个迭代的工作, 意义都足够重大, 足以弥补管理一个新任务的开销时, 这个顺序循环是适合并行化的.
将串行执行转换成并行执行:
    void processSequentially(List<Element> element) {
        for (Element e : element) {
            process(e);
        }
    }

    void processInParallel(Executor exec, List<Element> elements) {
        for (final Element e : elements) {
            exec.execute(new Runnable() {
                @Override
                public void run() {
                    process(e);
                }
            });
        }
        exec.invokeAll(null); // 如果需要等到所有处理结束之后才能继续执行可以加上这句
    }

将顺序递归转换成并行递归:
    public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results) {
        for (Node<T> node : nodes) {
            results.add(node.compute());
            sequentialRecursive(node.getChildren(), results);
        }
    }

    public <T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results) {
        for (final Node<T> node : nodes) {
            exec.execute(new Runnable() {
                @Override
                public void run() {
                    results.add(node.compute());
                }
            });
            parallelRecursive(exec, nodes, results);
        }
    }

    public <T> Collection<T> getParallelResult(List<Node<T>> nodes) throws InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();
        Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
        parallelRecursive(exec, nodes, resultQueue);
        exec.shutdown();
        exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
        return resultQueue;
    }


笔记3
分享到:
评论
1 楼 guozhigang 2011-09-29  
通常shutdown会紧随awaitTermination之后,应该改成 shutdown 后面紧随 awaitTermination吧。

相关推荐

    java并发编程实践pdf笔记

    Java并发编程实践是Java开发中不可或缺的一个领域,它涉及到如何高效、正确地处理多线程环境中的任务。这本书的读书笔记涵盖了多个关键知识点,旨在帮助读者深入理解Java并发编程的核心概念。 1. **线程和进程的...

    [原]Java并发编程实践-读书笔记

    《Java并发编程实践》这本书是Java开发者深入理解并发编程的重要参考。以下是对书中关键知识点的总结: 1. **线程和进程的区别** - **线程**:是程序执行的最小单位,一个进程中可以有多个线程,它们共享同一块...

    读书笔记-Java并发编程实战-基础篇

    在Java并发编程中,数据的封装与访问控制、线程安全性的考量、同步...以上知识点是从提供的文件内容中提取出的,旨在帮助理解Java并发编程的基础概念和技术实践。掌握这些概念对于开发高效且可靠的并发程序至关重要。

    JUC并发编程学习笔记(硅谷)

    Java并发编程是Java开发中的重要领域,特别是在大型分布式系统或者高并发应用中,对线程安全和性能优化的理解与...源码分析部分可能包含了一些并发编程的实例代码,通过阅读和实践这些代码,能进一步加深对JUC的理解。

    GoLang学习资源_学习笔记和并发编程实战

    通过阅读"Go语言学习笔记",你可以建立起对Go语言全面的认知,然后借助"Go并发编程实战"来深化对并发编程的理解,结合实际编写示例程序,将理论知识转化为实践经验。 在学习过程中,建议先从基础语法开始,掌握变量...

    java编程思想读书笔记

    这份读书笔记记录了读者在研读此书过程中的理解和体会,涵盖了从基础语法到高级特性的全面解析。以下是笔记中可能涉及的一些关键知识点: 1. **Java语言简介**:Java是一种跨平台的、面向对象的编程语言,由Sun ...

    实战Java高并发程序设计-试读

    《实战Java高并发程序设计》是一本专注于Java并发编程实践的书籍,试读版提供了前两章的内容,为读者提供了一个初步了解并发编程基础的窗口。在Java领域,并发编程是构建高性能、高效率系统的关键技术,对于软件开发...

    java并发源码集合-learnJava:《java8》、《java并发编程艺术》读书笔记、练习,jdk集合包源码阅读、练习

    这个名为"java并发源码集合-learnJava"的资源是针对深入理解Java并发编程和集合包源码的一份宝贵资料,主要涵盖了《Java 8》、《Java并发编程艺术》两本书的读书笔记和实践练习。通过学习这些内容,开发者可以更好地...

    王者归来之Thinking in java读书笔记

    通过阅读《王者归来之Thinking in Java读书笔记》,你可以系统地掌握Java编程的核心知识,理解编程思想,提高解决问题的能力。无论是初学者还是经验丰富的开发者,都能从中受益匪浅。这本书不仅提供了理论知识,还有...

    A Note about 《C++ 并发编程实战》.zip

    《C++ 并发编程实战》是一本深入探讨C++多线程和并发...通过阅读《C++ 并发编程实战》,读者不仅可以掌握C++并发编程的基本概念和工具,还能了解到实际应用中的最佳实践和陷阱,提升编写高效、安全的多线程程序的能力。

    编程笔记文案

    在这样的文档中,作者可能按照不同的主题进行了分类,比如有单独的章节分别讲述Python的数据结构、Java的并发编程、JavaScript的DOM操作,或者关于数据库查询语言SQL的使用心得。 总的来说,这份“编程笔记文案”是...

    Java Concurrency In Practice Learning Note

    《Java并发编程实践学习笔记》是一份深入探讨Java并发编程的资源,主要涵盖了在实际开发中如何有效地管理和利用多线程。这篇博文链接虽然没有提供具体的内容,但从标题来看,我们可以推断它会讨论Java并发编程的最佳...

    Golang 笔记 第四版 高清版.zip

    笔记会深入讨论goroutines和channels,这是Go实现并发编程的主要工具。goroutine是一种轻量级线程,而channel则用于goroutine间的通信,这种机制被称为“ CSP (Communicating Sequential Processes)”模型,它使得...

    The Art of Multiprocessor Programming 读书笔记

    《多处理器编程的艺术》是一本深度剖析并发编程原理与实践的优秀著作。通过对共享内存同步、互斥机制、并发数据结构以及多核硬件特性的全面解读,读者可以系统地掌握多线程编程的核心知识,提升在复杂多核环境下的...

    vxworks读书笔记

    这份读书笔记虽然笼统,但涉及了VxWorks的核心特性和开发中的重要概念。对于熟悉和使用VxWorks的工程师来说,这些知识点是理解和驾驭这个操作系统的基础。通过深入学习和实践,开发者能够更好地利用VxWorks的强大...

    mldn学习笔记 — 网络编程

    《mldn学习笔记——网络编程》这篇博客主要探讨了计算机网络编程的相关概念和技术,结合提供的文件《215_网络编程.pdf》,我们可以深入学习这一主题。网络编程是IT领域中的核心部分,它涉及到如何通过网络进行数据...

    网络编程(学习笔记)

    "学习笔记"文件可能包含了关于以上概念的详细解释、实例代码和练习题,这些都是深入理解网络编程的关键。你应该仔细阅读并尝试自己编写简单的客户端和服务端程序,加深理论知识的理解,并通过实践提升技能。 总结来...

    Qt学习全程笔记—版权所有

    QT.doc和QT.docx是笔记文本,详细记录了每个主题的学习要点和实践心得,而QT.ppt则包含了讲课的幻灯片,图文并茂地展示了关键概念和技术。 通过这份资料,无论是对Qt的初学者还是有一定经验的开发者,都能从中受益...

    个人学习JUC代码笔记总集

    这个压缩包文件“个人学习JUC代码笔记总集”显然是一个个人的学习资源,记录了对JUC组件的理解和应用实例,特别适合已经有一定Java基础,想要深入学习并发编程的开发者。 JUC的主要目标是简化并发编程,提高多线程...

Global site tag (gtag.js) - Google Analytics