`
BucketLi
  • 浏览: 195913 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
博客专栏
5a76a659-f8e6-3bf3-b39a-8ae8f7a0f9d9
Percolator与分布...
浏览量:5686
社区版块
存档分类
最新评论

一个java定时器实现

 
阅读更多
之前有几个需要用到定时器超时的场景,比如线程池大小有限,如何让task不死等,但又不至于一旦队列满就直接reject或者让提交线程来做,但后来还是用让提交线程做事的方式来做,也就是并行暂时退化成了串行。

定时器有几个关键部分,1.定时扫描机制和设定,2.超时处理callback,3.超时前成功返回cancel.定时器的实现有很多种。下面的代码主要是团队使用的改造过的HashedWheelTimer,内部有很多细节,但不妨碍理解机制。

业务使用代码:
      Timeout timeout = this.timer.newTimeout(new TimerTask() {

                public void run(Timeout timeout) throws Exception {
                    // 如果连接没有推送元,则断开连接
                    if (!timeout.isCancelled() && conn.isConnected()) {
                        if (conn.getAttribute(CONN_META_DATA_ATTR) == null) {
                            log.error(LOGPREFX + "连接" + conn.getRemoteSocketAddress() + "没有推送元信息,主动关闭连接");
                            conn.close(false);
                        }
                    }

                }
            }, this.delay, TimeUnit.SECONDS);
            // 已经添加了,取消最新的
            if (conn.setAttributeIfAbsent(CONN_METADATA_TIMEOUT_ATTR, timeout) != null) {
                timeout.cancel();
            }



定时器实现代码
/**
 * A {@link Timer} optimized for approximated I/O timeout scheduling.
 * 
 * <h3>Tick Duration</h3>
 * 
 * As described with 'approximated', this timer does not execute the scheduled
 * {@link TimerTask} on time. {@link HashedWheelTimer}, on every tick, will
 * check if there are any {@link TimerTask}s behind the schedule and execute
 * them.
 * <p>
 * You can increase or decrease the accuracy of the execution timing by
 * specifying smaller or larger tick duration in the constructor. In most
 * network applications, I/O timeout does not need to be accurate. Therefore,
 * the default tick duration is 100 milliseconds and you will not need to try
 * different configurations in most cases.
 * 
 * <h3>Ticks per Wheel (Wheel Size)</h3>
 * 
 * {@link HashedWheelTimer} maintains a data structure called 'wheel'. To put
 * simply, a wheel is a hash table of {@link TimerTask}s whose hash function is
 * 'dead line of the task'. The default number of ticks per wheel (i.e. the size
 * of the wheel) is 512. You could specify a larger value if you are going to
 * schedule a lot of timeouts.
 * 
 * <h3>Implementation Details</h3>
 * 
 * {@link HashedWheelTimer} is based on <a
 * href="http://cseweb.ucsd.edu/users/varghese/">George Varghese</a> and Tony
 * Lauck's paper, <a
 * href="http://cseweb.ucsd.edu/users/varghese/PAPERS/twheel.ps.Z">'Hashed and
 * Hierarchical Timing Wheels: data structures to efficiently implement a timer
 * facility'</a>. More comprehensive slides are located <a
 * href="http://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt"
 * >here</a>.
 * 
 * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
 * @author <a href="http://gleamynode.net/">Trustin Lee</a>
 */
public class HashedWheelTimer implements Timer {

    static final Log logger = LogFactory.getLog(HashedWheelTimer.class);
    private static final AtomicInteger id = new AtomicInteger();

    // I'd say 64 active timer threads are obvious misuse.
    private static final int MISUSE_WARNING_THRESHOLD = 64;
    private static final AtomicInteger activeInstances = new AtomicInteger();
    private static final AtomicBoolean loggedMisuseWarning = new AtomicBoolean();

    private final Worker worker = new Worker();
    final Thread workerThread;
    final AtomicBoolean shutdown = new AtomicBoolean();

    private final long roundDuration;
    final long tickDuration;
    final Set<HashedWheelTimeout>[] wheel;
    final ReusableIterator<HashedWheelTimeout>[] iterators;
    final int mask;
    final ReadWriteLock lock = new ReentrantReadWriteLock();
    private volatile int wheelCursor;

    private final AtomicInteger size = new AtomicInteger(0);

    final int maxTimerCapacity;


    /**
     * Creates a new timer with the default thread factory (
     * {@link Executors#defaultThreadFactory()}), default tick duration, and
     * default number of ticks per wheel.
     */
    public HashedWheelTimer() {
        this(Executors.defaultThreadFactory());
    }


    /**
     * Creates a new timer with the default thread factory (
     * {@link Executors#defaultThreadFactory()}) and default number of ticks per
     * wheel.
     * 
     * @param tickDuration
     *            the duration between tick
     * @param unit
     *            the time unit of the {@code tickDuration}
     */
    public HashedWheelTimer(long tickDuration, TimeUnit unit) {
        this(Executors.defaultThreadFactory(), tickDuration, unit);
    }


    /**
     * Creates a new timer with the default thread factory (
     * {@link Executors#defaultThreadFactory()}).
     * 
     * @param tickDuration
     *            the duration between tick
     * @param unit
     *            the time unit of the {@code tickDuration}
     * @param ticksPerWheel
     *            the size of the wheel
     */
    public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel, int maxTimerCapacity) {
        this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel, maxTimerCapacity);
    }


    /**
     * Creates a new timer with the default tick duration and default number of
     * ticks per wheel.
     * 
     * @param threadFactory
     *            a {@link ThreadFactory} that creates a background
     *            {@link Thread} which is dedicated to {@link TimerTask}
     *            execution.
     */
    public HashedWheelTimer(ThreadFactory threadFactory) {
        this(threadFactory, 100, TimeUnit.MILLISECONDS);
    }


    /**
     * 返回当前timer个数
     * 
     * @return
     */
    public int size() {
        return this.size.get();
    }


    /**
     * Creates a new timer with the default number of ticks per wheel.
     * 
     * @param threadFactory
     *            a {@link ThreadFactory} that creates a background
     *            {@link Thread} which is dedicated to {@link TimerTask}
     *            execution.
     * @param tickDuration
     *            the duration between tick
     * @param unit
     *            the time unit of the {@code tickDuration}
     */
    public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
        this(threadFactory, tickDuration, unit, 512, 50000);
    }


    /**
     * Creates a new timer.
     * 
     * @param threadFactory
     *            a {@link ThreadFactory} that creates a background
     *            {@link Thread} which is dedicated to {@link TimerTask}
     *            execution.
     * @param tickDuration
     *            the duration between tick
     * @param unit
     *            the time unit of the {@code tickDuration}
     * @param ticksPerWheel
     * @param maxTimerCapacity
     *            the size of the wheel
     */
    public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel,
            int maxTimerCapacity) {

        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }
        if (tickDuration <= 0) {
            throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
        }
        if (ticksPerWheel <= 0) {
            throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
        }
        if (maxTimerCapacity <= 0) {
            throw new IllegalArgumentException("maxTimerCapacity must be greater than 0: " + maxTimerCapacity);
        }

        // Normalize ticksPerWheel to power of two and initialize the wheel.
        this.wheel = createWheel(ticksPerWheel);
        this.iterators = createIterators(this.wheel);
        this.maxTimerCapacity = maxTimerCapacity;
        this.mask = this.wheel.length - 1;

        // Convert tickDuration to milliseconds.
        this.tickDuration = tickDuration = unit.toMillis(tickDuration);

        // Prevent overflow.
        if (tickDuration == Long.MAX_VALUE || tickDuration >= Long.MAX_VALUE / this.wheel.length) {
            throw new IllegalArgumentException("tickDuration is too long: " + tickDuration + ' ' + unit);
        }

        this.roundDuration = tickDuration * this.wheel.length;

        this.workerThread =
                threadFactory.newThread(new ThreadRenamingRunnable(this.worker, "Hashed wheel timer #"
                        + id.incrementAndGet()));

        // Misuse check
        int activeInstances = HashedWheelTimer.activeInstances.incrementAndGet();
        if (activeInstances >= MISUSE_WARNING_THRESHOLD && loggedMisuseWarning.compareAndSet(false, true)) {
            logger.debug("There are too many active " + HashedWheelTimer.class.getSimpleName() + " instances ("
                    + activeInstances + ") - you should share the small number "
                    + "of instances to avoid excessive resource consumption.");
        }
    }


    @SuppressWarnings("unchecked")
    private static Set<HashedWheelTimeout>[] createWheel(int ticksPerWheel) {
        if (ticksPerWheel <= 0) {
            throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
        }
        if (ticksPerWheel > 1073741824) {
            throw new IllegalArgumentException("ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
        }

        ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
        Set<HashedWheelTimeout>[] wheel = new Set[ticksPerWheel];
        for (int i = 0; i < wheel.length; i++) {
            wheel[i] =
                    new MapBackedSet<HashedWheelTimeout>(new ConcurrentIdentityHashMap<HashedWheelTimeout, Boolean>(16,
                        0.95f, 4));
        }
        return wheel;
    }


    @SuppressWarnings("unchecked")
    private static ReusableIterator<HashedWheelTimeout>[] createIterators(Set<HashedWheelTimeout>[] wheel) {
        ReusableIterator<HashedWheelTimeout>[] iterators = new ReusableIterator[wheel.length];
        for (int i = 0; i < wheel.length; i++) {
            iterators[i] = (ReusableIterator<HashedWheelTimeout>) wheel[i].iterator();
        }
        return iterators;
    }


    private static int normalizeTicksPerWheel(int ticksPerWheel) {
        int normalizedTicksPerWheel = 1;
        while (normalizedTicksPerWheel < ticksPerWheel) {
            normalizedTicksPerWheel <<= 1;
        }
        return normalizedTicksPerWheel;
    }


    /**
     * Starts the background thread explicitly. The background thread will start
     * automatically on demand even if you did not call this method.
     * 
     * @throws IllegalStateException
     *             if this timer has been {@linkplain #stop() stopped} already
     */
    public synchronized void start() {
        if (this.shutdown.get()) {
            throw new IllegalStateException("cannot be started once stopped");
        }

        if (!this.workerThread.isAlive()) {
            this.workerThread.start();
        }
    }


    public synchronized Set<Timeout> stop() {
        if (!this.shutdown.compareAndSet(false, true)) {
            return Collections.emptySet();
        }

        boolean interrupted = false;
        while (this.workerThread.isAlive()) {
            this.workerThread.interrupt();
            try {
                this.workerThread.join(100);
            }
            catch (InterruptedException e) {
                interrupted = true;
            }
        }

        if (interrupted) {
            Thread.currentThread().interrupt();
        }

        activeInstances.decrementAndGet();

        Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
        for (Set<HashedWheelTimeout> bucket : this.wheel) {
            unprocessedTimeouts.addAll(bucket);
            bucket.clear();
        }

        return Collections.unmodifiableSet(unprocessedTimeouts);
    }


    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        final long currentTime = System.currentTimeMillis();

        if (task == null) {
            throw new NullPointerException("task");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }

        delay = unit.toMillis(delay);
        if (delay < this.tickDuration) {
            delay = this.tickDuration;
        }

        if (!this.workerThread.isAlive()) {
            this.start();
        }

        if (this.size.get() >= this.maxTimerCapacity) {
            throw new RejectedExecutionException("Timer size " + this.size + " is great than maxTimerCapacity "
                    + this.maxTimerCapacity);
        }

        // Prepare the required parameters to create the timeout object.
        HashedWheelTimeout timeout;
        final long lastRoundDelay = delay % this.roundDuration;
        final long lastTickDelay = delay % this.tickDuration;

        final long relativeIndex = lastRoundDelay / this.tickDuration + (lastTickDelay != 0 ? 1 : 0);

        final long deadline = currentTime + delay;

        final long remainingRounds = delay / this.roundDuration - (delay % this.roundDuration == 0 ? 1 : 0);

        // Add the timeout to the wheel.
        this.lock.readLock().lock();
        try {
            timeout =
                    new HashedWheelTimeout(task, deadline, (int) (this.wheelCursor + relativeIndex & this.mask),
                        remainingRounds);

            this.wheel[timeout.stopIndex].add(timeout);
        }
        finally {
            this.lock.readLock().unlock();
        }
        this.size.incrementAndGet();

        return timeout;
    }

    private final class Worker implements Runnable {

        private long startTime;
        private long tick;


        Worker() {
            super();
        }


        public void run() {
            List<HashedWheelTimeout> expiredTimeouts = new ArrayList<HashedWheelTimeout>();

            this.startTime = System.currentTimeMillis();
            this.tick = 1;

            while (!HashedWheelTimer.this.shutdown.get()) {
                this.waitForNextTick();
                this.fetchExpiredTimeouts(expiredTimeouts);
                this.notifyExpiredTimeouts(expiredTimeouts);
            }
        }


        private void fetchExpiredTimeouts(List<HashedWheelTimeout> expiredTimeouts) {

            // Find the expired timeouts and decrease the round counter
            // if necessary. Note that we don't send the notification
            // immediately to make sure the listeners are called without
            // an exclusive lock.
            HashedWheelTimer.this.lock.writeLock().lock();
            try {
                int oldBucketHead = HashedWheelTimer.this.wheelCursor;

                int newBucketHead = oldBucketHead + 1 & HashedWheelTimer.this.mask;
                HashedWheelTimer.this.wheelCursor = newBucketHead;

                ReusableIterator<HashedWheelTimeout> i = HashedWheelTimer.this.iterators[oldBucketHead];
                this.fetchExpiredTimeouts(expiredTimeouts, i);
            }
            finally {
                HashedWheelTimer.this.lock.writeLock().unlock();
            }
        }


        private void fetchExpiredTimeouts(List<HashedWheelTimeout> expiredTimeouts,
                ReusableIterator<HashedWheelTimeout> i) {

            long currentDeadline = System.currentTimeMillis() + HashedWheelTimer.this.tickDuration;
            i.rewind();
            while (i.hasNext()) {
                HashedWheelTimeout timeout = i.next();
                if (timeout.remainingRounds <= 0) {
                    if (timeout.deadline < currentDeadline) {
                        i.remove();
                        expiredTimeouts.add(timeout);
                    }
                    else {
                        // A rare case where a timeout is put for the next
                        // round: just wait for the next round.
                    }
                }
                else {
                    timeout.remainingRounds--;
                }
            }
        }


        private void notifyExpiredTimeouts(List<HashedWheelTimeout> expiredTimeouts) {
            // Notify the expired timeouts.
            for (int i = expiredTimeouts.size() - 1; i >= 0; i--) {
                expiredTimeouts.get(i).expire();
                HashedWheelTimer.this.size.decrementAndGet();
            }

            // Clean up the temporary list.
            expiredTimeouts.clear();

        }


        private void waitForNextTick() {
            for (;;) {
                final long currentTime = System.currentTimeMillis();
                final long sleepTime = HashedWheelTimer.this.tickDuration * this.tick - (currentTime - this.startTime);

                if (sleepTime <= 0) {
                    break;
                }

                try {
                    Thread.sleep(sleepTime);
                }
                catch (InterruptedException e) {
                    if (HashedWheelTimer.this.shutdown.get()) {
                        return;
                    }
                }
            }

            // Reset the tick if overflow is expected.
            if (HashedWheelTimer.this.tickDuration * this.tick > Long.MAX_VALUE - HashedWheelTimer.this.tickDuration) {
                this.startTime = System.currentTimeMillis();
                this.tick = 1;
            }
            else {
                // Increase the tick if overflow is not likely to happen.
                this.tick++;
            }
        }
    }

    private final class HashedWheelTimeout implements Timeout {

        private final TimerTask task;
        final int stopIndex;
        final long deadline;
        volatile long remainingRounds;
        private volatile boolean cancelled;


        HashedWheelTimeout(TimerTask task, long deadline, int stopIndex, long remainingRounds) {
            this.task = task;
            this.deadline = deadline;
            this.stopIndex = stopIndex;
            this.remainingRounds = remainingRounds;
        }


        public Timer getTimer() {
            return HashedWheelTimer.this;
        }


        public TimerTask getTask() {
            return this.task;
        }


        public void cancel() {
            if (this.isExpired()) {
                return;
            }

            this.cancelled = true;
            // Might be called more than once, but doesn't matter.
            if (HashedWheelTimer.this.wheel[this.stopIndex].remove(this)) {
                HashedWheelTimer.this.size.decrementAndGet();
            }
        }


        public boolean isCancelled() {
            return this.cancelled;
        }


        public boolean isExpired() {
            return this.cancelled || System.currentTimeMillis() > this.deadline;
        }


        public void expire() {
            if (this.cancelled) {
                return;
            }

            try {
                this.task.run(this);
            }
            catch (Throwable t) {
                logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + ".", t);
            }
        }


        @Override
        public String toString() {
            long currentTime = System.currentTimeMillis();
            long remaining = this.deadline - currentTime;

            StringBuilder buf = new StringBuilder(192);
            buf.append(this.getClass().getSimpleName());
            buf.append('(');

            buf.append("deadline: ");
            if (remaining > 0) {
                buf.append(remaining);
                buf.append(" ms later, ");
            }
            else if (remaining < 0) {
                buf.append(-remaining);
                buf.append(" ms ago, ");
            }
            else {
                buf.append("now, ");
            }

            if (this.isCancelled()) {
                buf.append(", cancelled");
            }

            return buf.append(')').toString();
        }
    }
}
分享到:
评论
2 楼 hanmiao 2013-09-25  
楼主的这個工具类对应的 JAR 包去哪里可以取到,按照代码中给出的注释只能找到 Netty 开源项目的代码,是从这個里面来的嘛?
1 楼 rxin2009 2012-11-27  
lz能不能帮忙解释下时间轮算法的原理呢?

相关推荐

    一个定时器实现多个虚拟定时器具体代码实现

    这可能是一个结构体或类,每个实例代表一个虚拟定时器。 2. **时间管理**:为了比较和管理多个定时器的触发时间,需要一个数据结构(如优先队列)来按时间顺序排列所有定时器。这样可以在每次检查时快速找到即将...

    时间轮定时器java实现

    Java实现时间轮定时器时,可以使用数组或链表来模拟环形结构,每个槽位代表一个时间间隔,任务则存储在相应的槽位中。时间轮的优点在于空间效率高,对于大量短期和重复的定时任务,它的性能优于最小堆。 实现基于...

    Java定时器的使用 Java程序

    在实际开发中,Java定时器常与其他框架结合使用,如Spring的`@Scheduled`注解或Quartz等高级定时任务库,以实现更复杂的定时任务需求。 通过以上的讲解,你应该对Java定时器的使用有了深入的理解。在实际项目中,...

    Java定时器

    总结来说,Java定时器是Java编程中的一个重要概念,它结合`Timer`和`TimerTask`类可以实现各种定时任务,而在`Swing`环境中,`Swing Timer`提供了更加便捷且线程安全的解决方案。对于Java初学者来说,理解和掌握...

    JAVA定时器JAVA定时器.pdf

    Quartz框架是一个开源的任务调度器框架,提供了强大的任务调度功能。Quartz框架可以与Spring框架集成,实现自动执行任务的功能。在使用Quartz框架时,需要导入Quartz框架的JAR包,例如quartz-1.6.2.jar。 3. 定时器...

    java 定时器(Timer)

    Java定时器(Timer)是Java Swing库中的一个类,它提供了调度任务在未来某个时间执行的能力。这个类主要用于在GUI应用程序中实现定时触发某些操作,比如更新界面、执行动画或者执行周期性的后台任务。Timer类结合了...

    Java定时器简例

    ### Java定时器实现详解...综上所述,这个Java定时器实现通过组合`Timer`、`Clock`和`Task`等类,构建了一个灵活、可扩展的定时任务执行框架。开发者可以根据具体需求调整任务逻辑和执行频率,实现各种复杂的定时功能。

    java定时器的实现

    ### Java定时器的实现 #### 一、概述 在Java编程中,定时执行特定任务是一种常见的需求。例如,定期清理缓存、定时发送邮件或执行数据同步等操作。Java提供了多种方式来实现定时任务,其中之一就是利用内置的`java...

    java 定时器 spring 定时器

    `TimerTask`是一个抽象类,你需要创建它的子类并实现`run()`方法来定义要执行的任务。 然而,Java定时器存在一些限制,如任务调度不够灵活,无法处理任务间的依赖关系,以及当定时器被取消或销毁时,已经启动的任务...

    java定时器设置及停止的方法

    `TimerTask`是`Timer`调度的任务,是一个抽象类,你需要继承并实现`run()`方法来定义要定时执行的逻辑。以下是如何使用`Timer`和`TimerTask`创建定时任务: ```java Timer timer = new Timer(); TimerTask task = ...

    java定时器的使用

    在应用开发中,经常需要一些周期性的操作,比如每5分钟执行某一操作等。 对于这样的操作最方便、高效的实现方式就是使用java.util.Timer工具类。

    java定时器服务!!

    总结来说,Java定时器服务主要通过`Timer`和`TimerTask`实现,用于计划未来的任务执行。当需求更复杂时,可以转向`ScheduledExecutorService`。在实际编程中,理解这些类的工作原理和使用方式,能帮助我们更好地管理...

    JAVA中定时器的使用

    Timer 类用于创建一个新的定时器,而 TimerTask 类则用于定义要执行的任务。 使用 Timer 和 TimerTask 实现定时器时,需要继承 TimerTask 类,并重写 run 方法以定义要执行的任务。然后,使用 Timer 的 schedule ...

    Java定时器框架(Quartz)

    Java定时器框架Quartz是Java开发中用于任务调度的一个强大工具,它允许开发者精确地控制任务的执行时间,包括一次性任务和周期性任务。Quartz以其灵活性和稳定性,在企业级应用中广泛使用,尤其在需要定期执行后台...

    java web定时器例子

    Java Web定时器主要指的是在Java Web应用程序中实现定时任务的功能,这在许多业务场景中都是必要的,例如数据备份、报表生成、邮件发送等。在Java中,我们可以利用内置的`java.util.Timer`类或者Spring框架的`@...

    Java创建定时器.rar

    这个“Java创建定时器.rar”压缩包中的代码示例,显然是一个展示了如何在Java中利用定时器实现特定功能的实例。这个例子中,程序会绘制数字的Canvas对象,并通过定时器更新显示的数字,同时使用了生成随机数的For...

    java定时器实现实时访问数据库以免数据库链接超时

    下面将详细讲解如何利用Java定时器实现这一功能。 首先,我们需要了解Java中的定时器类`java.util.Timer`和`java.util.TimerTask`。`Timer`类用于创建和调度任务,而`TimerTask`是继承自`Runnable`接口的任务类,...

    Java后台定时器代码

    `java.util.Timer` 类是Java标准库中的一个基础定时器,它可以调度定时任务的执行。创建一个`Timer`对象后,你可以通过调用`schedule(TimerTask task, long delay)`或`schedule(TimerTask task, Date firstTime, ...

    使用java定时器的几种方式

    Quartz是一个功能强大的作业调度库,它提供了比Java自带的定时器更丰富的功能。通过Quartz,你可以安排作业在特定时间执行,也可以按照特定的频度来周期性执行。Quartz中有一个核心的组件Job,它是实际执行任务的类...

Global site tag (gtag.js) - Google Analytics