- 浏览: 143804 次
- 性别:
- 来自: 北京
文章分类
最新评论
-
ltjxwxz:
例子很棒,正在学习中,感谢!
quartz spring 实现动态定时任务 -
CH1132813751:
感谢,刚好需要
quartz spring 实现动态定时任务 -
yonglongwang:
mark 正是需要找的
quartz spring 实现动态定时任务 -
xu5500509:
andyhu1a 写道snailxr 写道dongfengku ...
quartz spring 实现动态定时任务 -
wsfym1988:
Scheduler scheduler = scheduler ...
quartz spring 实现动态定时任务
ExecutorService
package thread; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingDeque; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; public class ExecutorsServiceThread { public static void main(String[] args) { //ExecutorService pool = Executors.newFixedThreadPool(2);//固定大小的线程池 //ExecutorService pool = Executors.newSingleThreadExecutor();//单个工作线程,以上两个线程池都是固定大小的,添加线程超过大小限制后就会自动增加到队列中等待,一旦有线程池中有完成的线程,则排队等待的线程就会入池执行 //ExecutorService pool = Executors.newCachedThreadPool(); ScheduledExecutorService pool = Executors.newScheduledThreadPool(2); //ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor();//单任务延迟线程池 //ThreadPoolExecutor pool2=new ThreadPoolExecutor(corePoolSize, //池中保存的线程数,包括空闲线程 //maximumPoolSize,//池中允许的最大线程数 //keepAliveTime, //当线程数大于核心时,此为终止前 多余的空闲线程等待新任务的最长时间。 //TimeUnit.DAYS, //时间的单位 //workQueue);//任务队列 BlockingQueue<Runnable> block = new ArrayBlockingQueue<Runnable>(20); ThreadPoolExecutor pool3 = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, block); Thread t1 = new ExecutorsThread("t1"); Thread t2 = new ExecutorsThread("t2"); Thread t3 = new ExecutorsThread("t3"); Thread t4 = new ExecutorsThread("t4"); Thread t5 = new ExecutorsThread("t5"); pool.execute(t1); pool.execute(t2); pool.execute(t1); pool.schedule(t5, 10, TimeUnit.SECONDS);//参数:要执行的任务,从现在开始延迟的时间,时间的单位 pool.shutdown(); pool3.execute(t1); pool3.execute(t2); pool3.execute(t3); pool3.execute(t4); pool3.shutdown(); // ThreadFactory factory=new ThreadFactory() { // // @Override // public Thread newThread(Runnable r) { // return new Thread(r); // } // }; // factory. } } class ExecutorsThread extends Thread { public String name; public ExecutorsThread(String name) { this.name = name; } @Override public void run() { //while (true) { System.out.println(name + " is running......"); try { sleep(1000); // } } catch (InterruptedException ex) { Logger.getLogger(ExecutorsThread.class.getName()).log(Level.SEVERE, null, ex); } } }
带返回结果的线程
//下面的executer 还有一个 List<Future<T>> invokeAll(Collection<Callable<T>> tasks)方法,执行完所有任务后可将结果保存致list中返回,比便后续处理 package thread; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class CallableThread { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executer = Executors.newFixedThreadPool(2); CallThread callable1 = new CallThread("1111"); CallThread callable2 = new CallThread("2222"); Future<String>f1=executer.submit(callable1);//当执Callable任务后将返回结果保存到Future对象中 Future<String>f2=executer.submit(callable2); System.out.println(f1.get()); System.out.println(f2.get()); executer.shutdown(); } } /** * 有返回值的任务 */ class CallThread implements Callable<String> {//可返回值的任务必须实现Callble接口 private String name = ""; public CallThread(String name) { this.name = name; } @Override public String call() throws Exception { return name; } }
阻塞队列
package thread; // //import java.util.Random; import java.lang.reflect.Field; import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; public class BlockingQueueThread { public static void main(String[] args) { BlockingQueue queue = new ArrayBlockingQueue(5); ExecutorService pool = Executors.newFixedThreadPool(15); PutThread[] blockThread = new PutThread[10]; TakeThread[] takeThread = new TakeThread[5]; Lock lock = new ReentrantLock(); Lock takeLock = new ReentrantLock(); for (int i = 0; i < takeThread.length; i++) { takeThread[i] = new TakeThread(queue, "thread" + i, takeLock); pool.execute(takeThread[i]); } for (int i = 0; i < blockThread.length; i++) { blockThread[i] = new PutThread(queue, "thread" + i, lock);//在此处takeThread和putThread内加不同锁不能保证打印时的正确性,因为执行put后 //可能已经有take线程从queue中取数据。因此打印的结果看似add都在一块,但数据不是递增的 //如果加同一个锁的话那就会阻塞:当put满或take空时queue会让其等待,但有拿着锁,而take //或put进不到queue中无法继续执行,一直等待。 //结论:用BlockQueue put 或take 还想同时拿到其内部数据多少时不可行的。 //以上结论是不对滴:以下注释中有个非主流的做法 pool.execute(blockThread[i]); } pool.shutdown(); } } class PutThread extends Thread { private BlockingQueue queue; private String name; private Lock lock; public PutThread(BlockingQueue queue, String name, Lock lock) { this.queue = queue; this.name = name; this.lock = lock; } @Override public void run() { while (true) { lock.lock(); Integer i = new Random().nextInt(10) + 1; try { queue.put(i);//此处应用put take 。如果用add时,成功返回true,失败则抛出异常 } catch (InterruptedException ex) { Logger.getLogger(PutThread.class.getName()).log(Level.SEVERE, null, ex); } finally { lock.unlock(); } System.out.println(this.getName() + " add " + i + " ......" + queue.size()); } } } class TakeThread extends Thread { private BlockingQueue queue; private String name; private Lock lock; ; public TakeThread(BlockingQueue queue, String name, Lock lock) { this.queue = queue; this.name = name; this.lock = lock; } @Override public void run() { while (true) { lock.lock(); Integer i = 0; try { i = (Integer) queue.take(); System.out.println(this.getName() + " take " + i + " ......" + queue.size()); } catch (InterruptedException ex) { Logger.getLogger(PutThread.class.getName()).log(Level.SEVERE, null, ex); } finally { lock.unlock(); } } } } //public class BlockingQueueThread { // // public static ReentrantLock getLock(BlockingQueue queue) throws Exception { // Field fields[] = queue.getClass().getDeclaredFields(); // for (Field f : fields) { // f.setAccessible(true); // if (f.getName().equals("lock")) { // return (ReentrantLock) f.get(queue); // } // } // return null; // } // public static void main(String[] args) throws Exception { // BlockingQueue queue = new ArrayBlockingQueue(10); // ExecutorService pool = Executors.newFixedThreadPool(15); // PutThread[] blockThread = new PutThread[10]; // TakeThread[] takeThread = new TakeThread[5]; // Object lock = new Object(); // Object takeLock = new Object(); // try { // lock = getLock(queue); // takeLock = lock; // } catch (Exception e) { // e.printStackTrace(); // } // for (int i = 0; i < takeThread.length; i++) { // takeThread[i] = new TakeThread(queue, "thread" + i, takeLock); // pool.execute(takeThread[i]); // } // for (int i = 0; i < blockThread.length; i++) { // blockThread[i] = new PutThread(queue, "thread" + i, lock); // pool.execute(blockThread[i]); // } // pool.shutdown(); // } //} // //class PutThread extends Thread { // // private BlockingQueue queue; // private String name; // private Object lock; // // public PutThread(BlockingQueue queue, String name, Object lock) { // this.queue = queue; // this.name = name; // this.lock = lock; // } // // @Override // public void run() { // while (true) { // try { // Thread.sleep(1000); // } catch (Exception ex) { // ex.printStackTrace(); // } // // synchronized (lock) { // try { // ((ReentrantLock) lock).lock(); // Integer i = new Random().nextInt(10) + 1; // try { // queue.put(i);// 此处应用put take 。如果用add时,成功返回true,失败则抛出异常 // } catch (InterruptedException ex) { // Logger.getLogger(PutThread.class.getName()).log( // Level.SEVERE, null, ex); // } // System.out.println(this.getName() + " add " + i + " ......" // + queue.size()); // } catch (Exception e) { // e.printStackTrace(); // } finally { // ((ReentrantLock) lock).unlock(); // } // // } // } // // } //} // //class TakeThread extends Thread { // // private BlockingQueue queue; // private String name; // private Object lock; // // public TakeThread(BlockingQueue queue, String name, Object lock) { // this.queue = queue; // this.name = name; // this.lock = lock; // } // // @Override // public void run() { // while (true) { // try { // Thread.sleep(1000); // } catch (Exception ex) { // ex.printStackTrace(); // } // // synchronized (lock) { // try { // ((ReentrantLock) lock).lock(); // Integer i = 0; // try { // i = (Integer) queue.take(); // } catch (InterruptedException ex) { // Logger.getLogger(PutThread.class.getName()).log( // Level.SEVERE, null, ex); // } // System.out.println(this.getName() + " take " + i + " ......" // + queue.size()); // } catch (Exception e) { // e.printStackTrace(); // } finally { // ((ReentrantLock) lock).unlock(); // } // // } // } // } //}
每任务每线程
package thread; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.logging.Level; import java.util.logging.Logger; public class CompletionServiceThread { public static void main(String[] args) { List<String> abc = new ArrayList<String>(); abc.add("11"); abc.add("233322"); abc.add("3rdddddr33"); abc.add("4rr44"); abc.add("54554545"); abc.add("5555544444444444"); ExecutorService executor = Executors.newFixedThreadPool(2); CompletionService service = new ExecutorCompletionService(executor); for (final String s : abc) {//每个字符串分别获得一个线程进行处理,假设处理字符串需要时间较长 service.submit(new Callable() {//提交一批希望得到结果的任务 @Override public Object call() throws Exception { Thread.sleep(5000); if(s.equals("11")){ throw new Exception(); } return Thread.currentThread().getName()+" "+s + " " + s.length() + ""; } }); } for(int i=0;i<abc.size();i++){ try { Future<String> future=service.take();//检索并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则等待。 String a=future.get(); System.out.println(a);//处理已完成的任务 } catch (ExecutionException ex) { Logger.getLogger(CompletionServiceThread.class.getName()).log(Level.SEVERE, null, ex); } catch (InterruptedException ex) { Logger.getLogger(CompletionServiceThread.class.getName()).log(Level.SEVERE, null, ex); } } executor.shutdown(); } } /** * 某次运行结果: * pool-1-thread-1 11 2 pool-1-thread-2 233322 6 pool-1-thread-1 3rdddddr33 10 pool-1-thread-2 4rr44 5 pool-1-thread-1 54554545 8 pool-1-thread-2 5555544444444444 16 */
主线程等从线程
package thread; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; public class BarrierThread { public static void main(String[] args) { ThreadFactory a; CyclicBarrier barrier = new CyclicBarrier(3, new MainThread());//当有一个线程需要等待其他线程指向完毕后才能执行时可用此类 ExecutorService pool = Executors.newFixedThreadPool(3); pool.execute(new SubThread(barrier)); pool.execute(new SubThread(barrier)); pool.execute(new SubThread(barrier)); pool.shutdown(); } } class MainThread extends Thread { @Override public void run() { System.out.println("I'm here...."); } } class SubThread extends Thread { private CyclicBarrier barrier; public SubThread(CyclicBarrier barrier) { this.barrier = barrier; } @Override public void run() { for (int i = 0; i < 1000; i++) { System.out.println(this.getName() + " is running " + i); } try { barrier.await(); } catch (InterruptedException ex) { Logger.getLogger(SubThread.class.getName()).log(Level.SEVERE, null, ex); } catch (BrokenBarrierException ex) { Logger.getLogger(SubThread.class.getName()).log(Level.SEVERE, null, ex); } } }
awaitTermination方法的使用
package thread; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; public class AwaitTerminationThread { public static void main(String[] args) { ExecutorService service = Executors.newFixedThreadPool(2); service.submit(new Runnable() { @Override public void run() { for (int i = 0; i < 1000; i++) { System.out.println(Thread.currentThread().getName()+"bbbbb " + i); try { Thread.sleep(1000); } catch (InterruptedException ex) { Logger.getLogger(AwaitTerminationThread.class.getName()).log(Level.SEVERE, null, ex); } } } }); service.submit(new Runnable() { // new Thread.UncaughtExceptionHandler(){ // // }; @Override public void run() { for (int i = 0; i < 1000; i++) { System.out.println(Thread.currentThread().getName()+" aaaaaa " + i); try { Thread.sleep(2000); } catch (InterruptedException ex) { Logger.getLogger(AwaitTerminationThread.class.getName()).log(Level.SEVERE, null, ex); } } } }); service.shutdown(); try { service.awaitTermination(10, TimeUnit.SECONDS);//开始10秒时如果任务未完成一直阻塞让service submit的任务先执行完 } catch (InterruptedException ex) { Logger.getLogger(AwaitTerminationThread.class.getName()).log(Level.SEVERE, null, ex); } while (true) { System.out.println("11111"); } } }
集合迭代时抛出的异常
package thread; import java.util.ArrayList; import java.util.Iterator; import java.util.List; public class ConcurrentModificationExceptionTest { public static void main(String[] args) { List<String> list=new ArrayList<String>(); list.add("1111"); list.add("222"); list.add("3333"); Iterator iterator=list.iterator(); while(iterator.hasNext()){ if(iterator.next().equals("222")){ iterator.remove(); list.add("555");//此处会有异常,迭代过程中迭代器对ArrayList做了修改后在迭代器内再通过list对集合做修改会抛异常 /** * 抛异常的标准就是modCount != expectedModCount * 在Itr类的成员变量里对expectedModCount初始化的赋值是int expectedModCount = modCount Itr 是在执行iterator方法是返回的迭代器 * 组若这个过程中list中有add remove等操作就会使modCount这两个值不同,就会会抛出异常 */ } } list.add("4444"); System.out.println(list); } }
例子:三个线程按顺序执行
方法一:
//转....... package thread; public class ABCThread extends Thread{ private static Object o = new Object(); private static int count = 0; private char ID; private int id; private int num = 0; public ABCThread(int id, char ID) { this.id = id; this.ID = ID; } public void run() { synchronized (o) { while (num < 10) { if (count % 3 == id) {//将count与线程id联系起来,count为0 3 6 9......时才会执行线程A,其它线程wait(),count为1,4,7...时执行线程1 System.out.println(ID); ++count; ++num; o.notifyAll(); } else { try { o.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } public static void main(String[] args) { (new ABCThread(2, 'C')).start(); (new ABCThread(0, 'A')).start(); (new ABCThread(1, 'B')).start(); } }
方法二:
//转 package thread; import java.text.DateFormat; import java.util.concurrent.Semaphore; import java.util.logging.Level; import java.util.logging.Logger; public class ABCMyThread extends Thread { int count = 10; String name; Semaphore current; Semaphore next; public ABCMyThread(String name, Semaphore current, Semaphore next) { this.name = name; this.next = next; this.current = current; } @Override public void run() { while (count > 0) { try { this.current.acquire(); } catch (InterruptedException ex) { Logger.getLogger(ABCMyThread.class.getName()).log(Level.SEVERE, null, ex); } System.out.println(this.name); count--; this.next.release(); } } public static void main(String[] args) { // Semaphore a = new Semaphore(1); // Semaphore b = new Semaphore(0); // Semaphore c = new Semaphore(0); // Thread t1=new ABCMyThread("A",a,b);//t1进程用a信号量acquire可允许一个线程进入(即t1线程),如果t1循环执行一次后又到acquire,时不能往下执行,需等t3的a信号量执行release // Thread t2=new ABCMyThread("B",b,c);//t2线程用b信号量qcquire不允许任何线程进入,所以会等t1线程的b信号量执行release时才可运行 // Thread t3=new ABCMyThread("C",c,a);//..... // t1.start(); // t2.start(); // t3.start(); //DateFormat format=new DateFormat() {}; } }
评论
1 楼
ak913
2012-09-12
awaitTermination方法的使用 的 第46行有问题,awaitTermination的使用需要加一层while才能实现“阻塞,直到池中所有线程完成”。代码:
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runState == TERMINATED)
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runState == TERMINATED)
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
相关推荐
Java线程是Java编程语言中的核心概念,尤其在多任务处理和并发编程中扮演着重要角色。线程允许一个程序内部同时执行多个独立的控制流,使得程序能够更高效地利用处理器资源。本文将深入解析Java线程的相关知识点,...
这本书详细介绍了Java线程的各个方面,包括基础知识、高级特性以及实战应用。 在Java中,线程是通过`Thread`类或实现`Runnable`接口来创建的。基础知识点包括如何启动线程、线程的状态(新建、就绪、运行、阻塞和...
总的来说,"JAVA线程学习(源代码)"涵盖了Java线程的基础知识和高级特性,包括线程的创建、管理、同步和通信。通过分析和实践这些源代码,你可以深入了解Java并发编程的精髓,提高你的多线程编程能力。
### Java线程入门知识点详解 #### 一、Java线程基础知识概述 **1.1 什么是线程?** 线程是程序执行流的最小单元,是操作系统...以上介绍的是Java线程的基础知识,更多高级特性和最佳实践将在后续的学习中逐渐深入。
Java线程是Java编程语言中...以上知识点覆盖了Java线程的基础概念、创建方式、状态管理、同步机制以及高级特性,是Java程序员必须掌握的重要内容。通过深入理解和熟练应用这些知识,可以编写出高效、安全的多线程程序。
在Java编程中,多线程是一项关键特性,它允许程序同时执行多个任务,提升系统效率。在处理耗时操作如大文件下载、数据处理或网络请求时,展示进度条能够提供用户友好的交互体验,让使用者了解任务的完成状态。本主题...
### Java线程培训资料知识点详解 #### 一、Java线程基本概念 ...通过以上知识点的学习,可以深入理解Java线程的基本概念、API使用以及Concurrent包中的高级特性,这对于开发高效、稳定的多线程应用程序至关重要。
Java线程:新特征-线程池 一、固定大小的线程池 二、单任务线程池 三、可变尺寸的线程池 四、延迟连接池 五、单任务延迟连接池 六、自定义线程池 Java线程:新特征-有返回值的线程 Java线程:新特征-锁(上...
Java是首个在语言级别明确支持线程特性的主流编程语言之一。在Java中,线程具有独立的执行路径,拥有自己的堆栈和局部变量,但与其他线程共享同一个进程的内存资源。 - **与进程的区别**:与进程相比,线程之间的...
#### 十一、Java线程:新特征 - **线程池** `ExecutorService` 和 `Executors` 提供了一种高效地管理线程的方式,减少了线程创建和销毁的开销。 - **有返回值的线程** 通过 `Callable` 接口和 `Future` 类可以...
#### 十三、Java线程:新特征-有返回值的线程 Java 5 引入了 `java.util.concurrent` 包,提供了 `Callable` 和 `Future` 接口,允许线程执行后返回结果。 #### 十四、Java线程:新特征-锁(上) Java 5 还引入了...
#### 十四、Java线程:新特征-锁(上)&(下) - `java.util.concurrent.locks` 包提供了更高级别的锁实现,如 `ReentrantLock`、`ReadWriteLock` 等。 - 这些锁提供了比内置锁更多的功能,如公平性选择、尝试锁、...
Java线程是Java语言的一个重要特性,它允许程序同时执行多个任务,从而实现并行处理。 本书涵盖了以下关键知识点: 1. **线程基础**:首先,书中介绍了线程的概念,包括线程的创建、启动、同步和通信。在Java中,...
Java线程,作为Java编程语言中的一项核心特性,为实现并发处理提供了强大的工具。线程,本质上是程序执行流的最小单位,是一种轻量级的过程,是操作系统能够进行运算调度的最小单位。在Java中,线程使得程序能够同时...
- **Java线程的独特之处**: Java是首个在语言层面直接支持线程特性的主流编程语言,这意味着开发者可以直接在Java代码中管理线程而无需依赖底层操作系统接口。 #### 三、Java线程的生命周期 - **新建**(New): 当...
### Java线程教程知识点梳理 #### 一、教程概述 - **目标读者**: 本教程主要面向具备丰富Java基础知识但缺乏多线程编程经验的学习者。 - **学习成果**: 学习者能够掌握编写简单的多线程程序的能力,并能够理解和...
在Java中,线程的创建和管理被内置到语言本身,这标志着Java成为了首个明确包含线程特性的主流编程语言,将线程提升到了语言级别的高度。 #### 线程与进程的区别 线程与进程在概念上有本质的不同。进程是系统进行...