该工具类适用于以下场合:
a) 关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。
b) 缓存。缓存中的对象,超过了空闲时间,需要从缓存中移出。
c) 任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。
d)心跳任务
package com.yuan.common.async;
import java.util.Iterator;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.yuan.common.collection.DelayItem;
public abstract class TimeoutTask<T> implements Runnable {
private static final Logger log = LoggerFactory.getLogger(TimeoutTask.class);
private DelayQueue<DelayItem<T>> timeoutQueue = new DelayQueue<DelayItem<T>>(); //超时队列
private Thread daemonThread;
private AtomicBoolean running = new AtomicBoolean(true);
private String name;
private Object lock = new Object();
public TimeoutTask(String name){
this.name = name;
}
public void start(){
if(daemonThread == null){
daemonThread = new Thread(this);
daemonThread.setDaemon(true);
daemonThread.setName(name);
daemonThread.start();
}
}
public void stop(){
if(daemonThread != null){
running.compareAndSet(true, false);
daemonThread.interrupt();
}
}
public void join() throws InterruptedException{
if(daemonThread != null){
daemonThread.join();
}
}
@Override
public void run() {
while(running.get()) {
try {
if(timeoutQueue.isEmpty()){//如果超时队列是空的就阻塞守护线程
lock();
}
DelayItem<T> delayItem = timeoutQueue.take();
if(delayItem == null){
continue;
}
process(delayItem.getItem());
//重新开始延迟
delayItem.resetTime();
timeoutQueue.put(delayItem);
} catch (InterruptedException e) {
log.warn(e.getMessage(), e);
}
}
}
protected void lock() throws InterruptedException{
synchronized(lock){
lock.wait();
}
}
protected void unlock(){
synchronized(lock){
lock.notifyAll();
}
}
protected abstract void process(T submit);
public void put(T submit, long timeout, TimeUnit unit){
int minutes = (int)TimeUnit.MINUTES.convert(timeout, unit);
timeoutQueue.put(new DelayItem<T>(submit, minutes));
unlock();
}
public void remove(T submit){
DelayItem<T> delayItem = getDelayItem(submit);
if(delayItem != null){
timeoutQueue.remove(delayItem);
}
}
public void resetTime(T submit){
DelayItem<T> delayItem = getDelayItem(submit);
if(delayItem != null){
delayItem.resetTime();
}
}
protected DelayItem<T> getDelayItem(T submit){
Iterator<DelayItem<T>> it = timeoutQueue.iterator();
while(it.hasNext()){
DelayItem<T> delayItem = it.next();
if(delayItem.getItem() == submit){
return delayItem;
}
}
return null;
}
}
package com.yuan.common.collection;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class DelayItem<T> implements Delayed {
private static final long NANO_ORIGIN = System.nanoTime(); //基准时间
private static final AtomicLong sequencer = new AtomicLong(0); //
private final long sequenceNumber; //FIFO, 保证不会有两个以上的延迟项同时被执行, 先开始延迟的优先执行
private long time;
private final T item;
private final long timeout;
public DelayItem(T submit, int minutes){
this(submit, TimeUnit.NANOSECONDS.convert(minutes, TimeUnit.MINUTES));
}
public DelayItem(T submit, long timeout) {
this.timeout = timeout;
this.time = now() + timeout;
this.item = submit;
this.sequenceNumber = sequencer.getAndIncrement();
}
public void resetTime(){
this.time = now() + timeout;
}
final static long now() {
return System.nanoTime() - NANO_ORIGIN;
}
public T getItem() {
return this.item;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), TimeUnit.NANOSECONDS);
}
@Override
public int compareTo(Delayed other) {
if (other == this) // compare zero ONLY if same object
return 0;
if (other instanceof DelayItem) {
DelayItem<?> x = (DelayItem<?>) other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long d = (getDelay(TimeUnit.NANOSECONDS) - other
.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}
}
用法:
TimeoutTask<String> timeoutTask = new TimeoutTask<String>("test task") {
protected void process(String submit) {
System.out.println("process : " + submit);
}
};
timeoutTask.put("test", 1, TimeUnit.MINUTES);
timeoutTask.start();
分享到:
相关推荐
在这个项目中,我们关注的是如何利用Netty内置的时间轮(TimeWheel)工具来处理大批量的定时或超时任务。 时间轮是一种高效的数据结构,常用于实现定时器和延迟队列。它是由一系列的槽(Bucket)组成,每个槽代表一...
// 如果是超时任务完成,取消函数任务并抛出异常 else { cancellationTokenSource.Cancel(); throw new TimeoutException($"函数执行超时,已取消。"); } } } ``` 在这个示例中,`ExecuteWithTimeout`方法...
它创建了一个新的`Socket`对象,然后同时启动连接任务和超时任务。当任一任务完成时,检查哪个任务先完成。如果连接任务先完成,那么创建一个新的`TcpClient`实例并返回;如果超时任务先完成,则抛出一个`...
在超时后,我们可以调用`cancel`方法尝试取消任务,但要注意的是,只有当任务尚未开始执行或者正在执行的任务可以被中断时,`cancel`方法才能成功。 以下是一个简单的示例: ```java ExecutorService executor = ...
Java实现任务超时处理方法是指在Java中对超时任务的处理,主要有两种方式:基于异步任务结果的超时获取和使用延时任务来终止超时操作。 基于异步任务结果的超时获取是通过使用Future对象来实现的。在提交任务时,会...
"vCenter任务、事件清除"是指在vCenter Server中对积累的任务记录和事件通知进行清理的过程。这通常是为了优化系统性能,减少存储占用,以及保持管理界面的清晰整洁。 vCenter 6.x 版本引入了许多改进和增强,包括...
// 如果是超时任务完成,取消其他任务 if (completedTask == Task.Delay(timeoutMilliseconds)) { cts.Cancel(); throw new TimeoutException("请求超时"); } // 收集已完成的任务的结果 var results = new...
2. **及时检测的意义:** 及时检测和处理超时任务有助于防止系统资源的过度消耗,确保关键任务能够平稳可靠地执行。这对于保障边缘计算网络的整体稳定性和可靠性至关重要。 #### 二、超时控制算法在边缘网络中的...
当我们提交一个任务到`ExecutorService`时,它会返回一个`Future`对象,我们可以利用这个对象来检查任务是否完成或者设置超时。 ```java ExecutorService executor = Executors.newSingleThreadExecutor(); Future...
- 当检测到超时任务时,可以从队列中移除并重新放入,或者根据策略放入重试队列。 4. **故障恢复与幂等性**: - 在分布式环境中,任务可能会因网络问题或服务器故障而丢失,所以KMQueue可能包含一种方法来确保...
一直用线程池,但有些时候发现线程池无法控制结束某个超时等待的线程任务,所以就勉为其难的自己写了个线程管理~。作用:模仿线程池操作,管理多线程任务,超时,以及完成任务的回调。如果有bug自行处理,服务器挂机...
在`testovertime.sh`这个脚本中,我们可以推测其可能涉及到的工作流程是检查或处理超时任务。例如,它可能遍历日志文件,查找运行时间超过预定阈值的进程,然后进行相应的操作,如发送警告邮件或强制终止进程。 ...
首先,`OutTimeClass.cs`可能包含了一个自定义的超时类,它可能提供了设置超时时间、执行任务以及在超时时抛出异常等功能。这样的类通常会使用`System.Threading.Tasks.Task`或`System.Threading.Timer`来监控任务的...
为了设置超时,代码创建了一个新的线程,并在其中调用了`get()`方法,这个方法会阻塞直到任务完成或达到指定的超时时间。如果任务在10秒内未完成,`get()`方法会抛出`TimeoutException`。 当`TimeoutException`捕获...
不用担心task-running-overtime,因为task框架已经实现了对超时任务的监控,一旦任务超时,任务监控器会将其从线程池中移除,还能帮你自动结束超时任务。 3.任务调度框架性能优良,如果任务队列为空,框架会自动...
这使得我们可以同时跟踪异步操作和超时任务,实现超时控制。 测试`TryRunWithTimeoutAsync`方法时,我们可能会创建一些模拟的异步操作,如模拟长时间运行的任务或者快速完成的任务,然后调用`...
创建和管理重复任务入门[sudo] npm install timeouter --save例子//Start a basic repeating logvar func = timeouter . add ( {func : function ( ) {console . log ( "Repeating event" )} ,timeout : 1000} )//...
通过`curl_multi_perform`可以并行执行这些请求,同时利用`curl_easy_setopt`设置超时参数,如`CURLOPT_TIMEOUT`来指定单个请求的超时时间。 压缩包中的`paw-master`可能是一个开源项目,它可能包含了实现上述功能...
在Java编程中,控制程序执行超时是一项重要的任务,特别是在多线程环境下,我们可能需要确保某个任务不会无限制地运行下去,导致资源耗尽。本文将深入探讨如何使用Java的线程机制来实现程序执行的超时控制,同时也会...
C#利用Task实现任务超时多任务一起执行的方法 本文主要介绍了C#利用Task实现任务超时,多任务一起执行的相关知识点。Task是C#中的一个异步编程模型,能够帮助开发者更方便地编写异步代码。下面是本文中所涉及的知识...