`
g21121
  • 浏览: 696096 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Worker

 
阅读更多

        Worker是线程池中实际工作的线程,Worker是 ThreadPoolExecutor的私有内部类,它实现了 Runnable接口,所以可以作为一个线程来使用。

private final class Worker implements Runnable {
        //锁
	private final ReentrantLock runLock = new ReentrantLock();
        //任务
	private Runnable firstTask;
        //完成任务数
	volatile long completedTasks;
        //Worker运行的线程
	Thread thread;
        //构造方法,firstTask为要执行的任务
	Worker(Runnable firstTask) {
		this.firstTask = firstTask;
	}

	boolean isActive() {
		return runLock.isLocked();
	}
        //如果没有任务就中断线程
	void interruptIfIdle() {
		final ReentrantLock runLock = this.runLock;
		if (runLock.tryLock()) {
			try {
				if (thread != Thread.currentThread())
					thread.interrupt();
			} finally {
				runLock.unlock();
			}
		}
	}
        //立即中断线程,即使正在运行任务
	void interruptNow() {
		thread.interrupt();
	}
        //执行任务,在beforeExecute方法之后,afterExecute方法之前执行
	private void runTask(Runnable task) {
		final ReentrantLock runLock = this.runLock;
		runLock.lock();
		try {
			if (runState < STOP && Thread.interrupted() && runState >= STOP)
				thread.interrupt();
			boolean ran = false;
			beforeExecute(thread, task);
			try {
				task.run();
				ran = true;
				afterExecute(task, null);
				++completedTasks;
			} catch (RuntimeException ex) {
				if (!ran)
					afterExecute(task, ex);
				throw ex;
			}
		} finally {
			runLock.unlock();
		}
	}

	/**
	 * 任务执行主循环方法
	 */
	public void run() {
		try {
			Runnable task = firstTask;
			firstTask = null;
			while (task != null || (task = getTask()) != null) {
				runTask(task);
				task = null;
			}
		} finally {
			workerDone(this);
		}
	}
}

 

        1.成员变量

        Worker有四个成员变量:

/**
 * 可重入的互斥锁
 */
private final ReentrantLock runLock = new ReentrantLock();

/**
 * 初始任务在进入run循环之前运行,可能为空的
 */
private Runnable firstTask;

/**
 * 每个线程完成任务的计数器
 */
volatile long completedTasks;

/**
 * Worker运行时的线程
 */
Thread thread;

 

        2.run方法

        其中最为核心的部分就是 Worker的run方法:

	public void run() {
		try {
			Runnable task = firstTask;
			firstTask = null;
			while (task != null || (task = getTask()) != null) {
				runTask(task);
				task = null;
			}
		} finally {
			workerDone(this);
		}
	}

        从run方法的实现可以看出,它首先从成员变量中获取 firstTask(new时传进来的),然后利用while循环不断执行 runTask()方法,在 runTask()执行完之后,循环依然不断通过 getTask()去取新的任务来执行,其中 getTask方法是从任务缓存队列中获取任务。

 

        3.runTask方法

        run方法中获取到新任务后实际上调用的是 runTask方法来执行任务,以下就是 runTask方法的源代码:

private void runTask(Runnable task) {
	
	final ReentrantLock runLock = this.runLock;
	//获取锁
	runLock.lock();
	try {
		if (runState < STOP && Thread.interrupted() && runState >= STOP)
			thread.interrupt();
		boolean ran = false;
		//前置操作
		beforeExecute(thread, task);
		try {
			task.run();
			ran = true;
			//后置操作
			afterExecute(task, null);
			++completedTasks;
		} catch (RuntimeException ex) {
			if (!ran)
				afterExecute(task, ex);
			throw ex;
		}
	} finally {
		//释放锁
		runLock.unlock();
	}
}

        1)首先获得锁。

        2)获得锁后判断线程池的状态如果是 RUNNING或 SHUTDOWN则终止该线程;然后调用Thread.interrupted() 方法,Thread.interrupted() 内部实现为:

public static boolean interrupted() {
	return currentThread().isInterrupted(true);
}

        interrupted方法的作用是清除该线程的中断标志位。当线程处于中断状态时,其标志位为true。

        举一个例子来说明将更加清晰:

public class ThreadB implements Runnable {

	public void run() {
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println(Thread.currentThread().isInterrupted());
	}

	public static void main(String[] args) {
		Thread t = new Thread(new ThreadB());
		t.start();
		t.interrupt();
	}
}
//结果:
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at ThreadB.run(ThreadB.java:5)
	at java.lang.Thread.run(Thread.java:662)
false

        发现当线程处于中断状态时,再调用sleep,wait等方法会抛出 InterruptedException异常,InterruptedException异常的抛出就是根据这个中断标志位来判断的。我们修改一下代码,利用 interrupted方法人为的清除中断标志位:

public class ThreadB implements Runnable {

	public void run() {
		System.out.println(Thread.currentThread().isInterrupted());
		System.out.println(Thread.currentThread().interrupted());
		System.out.println(Thread.currentThread().interrupted());
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	public static void main(String[] args) {
		Thread t = new Thread(new ThreadB());
		t.start();
		t.interrupt();
	}
}
//结果:
true
true
false

         发现已经没有异常抛出了。但是再调用两次 interrupted方法后返回的结果就都是false了,是因为标志位已经被清除,所以再次调用就无效了。

        最后再判断线程池的状态,终止线程。

        3)声明ran变量,用于标志任务是否执行完成。

        4)执行 beforeExecute方法,在任务开始执行之前会首先执行 beforeExecute方法:

 

protected void beforeExecute(Thread t, Runnable r) { }
        beforeExecute是 ThreadPoolExecutor的方法,从上面源代码中发现 ThreadPoolExecutor类并没有实现任何代码,因为 ThreadPoolExecutor时可以扩展的,所以 beforeExecute与 afterExecute一样可在子类中定制,如:
@Override
protected void beforeExecute(Thread t, Runnable r) {
	super.beforeExecute(t, r);
	// 其他操作,如日志等
}
        beforeExecute与 afterExecute可以在任务执行的前后添加日志、计时、监控或统计信息收集等功能。

 

        5)task.run();执行任务,执行成功后ran标志会赋值为true,证明任务已经完成,如果任务执行过程中抛出异常则会跳转到 catch代码中,ran的值仍为false。

        6)完成任务执行后调用 afterExecute方法,并增加完成任务数。

        7)释放锁。

 

        4.interruptNow方法

        interruptNow用于终止正在运行的任务,其源代码为:

void interruptNow() {
	thread.interrupt();
}

        interruptNow中调用的是 interrupt方法。

 

        5.interruptIfIdle方法

        interruptIfIdle方法用于中断空闲的线程。

/**
 * 如果没有运行任务则中止线程
 */
void interruptIfIdle() {
    final ReentrantLock runLock = this.runLock;
    if (runLock.tryLock()) {
        try {
    if (thread != Thread.currentThread())
	thread.interrupt();
        } finally {
            runLock.unlock();
        }
    }
}

 

        除了 Worker类中的这些方法外 ThreadPoolExecutor有几个与 Worker相关的实用方法。

 

        6.getTask方法

        getTask 是 ThreadPoolExecutor类中的方法,并不是 Worker类中的方法,下面是 getTask方法的实现:

Runnable getTask() {
	for (;;) {
		try {
			int state = runState;
			if (state > SHUTDOWN)
				return null;
			Runnable r;
			if (state == SHUTDOWN) // Help drain queue
				r = workQueue.poll();
			else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
				r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
			else
				r = workQueue.take();
			if (r != null)
				return r;
			if (workerCanExit()) {
				if (runState >= SHUTDOWN) // Wake up others
					interruptIdleWorkers();
				return null;
			}
			// Else retry
		} catch (InterruptedException ie) {
			// On interruption, re-check runState
		}
	}
}

        在getTask中,首先判断当前线程池状态,如果 runState大于SHUTDOWN(STOP或者TERMINATED),则直接返回null。

   如果 runState为 SHUTDOWN或者 RUNNING,则从任务缓存队列取任务。

   如果当前线程池的线程数大于核心池大小 corePoolSize或者允许为核心池中的线程设置空闲存活时间,则调用 poll(time,timeUnit)来取任务,这个方法会等待一定的时间,如果取不到任务就返回null。

        然后判断取到的任务r是否为null,为null则通过调用workerCanExit()方法来判断当前worker是否可以退出。

        也就是说如果线程池处于STOP状态、或者任务队列已为空或者允许为核心池线程设置空闲存活时间并且线程数大于1时,允许 worker退出。如果允许 worker退出,则调用 interruptIdleWorkers()中断处于空闲状态的 worker。实际上 interruptIdleWorkers方法调用的是 Worker的 interruptIfIdle()方法。

 

        7.workerCanExit方法

        workerCanExit方法判断Worker是否可以退出。

/**
 * 判断Worker是否可以退出
 */
private boolean workerCanExit() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    boolean canExit;
    try {
        canExit = runState >= STOP ||
            workQueue.isEmpty() ||
            (allowCoreThreadTimeOut &&
             poolSize > Math.max(1, corePoolSize));
    } finally {
        mainLock.unlock();
    }
    return canExit;
}

        其中判断语句是关键。

        首先判断运行状态为 STOP或 TERMINATED;

        然后判断worker队列workQueue如果为空;

        之后判断 allowCoreThreadTimeOut属性是否为true;

        最后判断poolSize(线程数)大于1或核心线程数。满足其中任何一个条件都说明该 Worker可以退出。

 

        8.interruptIdleWorkers

        interruptIdleWorkers方法利用循环调用每个 Worker的 interruptIfIdle方法。

/**
 * 唤醒所有可能正在等待任务的线程,这样他们就可以检查终止。注:这种方法也被称为 scheduledthreadpoolexecutor。
 */
void interruptIdleWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)
            w.interruptIfIdle();
    } finally {
        mainLock.unlock();
    }
}

 

分享到:
评论

相关推荐

    nextjs-worker-example:这是Webpack使用Next.js加载Web Worker的示例

    NextJS Worker示例这是使Web Worker在NextJS项目中运行的示例。 要使用worker-loader将Web Worker加载到NextJS站点上,并允许在其worker上运行babel等webpack加载器,必须覆盖构建输出路径。 感谢。 // next.config....

    Web Worker版调用Face-Api.js

    "Web Worker版调用Face-Api.js"是一个解决方案,它利用Web Worker技术来封装并异步执行Face-API.js库,从而避免阻塞主线程,提供高性能的用户体验。以下是关于这个主题的详细知识点: 1. **Web Worker**:Web ...

    向Webpack添加原生WebWorker捆绑支持

    然而,默认情况下,Webpack 不直接支持原生Web Worker的捆绑和编译。Web Worker是HTML5引入的一个特性,用于在后台线程中执行脚本,以实现多线程处理,避免阻塞主线程。由于Web Worker的特殊性,其加载和运行机制与...

    ThinkPHP5集成GatewayWorker

    其设计模式为Broker-Worker架构,Broker负责调度,Worker负责业务处理,这种分离使得系统扩展性极强。 在ThinkPHP5框架中集成GatewayWorker,首先需要确保你的开发环境已经安装了PHP、Composer以及ThinkPHP5框架。...

    Cloudflare worker脚本代理OpenAI对Azure OpenAI服务的请求.zip

    这个压缩包文件"Cloudflare worker脚本代理OpenAI对Azure OpenAI服务的请求.zip"显然是为了实现一个中间人代理,通过Cloudflare Worker来转发OpenAI API的请求到Azure的OpenAI服务。 首先,我们需要理解OpenAI API...

    laravel+gatewayworker(workerman)+vue实现用户聊天功能代码

    本示例通过集成Laravel、GatewayWorker(基于Workerman)和Vue.js技术栈,提供了一个完整的用户聊天功能解决方案。下面将详细介绍这些技术以及如何结合它们实现聊天功能。 **Laravel** Laravel是一款基于PHP的开源...

    pdf.js和pdf.worker.js html打开pdf所需要的js

    PDF.js和PDF.Worker.js是Mozilla开发的开源库,用于在Web浏览器中渲染PDF文档,无需依赖任何插件。这两个JavaScript文件是实现HTML页面加载和显示PDF的关键组件,提供了纯JavaScript解决方案来处理PDF文档。 PDF.js...

    前端开源库-web-worker-manager

    Web Worker Manager作为一个开源库,它的主要目标是简化Web Worker的创建、管理和通信流程,提供更高级别的抽象,使得开发者能够更加方便地利用Web Worker的多线程能力。它通常包含以下功能: 1. **Worker实例化和...

    pdf.js&pdf;.worker.js

    `pdf.worker.js`与`pdf.js`通过消息传递机制进行通信,当需要解析PDF时,`pdf.js`会创建一个新的Web Worker并加载`pdf.worker.js`,然后在Worker中执行实际的解析操作。 在实际使用PDF.js时,首先需要确保浏览器...

    Web-Worker在webgl中的应用(百度地图技术团队)1

    1. 主线程创建 Worker 实例:通过 `new Worker('worker.js')` 创建一个新的 Worker 线程,其中 'worker.js' 是工作线程的脚本文件。 2. 监听消息:使用 `worker.onmessage` 事件处理器来接收 Worker 线程发送的消息...

    Thinkphp6 redis队列 消息事件 gatewayworker聊天打通版

    在IT行业中,构建高效、实时的在线应用是关键任务之一,而`Thinkphp6`、`Redis`、`GatewayWorker`以及`queue队列`这些技术的结合则为实现这一目标提供了强大的支持。本文将深入探讨如何利用这些技术构建一个聊天系统...

    thinkphp5.1完美集成gatewayworker

    标题中的“thinkphp5.1完美集成gatewayworker”是指在ThinkPHP5.1这个流行的PHP框架基础上,成功地整合了GatewayWorker,一个专为实时通信设计的高性能、高并发的PHP服务框架。这个集成允许开发者在Web应用中无缝地...

    GatewayWorker框架源码

    GatewayWorker采用Socket进行进程间的通信,如Gateway与BusinessWorker之间的数据传递,Manager对Worker的管理命令等。通过socket通信,可以实现跨进程的数据交换,确保系统的扩展性和稳定性。 6. 心跳机制: 为了...

    GatewayWorker与ThinkPHP3整合

    **标题:“GatewayWorker与ThinkPHP3整合”** 在现代Web开发中,实时通信已经成为不可或缺的一部分,这通常是通过WebSocket协议实现的。GatewayWorker是一款高性能、易用的分布式长连接服务器,适用于聊天、推送、...

    简单的定时任务 .NETCore3.1 WorkerService.zip

    .NET Core 3.1 Worker Service 是一个用于创建后台任务或长时间运行服务的框架,它在 .NET Core 平台上提供了一种简单的方式来实现定时任务。这个压缩包 "简单的定时任务 .NETCore3.1 WorkerService.zip" 包含了一个...

    Nginx worker_connections配置太低导致500错误案例

    最近一次安全培训,需要用到安全攻防平台,结果30几个人登录上去直接爆出500错误。不知道什么原因,后来找来SSH登录用户,密码,逐步排查,发现了Nginx... 您可能感兴趣的文章:Nginx中worker connections问题的解决方法

    tp5.1+GatewayWorker 修复GatewayWorker在windows启动问题

    命令php think worker:gateway在windows下运行...根据GatewayWorker-for-win提供的demo修改的 本资源依赖GatewayWorker扩展,请先安装扩展。 使用方法,把解压后的文件夹放到项目根目录,双击start_for_win.bat,启动

    基于ThinkPHP5和Layui的GatewayWorker开源多客服聊天系统设计源码

    本资源提供了一套基于ThinkPHP5、Layui和GatewayWorker的开源多客服聊天系统的设计源码,包含448个文件,其中包括237个PHP源代码文件,45个JavaScript脚本文件,30个HTML页面文件,以及20个CSS样式文件。此外,还...

    WorkerDOM实现运行在一个WebWorker中的DOMAPI和框架

    WorkerDOM 是一个JavaScript库,它的主要目标是将DOM API和框架功能带入Web Worker环境。在Web Worker中执行DOM操作可以显著提升Web应用的性能,因为它允许我们在后台线程处理复杂的计算任务,而不阻塞主线程,从而...

    pdf.js和pdf.worker.js

    2. **创建Worker**:接着,PDF.js会创建一个pdf.worker.js实例,这个worker将在后台运行,负责处理PDF数据的解析和解码。 3. **数据获取**:浏览器通过Ajax或Fetch API请求PDF文档的二进制数据。一旦数据加载完成,...

Global site tag (gtag.js) - Google Analytics