`
iaiai
  • 浏览: 2179882 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Netty笔记-GlobalEventExecutor

    博客分类:
  • J2EE
 
阅读更多
1.概念
/**
 * Single-thread singleton {@link EventExecutor}.  It starts the thread automatically and stops it when there is no
 * task pending in the task queue for 1 second.  Please note it is not scalable to schedule large number of tasks to
 * this executor; use a dedicated executor.
 */
public final class GlobalEventExecutor extends AbstractScheduledEventExecutor {
}

GlobalEventExecutor是具备任务队列的单线程事件执行器,其适合用来实行时间短,碎片化的任务
public final class GlobalEventExecutor extends AbstractScheduledEventExecutor {

   //单例对象
    public static final GlobalEventExecutor INSTANCE = new GlobalEventExecutor();
   //维护的任务队列
    final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>();
ScheduledFutureTask.deadlineNanos(SCHEDULE_QUIET_PERIOD_INTERVAL), -SCHEDULE_QUIET_PERIOD_INTERVAL);
    //线程工厂
    final ThreadFactory threadFactory =
            new DefaultThreadFactory(DefaultThreadFactory.toPoolName(getClass()), false, Thread.NORM_PRIORITY, null);
   //一个循环运行的执行器
    private final TaskRunner taskRunner = new TaskRunner();
   //标记当前线程是否启动
    private final AtomicBoolean started = new AtomicBoolean();
   //运行中的线程
    volatile Thread thread;
}

2.inEventLoop方法

判断当前执行代码是否在同一个线程,有两种情况
  • 第一次没有异步线程,则创建新线程
  • 若在非当前运行线程中添加任务,判断异步线程是否在执行,如果没有则创建新的线程(原线程已经执行完毕)

3.测试代码

3.1
        {
            for (int i=0;i<5;i++)
            {
                Future<Integer> future=GlobalEventExecutor.INSTANCE.submit(new Callable<Integer>() {

                    @Override
                    public Integer call() throws Exception {
                        System.out.println("eventExecutor threadId:"+Thread.currentThread().getId()+
" inEventLoop:"+GlobalEventExecutor.INSTANCE.inEventLoop());
                        return 1;
                    }
                });
                System.out.println("threadId:"+Thread.currentThread().getId()+
" inEventLoop:"+GlobalEventExecutor.INSTANCE.inEventLoop());

            }
        }

输出结果:
执行的线程是同一个线程,在执行线程中, inEventLoop为true
threadId:1 inEventLoop:false
eventExecutor threadId:12 inEventLoop:true
threadId:1 inEventLoop:false
threadId:1 inEventLoop:false
threadId:1 inEventLoop:false
threadId:1 inEventLoop:false
eventExecutor threadId:12 inEventLoop:true
eventExecutor threadId:12 inEventLoop:true
eventExecutor threadId:12 inEventLoop:true
eventExecutor threadId:12 inEventLoop:true


3.2
每隔1秒钟添加任务,将会看到线程都是新创建的,因为线程执行完毕后会自动退出
        {
            for (int i=0;i<5;i++)
            {
                Future<Integer> future=GlobalEventExecutor.INSTANCE.submit(new Callable<Integer>() {

                    @Override
                    public Integer call() throws Exception {

                        System.out.println("eventExecutor threadId:"+Thread.currentThread().getId()+
" inEventLoop:"+GlobalEventExecutor.INSTANCE.inEventLoop());
                        return 1;
                    }
                });
                Thread.sleep(1000);
                System.out.println("threadId:"+Thread.currentThread().getId()+
" inEventLoop:"+GlobalEventExecutor.INSTANCE.inEventLoop());

            }
        }

输出结果
eventExecutor threadId:12 inEventLoop:true
threadId:1 inEventLoop:false
eventExecutor threadId:13 inEventLoop:true
threadId:1 inEventLoop:false
eventExecutor threadId:14 inEventLoop:true
threadId:1 inEventLoop:false
eventExecutor threadId:15 inEventLoop:true
threadId:1 inEventLoop:false
eventExecutor threadId:16 inEventLoop:true
threadId:1 inEventLoop:false

4.源码分析

启动线程的逻辑:当前没有线程且线程没有在任务执行状态
    @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        addTask(task);
        if (!inEventLoop()) {
            startThread();
        }
    }

    private void startThread() {
        if (started.compareAndSet(false, true)) {
            Thread t = threadFactory.newThread(taskRunner);
            // Set the thread before starting it as otherwise inEventLoop() may return false and so produce
            // an assert error.
            // See https://github.com/netty/netty/issues/4357
            thread = t;
            t.start();
        }
    }

   final class TaskRunner implements Runnable {
        @Override
        public void run() {
            for (;;) {
                Runnable task = takeTask();
                if (task != null) {
                    try {
                        task.run();
                    } catch (Throwable t) {
                        logger.warn("Unexpected exception from the global event executor: ", t);
                    }

                    if (task != quietPeriodTask) {
                        continue;
                    }
                }

                Queue<ScheduledFutureTask<?>> scheduledTaskQueue = GlobalEventExecutor.this.scheduledTaskQueue;
                // Terminate if there is no task in the queue (except the noop task).
                if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) {
                    // Mark the current thread as stopped.
                    // The following CAS must always success and must be uncontended,
                    // because only one thread should be running at the same time.
                    boolean stopped = started.compareAndSet(true, false);
                    assert stopped;

                    // Check if there are pending entries added by execute() or schedule*() while we do CAS above.
                    if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) {
                        // A) No new task was added and thus there's nothing to handle
                        //    -> safe to terminate because there's nothing left to do
                        // B) A new thread started and handled all the new tasks.
                        //    -> safe to terminate the new thread will take care the rest
                        break;
                    }

                    // There are pending tasks added again.
                    if (!started.compareAndSet(false, true)) {
                        // startThread() started a new thread and set 'started' to true.
                        // -> terminate this thread so that the new thread reads from taskQueue exclusively.
                        break;
                    }

                    // New tasks were added, but this worker was faster to set 'started' to true.
                    // i.e. a new worker thread was not started by startThread().
                    // -> keep this thread alive to handle the newly added entries.
                }
            }
        }
    }

5.DefaultPromise的实现

本质是使用EventExecutor的execute方法,将通知方法放到异步线程队列中,所以其回调也是在EventExecutor的线程中执行
    private void notifyListeners() {
        EventExecutor executor = executor();
        if (executor.inEventLoop()) {
            final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
            final int stackDepth = threadLocals.futureListenerStackDepth();
            if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
                threadLocals.setFutureListenerStackDepth(stackDepth + 1);
                try {
                    notifyListenersNow();
                } finally {
                    threadLocals.setFutureListenerStackDepth(stackDepth);
                }
                return;
            }
        }

        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                notifyListenersNow();
            }
        });
    }

    private static void safeExecute(EventExecutor executor, Runnable task) {
        try {
            executor.execute(task);
        } catch (Throwable t) {
            rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);
        }
    }

回调调用堆栈

测试代码:
                Promise<Integer> promise=GlobalEventExecutor.INSTANCE.newPromise();
                promise.addListener(new GenericFutureListener<Future<? super Integer>>() {
                    @Override
                    public void operationComplete(Future<? super Integer> future) throws Exception {

                        System.out.println("threadId:"+Thread.currentThread().getId());
                        System.out.println("operationComplete");
                    }
                });

                System.out.println("threadId:"+Thread.currentThread().getId());
                promise.trySuccess(2);

  • 大小: 87.9 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics