`
leichenlei
  • 浏览: 128841 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

java并发(七、高级并发对象)

    博客分类:
  • java
 
阅读更多

到目前,这课程从一开就集中在java 平台的部分低级API 。这些API 适合于适合于非常基础的任务,但是高级构建需要更高级的任务。特别是在今天充分利用多核心系统上的并发应用。

在这一章,我们将考虑5.0 版本的java 平台引进的一些高级并发特点的。大多数特性是在java.util.concurrent 包下实现的。在Java 集合框架中也有新的并发数据结构。

l  Lock 对象支持的锁方案简化了并发应用。

l  Executors 定义了一个创建和管理线程的高级APIExecutor 实现在java.util.concurrent 中,提供了大规模应用的线程管理。

l  Concurrent collections 使管理大数据集合变得简单,能有效同步的需要。

l  Atomic variables 具有减少并发需要的功能,能避免内存不一致错误。

l  ThreadLocalRandom JDK 7 )提供了多线程伪随机数。

(一) 锁对象

同步代码依赖于一个简单的可重入锁。这种锁使用简单,但有很多限制。Java.util.concurrent.locks 包提供了更复杂的锁方案。我们不会详细看这个包,而是关注最基础的接口Lock

    锁对象的工作和同步代码的隐式锁很像。使用隐式锁,只能一个线程一次能够拥一个锁。锁对象也支持wait/notify 机制,通过他的Condition 对象。

    锁对象比隐式锁最大的好处是能够收回获取锁的尝试。tryLock 方法能够实现当锁不能立即使用或者超时时收回。lockInterruptibly 方法是在锁被获取之前,如果另一个线程发送一个中断时收回。

    让我们使用锁对象来解决Liveness 中的死锁问题。AlphonseGaston 已经训练他们自己去注意鞠躬。我们做出了这个改善的模型,通过要求我们的Friend 对象在继续鞠躬之前必须先获得所有参与者的锁。Safelock 是一个改善模型的源代码。为了展示这个方案的作用,我们假设AlphonseGaston 沉醉于能够安全鞠躬的新发现,以至于不停的鞠躬。

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.ReentrantLock;

import java.util.Random;

public class Safelock {

     static class Friend {

         private final String name;

         private final Lock lock = new ReentrantLock();

         public Friend(String name) {

              this.name = name;

         }

         public String getName() {

              return this.name;

         }

         public boolean impendingBow(Friend bower) {

              Boolean myLock = false;

              Boolean yourLock = false;

              try {

                   myLock = lock.tryLock();

                   yourLock = bower.lock.tryLock();

              } finally {

                   if (! (myLock && yourLock)) {

                       if (myLock) {

                            lock.unlock();

                       }

                       if (yourLock) {

                            bower.lock.unlock();

                       }

                   }

              }

              return myLock && yourLock;

         }

         public void bow(Friend bower) {

              if (impendingBow(bower)) {

                   try {

                       System.out.format("%s: %s has"+ " bowed to me!%n", this.name, bower.getName());

                       bower.bowBack(this);

                   } finally {

                       lock.unlock();

                       bower.lock.unlock();

                   }

              } else {

                   System.out.format("%s: %s started"+ " to bow to me, but saw that"

                       + " I was already bowing to"+ " him.%n",

                       this.name, bower.getName());

              }

         }

         public void bowBack(Friend bower) {

              System.out.format("%s: %s has" +" bowed back to me!%n",

                   this.name, bower.getName());

         }

     }

     static class BowLoop implements Runnable {

         private Friend bower;

         private Friend bowee;

         public BowLoop(Friend bower, Friend bowee) {

              this.bower = bower;

              this.bowee = bowee;

         }

         public void run() {

              Random random = new Random();

              for (;;) {

                   try {

                       Thread.sleep(random.nextInt(10));

                   } catch (InterruptedException e) {}

                   bowee.bow(bower);

              }

         }

     }

     public static void main(String[] args) {

         final Friend alphonse =new Friend("Alphonse");

         final Friend gaston =new Friend("Gaston");

         new Thread(new BowLoop(alphonse, gaston)).start();

         new Thread(new BowLoop(gaston, alphonse)).start();

     }

}

(二) Executors

在之前提供的所有例子中,都和新线程和任务之间有关,或者是Runnable 的对象,或者是Thread 对象。这只适用于小的应用,但不适合大规模应用,把线程管理和创建和应用分离式有意义的。这样方法封装在叫做执行器的对象里。接下俩详细介绍执行器。

·         Executor 接口定义了三种执行器对象类型。

·         Thread Pools 是最常有的一种实现。

·         Fork/Join 是一个有利于多处理器的框架(JDK7

1. Executor 接口

java.util.concurrent 包定义了三种执行器接口:

·         Executor, 一个简单的支持启动新任务的接口。

·         ExecutorServiceExecutor 的子接口,提供了管理个人任务和执行器生存周期的附加功能。

·         ScheduledExecutorService ,一个ExecutorService 的子接口,支持任务执行周期的and/or 功能。

典型的,传给executor 对的参数是三种接口类型的一种,而不是executor 类型。

1) Executor 接口

Executor 接口提供一个单独方法,execute ,给一个普通的线程创建方案提供顺序替换。如果r 是一个Runnable 的对象,eExecutor 对象。

    


(new Thread(r)).start();


替换成e.execute(r);








然而,execute 没有什么特殊的地方。一个创建新线程和立即启动它的低级方案。依赖于Executor 的实现,execute 做了同样的事情,但是,它很可能使用已经存在的工作线程去运行r ,或者将r 放到等待队列里,直到工作线程可用。(我们将在Thread Pools 中描述工作线程)。

    java.util.concurrent 中执行器的实现,重复利用了高级的ExecutorServiceScheduleExecutorService 接口,索然他们也是基于Executor 接口的。

2) ExecutorService 接口

ExecutorService 接口相似地实现execute 方法,但是有更有用的提交方法。像execute 一样,接收一个Runable 对象,但是也接收Callable 对象,它可以允许线程有返回值。它会返回Future 对象,用来取Callable 的返回值和管理CallableRunnable 任务的状态。

ExecuteService 也提供了提交Callable 对象的大集合方法,最后,ExecutorService 提供一些执行器关闭的管理方法。支持立即停止,任务正确处理中断。

3) ScheduleExecutorService 接口

ScheduleExecutorService 接口为它的父接口ExecutorService 补充了计划,可以执行一个延迟来执行Runnable 或者Callable 任务。还有,这个接口定义了scheduleAtFixedRatescheduleWithFixedDelay ,可以按照定义间隔重复执行指定任务。

2. 线程池

Java.util.concurrent 中的大多数执行器实现使用由工作线程组成的线程池。这种线程分别来自于RunnableCallable ,他们经常执行联合任务。

    使用工作线程,减小了线程创建的开销。线程对象大量内存,正在大规模应用中,分配和销毁线程对象会消耗大量的内存管理开销。

一个普遍使用的线程池是固定线程池(fixed thread pool )。这种类型的线程池有运行固定的线程数;如果正在使用的线程由于某种原因终止,会自动替换成一个新的线程。任务经过一个内存队列提交,当任务数大于线程数,队列保存多出的任务。

固定线程池的一个重要优势是使用它是优雅降低(degrade gracefully )。要理解它,想象一下web 服务器每个HTTP 请求使用一个单独的线程。如果应用简单的为每个HTTP 请求创建一个线程,系统接受的线程数比它能够处理的多,这样系统将停止响应所有请求,当这些线程的开销超出了系统处理能力。给线程的创建加个限制,应用不会立即处理所有请求,但是会再能力范围内立即处理。

·         使用创建固定线程池创建执行器的简单方法是调用在java.util.concurrent.Executors 中的newFixedThreadPool 工厂方法,这个类也提供下列工厂方法:

·         NewCachedThreadPool 的方法创建一个可扩展线程池的执行器。它适合于启动很多短声明周期任务的应用。

·         NewSingleThreadExecutor 方法创建单线程执行器。

·         还有几个工厂方法是上面执行器的ScheduledExecutorService 版本。

如果上面的工厂方法不能满足你的需要,java.util.concurrent.ThreadPoolExecutor 或者java.util.concurrent.ThreadPoolExecutor 将给你提供附加功能。

3. Fork/Join

Java SE 7 中的新特性,fork/join 框架是一个ExecutorService 接口的实现,帮你利用多处理器(系统)。它可以强行进入较小的递归快。目的是使用所有可用的处理能力去增加你的应用的性能。

和任何ExecutorService 一样,fork/join 把任务分配给线程池里的工作线程。Fork/join 框架的不同是使用工作- 抢断(work-stealing )算法。工作线程可以抢断其他正在工作线程的任务。

Fork/join 框架的核心是ForkJoinPool 类,AbstractExecutorService 的一个扩展。ForkJoinPool 实现了work-stealing 算法能够执行ForkJoinTasks

1)   基本应用

Fork/join 的使用很简单。第一步是写代码执行一个工作的一部分,你的代码可能如下:

if (my portion of the work is small enough)

do the work directly

else

split my work into two pieces

invoke the two pieces and wait for the results

 

包装这段代码做为ForkJoinTask 的子类,通常做为更专业的类型RecursiceTask( 返回结果) 或者RecursiveAction

你的ForkJoinTask 准备好之后,创建所有工程完成的表示和把他传给ForkJoinPool 实例的invoke 方法

2) 模糊到清晰

为了帮助你理解Fork/join 框架如何工作,看一个简单的例子。假设你想让一个图像模糊。整数数组代表原始图像,一个整数代表一个像素的颜色值。被模糊之后的图像也是同样大小的整数数组。

通过一个改变一个像素来完整模糊。每个像素变成它周围像素的平均值(红,绿,蓝部分被平均),结构放到目的数组中。这是一个可能的实现。

public class ForkBlur extends RecursiveAction {

     private int[] mSource;

     private int mStart;

     private int mLength;

     private int[] mDestination;

     // Processing window size, should be odd.

     private int mBlurWidth = 15;

     public ForkBlur(int[] src, int start, int length, int[] dst) {

         mSource = src;

         mStart = start;

         mLength = length;

         mDestination = dst;

     }

     protected void computeDirectly() {

         int sidePixels = (mBlurWidth - 1) / 2;

         for (int index = mStart; index < mStart + mLength; index++) {

              // Calculate average.

              float rt = 0, gt = 0, bt = 0;

              for (int mi = -sidePixels; mi <= sidePixels; mi++) {

                   int mindex = Math.min(Math.max(mi + index, 0), mSource.length - 1);

                   int pixel = mSource[mindex];

                   rt += (float)((pixel & 0x00ff0000) >> 16) / mBlurWidth;

                   gt += (float)((pixel & 0x0000ff00) >>  8) / mBlurWidth;

                   bt += (float)((pixel & 0x000000ff) >>  0) / mBlurWidth;

              }

              // Re-assemble destination pixel.

              int dpixel = (0xff000000    ) | (((int)rt) << 16) | (((int)gt) <<  8) | (((int)bt) <<  0);

              mDestination[index] = dpixel;

         }

     }

  ...

现在,你实现了抽象compute() 方法,或者直接执行blur 或者分成两个小的任务。一个简单的数组长度阀值,决定直接执行还是分段执行。

protected static int sThreshold = 100000;

protected void compute() {

     if (mLength < sThreshold) {

         computeDirectly();

         return;

     }

     int split = mLength / 2;

     invokeAll(new ForkBlur(mSource, mStart, split, mDestination),

     new ForkBlur(mSource, mStart + split, mLength - split, mDestination)) ;

}

如果前面的方法在RecursiveAction 类的子类里,设置在ForkJoinPool 中运行时简单的。

创建一个代表所有工作完成的任务。

// source image pixels are in src

// destination image pixels are in dst

ForkBlur fb = new ForkBlur(src, 0, src.length, dst);

Create the ForkJoinPool that will run the task.

ForkJoinPool pool = new ForkJoinPool();

Run the task.

pool.invoke(fb);

完整的代码,包括在windows 中展示原图片到目标图片的扩展代码,看ForkBlur 类。

(三) 并发集合

Java.util.concurrent 包包括一些附加java 框架集合。大多数可以通过接口分类:

·         BlockingQueue 定义了一个先进先出数据结构,当你向一个慢队列中添加或者检索一个空队列,它会阻塞或者超时

·         ConcurrentMap java.util.Map 的子接口,定义了很有用的原子操作。这些操作移除或替换一个key-vlaue ,如果key 存在,或者添加一个key-value 如果key 不存在。标记这些操作是原子的,避免使用同步。ConcurrentMap 的标准多用途实现是ConcurrentHashMap ,是HashMap 的并发模式。

·         ConcurrentNavigableMap ConcurrentMap 的子接口,它支持近似匹配。ConcurrentNavigableMap 的标准一般用途实现是ConcurrentSkipListMap ,它是TreeMap 的并发模式。

所有这些结合避免内存不一致,通过在添加元素操作和以后的移除或者访问操作之间建立happens-before 关系。

(四) 原子变量

java.util.concurrent.atomic 包定义了对单个变量的原子操作。所有类有getset 方法,像对volatile 变量读写一样工作。就是说,set 操作有happens-before 关系和后面的对同变量的get 操作。原子的compareAndSet 方法也有内存一致功能,就像使用简单的原子算法、应用于integer 原子变量的方法。

为了看这个包的使用,让我们回到最初我们演示线程冲突的Counter 类:

class Counter {

    private int c = 0;

    public void increment() {

        c++;

    }

    public void decrement() {

         c--;

    }

    public int value() {

        return c;

    }

}

使Counter 避免线程冲突的一个方法是,给它的方法加同步,看 SynchronizedCounter

 

class SynchronizedCounter {

    private int c = 0;

    public synchronized void increment() {

        c++;

    }

    public synchronized void decrement() {

        c--;

    }

    public synchronized int value() {

        return c;

    }

}

这个简单的类,同步是可接受的解决办法。但是对于更复杂的类,我们想要避免同步带来的对活跃性的影响。将域替换成AtomicInteger ,不用同步来避免线程冲突,正如 AtomicCounter

import java.util.concurrent.atomic.AtomicInteger;

class AtomicCounter {

    private AtomicInteger c = new AtomicInteger(0);

    public void increment() {

        c.incrementAndGet();

    }

    public void decrement() {

        c.decrementAndGet();

    }

    public int value() {

        return c.get();

    }

}

(五) 并发随机数

JDK7 中,java.util.concurrent 包括一个方便的类,ThreadLocalRandom ,应用于多线程或者ForkJoinTasks 中使用随机数的工程。

    对于并发访问,用ThreadLocalRandom 代替Math.random() 会减少竞争,性能更好。

你要做的只是调用ThreadLocalRandom.current() ,然后调用它的方法返回随机数,这是一个列子:

int r = ThreadLocalRandom.current() .nextInt(4, 77);





 

 

 

进一步阅读

Concurrent Programming in Java : Design Principles and Pattern (2nd Edition) 》, Doug Lea. 一个权威专家, 也是java 平台并发框架的架构师。

Java Concurrency in Practice 》 ,Brian Goetz, Tim Peierls, Joshua Bloch, Joseph Bowbeer, David Holmes, and Doug Lea. 一个适合初学者的指导手册。

Effective Java Programming Language Guide (2nd Edition) 》,Joshua Bloch. Though this is a general programming guide 尽管这是一个程序设计入门, 但是它每章都含有并发程序的最好的例子。

Concurrency: State Models & Java Programs (2nd Edition) 》, by Jeff Magee and Jeff Kramer. 通过建模和实际例子介绍并发程序设计

Java Concurrent Animated : 展示并发特点的动画片。

 

分享到:
评论

相关推荐

    java并发编程实战(英文版)

    ### Java并发编程实战知识点概述 #### 一、Java并发特性详解 在《Java并发编程实战》这本书中,作者深入浅出地介绍了Java 5.0和Java 6中新增的并发特性。这些特性旨在帮助开发者更高效、安全地编写多线程程序。书中...

    java并发编程

    Java并发编程是Java开发者必须掌握的关键技能之一,它涉及到如何在多线程环境中高效、安全地执行程序。并发编程能够充分利用多核处理器的计算能力,提高应用程序的响应速度和整体性能。《Java编程并发实战》这本书是...

    Java并发编程实战

    第1章 简介 1.1 并发简史 1.2 线程的优势 ...第四部分 高级主题 第13章 显式锁 第14章 构建自定义的同步工具 第15章 原子变量与非阻塞同步机制 第16章 Java内存模型 附录A 并发性标注 参考文献

    java并发实战中文文档

    通过阅读《Java并发实战》,开发者不仅可以掌握Java并发编程的基础,还能学习到高级技巧和最佳实践,从而在实际项目中更好地利用并发提高程序性能。这个高清且带有目录的文档,无疑将为学习和工作带来极大的便利。

    java并发库高级应用源码--张孝祥

    在《java并发库高级应用源码--张孝祥》中,我们将会深入探讨Java中的线程管理和并发控制策略,这对于我们理解和优化多线程程序至关重要。 首先,Java中的`Thread`类是实现并发的基础,它代表了一个独立的执行线程。...

    JAVA并发编程实践 中文 高清 带书签 完整版 Doug Lea .pdf

    根据提供的文件信息,“JAVA并发编程实践 中文 高清 带书签 完整版 Doug Lea .pdf”,我们可以推断出这份文档主要聚焦于Java并发编程的技术实践与理论探讨。下面将从多个角度来解析这个文档可能涵盖的关键知识点。 ...

    Java并发编程从入门到精通源码.rar

    此外,Java并发包`java.util.concurrent`提供了丰富的高级并发工具,如`ExecutorService`、`Future`、`Callable`、`Semaphore`、`CyclicBarrier`等。这些工具可以帮助开发者更高效、安全地管理线程,实现线程池、...

    java高级并发编程32例

    "java高级并发编程32例"这个主题涵盖了Java并发机制的深入理解和实践,旨在帮助Java开发者,尤其是那些追求架构师职位的人员,提升他们在并发编程方面的专业素养。 并发是指一个系统能够同时处理多个任务或执行流的...

    Java并发编程学习笔记

    Java并发编程是指在Java语言中编写多线程和多任务执行的程序,以便更高效地利用计算机的多核处理器资源。并发编程是Java高级编程技能中的重要组成部分,尤其是在需要处理大量数据、提供快速响应、实现高吞吐量和高可...

    java 并发编程实践

    在Java编程领域,并发编程是一项核心...以上内容涵盖了Java并发编程的多个关键点,从基础知识到高级技巧,希望对你理解和应用Java并发编程有所帮助。通过深入学习和实践,你将能够有效地编写出高效、可靠的并发程序。

    JAVA并发编程实践

    根据给定文件的信息“JAVA并发编程实践”以及其描述为“Java并发学习资料”,我们可以从中提炼出关于Java并发编程的一些核心知识点。Java并发编程是Java高级特性之一,它允许开发者编写能够同时执行多个任务的程序,...

    Java并发编程实践 高清扫描版

    总的来说,《Java并发编程实践》涵盖了Java并发编程的各个方面,从基础到高级,从理论到实践,是提升Java并发编程能力的必备读物。通过学习这本书,开发者可以更好地应对并发挑战,编写出高效、可靠的多线程程序。

    java并发编程实践

    ### Java并发编程实践知识点详解 #### 一、Java并发编程基础 ##### 1.1 并发与并行概念区分 在Java并发编程实践中,首先需要理解“并发”与“并行”的区别。“并发”指的是多个任务同时进行,但实际上可能是在多...

    java并发编程艺术

    《Java并发编程艺术》这本书是Java开发者深入理解多线程编程的重要参考资料。它全面而深入地探讨了Java平台上的并发编程技术,对于提升程序性能、优化系统资源利用以及解决多线程环境中的复杂问题有着极大的帮助。...

    Java并发机制的底层实现原理.pdf

    Java并发机制的底层实现原理涉及到多个方面,包括了本地内存与线程安全的问题、volatile关键字的使用、synchronized关键字的原理以及Java并发在处理器层面是如何实现的。通过这些机制,Java能够有效地管理多线程环境...

    JAVA并发编程实战.pdf

    根据提供的文件信息:“JAVA并发编程实战.pdf”,我们可以深入探讨与Java并发编程相关的多个核心知识点。 ### Java并发编程基础 #### 1. 并发与并行 - **并发(Concurrency)**:指一个程序中存在多个执行序列(如...

    java并发编程实践(第一版)

    Java还提供了一些高级并发对象,如AtomicInteger、AtomicReference等原子类,它们提供了不可分割的读写操作,保证了操作的原子性。在高并发情况下,这些原子类比传统的锁机制更加轻量级和高效。 ### 并发编程中的...

    java并发编程分享

    Java并发编程是Java开发中的重要领域,它涉及到多线程、同步机制、线程池以及并发集合等核心概念。在Java中,并发编程是提高系统性能和资源利用率的关键技术,尤其是在处理大量I/O操作或者计算密集型任务时。本文将...

    JAVA并发编程实践.pdf

    ### Java并发高级特性 #### 1. Future和Callable - **Future**:代表异步计算的结果。 - **FutureTask**:包装Callable或Runnable的Future实现。 #### 2. CountDownLatch和CyclicBarrier - **CountDownLatch**:...

Global site tag (gtag.js) - Google Analytics