`

ExecutorService

阅读更多
ExecutorService

1.
线程池的意义:
维护一定数量的线程,
线程执行任务的开销,不在于任务执行的时间,而是线程的创建以及销毁
所以,创建线程后,在线程完成任务后,不让线程立即销毁,而是保持线程的活跃性
当再有任务来时,让活着的线程继续工作

ThreadPoolExecutor(
int corePoolSize, // 核心线程池大小 
int maximumPoolSize, // 线程池最大容量
long keepAliveTime, // 保持存活时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 阻塞队列
RejectedExecutionHandler handler // 拒绝处理服务 )


  • 初始化,创建0个线程
  • 任务数量 < core ,创建 core 个线程,即使有线程空闲
  • core < 任务数量 < max ,queue 未满,新的任务在queue中排队
  • core < 任务数量 < max ,queue 已满,创建新的线程(此时若有新的任务出现,先处理新的任务,而不是处理队列中的任务,优先处理紧急的任务)
  • 任务数量 > max+queue.size 拒绝处理


拒绝任务处理:回调函数,

A想向B发送信息,但A不能直接调用B,所以A与B 之间约定 C 为中间人,A中实现C 即C1
B接受C作为参数,B(new C1);
A调用B,B调用C1,而C1是在A中进行实现的;
即回调函数     

2.创建

2.1
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolPrincipleDemo 

	public static void main(String[] args) {

		BlockingQueue<Runnable> bq = new ArrayBlockingQueue<Runnable>(5);
		// 1.coreSize = 5 
		// 2.maxSize = 10 
		// 3.keepTime = 10s
		// 4.queue.size = 5
		// 5.reject
		ExecutorService es = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, bq,new RejectedExecutionHandler() {
			
			@Override
			public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
				
			}
		});

		for(int index = 1 ; index <= 16 ; index++){
			es.execute(new RunnableDemo(index));
		}
	}

}

class RunnableDemo implements Runnable{

	private int index = 0 ;
	public RunnableDemo(int index){
		this.index = index ;
	}
	public RunnableDemo(){
		
	}
	
	public int getIndex() {
		return index;
	}
	public void setIndex(int index) {
		this.index = index;
	}
	@Override
	public void run() {
	
		System.out.println("编号:"+this.index+" "+" 线程创建");
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	
}



输出:
编号:1  线程创建
编号:4  线程创建
编号:2  线程创建
编号:3  线程创建
处理不了com.thread.util.concurrent.RunnableDemo@63b9240e
编号:5  线程创建
编号:13  线程创建
编号:12  线程创建
编号:11  线程创建
编号:14  线程创建
编号:15  线程创建
编号:6  线程创建
编号:7  线程创建
编号:8  线程创建
编号:9  线程创建
编号:10  线程创建


解释:
1.i:1~5, 即任务数量小于 coreSize,创建新的线程
2.i:6~10,i>5&&queue.size < 5 ,任务数量大于5,且 阻塞队列未满
不创建新的线程,故 6 ~10 在阻塞队列中
3.i:11~15,阻塞队列已满,且任务数量未大于 MAXsize ,创建新的线程
4.i>15 = maxSize + queue.size ,拒绝处理

其他:
1.当 任务数量 大于 core ,而小于 max ,且 队列已满,此时创建新的线程
若,此时任务已处理完毕,那么
大于core 而小于 max,新创建的线程就是临时的线程,因为线程池只需维护core个数量的线程不销毁;其他的线程 在 活跃时间之后会销毁

注意:
创建时,core /max / keepTime 等参数的值需要谨慎考虑,以期达到最优,最大程度较少内存消耗

2.2
快速创建,无需考虑参数设置问题
		ExecutorService es0 = Executors.newCachedThreadPool();
		/**
		 * 源码实现:
		 * 1.核心线程数量 0 ,线程最大数量 MAX_VALUE ~ 无限大 ,
		 *   有效时间 1 分钟,队列 SynchronousQueue 阻塞队列
		 *   欲取出一个元素,必须等待另一个线程放入一个元素
		 * 2.即:无永久存活线程,所有的线程均为临时创建,处理完任务1分钟后,全部销毁
		 */
//	    public static ExecutorService newCachedThreadPool() {
//	        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
//	                                      60L, TimeUnit.SECONDS,
//	                                      new SynchronousQueue<Runnable>());
//	    }
		
		ExecutorService es1 = Executors.newFixedThreadPool(5);
		/**
		 * 1.核心线程数量 = 线程最大数量 
		 * 2.即所有的线程均是核心线程,无临时线程,所有的线程永久存在
		 */
//	    public static ExecutorService newFixedThreadPool(int nThreads) {
//	        return new ThreadPoolExecutor(nThreads, nThreads,
//	                                      0L, TimeUnit.MILLISECONDS,
//	                                      new LinkedBlockingQueue<Runnable>());
//	    }
		
		ExecutorService es2 = Executors.newSingleThreadExecutor();
		/**
		 * 上面的创建方式的特例:
		 * 核心线程与最大线程均为1,只有一个线程在工作,多余的工作全部放入队列中
		 */
//	    public static ExecutorService newSingleThreadExecutor() {
//	        return new FinalizableDelegatedExecutorService
//	            (new ThreadPoolExecutor(1, 1,
//	                                    0L, TimeUnit.MILLISECONDS,
//	                                    new LinkedBlockingQueue<Runnable>()));
//	    }


3.任务提交

3.1 void executor(Runnable)

3.2 Future submit(Runnable)


import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ThreadPoolSubmitDemo {

	public static void main(String[] args) throws InterruptedException, ExecutionException {

		ExecutorService es = Executors.newCachedThreadPool();
		@SuppressWarnings("unchecked")
		Future<String> future = (Future<String>) es.submit(new FutureSubmitDemo());
		String result = future.get();
		System.out.println(result == null ? "执行完毕" : "若未执行完毕,则阻塞 get方法");
	}
	
	
}

class FutureSubmitDemo implements Runnable{

	@Override
	public void run() {

		System.out.println("start");
		try {
			Thread.sleep(5000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println("end");
	}
	
}



运行结果:输出null

3.3 Future submit(callable)

callable 有返回值,实现callable,要求实现其call方法,并有返回值,通过future可接受返回值,可以在线程外获取线程内部的执行结果

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ThreadPoolSubmitCallableDemo {

	/**
	 * @param args
	 * @throws ExecutionException 
	 * @throws InterruptedException 
	 */
	public static void main(String[] args) throws InterruptedException, ExecutionException {

		ExecutorService es = Executors.newCachedThreadPool();
		Future<String> future = es.submit(new ThreadSubmitCallableDemo());
		System.out.println(future.get());
	}
}

class ThreadSubmitCallableDemo implements Callable<String>{

	@Override
	public String call() throws Exception {
		System.out.println("start");
		Thread.sleep(5000);
		System.out.println("end");
		return "finish";
	}
	
}



3.4 invokeAny

package com.thread.util.concurrent;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * invokeAny
 * 执行集合中的任意一个,不保证执行的是哪一个,但一定有一个执行了
 */
public class ThreadPoolInvokeDemo {

	/**
	 * @param args
	 * @throws ExecutionException 
	 * @throws InterruptedException 
	 */
	public static void main(String[] args) throws InterruptedException, ExecutionException {

		Set<Callable<String>> set  = new HashSet<Callable<String>>(5);
		set.add(new Callable<String>() {

			@Override
			public String call() throws Exception {
				return "1";
			}
		});
		
		set.add(new Callable<String>() {

			@Override
			public String call() throws Exception {
				return "2";
			}
		});
		
		set.add(new Callable<String>() {

			@Override
			public String call() throws Exception {
				return "3";
			}
		});
		
		ExecutorService es = Executors.newCachedThreadPool();
		String result = es.invokeAny(set);
		System.out.println(result);
	}

}



3.5 invokeAll

// 调用集合中所有的,返回值为一个List<Future>
		ExecutorService es = Executors.newCachedThreadPool();
		List<Future<String>> result = es.invokeAll(set);
		System.out.println(result);


3.6 关闭

通过上面的构造方法得知,若 coreSize !=0 ,ExecutorService 始终维护 core 个线程保持存活 以及 一个 queue的存在,浪费资源
若此时全部的任务均已完成,则可关闭 ExecutorService

// 不是立即停止,而是停止接收新的任务,知道所有的线程均已完成任务后,停止
es.shutdown();


若欲立即停止执行所有线程;如:出现严重问题,不想再继续执行下去,则调用 shutDownNow方法,尝试立即停止所有线程,不保证线程的安全性,即有些线程可能已经执行完毕,有些可能刚刚开始执行。

二、ScheduleExecutorService

1.延迟执行


import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
 * 延迟一段时间后执行
 */
public class ScheduleExecutorServiceDemo {

	/**
	 * @param args
	 */
	public static void main(String[] args) {

		Long startTime = System.currentTimeMillis();
		ScheduledExecutorService ses = Executors.newScheduledThreadPool(5);
		ScheduledFuture<String> result = ses.schedule(new Callable<String>() {

			@Override
			public String call() throws Exception {
				System.out.println("执行体");
				return "执行完毕";
			}
		}, 5, TimeUnit.SECONDS);
		
		System.out.println(result);
		Long endTime = System.currentTimeMillis();
		System.out.println(endTime - startTime);
		ses.shutdown();
	}

}




2.定时执行


import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduleExecutorServiceFixedDemo {

	/**
	 * @param args
	 */
	public static void main(String[] args) {

		ScheduledExecutorService ese = Executors.newScheduledThreadPool(5);
		/**
		 * 延迟执行时间后(5),定时执行任务,每隔5秒执行一次
		 */
		/**
		 * 1.间隔是时间指的是:两次任务开始的时间间隔
		 */
		ese.scheduleAtFixedRate(new Runnable(){

			@Override
			public void run() {
				System.out.println("run");
			}
			
		}, 5, 5, TimeUnit.SECONDS);
		
		/**
		 * 2.间隔的时间指的是:两次任务结束的时间间隔
		 */
		ese.scheduleWithFixedDelay(new Runnable(){

			@Override
			public void run() {
				System.out.println("run");
			}
			
		}, 5, 5, TimeUnit.SECONDS);
		
//		ese.shutdown();
	}

}

分享到:
评论

相关推荐

    在spring boot中使用java线程池ExecutorService的讲解

    在 Spring Boot 中使用 Java 线程池 ExecutorService 的讲解 Spring Boot 作为一个流行的 Java 框架,提供了许多便捷的功能来帮助开发者快速构建应用程序。其中之一就是使用 Java 线程池 ExecutorService 来管理...

    ExecutorService与CompletionService对比详解.docx

    ExecutorService 和 CompletionService 是Java并发处理中的两种工具,它们用于管理和执行多个任务。ExecutorService 是一个接口,它是java.util.concurrent.Executor 接口的扩展,提供了一组方法来管理和控制线程池...

    Executor,Executors,ExecutorService比较.docx

    【Executor、Executors和ExecutorService详解】 在Java并发编程中,`Executor`、`Executors`和`ExecutorService`是核心组件,它们帮助开发者高效管理线程资源,提高程序的并发性能。理解这三个概念的区别和用途是...

    ExecutorService的execute和submit方法

    在Java多线程编程中,`ExecutorService`是线程池的核心接口,它提供了一种管理线程的方式,包括创建线程、调度线程执行以及控制线程的生命周期。`ExecutorService`通过`execute()`和`submit()`这两个方法来提交任务...

    ExecutorService.shutdown()应该是在线程执行完毕后,才会去关闭

    在Java多线程编程中,`ExecutorService`是线程池的一个重要接口,它提供了管理和控制线程执行的能力。当我们创建一个`ExecutorService`实例并提交任务时,我们可以通过调用`shutdown()`方法来关闭线程池。然而,标题...

    ExecutorService方法案例文件.zip

    ExecutorService方法案例文件.zip

    ExecutorService用法详解.doc

    接口 java.util.concurrent.ExecutorService 表述了异步执行的机制,并且可以让任务在后台执行。壹個 ExecutorService 实例因此特别像壹個线程池。事实上,在 java.util.concurrent 包中的 ExecutorService 的实现...

    ExecutorService线程池

    ExecutorService线程池是Java并发编程中的核心组件,它位于`java.util.concurrent`包下,是`java.util.concurrent.Executor`接口的一个实现。ExecutorService提供了一种管理线程的方式,允许我们创建、管理和控制...

    运用JAVA的concurrent.ExecutorService线程池实现socket的TCP和UDP连接JAVA语言

    运用JAVA的concurrent.ExecutorService线程池实现socket的TCP和UDP连接

    2_ExecutorService源码阅读1

    ExecutorService 是 Java 中用于管理和控制线程执行的核心接口,它是 java.util.concurrent 包的一部分。ExecutorService 扩展了 Executor 接口,提供了更丰富的功能,如任务的提交、关闭服务、检查服务状态等。这个...

    java并发编程:Executor、Executors、ExecutorService.docx

    Java并发编程中的Executor、Executors和ExecutorService是Java并发编程框架的重要组成部分,它们为开发者提供了高效管理和控制线程执行的工具。以下是对这些概念的详细解释: 1. Executor: Executor是一个接口,它...

    运用JAVA的concurrent.ExecutorService线程池实现socket的TCP和UDP连接.doc

    根据提供的文档标题、描述以及部分内容,我们可以总结出以下关于如何运用Java中的`concurrent.ExecutorService`线程池实现socket的TCP和UDP连接的关键知识点: ### 1. 理解Java中的`concurrent.ExecutorService` `...

    Java使用ExecutorService来停止线程服务

    Java 使用 ExecutorService 来停止线程服务 Java 中的 ExecutorService 是一个非常强大的线程池管理工具,它提供了多种方式来停止线程服务。今天,我们将详细介绍如何使用 ExecutorService 来停止线程服务。 首先...

    2011.08.30(2)——— java BlockingQueue ExecutorService

    标题 "2011.08.30(2)——— java BlockingQueue ExecutorService" 涉及到Java并发编程中的两个核心组件:BlockingQueue(阻塞队列)和ExecutorService。这篇博客可能深入探讨了如何使用这两个工具来优化多线程环境...

    详解Java利用ExecutorService实现同步执行大量线程

    Java中的ExecutorService是Java并发编程的重要组成部分,它提供了一种高效、灵活的方式来管理和控制线程的执行。在处理大量线程或并发操作时,ExecutorService能够确保系统的稳定性和资源的有效利用,避免线程间的不...

    java ExecutorService使用方法详解

    Java中的`ExecutorService`是Java并发编程的重要组成部分,它提供了线程池的管理,使得开发者可以更有效地控制并发任务的执行。在Java的`java.util.concurrent`包中,`ExecutorService`接口作为线程池的核心接口,...

    java线程池工具--ExecutorService,简单例子

    NULL 博文链接:https://x125858805.iteye.com/blog/2191873

    ExecutorService10个要诀和技巧编程开发技术

    ExecutorService10个要诀和技巧编程开发技术共9页.pdf.zip

    详解JDK中ExecutorService与Callable和Future对线程的支持

    Java并发编程中的ExecutorService、Callable和Future Java并发编程中,ExecutorService、Callable和Future是三大核心组件,它们之间紧密相连,共同实现了高效、安全的并发编程。下面我们将详细介绍这些组件的作用和...

Global site tag (gtag.js) - Google Analytics