`

java线程新特性

 
阅读更多

ExecutorService

package thread;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

public class ExecutorsServiceThread {

    public static void main(String[] args) {
        //ExecutorService pool = Executors.newFixedThreadPool(2);//固定大小的线程池
        //ExecutorService pool = Executors.newSingleThreadExecutor();//单个工作线程,以上两个线程池都是固定大小的,添加线程超过大小限制后就会自动增加到队列中等待,一旦有线程池中有完成的线程,则排队等待的线程就会入池执行
        //ExecutorService pool = Executors.newCachedThreadPool();
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
        //ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor();//单任务延迟线程池
        //ThreadPoolExecutor pool2=new ThreadPoolExecutor(corePoolSize, //池中保存的线程数,包括空闲线程
        //maximumPoolSize,//池中允许的最大线程数
        //keepAliveTime, //当线程数大于核心时,此为终止前 多余的空闲线程等待新任务的最长时间。
        //TimeUnit.DAYS, //时间的单位
        //workQueue);//任务队列
        BlockingQueue<Runnable> block = new ArrayBlockingQueue<Runnable>(20);
        ThreadPoolExecutor pool3 = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, block);
        Thread t1 = new ExecutorsThread("t1");
        Thread t2 = new ExecutorsThread("t2");
        Thread t3 = new ExecutorsThread("t3");
        Thread t4 = new ExecutorsThread("t4");
        Thread t5 = new ExecutorsThread("t5");
        pool.execute(t1);
        pool.execute(t2);
        pool.execute(t1);
        pool.schedule(t5, 10, TimeUnit.SECONDS);//参数:要执行的任务,从现在开始延迟的时间,时间的单位
        pool.shutdown();
        pool3.execute(t1);
        pool3.execute(t2);
        pool3.execute(t3);
        pool3.execute(t4);
        pool3.shutdown();
//        ThreadFactory factory=new ThreadFactory() {
//
//            @Override
//            public Thread newThread(Runnable r) {
//                return new Thread(r);
//            }
//        };
//        factory.
    }
}

class ExecutorsThread extends Thread {

    public String name;

    public ExecutorsThread(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        //while (true) {
        System.out.println(name + " is running......");
        try {
            sleep(1000);
            // }
        } catch (InterruptedException ex) {
            Logger.getLogger(ExecutorsThread.class.getName()).log(Level.SEVERE, null, ex);
        }
    }
}

 带返回结果的线程

//下面的executer 还有一个 List<Future<T>>  invokeAll(Collection<Callable<T>> tasks)方法,执行完所有任务后可将结果保存致list中返回,比便后续处理

package thread;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class CallableThread {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executer = Executors.newFixedThreadPool(2);
        CallThread callable1 = new CallThread("1111");
        CallThread callable2 = new CallThread("2222");
        Future<String>f1=executer.submit(callable1);//当执Callable任务后将返回结果保存到Future对象中
        Future<String>f2=executer.submit(callable2);
        System.out.println(f1.get());
        System.out.println(f2.get());
        executer.shutdown();
    }
}
/**
 * 有返回值的任务
 */
class CallThread implements Callable<String> {//可返回值的任务必须实现Callble接口
    private String name = "";
    public CallThread(String name) {
        this.name = name;
    }

    @Override
    public String call() throws Exception {
        return name;
    }
}
 

阻塞队列

 

package thread;
//
//import java.util.Random;

import java.lang.reflect.Field;
import java.util.Random;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;

public class BlockingQueueThread {

    public static void main(String[] args) {
        BlockingQueue queue = new ArrayBlockingQueue(5);
        ExecutorService pool = Executors.newFixedThreadPool(15);
        PutThread[] blockThread = new PutThread[10];
        TakeThread[] takeThread = new TakeThread[5];
        Lock lock = new ReentrantLock();
        Lock takeLock = new ReentrantLock();
        for (int i = 0; i < takeThread.length; i++) {
            takeThread[i] = new TakeThread(queue, "thread" + i, takeLock);
            pool.execute(takeThread[i]);
        }
        for (int i = 0; i < blockThread.length; i++) {
            blockThread[i] = new PutThread(queue, "thread" + i, lock);//在此处takeThread和putThread内加不同锁不能保证打印时的正确性,因为执行put后
                                                                      //可能已经有take线程从queue中取数据。因此打印的结果看似add都在一块,但数据不是递增的
                                                                      //如果加同一个锁的话那就会阻塞:当put满或take空时queue会让其等待,但有拿着锁,而take
                                                                      //或put进不到queue中无法继续执行,一直等待。
                                                                      //结论:用BlockQueue put 或take 还想同时拿到其内部数据多少时不可行的。
                                                                      //以上结论是不对滴:以下注释中有个非主流的做法
            pool.execute(blockThread[i]);
        }
        pool.shutdown();
    }
}

class PutThread extends Thread {

    private BlockingQueue queue;
    private String name;
    private Lock lock;

    public PutThread(BlockingQueue queue, String name, Lock lock) {
        this.queue = queue;
        this.name = name;
        this.lock = lock;
    }

    @Override
    public void run() {
        while (true) {
            lock.lock();
            Integer i = new Random().nextInt(10) + 1;
            try {
                queue.put(i);//此处应用put take 。如果用add时,成功返回true,失败则抛出异常
            } catch (InterruptedException ex) {
                Logger.getLogger(PutThread.class.getName()).log(Level.SEVERE, null, ex);
            } finally {
                lock.unlock();
            }
            System.out.println(this.getName() + " add " + i + " ......" + queue.size());
        }

    }
}

class TakeThread extends Thread {

    private BlockingQueue queue;
    private String name;
    private Lock lock;

    ;

    public TakeThread(BlockingQueue queue, String name, Lock lock) {
        this.queue = queue;
        this.name = name;
        this.lock = lock;
    }

    @Override
    public void run() {
        while (true) {
            lock.lock();
            Integer i = 0;
            try {
                i = (Integer) queue.take();
                System.out.println(this.getName() + " take " + i + " ......" + queue.size());
            } catch (InterruptedException ex) {
                Logger.getLogger(PutThread.class.getName()).log(Level.SEVERE, null, ex);
            } finally {
                lock.unlock();
            }

        }
    }
}
//public class BlockingQueueThread {
//
//    public static ReentrantLock getLock(BlockingQueue queue) throws Exception {
//        Field fields[] = queue.getClass().getDeclaredFields();
//        for (Field f : fields) {
//            f.setAccessible(true);
//            if (f.getName().equals("lock")) {
//                return (ReentrantLock) f.get(queue);
//            }
//        }
//        return null;
//    }
//    public static void main(String[] args) throws Exception {
//        BlockingQueue queue = new ArrayBlockingQueue(10);
//        ExecutorService pool = Executors.newFixedThreadPool(15);
//        PutThread[] blockThread = new PutThread[10];
//        TakeThread[] takeThread = new TakeThread[5];
//        Object lock = new Object();
//        Object takeLock = new Object();
//        try {
//            lock = getLock(queue);
//            takeLock = lock;
//        } catch (Exception e) {
//            e.printStackTrace();
//        }
//        for (int i = 0; i < takeThread.length; i++) {
//            takeThread[i] = new TakeThread(queue, "thread" + i, takeLock);
//            pool.execute(takeThread[i]);
//        }
//        for (int i = 0; i < blockThread.length; i++) {
//            blockThread[i] = new PutThread(queue, "thread" + i, lock);
//            pool.execute(blockThread[i]);
//        }
//        pool.shutdown();
//    }
//}
//
//class PutThread extends Thread {
//
//    private BlockingQueue queue;
//    private String name;
//    private Object lock;
//
//    public PutThread(BlockingQueue queue, String name, Object lock) {
//        this.queue = queue;
//        this.name = name;
//        this.lock = lock;
//    }
//
//    @Override
//    public void run() {
//        while (true) {
//            try {
//                Thread.sleep(1000);
//            } catch (Exception ex) {
//                ex.printStackTrace();
//            }
//            // synchronized (lock) {
//            try {
//                ((ReentrantLock) lock).lock();
//                Integer i = new Random().nextInt(10) + 1;
//                try {
//                    queue.put(i);// 此处应用put take 。如果用add时,成功返回true,失败则抛出异常
//                } catch (InterruptedException ex) {
//                    Logger.getLogger(PutThread.class.getName()).log(
//                            Level.SEVERE, null, ex);
//                }
//                System.out.println(this.getName() + " add " + i + " ......"
//                        + queue.size());
//            } catch (Exception e) {
//                e.printStackTrace();
//            } finally {
//                ((ReentrantLock) lock).unlock();
//            }
//            // }
//        }
//
//    }
//}
//
//class TakeThread extends Thread {
//
//    private BlockingQueue queue;
//    private String name;
//    private Object lock;
//
//    public TakeThread(BlockingQueue queue, String name, Object lock) {
//        this.queue = queue;
//        this.name = name;
//        this.lock = lock;
//    }
//
//    @Override
//    public void run() {
//        while (true) {
//            try {
//                Thread.sleep(1000);
//            } catch (Exception ex) {
//                ex.printStackTrace();
//            }
//            // synchronized (lock) {
//            try {
//                ((ReentrantLock) lock).lock();
//                Integer i = 0;
//                try {
//                    i = (Integer) queue.take();
//                } catch (InterruptedException ex) {
//                    Logger.getLogger(PutThread.class.getName()).log(
//                            Level.SEVERE, null, ex);
//                }
//                System.out.println(this.getName() + " take " + i + " ......"
//                        + queue.size());
//            } catch (Exception e) {
//                e.printStackTrace();
//            } finally {
//                ((ReentrantLock) lock).unlock();
//            }
//            // }
//        }
//    }
//}

 

 

每任务每线程

package thread;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.logging.Level;
import java.util.logging.Logger;

public class CompletionServiceThread {

    public static void main(String[] args) {
        List<String> abc = new ArrayList<String>();
        abc.add("11");
        abc.add("233322");
        abc.add("3rdddddr33");
        abc.add("4rr44");
        abc.add("54554545");
        abc.add("5555544444444444");
        ExecutorService executor = Executors.newFixedThreadPool(2);
        CompletionService service = new ExecutorCompletionService(executor);
        for (final String s : abc) {//每个字符串分别获得一个线程进行处理,假设处理字符串需要时间较长
            service.submit(new Callable() {//提交一批希望得到结果的任务
                @Override
                public Object call() throws Exception {
                    Thread.sleep(5000);
                    if(s.equals("11")){
                        throw new Exception();
                    }
                    return Thread.currentThread().getName()+" "+s + "   " + s.length() + "";
                }
            });
        }
        for(int i=0;i<abc.size();i++){
            try {
                Future<String> future=service.take();//检索并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则等待。
                String a=future.get();
                System.out.println(a);//处理已完成的任务
            } catch (ExecutionException ex) {
                Logger.getLogger(CompletionServiceThread.class.getName()).log(Level.SEVERE, null, ex);
            } catch (InterruptedException ex) {
                Logger.getLogger(CompletionServiceThread.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
        executor.shutdown();
    }
}
/**
 * 某次运行结果:
 * pool-1-thread-1 11   2
   pool-1-thread-2 233322   6
   pool-1-thread-1 3rdddddr33   10
   pool-1-thread-2 4rr44   5
   pool-1-thread-1 54554545   8
   pool-1-thread-2 5555544444444444   16
 */

 

主线程等从线程

package thread;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

public class BarrierThread {

    public static void main(String[] args) {
        ThreadFactory a;
        CyclicBarrier barrier = new CyclicBarrier(3, new MainThread());//当有一个线程需要等待其他线程指向完毕后才能执行时可用此类
        ExecutorService pool = Executors.newFixedThreadPool(3);
        pool.execute(new SubThread(barrier));
        pool.execute(new SubThread(barrier));
        pool.execute(new SubThread(barrier));
        pool.shutdown();
    }
}

class MainThread extends Thread {

    @Override
    public void run() {
        System.out.println("I'm here....");
    }
}

class SubThread extends Thread {

    private CyclicBarrier barrier;

    public SubThread(CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    @Override
    public void run() {
        for (int i = 0; i < 1000; i++) {
            System.out.println(this.getName() + " is running " + i);
        }
        try {
            barrier.await();
        } catch (InterruptedException ex) {
            Logger.getLogger(SubThread.class.getName()).log(Level.SEVERE, null, ex);
        } catch (BrokenBarrierException ex) {
            Logger.getLogger(SubThread.class.getName()).log(Level.SEVERE, null, ex);
        }
    }
}

 awaitTermination方法的使用

package thread;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

public class AwaitTerminationThread {

    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(2);

        service.submit(new Runnable() {

            @Override
            public void run() {
                for (int i = 0; i < 1000; i++) {
                    System.out.println(Thread.currentThread().getName()+"bbbbb " + i);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException ex) {
                        Logger.getLogger(AwaitTerminationThread.class.getName()).log(Level.SEVERE, null, ex);
                    }
                }
            }
        });
        service.submit(new Runnable() {
//           new Thread.UncaughtExceptionHandler(){
//               
//           };
            @Override
            public void run() {
                for (int i = 0; i < 1000; i++) {
                    System.out.println(Thread.currentThread().getName()+" aaaaaa " + i);
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException ex) {
                        Logger.getLogger(AwaitTerminationThread.class.getName()).log(Level.SEVERE, null, ex);
                    }
                }
            }
        });
        service.shutdown();
        try {
            service.awaitTermination(10, TimeUnit.SECONDS);//开始10秒时如果任务未完成一直阻塞让service submit的任务先执行完
        } catch (InterruptedException ex) {
            Logger.getLogger(AwaitTerminationThread.class.getName()).log(Level.SEVERE, null, ex);
        }
        while (true) {
            System.out.println("11111");
        }
    }
}

集合迭代时抛出的异常

package thread;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class ConcurrentModificationExceptionTest {
    public static void main(String[] args) {
        List<String> list=new ArrayList<String>();
        list.add("1111");
        list.add("222");
        list.add("3333");
        Iterator iterator=list.iterator();
        while(iterator.hasNext()){
            if(iterator.next().equals("222")){
                iterator.remove();
                list.add("555");//此处会有异常,迭代过程中迭代器对ArrayList做了修改后在迭代器内再通过list对集合做修改会抛异常
                /**
                 * 抛异常的标准就是modCount != expectedModCount
                 * 在Itr类的成员变量里对expectedModCount初始化的赋值是int expectedModCount = modCount Itr 是在执行iterator方法是返回的迭代器
                 * 组若这个过程中list中有add remove等操作就会使modCount这两个值不同,就会会抛出异常
                 */
                
            }
        }
        list.add("4444");
        System.out.println(list);
    }
}

  例子:三个线程按顺序执行

方法一:

//转.......
package thread;
public class ABCThread extends Thread{
    private static Object o = new Object();
    private static int count = 0;
    private char ID;
    private int id;
    private int num = 0;
    public ABCThread(int id, char ID) {
        this.id = id;
        this.ID = ID;
    }

    public void run() {
        synchronized (o) {
            while (num < 10) {
                if (count % 3 == id) {//将count与线程id联系起来,count为0 3 6 9......时才会执行线程A,其它线程wait(),count为1,4,7...时执行线程1
                    System.out.println(ID);
                    ++count;
                    ++num;

                    o.notifyAll();

                }
                else {
                    try {
                        o.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    public static void main(String[] args) {
        (new ABCThread(2, 'C')).start();
        (new ABCThread(0, 'A')).start();
        (new ABCThread(1, 'B')).start();
        
    }
}
 

方法二:

 

//转
package thread;

import java.text.DateFormat;
import java.util.concurrent.Semaphore;
import java.util.logging.Level;
import java.util.logging.Logger;

public class ABCMyThread extends Thread {

    int count = 10;
    String name;
    Semaphore current;
    Semaphore next;

    public ABCMyThread(String name, Semaphore current, Semaphore next) {
        this.name = name;
        this.next = next;
        this.current = current;

    }

    @Override
    public void run() {
        while (count > 0) {
            try {
                this.current.acquire();
            } catch (InterruptedException ex) {
                Logger.getLogger(ABCMyThread.class.getName()).log(Level.SEVERE, null, ex);
            }
            System.out.println(this.name);
            count--;
            this.next.release();
        }
    }

    public static void main(String[] args) {
//        Semaphore a = new Semaphore(1);
//        Semaphore b = new Semaphore(0);
//        Semaphore c = new Semaphore(0);
//        Thread t1=new ABCMyThread("A",a,b);//t1进程用a信号量acquire可允许一个线程进入(即t1线程),如果t1循环执行一次后又到acquire,时不能往下执行,需等t3的a信号量执行release
//        Thread t2=new ABCMyThread("B",b,c);//t2线程用b信号量qcquire不允许任何线程进入,所以会等t1线程的b信号量执行release时才可运行
//        Thread t3=new ABCMyThread("C",c,a);//.....
//        t1.start();
//        t2.start();
//        t3.start();
        //DateFormat format=new DateFormat() {};
       

    }
}
 
分享到:
评论
1 楼 ak913 2012-09-12  
awaitTermination方法的使用 的 第46行有问题,awaitTermination的使用需要加一层while才能实现“阻塞,直到池中所有线程完成”。代码:
public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                if (runState == TERMINATED)
                    return true;
                if (nanos <= 0)
                    return false;

                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }

相关推荐

    java线程深入解析

    Java线程是Java编程语言中的核心概念,尤其在多任务处理和并发编程中扮演着重要角色。线程允许一个程序内部同时执行多个独立的控制流,使得程序能够更高效地利用处理器资源。本文将深入解析Java线程的相关知识点,...

    java 线程java 线程

    Java线程是Java编程语言中的一个核心概念,它...以上就是关于Java线程的一些基础知识,包括线程的创建、状态管理、同步控制、异常处理以及高级特性。理解并掌握这些知识点对于编写高效、稳定的多线程Java程序至关重要。

    Java线程(第三版)

    这本书详细介绍了Java线程的各个方面,包括基础知识、高级特性以及实战应用。 在Java中,线程是通过`Thread`类或实现`Runnable`接口来创建的。基础知识点包括如何启动线程、线程的状态(新建、就绪、运行、阻塞和...

    JAVA线程学习(源代码)

    总的来说,"JAVA线程学习(源代码)"涵盖了Java线程的基础知识和高级特性,包括线程的创建、管理、同步和通信。通过分析和实践这些源代码,你可以深入了解Java并发编程的精髓,提高你的多线程编程能力。

    java线程.rar

    Java线程是Java编程语言中...以上知识点覆盖了Java线程的基础概念、创建方式、状态管理、同步机制以及高级特性,是Java程序员必须掌握的重要内容。通过深入理解和熟练应用这些知识,可以编写出高效、安全的多线程程序。

    java多线程进度条

    在Java编程中,多线程是一项关键特性,它允许程序同时执行多个任务,提升系统效率。在处理耗时操作如大文件下载、数据处理或网络请求时,展示进度条能够提供用户友好的交互体验,让使用者了解任务的完成状态。本主题...

    Java线程培训资料

    ### Java线程培训资料知识点详解 #### 一、Java线程基本概念 ...通过以上知识点的学习,可以深入理解Java线程的基本概念、API使用以及Concurrent包中的高级特性,这对于开发高效、稳定的多线程应用程序至关重要。

    java线程详解

    Java线程:新特征-线程池 一、固定大小的线程池 二、单任务线程池 三、可变尺寸的线程池 四、延迟连接池 五、单任务延迟连接池 六、自定义线程池 Java线程:新特征-有返回值的线程 Java线程:新特征-锁(上...

    Java多线程编程总结

    #### 十四、Java线程:新特征-锁(上)&(下) - `java.util.concurrent.locks` 包提供了更高级别的锁实现,如 `ReentrantLock`、`ReadWriteLock` 等。 - 这些锁提供了比内置锁更多的功能,如公平性选择、尝试锁、...

    JAVA线程第三版

    Java线程是Java语言的一个重要特性,它允许程序同时执行多个任务,从而实现并行处理。 本书涵盖了以下关键知识点: 1. **线程基础**:首先,书中介绍了线程的概念,包括线程的创建、启动、同步和通信。在Java中,...

    Java线程pdf

    Java线程,作为Java编程语言中的一项核心特性,为实现并发处理提供了强大的工具。线程,本质上是程序执行流的最小单位,是一种轻量级的过程,是操作系统能够进行运算调度的最小单位。在Java中,线程使得程序能够同时...

    Java线程PDF

    - **Java线程的独特之处**: Java是首个在语言层面直接支持线程特性的主流编程语言,这意味着开发者可以直接在Java代码中管理线程而无需依赖底层操作系统接口。 #### 三、Java线程的生命周期 - **新建**(New): 当...

    java线程 线程学习资料 java线程教程

    ### Java线程教程知识点梳理 #### 一、教程概述 - **目标读者**: 本教程主要面向具备丰富Java基础知识但缺乏多线程编程经验的学习者。 - **学习成果**: 学习者能够掌握编写简单的多线程程序的能力,并能够理解和...

Global site tag (gtag.js) - Google Analytics