`

netty3.6.2中写数据的过程,以及写数据写不出去后怎么处理

    博客分类:
  • java
阅读更多

        netty写数据的时候,会先放到一个缓存队列AbstractNioChannel.writeBufferQueue中,这个队列是WriteRequestQueue

public void eventSunk(
        ChannelPipeline pipeline, ChannelEvent e) throws Exception {
    if (e instanceof ChannelStateEvent) {
        ……
    } else if (e instanceof MessageEvent) {
        MessageEvent event = (MessageEvent) e;
        NioSocketChannel channel = (NioSocketChannel) event.getChannel();
        boolean offered = channel.writeBufferQueue.offer(event);//写到channel的writeBufferQueue
        assert offered;
        channel.worker.writeFromUserCode(channel);
    }
}

        WriteRequestQueue的offer方法中会根据缓存消息的总大小(字节数)判断是否超过了高水位线highWaterMark,如果第一次超过了超过高水位线,就会fireChannelInterestChanged;后边如果仍然一直往队列放数据,缓存的消息的大小持续超过高水位线的时候,不会再fireChannelInterestChanged。

public boolean offer(MessageEvent e) {
    boolean success = queue.offer(e);
    assert success;

    int messageSize = getMessageSize(e);
    int newWriteBufferSize = writeBufferSize.addAndGet(messageSize);
    int highWaterMark =  getConfig().getWriteBufferHighWaterMark();

    if (newWriteBufferSize >= highWaterMark) {
        if (newWriteBufferSize - messageSize < highWaterMark) {
            highWaterMarkCounter.incrementAndGet();
            if (!notifying.get()) {
                notifying.set(Boolean.TRUE);
                fireChannelInterestChanged(AbstractNioChannel.this);
                notifying.set(Boolean.FALSE);
            }
        }
    }
    return true;
}

        fireChannelInterestChanged这个会调到SimpleChannelUpstreamHandler.handleUpstream,触发SimpleChannelUpstreamHandler.channelInterestChanged,可以通过继承这个方法来自定义做些事情。高水位的值可以通过Bootstrap设置,最终会调到DefaultNioSocketChannelConfig.setOption。writeBufferHighWaterMark默认值为64K

public boolean setOption(String key, Object value) {
    if (super.setOption(key, value)) {
        return true;
    }
    if ("writeBufferHighWaterMark".equals(key)) {
        setWriteBufferHighWaterMark0(ConversionUtil.toInt(value));
    } else if ("writeBufferLowWaterMark".equals(key)) {
        setWriteBufferLowWaterMark0(ConversionUtil.toInt(value));
    } else if ("writeSpinCount".equals(key)) {
        setWriteSpinCount(ConversionUtil.toInt(value));
    } else if ("receiveBufferSizePredictorFactory".equals(key)) {
        setReceiveBufferSizePredictorFactory((ReceiveBufferSizePredictorFactory) value);
    } else if ("receiveBufferSizePredictor".equals(key)) {
        setReceiveBufferSizePredictor((ReceiveBufferSizePredictor) value);
    } else {
        return false;
    }
    return true;
}

        然后在write0的时候会从队列拉数据,拉数据的时候,如果发现本次拉的数据会导致缓存的数据大小(字节)从低水位writeBufferLowWaterMark之上,掉到了低水位之下,即跨过了低水位,会再次触发fireChannelInterestChanged事件。writeBufferLowWaterMark默认值为32K

public MessageEvent poll() {
    MessageEvent e = queue.poll();
    if (e != null) {
        int messageSize = getMessageSize(e);
        int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);
        int lowWaterMark = getConfig().getWriteBufferLowWaterMark();


        if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) {
            if (newWriteBufferSize + messageSize >= lowWaterMark) {//本次拉取,是的缓存数据大小掉到了低水位之下
                highWaterMarkCounter.decrementAndGet();
                if (isConnected() && !notifying.get()) {
                    notifying.set(Boolean.TRUE);
                    fireChannelInterestChanged(AbstractNioChannel.this);
                    notifying.set(Boolean.FALSE);
                }
            }
        }
    }
    return e;
}

         超过高水位和低于低水位都会触发fireChannelInterestChanged,怎么区分呢?通过AbstractChannel. isWritable(),如果channel的interestOps里边有注册过OP_WRITE,则是不可写的,否则是可写的

public boolean isWritable() {
    return (getInterestOps() & OP_WRITE) == 0;
}
public int getInterestOps() {
    if (!isOpen()) {
        return Channel.OP_WRITE;
    }

    int interestOps = getRawInterestOps();
    int writeBufferSize = this.writeBufferSize.get();
    if (writeBufferSize != 0) {
        if (highWaterMarkCounter.get() > 0) {//还记得这个值,放数据到发送队列的时候值+=1,从队列拉数据出来的时候值-=1
            int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
            if (writeBufferSize >= lowWaterMark) {//缓存队列数据量,超过高水位,也超过了低水位,意味着高水位>低水位,此时等于注册写操作
                interestOps |= Channel.OP_WRITE;
            } else {
                interestOps &= ~Channel.OP_WRITE;//缓存队列数据量,超过高水位但是低于低水位,意味着低水位>高水位,此时等于没有注册写操作
            }
        } else {//超过高水位counter<=0,意味着当前数据量小于高水位
            int highWaterMark = getConfig().getWriteBufferHighWaterMark();
            if (writeBufferSize >= highWaterMark) {//这里,缓存数据量仍然高于高水位.....并发?按道理说channel的处理是单线程处理的,此时等于注册写操作
                interestOps |= Channel.OP_WRITE;
            } else {
                interestOps &= ~Channel.OP_WRITE;
            }
        }
    } else {
        interestOps &= ~Channel.OP_WRITE;//写队列没数据,没有注册写操作
    }

    return interestOps;
}

       即,如果超过高水位isWritable()==false,低于低水位isWritable()==true,低水位优先级高于高水位,即如果 当前水位>低水位 则不可写,否则可写

        如果在通过netty向某机器写数据,但是写很缓慢,则会导致数据都缓存到netty的发送队列中,如果不做控制,可能会导致full gc/cms gc频繁,甚至最终OOM。所以可以考虑用高水位和低水位的值来控制netty的缓存队列,即用AbstractChannel.isWritable来控制是否继续写,如果AbstractChannel.isWritable==false,则丢弃数据,或者记录发送数据的状态,待后续缓存数据队列水位下降到安全水位后再发送。

  • 大小: 52.3 KB
分享到:
评论

相关推荐

    Netty 3.6.2.Final 稳定版本 含源码

    Netty 3.6.2.Final 稳定版本 含源码

    netty-3.6.2.Final-API文档-中文版.zip

    包含翻译后的API文档:netty-3.6.2.Final-javadoc-API文档-中文(简体)版.zip; Maven坐标:io.netty:netty:3.6.2.Final; 标签:netty、jar包、java、中文文档; 使用方法:解压翻译后的API文档,用浏览器打开...

    netty-3.6.2.Final-API文档-中英对照版.zip

    包含翻译后的API文档:netty-3.6.2.Final-javadoc-API文档-中文(简体)-英语-对照版.zip; Maven坐标:io.netty:netty:3.6.2.Final; 标签:netty、jar包、java、API文档、中英对照版; 使用方法:解压翻译后的API...

    Netty数据转发工具

    通过阅读和学习这些文件,你可以更深入地理解Netty如何处理数据转发,并且可以将这些知识应用到实际项目中。 总的来说,Netty数据转发工具的实现涉及到对Netty框架的深入理解和灵活运用,包括服务器的启动、通道的...

    JAVA netty 获取串口数据并且下发数据

    在Java中,处理串口通信并不像处理网络套接字那样常见,但Netty提供了一种方式来扩展其功能以支持这种通信模式。 首先,我们需要了解Netty中的Channel接口,它是所有I/O操作的核心。在串口通信中,我们需要创建一个...

    基于Netty的Java数据采集软件

    在这个基于Netty的Java数据采集软件中,"DataServer"模块扮演了核心角色,它是数据接收和处理的中心。Netty的ChannelHandler接口用于定义数据的接收和发送逻辑,使得服务器能够灵活地处理各种类型的数据输入。同时,...

    netty服务器解析16进制数据

    分析这个代码,我们可以看到Netty如何创建服务器、设置管道、以及如何定义和使用自定义的解码器和编码器来处理16进制数据。 通过上述步骤,Netty服务器可以轻松地解析16进制数据,从而支持各种网络协议,无论它们是...

    netty-3.6.2.Final.jar

    java运行依赖jar包

    netty websocket通讯接收数据不完整问题

    这是一个java web项目集成了netty websocket的...初始化握手对象时指定了maxFramePayloadLength 的长度、以及通过配置netty内置解码器处理数据半包等方法,均无效。以下是终极解决办法,供大家参考和解决这样的问题。

    java实现基于netty 的udp字节数据接收服务

    // 解析并处理数据 } finally { in.release(); } } ``` 以上是一个简化的示例,实际应用中可能需要考虑更多细节,例如错误处理、多线程同步、数据编码解码等。`udp`和`udpServer`可能是项目中包含的实际代码...

    使用netty实现TCP长链接消息写入kafka以及kafka批量消费数据

    标题"使用netty实现TCP长链接消息写入kafka以及kafka批量消费数据"揭示了这个项目的核心内容:通过Netty接收TCP长连接的数据,并将这些数据存储到Kafka中,同时利用Kafka的批量消费功能对数据进行处理。下面我们将...

    Netty同步等待数据返回实例代码

    在本文中,我们将深入探讨 Netty 中的同步等待数据返回的实例代码,以及这一机制如何在实际应用中工作。 首先,我们要明白在多线程环境下,同步是为了确保多个线程对共享资源的访问是有序且安全的。在 Netty 中,...

    NettySocket同步数据获取实现

    当接收到客户端消息时,我们可以在ChannelInboundHandler的channelRead方法中处理数据,通过ChannelHandlerContext.writeAndFlush()将响应数据写回客户端。 2. **心跳检测**: 心跳检测是确保网络连接活性的重要...

    Netty进制转换乱码问题

    总结,Netty中的进制转换乱码问题通常源于字符编码不一致或处理不当。解决此类问题的关键在于确保数据的发送和接收端都采用相同的字符编码,正确使用ByteBuf的方法进行读写,并可能需要自定义解码逻辑。通过仔细的...

    netty的自己写的文档

    通过以上介绍,我们可以看到Netty的强大之处在于其对NIO的支持以及高度可定制化的事件处理机制。Netty的设计思想强调灵活性和性能,因此广泛应用于各种实时通信场景,如游戏服务器、即时通讯系统等。

    基于netty4 的udp字节数据接收服务

    在本文中,我们将深入探讨如何利用 Netty 4 构建基于 UDP(用户数据报协议)的数据接收服务,以及如何实现相应的发送服务。 首先,UDP 是一种无连接的传输层协议,它不保证数据的顺序或可靠性,但具有较低的开销,...

    Netty发送protoBuf格式数据

    总的来说,Netty发送protoBuf格式数据的过程包括定义数据结构、生成Java类、编写编码解码器、配置Netty管道以及在服务器和客户端之间发送和接收消息。这种结合提供了强大而高效的网络通信解决方案,适用于多种应用...

    Netty 实现scoket 主动推送数据到服务和服务端实现方式

    随着物联网的发展,随之出现了各种传感器监测数据的实时发送,需要和netty服务器通讯,netty和传感器之间需要保持长连接(换句话说,netty和gateway之间都会主动给对方发送消息) 碰到的问题: netty作为服务器端如何...

    基于java netty的udp客户端声呐数据对接

    在"基于Java Netty的UDP客户端声呐数据对接"项目中,我们主要关注如何利用Netty处理UDP通信,以及如何解析和封装SCANFISH-II型声呐系统的数据。 UDP(User Datagram Protocol)是一种无连接的传输层协议,它不保证...

    netty接收串口数据代码,测试串口工具

    java netty接收串口数据 开启windows串口工具 发送串口数据调试助手

Global site tag (gtag.js) - Google Analytics