Executors的一些高级特性
取消任务
当你把任务发送给 executor 后,你可以选择取消这个任务的执行。使用 submit() 方法发送一个 Runnable 对象给一个executor,submit() 方法将返回一个实现了 Future 这个接口类的对象。你可以通过该类的 cancel() 方法取消任务的执行。cancel() 方法接收一个 boolean 值作为参数。如果参数为 true,且executor正在执行这个任务,那么执行任务的线程将被中断。
cancel() 方法返回一个boolean值显示任务是否被取消。
任务调度
ThreadPoolExecutor类是接口 Executor 和 ExecutorService的基本实现类。同时Java提供了一个扩展类用以实现任务的调度 - ScheduledThreadPoolExecutor类,通过此类你可以:
- 延迟执行一个任务
- 周期性执行任务,包括以一定的频率执行任务和固定的延迟时间执行任务
重载 executor 的方法
你可以通过继承现有的类 (ThreadPoolExecutor 或者 ScheduledThreadPoolExecutor)来实现自定义的executor。如果你继承了 ThreadPoolExecutor 类,你可以重载以下方法:
- beforeExecute():这个方法在executor里的并行任务执行前被调用。此方法接收了两个参数:即将被执行的Runnable对象以及负责执行的Thread对象。注意这个方法接收的Runnable对象实际是类FutureTask的一个实例,并不是你通过 submit() 方法发送给 executor 的Runnable对象(该对象已经被包装成了FutureTask)。
- afterExecute():这个方法在executor里的并行任务执行完之后被调用。它接收两个参数:被执行完的Runnable对象以及一个保存可能从任务里抛出的异常。跟beforeExecute()方法很像,Runnable对象是FutureTask类的一个实例。
- newTaskFor():这个方法创建了一个任务,用来执行你通过submit()方法发送来的Runnable对象。它必须返回一个RunnableFuture接口的实现。默认的,OpenJDK 8 和 Oracle JDK 8 返回的是FutureTask类的实例,但是这在未来的JDK版本中可能不一样。
如果你继承了ScheduledThreadPoolExecutor类,你可以重载decorateTask()方法。这个方法就像上面的newTaskFor(),但它是针对调度任务的。它允许你重载被executor执行的任务。
修改一些初始化参数
你也可以修改一些参数来改变executor的行为。其中包括:
- BlockingQueue<Runnable>:每一个executor内部都维护一个BlockingQueue来保存即将被执行的任务。你可以传入任何一个该接口的实现类。例如,你可以改变executor执行任务的默认顺序。
- ThreadFactory:你可以传入一个实现了ThreadFactory接口的自定义类,executor会使用这个自定义线程工厂来创建用来执行任务的线程。例如,你的自定义ThredFactory类返回的自定义线程能够保存每个任务的执行时间到日志中。
- RejectedExecutionHandler:在你调用了shutdown()或shutdownNow()方法后,所有发送给executor的新任务都会被拒绝。你可以传入自定义的实现了RejectedExecutionHandler接口的类来处理这种情况。
第一个例子 - 服务器应用
在第二章中,我们实现了客户端 / 服务器端应用程序。在这个例子中我们将对那个应用程序做如下拓展:
- 引入一个新的请求用来取消已经发送给服务器的请求
- 每个请求允许传递一个新的参数代表请求的优先权。用来控制请求的执行顺序
- 服务器能够计算每个用户已经执行的请求总数以及执行的总耗时
// 这个类用来记录已经执行的任务总数以及这些任务的总耗时 // 两个变量都是原子变量,因为不同线程需要更新这两个参数的值 public class ExecutorStatistics { private AtomicLong executionTime = new AtomicLong(0L); private AtomicInteger numTasks = new AtomicInteger(0); public void addExecutionTime(long time) { executionTime.addAndGet(time); } public void addTask() { numTasks.incrementAndGet(); } @Override public String toString() { return "Executed Tasks: " + getNumTasks() + ". Execution Time: "+ getExecutionTime(); } public AtomicLong getExecutionTime() { return executionTime; } public AtomicInteger getNumTasks() { return numTasks; } }
// 当executor被调用shutdown()或shutdownNow()后,executor会拒绝新提交的任务 // 这个类用来处理这种情况,这里它所做的是往输出流里打印出错信息 public class RejectedTaskController implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) { ConcurrentCommand command = (ConcurrentCommand) task; Socket clientSocket = command.getSocket(); try { PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true); String message = "The server is shutting down." + " Your request can not be served." + " Shutting Down: " + executor.isShutdown() + ". Terminated: " + executor.isTerminated() + ". Terminating: " + executor.isTerminating(); out.println(message); out.close(); clientSocket.close(); } catch (IOException e) { e.printStackTrace(); } } }
/** * 当一个Runnable对象提交给executor后,executor不是直接执行这个Runnable对象 * 它创建了一个FutureTask类的实例对象,该对象被executor里的线程所执行 * 这里我们继承了FutureTask类并实现了Comparable接口,这样每个提交给executor * 的任务可以按照一定规则排序(如优先级) */ public class ServerTask<V> extends FutureTask<V> implements Comparable<ServerTask<V>> { private ConcurrentCommand command; public ServerTask(ConcurrentCommand command) { super(command, null); this.command=command; } public ConcurrentCommand getCommand() { return command; } public void setCommand(ConcurrentCommand command) { this.command = command; } @Override public int compareTo(ServerTask<V> other) { return command.compareTo(other.getCommand()); } }
// 重载executor,以根据我们的需要修改它的行为 public class ServerExecutor extends ThreadPoolExecutor { // 记录每个任务的执行时间,主键是ServerTask对象(即Runnable对象) // 键值是对应的日期 private ConcurrentHashMap<Runnable, Date> startTimes; // 这个变量记录每个用户的统计数据,主键是用户名 // 键值是ExecutorStatistics对象 private ConcurrentHashMap<String, ExecutorStatistics> executionStatistics; private static int CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors(); private static int MAXIMUM_POOL_SIZE = Runtime.getRuntime().availableProcessors(); private static long KEEP_ALIVE_TIME = 10; private static RejectedTaskController REJECTED_TASK_CONTROLLER = new RejectedTaskController(); public ServerExecutor() { super(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new PriorityBlockingQueue<>(), REJECTED_TASK_CONTROLLER); startTimes = new ConcurrentHashMap<>(); executionStatistics = new ConcurrentHashMap<>(); } // 每个任务被执行前调用,这里记录每个任务的开始时间 protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); startTimes.put(r, new Date()); } // 每个任务执行完后调用,这里我们计算执行当前任务的耗时, // 并更新用户已被执行的任务总数和总耗时 @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); ServerTask<?> task = (ServerTask<?>) r; ConcurrentCommand command = task.getCommand(); if (t == null) { if (!task.isCancelled()) { // 首先从startTimes中删除此任务的运行开始时间 Date startDate = startTimes.remove(r); Date endDate = new Date(); long executionTime = endDate.getTime() - startDate.getTime(); ExecutorStatistics statistics = executionStatistics.computeIfAbsent (command.getUsername(), n -> new ExecutorStatistics()); statistics.addExecutionTime(executionTime); statistics.addTask(); // 从ConcurrentServer中维护的任务列表中删除此任务,因为它已经完成 ConcurrentServer.finishTask(command.getUsername(), command); } else { String message = "The task" + command.hashCode() + "of user" + command.getUsername() + "has been cancelled. "; System.out.println(message); } } else { String message = "The exception " + t.getMessage() + " has been thrown."; System.out.println(message); } } // 把发送给executor的Runnable对象包装成ServerTask对象,该对象才真正是被线程执行的 @Override protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new ServerTask<T>(runnable); } public void writeStatistics() { for (Map.Entry<String, ExecutorStatistics> entry : executionStatistics.entrySet()) { String user = entry.getKey(); ExecutorStatistics stats = entry.getValue(); System.out.println(user + ":" + stats); } } }
// 指令的抽象类 public abstract class Command { protected String[] command; public Command (String [] command) { this.command=command; } public abstract String execute (); } /** * 这是所有指令类的基础类,它包括了一些所有指令的公共行为 * 1. 调用每个指令类的具体逻辑实现 * 2. 把指令结果写会给客户端 * 3. 关闭所有通信中使用的资源 */ public abstract class ConcurrentCommand extends Command implements Comparable<ConcurrentCommand>, Runnable { private String username; private byte priority; private Socket socket; public ConcurrentCommand(Socket socket, String[] command) { super(command); username=command[1]; priority=Byte.parseByte(command[2]); this.socket=socket; } @Override public abstract String execute(); @Override public void run() { String ret = execute(); try { PrintWriter out = new PrintWriter(socket.getOutputStream(),true); out.println(ret); socket.close(); } catch (IOException e) { e.printStackTrace(); } } @Override public int compareTo(ConcurrentCommand o) { return Byte.compare(o.getPriority(), this.getPriority()); } public byte getPriority() { return priority; } public Socket getSocket() { return socket; } public String getUsername() { return username; } } // 对应Query请求的指令类 public class ConcurrentQueryCommand extends ConcurrentCommand { public ConcurrentQueryCommand(Socket socket, String [] command) { super(socket, command); } public String execute() { WDIDAO dao=WDIDAO.getDAO(); if (command.length==5) { return dao.query(command[3], command[4]); } else if (command.length==6) { try { return dao.query(command[3], command[4], Short.parseShort(command[5])); } catch (NumberFormatException e) { return "ERROR;Bad Command"; } } else { return "ERROR;Bad Command"; } } } //对应Report请求的指令类 public class ConcurrentReportCommand extends ConcurrentCommand { public ConcurrentReportCommand(Socket socket, String [] command) { super(socket, command); } public String execute() { WDIDAO dao=WDIDAO.getDAO(); return dao.report(command[3]); } } //对应Stop请求的指令类 public class ConcurrentStopCommand extends ConcurrentCommand { public ConcurrentStopCommand(Socket socket, String [] command) { super(socket, command); } public String execute() { ConcurrentServer.shutdown(); return "Server stopped"; } } //对应Cancel请求的指令类 public class ConcurrentCancelCommand extends ConcurrentCommand { public ConcurrentCancelCommand(Socket socket, String [] command) { super(socket, command); } public String execute() { ConcurrentServer.cancelTasks(getUsername()); return message; } } //此类处理一些服务器不支持的请求 public class ConcurrentErrorCommand extends ConcurrentCommand { public ConcurrentErrorCommand(Socket socket, String [] command) { super(socket, command); } public String execute() { return "Unknown command: " + command[0]; } } //对应status请求的指令 public class ConcurrentStatusCommand extends ConcurrentCommand { public ConcurrentStatusCommand (Socket socket, String[] command) { super(socket, command); } @Override public String execute() { StringBuilder sb=new StringBuilder(); ThreadPoolExecutor executor = ConcurrentServer.getExecutor(); sb.append("Server Status;"); sb.append("Actived Threads: "); sb.append(executor.getActiveCount()); sb.append(";"); sb.append("Maximum Pool Size: "); sb.append(executor.getMaximumPoolSize()); sb.append(";"); sb.append("Core Pool Size: "); sb.append(executor.getCorePoolSize()); sb.append(";"); sb.append("Pool Size: "); sb.append(executor.getPoolSize()); sb.append(";"); sb.append("Largest Pool Size: "); sb.append(executor.getLargestPoolSize()); sb.append(";"); sb.append("Completed Task Count: "); sb.append(executor.getCompletedTaskCount()); sb.append(";"); sb.append("Task Count: "); sb.append(executor.getTaskCount()); sb.append(";"); sb.append("Queue Size: "); sb.append(executor.getQueue().size()); sb.append(";"); return sb.toString(); } }
/** * 在这个类中,我们启动了RequestTask这个线程,该线程读取由ConcurrentServer保存的客户端socket, * 创建相应的指令并发给executor执行。 * 这样做的目的是让被线程执行的每个任务只包含和请求相关的代码,其它操作可以在executor外处理 */ public class ConcurrentServer { private static volatile boolean stopped = false; // 用来保存发送消息给服务器的客户的sockets private static LinkedBlockingQueue<Socket> pendingConnections; // 保存每一个在executor里执行的任务所关联的Future对象,主键是用户名, // 键值是另外一个ConcurrentMap (它的主键是ConcurrentCommand,键值是和任务关联的Future实例) private static ConcurrentMap<String, ConcurrentMap<ConcurrentCommand, ServerTask<?>>> taskController; // 执行RequestTask对象的Thread private static Thread requestThread; // 创建指令对象并发送给executor private static RequestTask task; private static ServerSocket serverSocket; public static void main(String[] args) { pendingConnections = new LinkedBlockingQueue<>(); taskController = new ConcurrentHashMap<String, ConcurrentHashMap<ConcurrentCommand, Future<?>>>(); // 启动RequestTask线程 task = new RequestTask(pendingConnections, taskController); requestThread = new Thread(task); requestThread.start(); System.out.println("Initialization completed."); serverSocket = new ServerSocket(Constants.CONCURRENT_PORT); do { try { Socket clientSocket = serverSocket.accept(); pendingConnections.put(clientSocket); } catch (Exception e) { e.printStackTrace(); } } while (!stopped); finishServer(); } // 该方法修改stopped变量为true并关闭serverSocket public static void shutdown() { stopped = true; try { serverSocket.close(); } catch (IOException e) { e.printStackTrace(); } } // 该方法停止executor并中断RequestTask线程 private static void finishServer() { System.out.println("Shutting down the server..."); task.shutdown(); System.out.println("Shutting down Request task"); requestThread.interrupt(); System.out.println("Request task ok"); System.out.println("Closing socket"); System.out.println("Shutting down logger"); System.out.println("Logger ok"); System.out.println("Main server thread ended"); } // 取消一个用户的请求 public static void cancelTasks(String username) { ConcurrentMap<ConcurrentCommand, ServerTask<?>> userTasks = taskController.get(username); if (userTasks == null) { return; } int taskNumber = 0; Iterator<ServerTask<?>> it = userTasks.values().iterator(); while(it.hasNext()) { ServerTask<?> task = it.next(); ConcurrentCommand command = task.getCommand(); if(!(command instanceof ConcurrentCancelCommand) && task.cancel(true)) { taskNumber++; it.remove(); } } } // 当一个任务顺利完成后,我们需要把和这个任务关联的Future对象从保存的ConcurrentMap中删除 public static void finishTask(String username, ConcurrentCommand command) { ConcurrentMap<ConcurrentCommand, ServerTask<?>> userTasks = taskController.get(username); userTasks.remove(command); } } public class RequestTask implements Runnable { // 保存客户端的sockets private LinkedBlockingQueue<Socket> pendingConnections; // 用来并行处理用户的请求指令 private ServerExecutor executor = new ServerExecutor(); // 保存和任务相关联的Future对象 private ConcurrentMap<String, ConcurrentMap<ConcurrentCommand, ServerTask<?>>> taskController; public RequestTask(LinkedBlockingQueue<Socket> pendingConnections, ConcurrentHashMap<String, ConcurrentHashMap<Integer, Future<?>>> taskController) { this.pendingConnections = pendingConnections; this.taskController = taskController; } public void run() { try { while (!Thread.currentThread().interrupted()) { try { Socket clientSocket = pendingConnections.take(); BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); String line = in.readLine(); ConcurrentCommand command; String[] commandData = line.split(";"); System.out.println("Command: " + commandData[0]); switch (commandData[0]) { case "q": System.out.println("Query"); command = new ConcurrentQueryCommand(clientSocket, commandData); break; case "r": System.out.println("Report"); command = new ConcurrentReportCommand(clientSocket, commandData); break; case "s": System.out.println("Status"); command = new ConcurrentStatusCommand(executor, clientSocket, commandData); break; case "z": System.out.println("Stop"); command = new ConcurrentStopCommand(clientSocket, commandData); break; case "c": System.out.println("Cancel"); command = new ConcurrentCancelCommand(clientSocket, commandData); break; default: System.out.println("Error"); command = new ConcurrentErrorCommand(clientSocket, commandData); break; } ServerTask<?> controller = (ServerTask<?>) executor.submit(command); storeContoller(command.getUsername(), controller, command); } catch (IOException e) { e.printStackTrace(); } } } catch (InterruptedException e) { // No Action Required } } // 保存和用户请求关联的Future对象 private void storeContoller(String userName, ServerTask<?>controller, ConcurrentCommand command) { taskController.computeIfAbsent(userName, k -> new ConcurrentHashMap<ConcurrentCommand, ServerTask<?>>()).put(command, controller); } // 关闭executor public void shutdown() { String message = "Request Task: " + pendingConnections.size() + " pending connections."; System.out.println(message); executor.shutdown(); } // 等待executor执行完所有正在执行的任务 public void terminate() { try { executor.awaitTermination(1, TimeUnit.DAYS); executor.writeStatistics(); } catch (InterruptedException e) { e.printStackTrace(); } } }
Executor额外的一些方法
你也可以重载以下Executor的方法:
- shutdown():你必须调用这个方法来结束executor。你可以重载这个方法来释放额外的资源。这个方法等待executor处理完所有排队被执行的任务
- shutdownNow():和sutdown()方法不同的是,该方法不等待executor处理完正在排队的任务
- submit(), invokeall(), invokeany():调用这些方法发送并发任务给executor。如果你需要在任务添加到executor里的任务队列之前或之后做一些额外的操作,你可以重载它们。请注意在任务插入队列之前或之后执行的额外操作不同于任务执行前或执行后,如果想要在任务执行前后做一些额外操作,你必须重载beforeExecute()和afterExecute()方法。
ScheduledThreadPoolExecutor类有以下方法允许延迟执行任务,或周期性的任务:
- schedule():这个方法允许在指定的延迟之后执行任务。任务只被执行一次。
- scheduleAtFixedRate():这个方法允许在指定的延迟之后周期性地执行任务。它和方法scheduleWithFixedDelay()不同的是:对于scheduleWithFixedDelay()方法,两次执行的时间间隔是上一次执行结束到下一次执行开始之间的时间间隔。对于scheduleAtFixedRate(),两次执行的时间间隔是上一次执行开始到下一次执行开始之间的时间间隔。
相关推荐
java Executors 使用实例 concurrent.ExecutorService
Java中的线程池Executors java中的线程池Executors是Java并发编程中的一种重要概念,它提供了一种高效、灵活的线程管理机制。使用线程池可以降低资源消耗,提高响应速度,提高线程的可管理性。 线程池的优点 1. ...
《PyPI官网下载:more_executors-1.1.0-py2.py3-none-any.whl》 在Python的世界里,PyPI(Python Package Index)是官方的第三方库仓库,它为开发者提供了一个集中地来发布、搜索和下载Python软件包。本资源“more_...
NULL 博文链接:https://bijian1013.iteye.com/blog/2284676
Java多线程实现数据切割批量执行,实现限流操作。 java线程池Executors实现...线程池调用第三方接口限流实现逻辑。 案例适合: 1.批量处理大数据。 2.数据批量导出。 3任务数据异步执行。 4.多线程请求第三方接口限流。
《PyPI官网下载 | more_executors-2.5.1-py2.py3-none-any.whl》 在Python的世界里,PyPI(Python Package Index)是最重要的资源库,它为全球的开发者提供了一个集中地来发布、分享和安装Python软件包。这个资源...
运行使用 docker 的 Jenkins 构建 这是一个包含 Jenkins 的 docker 镜像,并且能够在 docker 内部运行 docker 本身! (疯狂,我知道)。 docker run -p 8080:8080 -privileged michaelneale/jenkins-docker-...
资源分类:Python库 所属语言:Python 资源全名:more-executors-1.19.1.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059
本篇将深入探讨`Executors`类的使用方法及其背后的并发原理。 `Executors`类是Java并发包`java.util.concurrent`中的一个静态工厂类,它提供了多种方法用于创建不同类型的线程池。下面我们将逐一介绍这些方法以及...
【Executor、Executors和ExecutorService详解】 在Java并发编程中,`Executor`、`Executors`和`ExecutorService`是核心组件,它们帮助开发者高效管理线程资源,提高程序的并发性能。理解这三个概念的区别和用途是...
Java中Executors类中几种创建各类型线程池方法及简单实例
- 当内存不足以存储所有数据时,可以考虑使用序列化的方式来减少内存占用。 - 如果内存仍然不足,可以考虑采用内存+磁盘的存储方式。 - 在内存足够的情况下,还可以使用双副本机制来提高数据的可靠性。 #### 五...
Executors框架的结构包括任务、任务执行机制和异步计算的结果三个部分。 1. 背景 在Java中,使用线程来异步执行任务。Java线程的创建与销毁需要一定的开销,如果我们为每一个任务创建一个新线程来执行,这些线程的...
Java并发编程中,`java.util.concurrent.Executors` 类是一个至关重要的工具类,它提供了一系列静态方法,用于创建和管理线程池以及相关的线程执行服务。`Executors` 类简化了线程池的创建和线程的调度,使得开发者...
本文主要探讨Spark的常规性能调优策略,包括资源分配优化和RDD优化,旨在帮助用户最大化利用资源,提升任务执行效率。 首先,我们要理解资源分配的重要性。Spark性能调优的核心在于合理分配资源,这涉及到Executor...
《深入解析Java Executors源码》 在Java多线程编程中,`java.util.concurrent.Executors`类扮演着至关重要的角色。...正确理解和使用`Executors`,能够让我们更好地驾驭Java并发编程,提升系统性能。