- 浏览: 195913 次
- 性别:
- 来自: 杭州
博客专栏
-
Percolator与分布...
浏览量:5686
文章分类
最新评论
-
heglase:
好牛逼 竟然解决了我别的问题
使用jdk工具tools.jar引发的问题 -
wqcva:
在使用这个类的时候workerId应该怎么传
java时间有序id生成 -
沙漠绿树:
增加虚拟节点解决数据均衡的问题。我有个疑问:1.使用虚拟节点后 ...
一致性hash的实现 -
BucketLi:
wangjian95 写道tddl.....?不是
java唯一ID生成 -
wangjian95:
tddl.....?
java唯一ID生成
之前有几个需要用到定时器超时的场景,比如线程池大小有限,如何让task不死等,但又不至于一旦队列满就直接reject或者让提交线程来做,但后来还是用让提交线程做事的方式来做,也就是并行暂时退化成了串行。
定时器有几个关键部分,1.定时扫描机制和设定,2.超时处理callback,3.超时前成功返回cancel.定时器的实现有很多种。下面的代码主要是团队使用的改造过的HashedWheelTimer,内部有很多细节,但不妨碍理解机制。
业务使用代码:
定时器实现代码
定时器有几个关键部分,1.定时扫描机制和设定,2.超时处理callback,3.超时前成功返回cancel.定时器的实现有很多种。下面的代码主要是团队使用的改造过的HashedWheelTimer,内部有很多细节,但不妨碍理解机制。
业务使用代码:
Timeout timeout = this.timer.newTimeout(new TimerTask() { public void run(Timeout timeout) throws Exception { // 如果连接没有推送元,则断开连接 if (!timeout.isCancelled() && conn.isConnected()) { if (conn.getAttribute(CONN_META_DATA_ATTR) == null) { log.error(LOGPREFX + "连接" + conn.getRemoteSocketAddress() + "没有推送元信息,主动关闭连接"); conn.close(false); } } } }, this.delay, TimeUnit.SECONDS); // 已经添加了,取消最新的 if (conn.setAttributeIfAbsent(CONN_METADATA_TIMEOUT_ATTR, timeout) != null) { timeout.cancel(); }
定时器实现代码
/** * A {@link Timer} optimized for approximated I/O timeout scheduling. * * <h3>Tick Duration</h3> * * As described with 'approximated', this timer does not execute the scheduled * {@link TimerTask} on time. {@link HashedWheelTimer}, on every tick, will * check if there are any {@link TimerTask}s behind the schedule and execute * them. * <p> * You can increase or decrease the accuracy of the execution timing by * specifying smaller or larger tick duration in the constructor. In most * network applications, I/O timeout does not need to be accurate. Therefore, * the default tick duration is 100 milliseconds and you will not need to try * different configurations in most cases. * * <h3>Ticks per Wheel (Wheel Size)</h3> * * {@link HashedWheelTimer} maintains a data structure called 'wheel'. To put * simply, a wheel is a hash table of {@link TimerTask}s whose hash function is * 'dead line of the task'. The default number of ticks per wheel (i.e. the size * of the wheel) is 512. You could specify a larger value if you are going to * schedule a lot of timeouts. * * <h3>Implementation Details</h3> * * {@link HashedWheelTimer} is based on <a * href="http://cseweb.ucsd.edu/users/varghese/">George Varghese</a> and Tony * Lauck's paper, <a * href="http://cseweb.ucsd.edu/users/varghese/PAPERS/twheel.ps.Z">'Hashed and * Hierarchical Timing Wheels: data structures to efficiently implement a timer * facility'</a>. More comprehensive slides are located <a * href="http://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt" * >here</a>. * * @author <a href="http://www.jboss.org/netty/">The Netty Project</a> * @author <a href="http://gleamynode.net/">Trustin Lee</a> */ public class HashedWheelTimer implements Timer { static final Log logger = LogFactory.getLog(HashedWheelTimer.class); private static final AtomicInteger id = new AtomicInteger(); // I'd say 64 active timer threads are obvious misuse. private static final int MISUSE_WARNING_THRESHOLD = 64; private static final AtomicInteger activeInstances = new AtomicInteger(); private static final AtomicBoolean loggedMisuseWarning = new AtomicBoolean(); private final Worker worker = new Worker(); final Thread workerThread; final AtomicBoolean shutdown = new AtomicBoolean(); private final long roundDuration; final long tickDuration; final Set<HashedWheelTimeout>[] wheel; final ReusableIterator<HashedWheelTimeout>[] iterators; final int mask; final ReadWriteLock lock = new ReentrantReadWriteLock(); private volatile int wheelCursor; private final AtomicInteger size = new AtomicInteger(0); final int maxTimerCapacity; /** * Creates a new timer with the default thread factory ( * {@link Executors#defaultThreadFactory()}), default tick duration, and * default number of ticks per wheel. */ public HashedWheelTimer() { this(Executors.defaultThreadFactory()); } /** * Creates a new timer with the default thread factory ( * {@link Executors#defaultThreadFactory()}) and default number of ticks per * wheel. * * @param tickDuration * the duration between tick * @param unit * the time unit of the {@code tickDuration} */ public HashedWheelTimer(long tickDuration, TimeUnit unit) { this(Executors.defaultThreadFactory(), tickDuration, unit); } /** * Creates a new timer with the default thread factory ( * {@link Executors#defaultThreadFactory()}). * * @param tickDuration * the duration between tick * @param unit * the time unit of the {@code tickDuration} * @param ticksPerWheel * the size of the wheel */ public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel, int maxTimerCapacity) { this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel, maxTimerCapacity); } /** * Creates a new timer with the default tick duration and default number of * ticks per wheel. * * @param threadFactory * a {@link ThreadFactory} that creates a background * {@link Thread} which is dedicated to {@link TimerTask} * execution. */ public HashedWheelTimer(ThreadFactory threadFactory) { this(threadFactory, 100, TimeUnit.MILLISECONDS); } /** * 返回当前timer个数 * * @return */ public int size() { return this.size.get(); } /** * Creates a new timer with the default number of ticks per wheel. * * @param threadFactory * a {@link ThreadFactory} that creates a background * {@link Thread} which is dedicated to {@link TimerTask} * execution. * @param tickDuration * the duration between tick * @param unit * the time unit of the {@code tickDuration} */ public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit) { this(threadFactory, tickDuration, unit, 512, 50000); } /** * Creates a new timer. * * @param threadFactory * a {@link ThreadFactory} that creates a background * {@link Thread} which is dedicated to {@link TimerTask} * execution. * @param tickDuration * the duration between tick * @param unit * the time unit of the {@code tickDuration} * @param ticksPerWheel * @param maxTimerCapacity * the size of the wheel */ public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, int maxTimerCapacity) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } if (unit == null) { throw new NullPointerException("unit"); } if (tickDuration <= 0) { throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration); } if (ticksPerWheel <= 0) { throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel); } if (maxTimerCapacity <= 0) { throw new IllegalArgumentException("maxTimerCapacity must be greater than 0: " + maxTimerCapacity); } // Normalize ticksPerWheel to power of two and initialize the wheel. this.wheel = createWheel(ticksPerWheel); this.iterators = createIterators(this.wheel); this.maxTimerCapacity = maxTimerCapacity; this.mask = this.wheel.length - 1; // Convert tickDuration to milliseconds. this.tickDuration = tickDuration = unit.toMillis(tickDuration); // Prevent overflow. if (tickDuration == Long.MAX_VALUE || tickDuration >= Long.MAX_VALUE / this.wheel.length) { throw new IllegalArgumentException("tickDuration is too long: " + tickDuration + ' ' + unit); } this.roundDuration = tickDuration * this.wheel.length; this.workerThread = threadFactory.newThread(new ThreadRenamingRunnable(this.worker, "Hashed wheel timer #" + id.incrementAndGet())); // Misuse check int activeInstances = HashedWheelTimer.activeInstances.incrementAndGet(); if (activeInstances >= MISUSE_WARNING_THRESHOLD && loggedMisuseWarning.compareAndSet(false, true)) { logger.debug("There are too many active " + HashedWheelTimer.class.getSimpleName() + " instances (" + activeInstances + ") - you should share the small number " + "of instances to avoid excessive resource consumption."); } } @SuppressWarnings("unchecked") private static Set<HashedWheelTimeout>[] createWheel(int ticksPerWheel) { if (ticksPerWheel <= 0) { throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel); } if (ticksPerWheel > 1073741824) { throw new IllegalArgumentException("ticksPerWheel may not be greater than 2^30: " + ticksPerWheel); } ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel); Set<HashedWheelTimeout>[] wheel = new Set[ticksPerWheel]; for (int i = 0; i < wheel.length; i++) { wheel[i] = new MapBackedSet<HashedWheelTimeout>(new ConcurrentIdentityHashMap<HashedWheelTimeout, Boolean>(16, 0.95f, 4)); } return wheel; } @SuppressWarnings("unchecked") private static ReusableIterator<HashedWheelTimeout>[] createIterators(Set<HashedWheelTimeout>[] wheel) { ReusableIterator<HashedWheelTimeout>[] iterators = new ReusableIterator[wheel.length]; for (int i = 0; i < wheel.length; i++) { iterators[i] = (ReusableIterator<HashedWheelTimeout>) wheel[i].iterator(); } return iterators; } private static int normalizeTicksPerWheel(int ticksPerWheel) { int normalizedTicksPerWheel = 1; while (normalizedTicksPerWheel < ticksPerWheel) { normalizedTicksPerWheel <<= 1; } return normalizedTicksPerWheel; } /** * Starts the background thread explicitly. The background thread will start * automatically on demand even if you did not call this method. * * @throws IllegalStateException * if this timer has been {@linkplain #stop() stopped} already */ public synchronized void start() { if (this.shutdown.get()) { throw new IllegalStateException("cannot be started once stopped"); } if (!this.workerThread.isAlive()) { this.workerThread.start(); } } public synchronized Set<Timeout> stop() { if (!this.shutdown.compareAndSet(false, true)) { return Collections.emptySet(); } boolean interrupted = false; while (this.workerThread.isAlive()) { this.workerThread.interrupt(); try { this.workerThread.join(100); } catch (InterruptedException e) { interrupted = true; } } if (interrupted) { Thread.currentThread().interrupt(); } activeInstances.decrementAndGet(); Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>(); for (Set<HashedWheelTimeout> bucket : this.wheel) { unprocessedTimeouts.addAll(bucket); bucket.clear(); } return Collections.unmodifiableSet(unprocessedTimeouts); } public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { final long currentTime = System.currentTimeMillis(); if (task == null) { throw new NullPointerException("task"); } if (unit == null) { throw new NullPointerException("unit"); } delay = unit.toMillis(delay); if (delay < this.tickDuration) { delay = this.tickDuration; } if (!this.workerThread.isAlive()) { this.start(); } if (this.size.get() >= this.maxTimerCapacity) { throw new RejectedExecutionException("Timer size " + this.size + " is great than maxTimerCapacity " + this.maxTimerCapacity); } // Prepare the required parameters to create the timeout object. HashedWheelTimeout timeout; final long lastRoundDelay = delay % this.roundDuration; final long lastTickDelay = delay % this.tickDuration; final long relativeIndex = lastRoundDelay / this.tickDuration + (lastTickDelay != 0 ? 1 : 0); final long deadline = currentTime + delay; final long remainingRounds = delay / this.roundDuration - (delay % this.roundDuration == 0 ? 1 : 0); // Add the timeout to the wheel. this.lock.readLock().lock(); try { timeout = new HashedWheelTimeout(task, deadline, (int) (this.wheelCursor + relativeIndex & this.mask), remainingRounds); this.wheel[timeout.stopIndex].add(timeout); } finally { this.lock.readLock().unlock(); } this.size.incrementAndGet(); return timeout; } private final class Worker implements Runnable { private long startTime; private long tick; Worker() { super(); } public void run() { List<HashedWheelTimeout> expiredTimeouts = new ArrayList<HashedWheelTimeout>(); this.startTime = System.currentTimeMillis(); this.tick = 1; while (!HashedWheelTimer.this.shutdown.get()) { this.waitForNextTick(); this.fetchExpiredTimeouts(expiredTimeouts); this.notifyExpiredTimeouts(expiredTimeouts); } } private void fetchExpiredTimeouts(List<HashedWheelTimeout> expiredTimeouts) { // Find the expired timeouts and decrease the round counter // if necessary. Note that we don't send the notification // immediately to make sure the listeners are called without // an exclusive lock. HashedWheelTimer.this.lock.writeLock().lock(); try { int oldBucketHead = HashedWheelTimer.this.wheelCursor; int newBucketHead = oldBucketHead + 1 & HashedWheelTimer.this.mask; HashedWheelTimer.this.wheelCursor = newBucketHead; ReusableIterator<HashedWheelTimeout> i = HashedWheelTimer.this.iterators[oldBucketHead]; this.fetchExpiredTimeouts(expiredTimeouts, i); } finally { HashedWheelTimer.this.lock.writeLock().unlock(); } } private void fetchExpiredTimeouts(List<HashedWheelTimeout> expiredTimeouts, ReusableIterator<HashedWheelTimeout> i) { long currentDeadline = System.currentTimeMillis() + HashedWheelTimer.this.tickDuration; i.rewind(); while (i.hasNext()) { HashedWheelTimeout timeout = i.next(); if (timeout.remainingRounds <= 0) { if (timeout.deadline < currentDeadline) { i.remove(); expiredTimeouts.add(timeout); } else { // A rare case where a timeout is put for the next // round: just wait for the next round. } } else { timeout.remainingRounds--; } } } private void notifyExpiredTimeouts(List<HashedWheelTimeout> expiredTimeouts) { // Notify the expired timeouts. for (int i = expiredTimeouts.size() - 1; i >= 0; i--) { expiredTimeouts.get(i).expire(); HashedWheelTimer.this.size.decrementAndGet(); } // Clean up the temporary list. expiredTimeouts.clear(); } private void waitForNextTick() { for (;;) { final long currentTime = System.currentTimeMillis(); final long sleepTime = HashedWheelTimer.this.tickDuration * this.tick - (currentTime - this.startTime); if (sleepTime <= 0) { break; } try { Thread.sleep(sleepTime); } catch (InterruptedException e) { if (HashedWheelTimer.this.shutdown.get()) { return; } } } // Reset the tick if overflow is expected. if (HashedWheelTimer.this.tickDuration * this.tick > Long.MAX_VALUE - HashedWheelTimer.this.tickDuration) { this.startTime = System.currentTimeMillis(); this.tick = 1; } else { // Increase the tick if overflow is not likely to happen. this.tick++; } } } private final class HashedWheelTimeout implements Timeout { private final TimerTask task; final int stopIndex; final long deadline; volatile long remainingRounds; private volatile boolean cancelled; HashedWheelTimeout(TimerTask task, long deadline, int stopIndex, long remainingRounds) { this.task = task; this.deadline = deadline; this.stopIndex = stopIndex; this.remainingRounds = remainingRounds; } public Timer getTimer() { return HashedWheelTimer.this; } public TimerTask getTask() { return this.task; } public void cancel() { if (this.isExpired()) { return; } this.cancelled = true; // Might be called more than once, but doesn't matter. if (HashedWheelTimer.this.wheel[this.stopIndex].remove(this)) { HashedWheelTimer.this.size.decrementAndGet(); } } public boolean isCancelled() { return this.cancelled; } public boolean isExpired() { return this.cancelled || System.currentTimeMillis() > this.deadline; } public void expire() { if (this.cancelled) { return; } try { this.task.run(this); } catch (Throwable t) { logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + ".", t); } } @Override public String toString() { long currentTime = System.currentTimeMillis(); long remaining = this.deadline - currentTime; StringBuilder buf = new StringBuilder(192); buf.append(this.getClass().getSimpleName()); buf.append('('); buf.append("deadline: "); if (remaining > 0) { buf.append(remaining); buf.append(" ms later, "); } else if (remaining < 0) { buf.append(-remaining); buf.append(" ms ago, "); } else { buf.append("now, "); } if (this.isCancelled()) { buf.append(", cancelled"); } return buf.append(')').toString(); } } }
评论
2 楼
hanmiao
2013-09-25
楼主的这個工具类对应的 JAR 包去哪里可以取到,按照代码中给出的注释只能找到 Netty 开源项目的代码,是从这個里面来的嘛?
1 楼
rxin2009
2012-11-27
lz能不能帮忙解释下时间轮算法的原理呢?
发表评论
-
Spring Validator 部分注解说明
2021-01-30 17:13 352@AssertFalse Boole ... -
Mac 安装 OpenJDK
2019-07-17 08:05 827现在 ORACLE 新版本 JDK 越发越快,新版本固然好,但 ... -
git fork 分支合并原分支
2019-06-27 10:35 11251. List the current configured ... -
Cobar内存快速检测tips
2017-11-07 17:20 426很长时间没有使用mat,技巧生疏,趁这次使用Cobar(htt ... -
ORACLE CDC增量同步初始化
2016-09-07 22:29 760// Step 1 Find the source tab ... -
一些文章
2015-09-04 14:38 0http://www.biaodianfu.com/herme ... -
java资源加载
2015-04-22 10:04 596tips下。 this.getClass().getReso ... -
使用jdk工具tools.jar引发的问题
2015-04-22 09:31 1727这里tips下这个问题 之前本地开发机使用jdk7进行开发和 ... -
eclipse for mac快捷键
2015-02-26 13:16 704Command + O:显示大纲 Command + D:删除 ... -
zookeeper client的一些操作
2014-11-07 12:30 7351.登陆 ./zkcli.sh -server 127.0.0 ... -
java获取类版本和检查重复代码
2014-10-13 21:59 1391public final class Version { ... -
java代码细节
2014-10-17 09:29 847看代码过程中一些细节记录,不断补充。质量可靠,开发高效的捷径在 ... -
java程序启动的一些设置
2014-09-19 11:14 01. 开启debug,suspend值设置成y会等待debug ... -
java_web开发tips
2014-07-21 09:44 01.这两天接手一个新的应用,打算在上面开发几个api,因为功能 ... -
信息安全基础
2014-07-21 09:46 743转自某微博,这边tips下,虽然很不完全,但是有一些思路 信 ... -
java 的一些排序方法(转)
2014-07-21 09:48 718一些java排序方法,记录下。 package com.ta ... -
Shift-And和Shift-Or ByteBuffer匹配器
2012-09-07 18:15 1520两个ByteBuffer的匹配算法java实现,原作者 庄大侠 ... -
一个简单的BufferPool
2012-08-31 10:15 958一个简单的buffer分配和收集代码,将一大段buffer分片 ... -
一个典型md5生成工具类
2012-08-23 09:27 1155import java.io.UnsupportedEnc ... -
Java程序员常用工具集(转)
2012-08-31 10:18 1030转自庄大侠(killme2008)的博客,我这边收藏下。 原 ...
相关推荐
这可能是一个结构体或类,每个实例代表一个虚拟定时器。 2. **时间管理**:为了比较和管理多个定时器的触发时间,需要一个数据结构(如优先队列)来按时间顺序排列所有定时器。这样可以在每次检查时快速找到即将...
Java实现时间轮定时器时,可以使用数组或链表来模拟环形结构,每个槽位代表一个时间间隔,任务则存储在相应的槽位中。时间轮的优点在于空间效率高,对于大量短期和重复的定时任务,它的性能优于最小堆。 实现基于...
在实际开发中,Java定时器常与其他框架结合使用,如Spring的`@Scheduled`注解或Quartz等高级定时任务库,以实现更复杂的定时任务需求。 通过以上的讲解,你应该对Java定时器的使用有了深入的理解。在实际项目中,...
总结来说,Java定时器是Java编程中的一个重要概念,它结合`Timer`和`TimerTask`类可以实现各种定时任务,而在`Swing`环境中,`Swing Timer`提供了更加便捷且线程安全的解决方案。对于Java初学者来说,理解和掌握...
Quartz框架是一个开源的任务调度器框架,提供了强大的任务调度功能。Quartz框架可以与Spring框架集成,实现自动执行任务的功能。在使用Quartz框架时,需要导入Quartz框架的JAR包,例如quartz-1.6.2.jar。 3. 定时器...
Java定时器(Timer)是Java Swing库中的一个类,它提供了调度任务在未来某个时间执行的能力。这个类主要用于在GUI应用程序中实现定时触发某些操作,比如更新界面、执行动画或者执行周期性的后台任务。Timer类结合了...
### Java定时器实现详解...综上所述,这个Java定时器实现通过组合`Timer`、`Clock`和`Task`等类,构建了一个灵活、可扩展的定时任务执行框架。开发者可以根据具体需求调整任务逻辑和执行频率,实现各种复杂的定时功能。
### Java定时器的实现 #### 一、概述 在Java编程中,定时执行特定任务是一种常见的需求。例如,定期清理缓存、定时发送邮件或执行数据同步等操作。Java提供了多种方式来实现定时任务,其中之一就是利用内置的`java...
`TimerTask`是一个抽象类,你需要创建它的子类并实现`run()`方法来定义要执行的任务。 然而,Java定时器存在一些限制,如任务调度不够灵活,无法处理任务间的依赖关系,以及当定时器被取消或销毁时,已经启动的任务...
`TimerTask`是`Timer`调度的任务,是一个抽象类,你需要继承并实现`run()`方法来定义要定时执行的逻辑。以下是如何使用`Timer`和`TimerTask`创建定时任务: ```java Timer timer = new Timer(); TimerTask task = ...
在应用开发中,经常需要一些周期性的操作,比如每5分钟执行某一操作等。 对于这样的操作最方便、高效的实现方式就是使用java.util.Timer工具类。
总结来说,Java定时器服务主要通过`Timer`和`TimerTask`实现,用于计划未来的任务执行。当需求更复杂时,可以转向`ScheduledExecutorService`。在实际编程中,理解这些类的工作原理和使用方式,能帮助我们更好地管理...
Timer 类用于创建一个新的定时器,而 TimerTask 类则用于定义要执行的任务。 使用 Timer 和 TimerTask 实现定时器时,需要继承 TimerTask 类,并重写 run 方法以定义要执行的任务。然后,使用 Timer 的 schedule ...
Java定时器框架Quartz是Java开发中用于任务调度的一个强大工具,它允许开发者精确地控制任务的执行时间,包括一次性任务和周期性任务。Quartz以其灵活性和稳定性,在企业级应用中广泛使用,尤其在需要定期执行后台...
Java Web定时器主要指的是在Java Web应用程序中实现定时任务的功能,这在许多业务场景中都是必要的,例如数据备份、报表生成、邮件发送等。在Java中,我们可以利用内置的`java.util.Timer`类或者Spring框架的`@...
这个“Java创建定时器.rar”压缩包中的代码示例,显然是一个展示了如何在Java中利用定时器实现特定功能的实例。这个例子中,程序会绘制数字的Canvas对象,并通过定时器更新显示的数字,同时使用了生成随机数的For...
下面将详细讲解如何利用Java定时器实现这一功能。 首先,我们需要了解Java中的定时器类`java.util.Timer`和`java.util.TimerTask`。`Timer`类用于创建和调度任务,而`TimerTask`是继承自`Runnable`接口的任务类,...
`java.util.Timer` 类是Java标准库中的一个基础定时器,它可以调度定时任务的执行。创建一个`Timer`对象后,你可以通过调用`schedule(TimerTask task, long delay)`或`schedule(TimerTask task, Date firstTime, ...
Quartz是一个功能强大的作业调度库,它提供了比Java自带的定时器更丰富的功能。通过Quartz,你可以安排作业在特定时间执行,也可以按照特定的频度来周期性执行。Quartz中有一个核心的组件Job,它是实际执行任务的类...