`
ahua186186
  • 浏览: 563134 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

netty 3.10.4.Final 源码学习总结

 
阅读更多
提示:个人理解并且只针对NIO模式


总的来说,包括1个“核心接口”和2个“核心接口帮助类”:

(1)ChannelFactory(NioClientSocketChannelFactory和 NioServerSocketChannelFactory)

作用:初始化boss,work线程

(2)Channels,DefaultChannelPipeline

作用:统一发送底层IO thread的 ChannelEvent事件给handler(上行+下行)或ChannelSink(下行),通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果

(3)ChannelSink(NioClientSocketChannelSlink和NioServerSocketChannelSlink)负责初始化客户端ClientBootstrap 的connect操作和服务端ServertBootstrap 的bind操作,以及下行通道消息的write。

细节:

1. 服务端读数据:采用轮询channel的方式读取数据。

关键代码:

  while ((ret = ch.read(bb)) > 0) { 
                readBytes += ret; 
                if (!bb.hasRemaining()) { 
                    break; 
                } 
            } 


@Override
    protected boolean read(SelectionKey k) {
        final SocketChannel ch = (SocketChannel) k.channel();
        final NioSocketChannel channel = (NioSocketChannel) k.attachment();

        final ReceiveBufferSizePredictor predictor =
            channel.getConfig().getReceiveBufferSizePredictor();
        final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
        final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();

        int ret = 0;
        int readBytes = 0;
        boolean failure = true;

        ByteBuffer bb = recvBufferPool.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder());
        try {
            while ((ret = ch.read(bb)) > 0) {
                readBytes += ret;
                if (!bb.hasRemaining()) {
                    break;
                }
            }
            failure = false;
        } catch (ClosedChannelException e) {
            // Can happen, and does not need a user attention.
        } catch (Throwable t) {
            fireExceptionCaught(channel, t);
        }

        if (readBytes > 0) {
            bb.flip();

            final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
            buffer.setBytes(0, bb);
            buffer.writerIndex(readBytes);

            // Update the predictor.
            predictor.previousReceiveBufferSize(readBytes);

            // Fire the event.
            fireMessageReceived(channel, buffer);
        }

        if (ret < 0 || failure) {
            k.cancel(); // Some JDK implementations run into an infinite loop without this.
            close(channel, succeededFuture(channel));
            return false;
        }

        return true;
    }




2.服务端写数据:先保存数据到缓存队列,然后同步写入数据到计算机内核:

关键代码:

(1)synchronized (channel.writeLock)

(2)localWrittenBytes = buf.transferTo(ch);


 protected void write0(AbstractNioChannel<?> channel) {
        boolean open = true;
        boolean addOpWrite = false;
        boolean removeOpWrite = false;
        boolean iothread = isIoThread(channel);

        long writtenBytes = 0;

        final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
        final WritableByteChannel ch = channel.channel;
        final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
        final int writeSpinCount = channel.getConfig().getWriteSpinCount();
        List<Throwable> causes = null;

        synchronized (channel.writeLock) {
            channel.inWriteNowLoop = true;
            for (;;) {

                MessageEvent evt = channel.currentWriteEvent;
                SendBuffer buf = null;
                ChannelFuture future = null;
                try {
                    if (evt == null) {
                        if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
                            removeOpWrite = true;
                            channel.writeSuspended = false;
                            break;
                        }
                        future = evt.getFuture();

                        channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
                    } else {
                        future = evt.getFuture();
                        buf = channel.currentWriteBuffer;
                    }

                    long localWrittenBytes = 0;
                    for (int i = writeSpinCount; i > 0; i --) {
                        localWrittenBytes = buf.transferTo(ch);
                        if (localWrittenBytes != 0) {
                            writtenBytes += localWrittenBytes;
                            break;
                        }
                        if (buf.finished()) {
                            break;
                        }
                    }

                    if (buf.finished()) {
                        // Successful write - proceed to the next message.
                        buf.release();
                        channel.currentWriteEvent = null;
                        channel.currentWriteBuffer = null;
                        // Mark the event object for garbage collection.
                        //noinspection UnusedAssignment
                        evt = null;
                        buf = null;
                        future.setSuccess();
                    } else {
                        // Not written fully - perhaps the kernel buffer is full.
                        addOpWrite = true;
                        channel.writeSuspended = true;

                        if (writtenBytes > 0) {
                            // Notify progress listeners if necessary.
                            future.setProgress(
                                    localWrittenBytes,
                                    buf.writtenBytes(), buf.totalBytes());
                        }
                        break;
                    }
                } catch (AsynchronousCloseException e) {
                    // Doesn't need a user attention - ignore.
                } catch (Throwable t) {
                    if (buf != null) {
                        buf.release();
                    }
                    channel.currentWriteEvent = null;
                    channel.currentWriteBuffer = null;
                    // Mark the event object for garbage collection.
                    //noinspection UnusedAssignment
                    buf = null;
                    //noinspection UnusedAssignment
                    evt = null;
                    if (future != null) {
                        future.setFailure(t);
                    }
                    if (iothread) {
                        if (causes == null) {
                            causes = new ArrayList<Throwable>(1);
                        }
                        causes.add(t);
                    } else {
                        fireExceptionCaughtLater(channel, t);
                    }
                    if (t instanceof IOException) {
                        open = false;
                    }
                }
            }
            channel.inWriteNowLoop = false;
            if (open) {
                if (addOpWrite) {
                    setOpWrite(channel);
                } else if (removeOpWrite) {
                    clearOpWrite(channel);
                }
            }
        }
        if (causes != null) {
            for (Throwable cause: causes) {
                // notify about cause now as it was triggered in the write loop
                fireExceptionCaught(channel, cause);
            }
        }
        if (!open) {
            // close the channel now
            close(channel, succeededFuture(channel));
        }
        if (iothread) {
            fireWriteComplete(channel, writtenBytes);
        } else {
            fireWriteCompleteLater(channel, writtenBytes);
        }
    }


3. 服务端Handlers都是共享,读的时候没有并发问题,因为都是在IO线程中轮询读取数据,直到写入业务线程的队列中才返回,但是写的时候存在并发问题,因为写操作由业务线程执行,通常业务会使用线程池并行处理业务消息,这就意味着在某一个时刻会有多个业务线程同时操作ChannelHandler,我们需要对ChannelHandler进行并发保护,通常需要加锁。如果同步块的范围不当,可能会导致严重的性能瓶颈,这对开发者的技能要求非常高,降低了开发效率;

4. netty 3.10 写操作的并发问题指的是ChannelHandler的成员变量,因为:
{局部变量的数据存在于栈内存中。栈内存中的局部变量随着方法的消失而消失。
成员变量存储在堆中的对象里面,由垃圾回收器负责回收},我发现58同城那个RPC框架中httpHandler就存在这样的问题,估计他们都没有用到上传附件的功能所以一直没有修改这个bug。

5.引用李林峰的一句话:在Netty 3的时候,upstream是在I/O线程里执行的,而downstream是在业务线程里执行。当Netty从网络读取一个数据报投递给业务handler的时候,handler是在I/O线程里执行;而当我们在业务线程中调用write和writeAndFlush向网络发送消息的时候,handler是在业务线程里执行,直到最后一个Header handler将消息写入到发送队列中,业务线程才返回。


6. 业务线程只需要持有channel对象既可写入响应数据到channel的writeBufferQueue,这样在上层应用几乎只需要关注业务即可。
分享到:
评论

相关推荐

    Netty (netty-3.2.5.Final.jar,netty-3.2.5.Final-sources.jar)

    `netty-3.2.5.Final-sources.jar` 提供了Netty框架的源代码,这对于开发者来说是非常有价值的,因为它允许他们深入理解框架的工作原理,方便调试、学习和定制。通过查看源码,开发者可以更好地掌握如何利用Netty的...

    netty-netty-4.1.36.Final.rar

    netty源码netty-netty-4.1.36.Final.rarnetty源码netty-netty-4.1.36.Final.rarnetty源码netty-netty-4.1.36.Final.rarnetty源码netty-netty-4.1.36.Final.rarnetty源码netty-netty-4.1.36.Final.rarnetty源码netty-...

    netty-netty-4.1.79.Final.tar.gz

    这个“netty-netty-4.1.79.Final.tar.gz”文件是一个包含Netty 4.1.79.Final版本的压缩包,通常用于Java开发环境。解压后,我们可以得到Netty的源代码、库文件和其他相关资源。 Netty的核心特性包括: 1. **异步...

    Netty 3.6.2.Final 稳定版本 含源码

    Netty 3.6.2.Final 稳定版本 含源码

    Netty-4.1.97.Final源码

    总之,Netty-4.1.97.Final源码提供了丰富的学习资源,涵盖了网络编程的各个方面,对于提升Java程序员的专业技能具有重要作用。通过深入研究源码,你将能够更好地掌握Netty的工作原理,为你的项目带来更高效、更稳定...

    netty-3.7.0.Final-API文档-中文版.zip

    赠送jar包:netty-3.7.0.Final.jar; 赠送原API文档:netty-3.7.0.Final-javadoc.jar; 赠送源代码:netty-3.7.0.Final-sources.jar; 赠送Maven依赖信息文件:netty-3.7.0.Final.pom; 包含翻译后的API文档:netty-...

    netty-3.10.5.Final-API文档-中文版.zip

    赠送jar包:netty-3.10.5.Final.jar; 赠送原API文档:netty-3.10.5.Final-javadoc.jar; 赠送源代码:netty-3.10.5.Final-sources.jar; 赠送Maven依赖信息文件:netty-3.10.5.Final.pom; 包含翻译后的API文档:...

    netty-3.2.7.final JAR包

    netty-3.2.7.final JAR包,netty-3.2.7.final JAR包,netty-3.2.7.final JAR包

    netty-netty-3.10.6.Final.tar.gz

    Netty (netty-netty-3.10.6.Final.tar.gz)是一个 NIO 客户端服务器框架,可以快速轻松地开发协议服务器和客户端等网络应用程序。它极大地简化和流线了网络编程,例如 TCP 和 UDP 套接字服务器。 “快速和简单”并...

    netty-all-4.1.32.Final-sources.jar 最新版netty源码全部包

    netty-buffer-4.1.32.Final-sources.jar netty-buffer-4.1.32.Final.jar netty-build-22-sources.jar netty-build-22.jar netty-codec-4.1.32.Final-sources.jar netty-codec-4.1.32.Final.jar netty-codec-...

    netty-netty-4.1.69.Final.tar.gz

    这个“netty-netty-4.1.69.Final.tar.gz”文件是Netty的最新稳定版本,版本号为4.1.69.Final,它是一个压缩包文件,通常包含源码、编译后的类库、文档和其他相关资源。 Netty的核心特点包括: 1. **异步事件驱动**...

    netty-3.9.9.Final-API文档-中文版.zip

    赠送jar包:netty-3.9.9.Final.jar; 赠送原API文档:netty-3.9.9.Final-javadoc.jar; 赠送源代码:netty-3.9.9.Final-sources.jar; 赠送Maven依赖信息文件:netty-3.9.9.Final.pom; 包含翻译后的API文档:netty-...

    netty-all-4.1.6.Final.jar

    netty4.x开发所需jar包,版本号为:4.1.6-Final,亲测可用。

    netty-4.1.16.Final.jar netty全部jar及源码

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。这个"netty-4.1.16.Final.jar"是Netty框架的一个...总的来说,这个压缩包为使用和学习Netty提供了完整的资源。

    netty-netty-4.1.32.final-remark.zip

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在深入探讨Netty的知识点...通过学习,开发者可以深入理解 Netty 的工作原理,提高网络应用的开发效率和性能。

    Netty.4.1.29.Final

    Netty.4.1.29.Final 包含Jar包支持、Javadoc、Source。。

    Netty (netty-netty-4.1.74.Final.tar.gz)

    Netty (netty-netty-4.1.74.Final.tar.gz)是一个 NIO 客户端服务器框架,可以快速轻松地开发协议服务器和客户端等网络应用程序。它极大地简化和流线了网络编程,例如 TCP 和 UDP 套接字服务器。 “快速和简单”并...

Global site tag (gtag.js) - Google Analytics