`
youaremoon
  • 浏览: 32470 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论

netty5笔记-线程模型1-Promise

阅读更多

冬天实在太冷了,习惯了广东的天气,突然换个地方还真有点不适应, 早就想写的学习笔记也一直拖到现在。下面进入正题,一起来学习下netty的线程池实现。 我们知道java本身实现了一套线程池,即我们常见的ExecutorService。那么netty为什么还要定义自己的线程模型,什么时候适合用netty线程池,什么时候适合用ExecutorService。相信你看了这几篇文章就会有眉目。

先来张大图,直观的看下netty线程模型和java自带线程池的区别:

需要注意,此图对线程模型进行了简化。图左netty中的executor指jdk中的Executor接口。

了解Promise前我们先看看Future。Future代表一个异步任务的结果,由于是异步任务,得到Future并不代表任务结束,你可以通过get来等待真正结果的返回,或者通过cancal来取消任务。netty对java.util.concurrent.Future进行了增强:

方法名 说明
isSuccess 任务是否执行成功
isCancellable 任务是否可以取消
cause 任务产生的异常
addListener 添加listener, 任务完成后执行listener,如果任务已经完成,则添加时立刻执行
removeListener 移除listener
sync 等待任务结束,如果任务产生异常或被中断则抛出异常,否则返回Future自身
syncUninterruptibly 等待任务结束,任务本身不可中断,如果产生异常则抛出异常,否则返回Future自身
await 等待任务结束,如果任务被中断则抛出中断异常,与sync不同的是只抛出中断异常,不抛出任务产生的异常
awaitUninterruptibly 等待任务结束,任务不可中断
getNow 任务未完成或者发生异常则返回null, 否则返回任务的结果

Future本身不能由实现者直接标记成功或失败,而是由调用的线程来标记。而Promise则在Future的基础上增加了让用户自己标记成功或失败的接口:

 

方法名 说明
setSuccess 通过设置结果的方式标记Future成功并通知所有listener, 如果已被标记过,则抛出异常
trySuccess 通过设置结果的方式标记Future成功并通知所有listener, 如果已被标记过,只是返回false
setFailure 通过设置异常的方式标记Future失败并通知所有listener, 如果已被标记过,则抛出异常
tryFailure 通过设置异常的方式标记Future失败并通知所有listener, 如果已被标记过,只是返回false

Promise这种特性使异步变成更加灵活,某些场景下效率更高(后面会介绍)。来看看Promise的默认实现DefaultPromise。
DefaultPromise的设计比较有特点,利用一个result属性表示了所有的状态:

 

值 说明
null 任务还未开始执行(初始值),此时任务可以被取消
UNCANCELLABLE 任务不可取消
CANCELLATION_CAUSE_HOLDER 任务取消
CauseHolder 执行完成,产生的异常
SUCCESS 执行成功,且结果为null
其他 执行成功,且结果为result

DefaultPromise实现不算复杂, 这里只选取其中几个方法来分析,先看看一个比较重要的方法,等待任务结束的await:

 

public Promise<V> await() throws InterruptedException {
        // 如果任务已经完成则直接返回
        if (isDone()) {
            return this;
        }

        if (Thread.interrupted()) {
            throw new InterruptedException(toString());
        }

        synchronized (this) {
            // 未完成则一直循环
            while (!isDone()) {
                // 检测是否产生死锁
                checkDeadLock();
                // 将waiter数加1
                incWaiters();
                try {
                    // 等待(被唤醒)
                    wait();
                } finally {
                    // 将waiter数减1
                    decWaiters();
                }
            }
        }
        return this;
    }

不复杂吧,这里有一个需要注意的方法checkDeadLock(),netty线程池的设计,在使用者不正确的使用的情况下会产生死锁,下面我们来看看死锁的检测方法,并分析什么情况下会死锁:

 

 

protected void checkDeadLock() {
        EventExecutor e = executor();
        if (e != null && e.inEventLoop()) {
            throw new BlockingOperationException(toString());
        }
    }

代码够简单吧,executor线程池负责在任务完成的时候唤醒此Promise, 而e.inEventLoop()则表示当前线程和executor的执行线程是同一个(提前出线了,后面线程池分析会有),即该线程上的一个任务等待该线程上的其他任务唤醒自己。我们知道线程的执行是线性,即前面的代码执行完毕才能执行后面的代码,因此这里产生了一个死锁。那么什么样的代码会产生这种情况呢,下面给一个例子:

public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println(msg);
        //这里的cf是一个DefaultPromise的子类的实例
        // 代码1
        ChannelFuture cf = ctx.write(msg);
        // 代码2
        ChannelFuture cf = ctx.writeAndFlush(msg);
        try {// 使用代码1会产生死锁(被检测到并抛出异常),而代码2是正常的
            cf.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

说明下这里的cf其实也是一个DefaultPromise的子类的实例。代码1和代码2比差异不大, 却导致了死锁,怎么回事呢?原来调用write方法只是把发送数据放入一个缓存,而不会真实的发送,而writeAndFlush则是将数据放入缓存然后发送数据,发送完成自然cf.isDone() == true了,所以await方法不会走到死锁检测的地方。而write方法调用后cf.isDone()仍然是false,最后死锁检测那一步就会报错了。那么如果我们去除死锁检测那段代码(即checkDeadLock()),会发生什么情况呢? 当前线程会进入wait(), 而真正分发送数据的代码永远不会执行(因为发送数据的方法也是在当前线程中执行,且在这段代码以后)。这样死锁就妥妥的产生了!
代码始终是枯燥的,下面我们举个比较易懂的例子说明下:你和朋友A都是程序员,都很老实,午餐时间到了你们去食堂排队吃东西,并且老老实实的排在一个队伍中。突然你发现忘了带卡,于是你决定等A付完钱再拿他的卡付钱。 此时如果A在你的前面他可以买完把卡给你,但是如果A在你的后面,就会出现一个局面, A在队伍后面等着买单,而你在前面等A买单后给你卡(还不愿意放弃现在的位置),最终整个队伍就卡死在你这了。
解决的方法有两个:

 

1、你转身让A先把卡给你;

2、把A放在其他队伍中,两个队伍互不干扰,这样A能够拿到饭,然后给你卡。
方法1在java里不可行,因为已经调用了wait(),除非有人notify或者interrupte,不然无法进行任何操作);方法2在java的fixed线程池模式下是可行的,在netty的线程池模型下是不行的,为啥? 再看看上面那个大图,一个任务进入一个队列后就和一个线程绑定死了,无法切换到其他线程,so,在netty中这样的代码千万不能写。

“这样的代码”指在netty线程池中调用await等阻塞方法。
即使是代码2也最好不要写,因为不是每个这种方法你都能hold住,都知道其实现,所以最保险的方法就是不用它! 

最后看看如果标记任务完成的:

public Promise<V> setSuccess(V result) {
        // 设置完后调用listener
        if (setSuccess0(result)) {
            notifyListeners();
            return this;
        }
        throw new IllegalStateException("complete already: " + this);
    }
    private boolean setSuccess0(V result) {
        if (isDone()) {
            return false;
        }

        synchronized (this) {
            // Allow only once.
            if (isDone()) {
                return false;
            }
            if (result == null) {
                this.result = SUCCESS;
            } else {
                this.result = result;
            }
            // 如果有等待者,则发起通知
           if (hasWaiters()) {
                notifyAll();
            }
        }
        return true;
    }
    private boolean hasWaiters() {
        return waiters > 0;
    }

任务执行完成后会notifyListeners,这里需要注意的是,listener的方法默认情况下是使用io线程执行的,因此不要在里面有很耗时或阻塞的代码,如果确实有的话,可以在实例化Promise的时候传入非io线程的EventExecutor,或者保证listener的operationComplete方法中在其他线程池中执行。

Promise的实现还有很多,但多半是线程池实现类的内部类,这里不过多介绍了。有一个比较常见的ProgressivePromise可以关注下,在Promise的基础上增加了进度的跟踪, 适用如监控数据发送进度之类的场景。






 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics