`

在用Netty 3.6.2发数据,发现内核缓冲区满的时候.....

    博客分类:
  • java
阅读更多

       用nettys收发网络数据的时候,一般不会注册SelectionKey.OP_WRITE事件。但是,如果在netty写数据的时候AbstractNioWorker.write0,发现写不进去了buf.finished()==false,可能是操作系统内核缓冲区满了,这个时候会把SelectionKey关联的Channel注册上OP_WRITE事件

    protected void write0(AbstractNioChannel<?> channel) {
        boolean open = true;
        boolean addOpWrite = false;
        boolean removeOpWrite = false;
        boolean iothread = isIoThread(channel);
        long writtenBytes = 0;

        final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
        final WritableByteChannel ch = channel.channel;
        final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
        final int writeSpinCount = channel.getConfig().getWriteSpinCount();
        synchronized (channel.writeLock) {
            channel.inWriteNowLoop = true;
            for (;;) {
                MessageEvent evt = channel.currentWriteEvent;
                SendBuffer buf = null;
                ChannelFuture future = null;
                try {
                    ……
                    if (buf.finished()) {
                        // Successful write - proceed to the next message.
                        buf.release();
                        channel.currentWriteEvent = null;
                        channel.currentWriteBuffer = null;
                        // Mark the event object for garbage collection.
                        //noinspection UnusedAssignment
                        evt = null;
                        buf = null;
                        future.setSuccess();
                    } else {//写不进去啦,内核缓冲区可能满了
                        // Not written fully - perhaps the kernel buffer is full.
                        addOpWrite = true;//这个参数在后边会导致注册OP_WRITE事件
                        channel.writeSuspended = true;//这个参数会影响到后续的写入操作
                        if (localWrittenBytes > 0) {
                            // Notify progress listeners if necessary.
                            future.setProgress(
                                    localWrittenBytes,
                                    buf.writtenBytes(), buf.totalBytes());
                        }
                        break;
                    }
                } catch (AsynchronousCloseException e) {
                    // Doesn't need a user attention - ignore.
                } catch (Throwable t) {
                    ……
                }
            }
            channel.inWriteNowLoop = false;

            // Initially, the following block was executed after releasing
            // the writeLock, but there was a race condition, and it has to be
            // executed before releasing the writeLock:
            //
            //     https://issues.jboss.org/browse/NETTY-410
            //
            if (open) {
                if (addOpWrite) {
                    setOpWrite(channel);
                } else if (removeOpWrite) {
                    clearOpWrite(channel);
                }
            }
        }
        ……
    }

        同时,AbstractNioWorker.write0的时候,会把被写的Channel.writeSuspended设置为true,这个参数在Channels.write -> DefaultChannelPipeline.sendDownstream -> NioClientSocketPipelineSink.eventSunk -> AbstractNioWorker.writeFromUserCode时会控制后续写入操作。

    public void eventSunk(
            ChannelPipeline pipeline, ChannelEvent e) throws Exception {
        if (e instanceof ChannelStateEvent) {
            ChannelStateEvent event = (ChannelStateEvent) e;
            NioClientSocketChannel channel =
                (NioClientSocketChannel) event.getChannel();
            ChannelFuture future = event.getFuture();
            ChannelState state = event.getState();
            Object value = event.getValue();

            switch (state) {
            case OPEN:
                ……
            case INTEREST_OPS:
                channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
                break;
            }
        } else if (e instanceof MessageEvent) {
            MessageEvent event = (MessageEvent) e;
            NioSocketChannel channel = (NioSocketChannel) event.getChannel();
            boolean offered = channel.writeBufferQueue.offer(event);
            assert offered;
            channel.worker.writeFromUserCode(channel);//写入前的判断逻辑
        }
    }



   void writeFromUserCode(final AbstractNioChannel<?> channel) {
        ……
        if (channel.writeSuspended) {//如果这里为true,永远也不会调用真正的写入操作了
            return;
        }
        ……
        write0(channel);
    }

        但是上边提到,在写入不了的时候,注册了OP_WRITE事件,一旦内核缓冲有空间了,在selector.selectKeys()的时候,就会抓到这个注册了OP_WRITE事件的selectionKey(问题是,哪个线程会做这样的事情?channel和worker是绑定的?一个worker关联一个selector?)NioWorker.run -> AbstractNioWorker.run 

    public void run() {
        thread = Thread.currentThread();

        int selectReturnsImmediately = 0;
        Selector selector = this.selector;

        if (selector == null) {
            return;
        }
        // use 80% of the timeout for measure
        final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100;
        boolean wakenupFromLoop = false;
        for (;;) {
            wakenUp.set(false);

            try {
                long beforeSelect = System.nanoTime();
                int selected = select(selector);//select准备就绪的key
                if (SelectorUtil.EPOLL_BUG_WORKAROUND && selected == 0 && !wakenupFromLoop && !wakenUp.get()) {
                    long timeBlocked = System.nanoTime() - beforeSelect;

                    if (timeBlocked < minSelectTimeout) {
                        boolean notConnected = false;
                        // loop over all keys as the selector may was unblocked because of a closed channel
                        for (SelectionKey key: selector.keys()) {
                            SelectableChannel ch = key.channel();
                            try {
                                if (ch instanceof DatagramChannel && !((DatagramChannel) ch).isConnected() ||
                                        ch instanceof SocketChannel && !((SocketChannel) ch).isConnected()) {
                                    notConnected = true;
                                    // cancel the key just to be on the safe side
                                    key.cancel();
                                }
                            } catch (CancelledKeyException e) {
                                // ignore
                            }
                        }
                        if (notConnected) {
                            selectReturnsImmediately = 0;
                        } else {
                            // returned before the minSelectTimeout elapsed with nothing select.
                            // this may be the cause of the jdk epoll(..) bug, so increment the counter
                            // which we use later to see if its really the jdk bug.
                            selectReturnsImmediately ++;
                        }
                    } else {
                        selectReturnsImmediately = 0;
                    }
                    ……
                } else {
                    // reset counter
                    selectReturnsImmediately = 0;
                }
                ……
                if (wakenUp.get()) {
                    wakenupFromLoop = true;
                    selector.wakeup();
                } else {
                    wakenupFromLoop = false;
                }

                cancelledKeys = 0;
                processTaskQueue();//处理task,真正的逻辑是在这里处理的?
                selector = this.selector; // processTaskQueue() can call rebuildSelector()

                if (shutdown) {
                    ……
                } else {
                    process(selector);//处理就绪的key
                }
            } catch (Throwable t) {
               ……
            }
        }
    }


    @Override
    protected void process(Selector selector) throws IOException {
        Set<SelectionKey> selectedKeys = selector.selectedKeys();
        // check if the set is empty and if so just return to not create garbage by
        // creating a new Iterator every time even if there is nothing to process.
        // See https://github.com/netty/netty/issues/597
        if (selectedKeys.isEmpty()) {
            return;
        }
        for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
            SelectionKey k = i.next();
            i.remove();
            try {
                int readyOps = k.readyOps();
                //读事件
                if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
                    if (!read(k)) {
                        // Connection already closed - no need to handle write.
                        continue;
                    }
                }
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                    writeFromSelectorLoop(k);//注册了OP_WRITE事件!现在内核告知可以继续写啦!
                }
            } catch (CancelledKeyException e) {
                close(k);
            }
            ……
        }
    }

    void writeFromSelectorLoop(final SelectionKey k) {
        AbstractNioChannel<?> ch = (AbstractNioChannel<?>) k.attachment();
        ch.writeSuspended = false;//写不再挂起,调用Channel.write走到writeFromUserCode()的时候,也可以顺利的走到write0了
        write0(ch);
    }
分享到:
评论

相关推荐

    Netty 3.6.2.Final 稳定版本 含源码

    Netty 3.6.2.Final 稳定版本 含源码

    netty-3.6.2.Final-API文档-中文版.zip

    赠送jar包:netty-3.6.2.Final.jar; 赠送原API文档:netty-3.6.2.Final-javadoc.jar; 赠送源代码:netty-3.6.2.Final-sources.jar; 赠送Maven依赖信息文件:netty-3.6.2.Final.pom; 包含翻译后的API文档:netty-...

    netty-3.6.2.Final.jar

    java运行依赖jar包

    netty-3.6.2.Final-API文档-中英对照版.zip

    赠送jar包:netty-3.6.2.Final.jar; 赠送原API文档:netty-3.6.2.Final-javadoc.jar; 赠送源代码:netty-3.6.2.Final-sources.jar; 赠送Maven依赖信息文件:netty-3.6.2.Final.pom; 包含翻译后的API文档:netty-...

    netty-netty-4.1.69.Final.tar.gz

    8. **零拷贝**:Netty通过直接将数据从网络缓冲区传递到用户缓冲区,或者反之,实现了零拷贝,减少了不必要的内存复制,提高了效率。 9. **心跳与空闲检测**:Netty提供心跳机制和空闲检测,以保持连接的活跃状态并...

    netty-netty-4.1.79.Final.tar.gz

    5. **零拷贝**:Netty实现了操作系统层面的零拷贝,通过直接缓冲区和FileChannel的transferTo方法,减少了数据在用户空间和内核空间之间传输的次数,提高了性能。 6. **线程模型**:Netty的EventLoopGroup负责调度...

    netty-common-4.1.65.Final-API文档-中英对照版.zip

    赠送jar包:netty-common-4.1.65.Final.jar; 赠送原API文档:netty-common-4.1.65.Final-javadoc.jar; 赠送源代码:netty-common-4.1.65.Final-sources.jar; 赠送Maven依赖信息文件:netty-common-4.1.65.Final....

    netty-all-4.1.32.Final-sources.jar 最新版netty源码全部包

    netty-buffer-4.1.32.Final-sources.jar netty-buffer-4.1.32.Final.jar netty-build-22-sources.jar netty-build-22.jar netty-codec-4.1.32.Final-sources.jar netty-codec-4.1.32.Final.jar netty-codec-...

    netty-netty-4.1.36.Final.rar

    netty源码netty-netty-4.1.36.Final.rarnetty源码netty-netty-4.1.36.Final.rarnetty源码netty-netty-4.1.36.Final.rarnetty源码netty-netty-4.1.36.Final.rarnetty源码netty-netty-4.1.36.Final.rarnetty源码netty-...

    聊聊Netty那些事儿之从内核角度看IO模型.doc

    如果接收缓冲区中没有数据,那么应用程序就会在系统调用上阻塞,直到Socket接收缓冲区有数据,然后CPU将内核空间(Socket接收缓冲区)的数据拷贝到用户空间,最后系统调用read返回,应用程序读取数据。

    netty-all-4.1.68.Final-API文档-中文版.zip

    赠送jar包:netty-all-4.1.68.Final.jar; 赠送原API文档:netty-all-4.1.68.Final-javadoc.jar; 赠送源代码:netty-all-4.1.68.Final-sources.jar; 赠送Maven依赖信息文件:netty-all-4.1.68.Final.pom; 包含...

    netty-netty-4.1.96.Final.tar.gz 官网最新版Netty Project

    Netty 是一个利用 Java 的高级网络的能力,隐藏其背后的复杂性而提供一个易于使用的 API 的客户端/服务器框架。 Netty 是一个广泛使用的 Java 网络编程框架(Netty 在 2011 年获得了Duke's Choice Award,见...

    Netty (netty-3.2.5.Final.jar,netty-3.2.5.Final-sources.jar)

    3. **灵活的管道(Pipeline)机制**:网络数据在Netty中通过一系列处理器(ChannelHandler)进行处理,形成了数据处理的管道,开发者可以根据需求自定义处理器,实现复杂的数据处理逻辑。 4. **丰富的协议支持**:...

    Netty (netty-netty-4.1.77.Final.tar.gz)

    Netty (netty-netty-4.1.77.Final.tar.gz)是一个 NIO 客户端服务器框架,可以快速轻松地开发协议服务器和客户端等网络应用程序。它极大地简化和流线了网络编程,例如 TCP 和 UDP 套接字服务器。 “快速和简单”并...

    netty-netty-3.10.6.Final.tar.gz

    Netty (netty-netty-3.10.6.Final.tar.gz)是一个 NIO 客户端服务器框架,可以快速轻松地开发协议服务器和客户端等网络应用程序。它极大地简化和流线了网络编程,例如 TCP 和 UDP 套接字服务器。 “快速和简单”并...

    基于Netty的Java数据采集软件

    《基于Netty的Java数据采集软件详解》 在IT领域,高效、稳定的数据采集系统是许多业务场景中的关键环节。本文将深入探讨一个基于Netty的Java数据采集软件,它利用Netty强大的网络通信框架,实现了对大规模分布式...

    netty-all-4.1.28.Final.jar

    1. **ByteBuf**: Netty自定义的字节缓冲区,比Java的ByteBuffer更加强大和高效,支持读写分离,便于数据操作和内存管理。 2. **Channel**: 表示网络连接,可以是TCP连接、UDP连接或者本地套接字等。每个Channel对应...

Global site tag (gtag.js) - Google Analytics