`
jishuaige
  • 浏览: 10297 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

netty探索之旅六

 
阅读更多
netty中的管道--ChannelPipeline的事件传输机制

在AbstractChannelHandlerContext对象中有inbound和outbound两个boolean变量,用于标识 Context所对应的handler的类型。
inbound=ture,表示对应的ChannelHandler实现了 ChannelInboundHandler方法.
outbound=ture,表示对应的ChannelHandler实现了 ChannelOutboundHandler方法.

在netty中支持两种类型的事件: Inbound和Outbound事件。
看看以下的图:


inbound事件的流向是从下至上的,outbound事件的流向是从上至下的,inbound事件的传递方式是通过调用相应的ChannelHandlerContext.fire_EVT()方法。outbound事件的传递方式是通过调用ChannelHandlerContext.OUT_EVT()方法。例如:ChannelHandlerContext.fireChannelRegistered()的调用会发送一个ChannelRegistered的 inbound事件给下一个ChannelHandlerContext。ChannelHandlerContext.bind调用会发送一个 bind的outbound的事件给下一个ChannelHandlerContext。
Inbound事件传播方法有:
ChannelHandlerContext.fireChannelRegistered()
ChannelHandlerContext.fireChannelActive()
ChannelHandlerContext.fireChannelRead(Object)
ChannelHandlerContext.fireChannelReadComplete()
ChannelHandlerContext.fireExceptionCaught(Throwable)
ChannelHandlerContext.fireUserEventTriggered(Object)
ChannelHandlerContext.fireChannelWritabilityChanged()
ChannelHandlerContext.fireChannelInactive()
ChannelHandlerContext.fireChannelUnregistered()


Oubound事件传输方法有:
ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext.write(Object, ChannelPromise)
ChannelHandlerContext.flush()
ChannelHandlerContext.read()
ChannelHandlerContext.disconnect(ChannelPromise)
ChannelHandlerContext.close(ChannelPromise)


这些方法都在ChannelHandlerContext接口中定义。

Oubound:
Outbound事件都是请求事件(request event), 即请求某件事情的发生,然后通过Outbound事件进行通知。Outbound事件的传播方向是tail-->customContext-->head

接下来我们以connect事件,分析一下Outbound事件的传播机制。
在EchoClient实例中调用connect方法:
 ChannelFuture f = b.connect(HOST, PORT).sync();

调用链是:Bootstrap.connect -> Bootstrap.doConnect -> Bootstrap.doConnect0 -> AbstractChannel.connect
在AbstractChannel.connect中其实就是调用DefaultChannelPipeline.connect的方法:
@Override
    public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
        return pipeline.connect(remoteAddress, promise);
    }

看看DefaultChannelPipeline的connect方法:
@Override
    public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
        return tail.connect(remoteAddress, promise);
    }

当connect事件(outbound事件)到达Pipeline后,就是以tail为起点开始传播的。tail.connect调用到的是:AbstractChannelHandlerContext.connect方法:
public ChannelFuture connect(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
        if (remoteAddress == null) {
            throw new NullPointerException("remoteAddress");
        }
            return promise;
        }

        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeConnect(remoteAddress, localAddress, promise);
        } else {
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    next.invokeConnect(remoteAddress, localAddress, promise);
                }
            }, promise, null);
        }
        return promise;
    }

findContextOutbound这段代码:
private AbstractChannelHandlerContext findContextOutbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
    }

它的作用是以当前Context为起点(tail),向Pipeline中的双向链表的前端寻找第一个outbound 属性为true的Context。当我们找到了一个outbound的Context时,就调用它的invokeConnect方法。
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
        if (invokeHandler()) {
            try {
                ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
            } catch (Throwable t) {
                notifyOutboundHandlerException(t, promise);
            }
        } else {
            connect(remoteAddress, localAddress, promise);
        }
    }

方法里面会调用与之关联的handler的connect方法,就是handler()获取的handler,如果我们自己实现的handle没有重写ChannelHandler的connect方法,那么会调用 ChannelOutboundHandlerAdapter所实现的connect方法:
@Override
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception {
        ctx.connect(remoteAddress, localAddress, promise);
    }

这个方法又回到了AbstractChannelHandlerContext的connect方法了。又会找到从此context开始的第一个outbound为true的context。
循环链为:
Context.connect-->Connect.findContextOutbound-->next.invokeConnect--> handler.connect-->Context.connect。
这样的循环会把事件传递到pipeline双向链表中的头节点(head),head实现了ChannelOutboundHandler接口,outbound的属性为true。head本身是一个ChannelHandlerContext也是一个ChannelOutboundHandler,因此在connect事件到达head后,因为HeadContext中覆盖了handler()方法,因此在invokeConnect方法中的handler()返回的就是HeadContext,这样就会调用的HeadContext.connect的方法:
 public void connect(
                ChannelHandlerContext ctx,
                SocketAddress remoteAddress, SocketAddress localAddress,
                ChannelPromise promise) throws Exception {
            unsafe.connect(remoteAddress, localAddress, promise);
        }

这样的一个outbound事件就完成了。其他的outbound事件和connect事件都是遵循一个的传播规则。

Inbound:
Inbound事件是一个通知事件,即某件事情已经发生了,然后通过Inbound事件进行通知。Inbound事件通常发生在Channel的状态改变或IO事件就绪。
Inbound事件的的传播方向是:head-->customContext-->tail。

既然我们分析了Connect这个Outbound事件,那么接着分析Connect事件后会发生什么Inbound 事件,并最终找到Outbound和Inbound事件之间的联系。
上面的unsafe.connect调用的是AbstractNioUnsafe.connect:
...............
 if (doConnect(remoteAddress, localAddress)) {
                    fulfillConnectPromise(promise, wasActive);
}
..............

doConnect就是进行实际的Socket连接。在NioSocketChannel中
 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
        if (localAddress != null) {
            doBind0(localAddress);
        }

        boolean success = false;
        try {
            boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
            if (!connected) {
                selectionKey().interestOps(SelectionKey.OP_CONNECT);
            }
            success = true;
            return connected;
        } finally {
            if (!success) {
                doClose();
            }
        }
    }

在此方法后,调用了
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
            if (promise == null) {
                return;
            }
            boolean active = isActive();
            boolean promiseSet = promise.trySuccess();
            if (!wasActive && active) {
                pipeline().fireChannelActive();
            }
            if (!promiseSet) {
                close(voidPromise());
            }
        }

这个方法是在connect方法后调用的,它会调用pipeline的fireChannelActive方法,这个方法实际是把socket连接成功的事件发送出去,这个方法就是Inbound事件的起点。调用fireChannelActive方法后就产生了一个ChannelActive Inbound 事件。
看看fireChannelActive方法:
@Override
    public final ChannelPipeline fireChannelActive() {
        AbstractChannelHandlerContext.invokeChannelActive(head);
        return this;
    }

注意看看在这个方法中传入的对象是head对象,这就展示了Inbound事件在Pipeline中传输的起点是从head开始的。
AbstractChannelHandlerContext的invokeChannelActive:
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelActive();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelActive();
                }
            });
        }
    }

调用head的invokeChannelActive方法:
private void invokeChannelActive() {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelActive(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelActive();
        }
    

因为HeadContext即使Context又是Handler,所以在HeadContext重写了handler()方法,因为这个的handler()返回的就是HeadContext。
看看HeadContext的channelActive
public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelActive();

            readIfIsAutoRead();
        }

ctx.fireChannelActive()这个方法很明显了,就是把ChannelActive的Inbound事件在Pipeline中传下去。这样会调用到AbstractChannelHandlerContext里面
public ChannelHandlerContext fireChannelActive() {
        invokeChannelActive(findContextInbound());
        return this;
    }

看到findContextInbound方法没!这里就是寻找第一个属性inbound为true的Context,
private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while (!ctx.inbound);
        return ctx;
    }

然后调用invokeChannelActive方法,这样又会调到
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelActive();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelActive();
                }
            });
        }
    }

在((ChannelInboundHandler) handler()).channelActive(this);中,如果用户没有重写 channelActive方法, 那么会调用ChannelInboundHandlerAdapter的channelActive方法。
 @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();
    }

这里面也是仅仅调用了ctx.fireChannelActive方法。这样就是一个循环了。
Context.fireChannelActive-->Connect.findContextInbound--> nextContext.invokeChannelActive-->nextHandler.channelActive--> nextContext.fireChannelActive
在这里循环的最后到达了tail,它实现了ChannelInboundHandler和ChannelHandlerContext 口,当channelActive消息传递到tail后,handler()返回的就是tail本身,来看看tail中的channelActive方法:
@Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception { }

它什么都没有做。
如果是Inbound事件当用户没有实现自定义的处理器时,那么最后默认是不处理的。

总结:
Outbound:
Outbound事件是请求事件。
Outbound事件的发起者是Channel。
Outbound事件的处理者是unsafe。
Outbound事件在Pipeline中的传输方向是tail-->head。

Inbound:
Inbound事件是通知事件。
Inbound事件发起者是unsafe。
Inbound事件的处理者是Channel。
Inbound事件在Pipeline中传输方向是head-->tail。
  • 大小: 108.5 KB
分享到:
评论

相关推荐

    《Netty进阶之路 跟着案例学Netty》.rar

    Java进阶技术-netty进阶之路

    Netty进阶之路-跟着案例学Netty

    《Netty进阶之路-跟着案例学Netty》是由知名技术专家李林峰撰写的一本专为Java开发者深入理解Netty框架而准备的书籍。这本书旨在通过实例教学,帮助读者全面掌握Netty的核心特性和实战技巧,提升网络编程的能力。 ...

    《Netty进阶之路 跟着案例学Netty》_李林锋_

    Netty进阶之路 跟着案例学Netty 整本书无密码,Netty进阶之路 跟着案例学Netty

    Netty进阶之路:跟着案例学Netty 完整版.pdf

    《Netty进阶之路:跟着案例学Netty》中的案例涵盖了Netty的启动和停止、内存、并发多线程、性能、可靠性、安全等方面,囊括了Netty绝大多数常用的功能及容易让人犯错的地方。在案例的分析过程中,还穿插讲解了Netty...

    高清_书签_Netty进阶之路 跟着案例学Netty.zip

    精选自1000多个一线业务实际案例,从原理到实践全景式讲解Netty项目实践,快速领悟Netty专家花大量时间积累的经验,提高编程水平及分析解决问题的能力,《Netty木又威指南》作者力作,众专家力荐 Netty将Java NIO...

    netty学习之ServerChannel

    Netty学习之ServerChannel Netty是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本篇中,我们将深入探讨ServerChannel这一核心概念,它是Netty中用于接收客户端...

    读书笔记:Netty权威指南学习之旅.zip

    读书笔记:Netty权威指南学习之旅

    netty之hello world

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本文中,我们将深入探讨Netty的基本概念,通过“Hello World”范例来理解其工作原理。 首先,让我们理解...

    Netty进阶之路--跟着案例学part2

    在《Netty进阶之路:跟着案例学Netty》中,作者将在过去几年实践中遇到的问题,以及Netty学习者咨询的相关问题,进行了归纳和总结,以问题案例做牵引,通过对案例进行剖析,讲解问题背后的原理,并结合Netty源码分析...

    Netty实战.epub_netty实战epub_netty实战epub_netty_

    《Netty实战》这本书是针对Java网络编程框架Netty的一本深入实践教程,旨在帮助读者掌握Netty的核心特性和实际应用。Netty是一款高性能、异步事件驱动的网络应用程序框架,广泛应用于各种分布式系统、微服务架构以及...

    Netty基础,用于学习Netty,参考黑马程序员的netty教程

    Netty基础,用于学习Netty,参考黑马程序员的netty教程

    跟闪电侠学Netty:Netty即时聊天实战与底层原理-book-netty.zip

    6. **性能优化**:Netty的性能优化策略,如内存管理、线程模型调整、缓冲区复用等,以提高系统的吞吐量和响应速度。 7. **Netty底层原理**:探讨Netty如何利用Java NIO实现异步非阻塞I/O,以及零拷贝技术的工作机制...

    Netty实战 电子版.pdf_java_netty_服务器_

    6. **线程模型**:Netty提供了BossGroup和WorkerGroup的线程模型,BossGroup负责接收新的连接,WorkerGroup负责处理I/O事件和业务逻辑。 7. **心跳机制**:Netty提供了心跳包处理,用于检测连接的活跃状态,防止...

    Netty 教程 Netty权威指南

    6. **安全支持**:Netty 内置了 SSL/TLS 支持,可以轻松实现安全的网络通信。 **三、Netty 在实际项目中的应用** 1. **分布式系统**:Netty 的高性能和低延迟特性使其成为分布式系统间的通信首选。 2. **游戏...

    netty4-netty5.rar

    ChannelHandlerAdapter 4.X版本和5.X版本的差别很大。ChannelRead是属于5.X版本的4.X版本没有这个方法,所以如果要用ChannelRead。可以更换5.X版本的Netty。

    深入浅出Netty_netty_

    在Netty中,最重要的概念之一是“Boss线程”和“Worker线程”的模型。Boss线程负责接收新的连接请求,而Worker线程则处理这些连接后的读写操作,这种模型能够有效提高系统的并发能力。Netty的NIO(非阻塞I/O)模型...

    网络编程之Netty一站式精讲.rar

    Netty是Java领域的一款高性能、异步事件驱动的网络应用框架,主要用于快速开发可维护的高性能协议服务器和客户端。在本精讲中,我们将深入探讨Netty的核心概念、设计模式以及实际应用场景,帮助你全面理解并掌握...

    整合netty实时通讯

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本文中,我们将深入探讨 Netty 实时通讯的原理与应用,以及如何利用它构建 WebSocket 服务。 WebSocket 是...

    netty4.0源码,netty例子,netty api文档

    6. **易用性与灵活性**:Netty提供了许多便捷的API和工具,使得开发者可以快速构建网络应用,同时它的设计允许灵活地调整和扩展。 接下来,我们关注下压缩包中的具体文件: - **javadoc** 文件夹包含了Netty 4.0.0...

    springboot整合 netty做心跳检测

    springboot整合 netty做心跳检测 springboot整合 netty做心跳检测 springboot整合 netty做心跳检测 springboot整合 netty做心跳检测 springboot整合 netty做心跳检测 springboot整合 netty做心跳检测 springboot整合...

Global site tag (gtag.js) - Google Analytics