`
yuancihang
  • 浏览: 145401 次
  • 性别: Icon_minigender_1
  • 来自: 洛阳
社区版块
存档分类
最新评论

超时任务

阅读更多

该工具类适用于以下场合:

a) 关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。
b) 缓存。缓存中的对象,超过了空闲时间,需要从缓存中移出。
c) 任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。

d)心跳任务

 

package com.yuan.common.async;

import java.util.Iterator;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.yuan.common.collection.DelayItem;

public abstract class TimeoutTask<T> implements Runnable {
   
    private static final Logger log = LoggerFactory.getLogger(TimeoutTask.class);
   
    private DelayQueue<DelayItem<T>> timeoutQueue = new DelayQueue<DelayItem<T>>(); //超时队列
    private Thread daemonThread;
    private AtomicBoolean running = new AtomicBoolean(true);
    private String name;
    private Object lock = new Object();

    public TimeoutTask(String name){
        this.name = name;
    }
   
    public void start(){
        if(daemonThread == null){
            daemonThread = new Thread(this);
            daemonThread.setDaemon(true);
            daemonThread.setName(name);
            daemonThread.start();
        }
    }
   
    public void stop(){
        if(daemonThread != null){
            running.compareAndSet(true, false);
            daemonThread.interrupt();
        }
    }
   
    public void join() throws InterruptedException{
        if(daemonThread != null){
            daemonThread.join();
        }
    }
   
    @Override
    public void run() {
        while(running.get()) {
            try {
                if(timeoutQueue.isEmpty()){//如果超时队列是空的就阻塞守护线程
                    lock();
                }
                DelayItem<T> delayItem = timeoutQueue.take();
                if(delayItem == null){
                    continue;
                }
               
                process(delayItem.getItem());
               
                //重新开始延迟
                delayItem.resetTime();
                timeoutQueue.put(delayItem);
            } catch (InterruptedException e) {
                log.warn(e.getMessage(), e);
            }
        }

    }
    protected void lock() throws InterruptedException{
        synchronized(lock){
            lock.wait();
        }
    }
    protected void unlock(){
        synchronized(lock){
            lock.notifyAll();
        }
    }
   
    protected abstract void process(T submit);
   
    public void put(T submit, long timeout, TimeUnit unit){
       
        int minutes = (int)TimeUnit.MINUTES.convert(timeout, unit);
        timeoutQueue.put(new DelayItem<T>(submit, minutes));
        unlock();
    }
   
    public void remove(T submit){
        DelayItem<T> delayItem = getDelayItem(submit);
       
        if(delayItem != null){
            timeoutQueue.remove(delayItem);
        }
    }
   
    public void resetTime(T submit){
        DelayItem<T> delayItem = getDelayItem(submit);
       
        if(delayItem != null){
            delayItem.resetTime();
        }
       
    }
   
    protected DelayItem<T> getDelayItem(T submit){
       
        Iterator<DelayItem<T>> it = timeoutQueue.iterator();
        while(it.hasNext()){
            DelayItem<T> delayItem = it.next();
            if(delayItem.getItem() == submit){
               
                return delayItem;
            }
        }
       
        return null;
    }

}

 

 

package com.yuan.common.collection;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class DelayItem<T> implements Delayed {
   
    private static final long NANO_ORIGIN = System.nanoTime(); //基准时间
    private static final AtomicLong sequencer = new AtomicLong(0); //
    private final long sequenceNumber; //FIFO, 保证不会有两个以上的延迟项同时被执行, 先开始延迟的优先执行
    private long time;
    private final T item;
    private final long timeout;
   
    public DelayItem(T submit, int minutes){
        this(submit, TimeUnit.NANOSECONDS.convert(minutes, TimeUnit.MINUTES));
    }
   
    public DelayItem(T submit, long timeout) {
        this.timeout = timeout;
        this.time = now() + timeout;
        this.item = submit;
        this.sequenceNumber = sequencer.getAndIncrement();
    }
   
    public void resetTime(){
        this.time = now() + timeout;
    }
   
    final static long now() {
        return System.nanoTime() - NANO_ORIGIN;
    }
   
    public T getItem() {
        return this.item;
    }
   
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(time - now(), TimeUnit.NANOSECONDS);
    }

    @Override
    public int compareTo(Delayed other) {
        if (other == this) // compare zero ONLY if same object
            return 0;
        if (other instanceof DelayItem) {
            DelayItem<?> x = (DelayItem<?>) other;
            long diff = time - x.time;
            if (diff < 0)
                return -1;
            else if (diff > 0)
                return 1;
            else if (sequenceNumber < x.sequenceNumber)
                return -1;
            else
                return 1;
        }
        long d = (getDelay(TimeUnit.NANOSECONDS) - other
                .getDelay(TimeUnit.NANOSECONDS));
        return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
    }

}

 

用法:

TimeoutTask<String> timeoutTask = new TimeoutTask<String>("test task") {
            protected void process(String submit) {
                System.out.println("process : " + submit);
               
            }
        };
        timeoutTask.put("test", 1, TimeUnit.MINUTES);
       
        timeoutTask.start();

 

分享到:
评论
1 楼 sp42 2015-08-09  
学习了 谢谢!

相关推荐

    基于Java+netty内置时间轮工具处理大批量定时或超时任务工具源码.zip

    在这个项目中,我们关注的是如何利用Netty内置的时间轮(TimeWheel)工具来处理大批量的定时或超时任务。 时间轮是一种高效的数据结构,常用于实现定时器和延迟队列。它是由一系列的槽(Bucket)组成,每个槽代表一...

    C#函数超时执行事例

    // 如果是超时任务完成,取消函数任务并抛出异常 else { cancellationTokenSource.Cancel(); throw new TimeoutException($"函数执行超时,已取消。"); } } } ``` 在这个示例中,`ExecuteWithTimeout`方法...

    Tcpclient连接服务器连接超时设置

    它创建了一个新的`Socket`对象,然后同时启动连接任务和超时任务。当任一任务完成时,检查哪个任务先完成。如果连接任务先完成,那么创建一个新的`TcpClient`实例并返回;如果超时任务先完成,则抛出一个`...

    BlockingQueue队列自定义超时时间取消线程池任务

    在超时后,我们可以调用`cancel`方法尝试取消任务,但要注意的是,只有当任务尚未开始执行或者正在执行的任务可以被中断时,`cancel`方法才能成功。 以下是一个简单的示例: ```java ExecutorService executor = ...

    Java实现任务超时处理方法

    Java实现任务超时处理方法是指在Java中对超时任务的处理,主要有两种方式:基于异步任务结果的超时获取和使用延时任务来终止超时操作。 基于异步任务结果的超时获取是通过使用Future对象来实现的。在提交任务时,会...

    vCenter任务、事件清除

    "vCenter任务、事件清除"是指在vCenter Server中对积累的任务记录和事件通知进行清理的过程。这通常是为了优化系统性能,减少存储占用,以及保持管理界面的清晰整洁。 vCenter 6.x 版本引入了许多改进和增强,包括...

    异步编程超时取消并返回结果

    // 如果是超时任务完成,取消其他任务 if (completedTask == Task.Delay(timeoutMilliseconds)) { cts.Cancel(); throw new TimeoutException("请求超时"); } // 收集已完成的任务的结果 var results = new...

    边缘计算促进的超时控制网络边缘智能.pptx

    2. **及时检测的意义:** 及时检测和处理超时任务有助于防止系统资源的过度消耗,确保关键任务能够平稳可靠地执行。这对于保障边缘计算网络的整体稳定性和可靠性至关重要。 #### 二、超时控制算法在边缘网络中的...

    Java线程超时监控

    当我们提交一个任务到`ExecutorService`时,它会返回一个`Future`对象,我们可以利用这个对象来检查任务是否完成或者设置超时。 ```java ExecutorService executor = Executors.newSingleThreadExecutor(); Future...

    该框架是基于redis实现的分布式队列

    - 当检测到超时任务时,可以从队列中移除并重新放入,或者根据策略放入重试队列。 4. **故障恢复与幂等性**: - 在分布式环境中,任务可能会因网络问题或服务器故障而丢失,所以KMQueue可能包含一种方法来确保...

    多线程管理框架支持超时退出任务回调处理

    一直用线程池,但有些时候发现线程池无法控制结束某个超时等待的线程任务,所以就勉为其难的自己写了个线程管理~。作用:模仿线程池操作,管理多线程任务,超时,以及完成任务的回调。如果有bug自行处理,服务器挂机...

    shell-任务

    在`testovertime.sh`这个脚本中,我们可以推测其可能涉及到的工作流程是检查或处理超时任务。例如,它可能遍历日志文件,查找运行时间超过预定阈值的进程,然后进行相应的操作,如发送警告邮件或强制终止进程。 ...

    C#超时方法 正则超时

    首先,`OutTimeClass.cs`可能包含了一个自定义的超时类,它可能提供了设置超时时间、执行任务以及在超时时抛出异常等功能。这样的类通常会使用`System.Threading.Tasks.Task`或`System.Threading.Timer`来监控任务的...

    Android 异步任务 设置 超时使用handler更新通知功能

    为了设置超时,代码创建了一个新的线程,并在其中调用了`get()`方法,这个方法会阻塞直到任务完成或达到指定的超时时间。如果任务在10秒内未完成,`get()`方法会抛出`TimeoutException`。 当`TimeoutException`捕获...

    task:高效的线程池任务调度框架-开源

    不用担心task-running-overtime,因为task框架已经实现了对超时任务的监控,一旦任务超时,任务监控器会将其从线程池中移除,还能帮你自动结束超时任务。 3.任务调度框架性能优良,如果任务队列为空,框架会自动...

    TryRunWithTimeoutAsync测试

    这使得我们可以同时跟踪异步操作和超时任务,实现超时控制。 测试`TryRunWithTimeoutAsync`方法时,我们可能会创建一些模拟的异步操作,如模拟长时间运行的任务或者快速完成的任务,然后调用`...

    timeouter:创建和管理重复的超时任务

    创建和管理重复任务入门[sudo] npm install timeouter --save例子//Start a basic repeating logvar func = timeouter . add ( {func : function ( ) {console . log ( "Repeating event" )} ,timeout : 1000} )//...

    cpp-并行执行http请求支持超时设置

    通过`curl_multi_perform`可以并行执行这些请求,同时利用`curl_easy_setopt`设置超时参数,如`CURLOPT_TIMEOUT`来指定单个请求的超时时间。 压缩包中的`paw-master`可能是一个开源项目,它可能包含了实现上述功能...

    java通过线程控制程序执行超时(新)

    在Java编程中,控制程序执行超时是一项重要的任务,特别是在多线程环境下,我们可能需要确保某个任务不会无限制地运行下去,导致资源耗尽。本文将深入探讨如何使用Java的线程机制来实现程序执行的超时控制,同时也会...

    C#利用Task实现任务超时多任务一起执行的方法

    C#利用Task实现任务超时多任务一起执行的方法 本文主要介绍了C#利用Task实现任务超时,多任务一起执行的相关知识点。Task是C#中的一个异步编程模型,能够帮助开发者更方便地编写异步代码。下面是本文中所涉及的知识...

Global site tag (gtag.js) - Google Analytics