`
Donald_Draper
  • 浏览: 980964 次
社区版块
存档分类
最新评论

netty 抽象nio消息通道

阅读更多
netty 通道接口定义:http://donald-draper.iteye.com/blog/2392740
netty 抽象通道初始化:http://donald-draper.iteye.com/blog/2392801
netty 抽象Unsafe定义:http://donald-draper.iteye.com/blog/2393053
netty 通道Outbound缓冲区:http://donald-draper.iteye.com/blog/2393098
netty 抽象通道后续:http://donald-draper.iteye.com/blog/2393166
netty 抽象nio通道:http://donald-draper.iteye.com/blog/2393269
netty 抽象nio字节通道:http://donald-draper.iteye.com/blog/2393323
引言
上一篇文章我们看了抽象nio字节通道,先来回顾一下:
写通道Outbound缓冲区,即遍历刷新链上的写请求,如果写请求消息为字节buf,则调用doWriteBytes完成实际数据发送操作,待子类扩展,如果写请求消息为文件Region,调用doWriteFileRegion完成实际数据发送操作,待子类扩展,数据发送,则更新通道的数据发送进度,并从刷新链上移除写请求;如果所有写请求发送完毕,则重新添加写操作事件到选择key兴趣事件集,否则继续刷新通道Outbound缓冲区中的写请求。

nio字节Unsafe读操作,从通道接收缓冲区读取数据,通知通道处理读取数据,触发Channel管道线的fireChannelRead事件,待数据读取完毕,触发Channel管道线的fireChannelReadComplete事件,如果在读数据的过程中,通道关闭,则触发通道输入关闭事件(fireUserEventTriggered),如果在读数据的过程中,发生异常,则读取缓存区中没有读完的数据,并通道通道处理剩余数据。
抽象nio字节通道是面向字节的通道,为Socket通道的父类,
今天我们来看ServerSocket通道的父类AbstractNioMessageChannel,面向消息的通道:
package io.netty.channel.nio;

import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.ServerChannel;

import java.io.IOException;
import java.net.PortUnreachableException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.util.ArrayList;
import java.util.List;

/**
 * {@link AbstractNioChannel} base class for {@link Channel}s that operate on messages.
 */
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
    boolean inputShutdown;//是否关闭输入流

    /**
     * @see AbstractNioChannel#AbstractNioChannel(Channel, SelectableChannel, int)
     */
    protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent, ch, readInterestOp);
    }
}

来看实际写操作:
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    final SelectionKey key = selectionKey();
    final int interestOps = key.interestOps();

    for (;;) {
        Object msg = in.current();
        if (msg == null) {
            // Wrote all messages.
            if ((interestOps & SelectionKey.OP_WRITE) != 0) {
	       //消息已发送完,从选择key兴趣集中移除写操作事件
                key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
            }
            break;
        }
        try {
            boolean done = false;
            for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
	        //写消息
                if (doWriteMessage(msg, in)) {
                    done = true;
                    break;
                }
            }

            if (done) {//如果读完消息,则从通道刷新链上移除写请求
                in.remove();
            } else {
                // Did not write all messages.
                if ((interestOps & SelectionKey.OP_WRITE) == 0) {
		    //消息没发送完,如果需要添加写事件到选择key的兴趣事件集
                    key.interestOps(interestOps | SelectionKey.OP_WRITE);
                }
                break;
            }
        } catch (IOException e) {
            if (continueOnWriteError()) {
	        //如果写异常时需要移除写请求,则移除
                in.remove(e);
            } else {
                throw e;
            }
        }
    }
}

/**
 * Write a message to the underlying {@link java.nio.channels.Channel}.
 *写一个消息到底层通道,待子类扩展
 * @return {@code true} if and only if the message has been written
 */
protected abstract boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception;
/**

 * Returns {@code true} if we should continue the write loop on a write error.
 */
protected boolean continueOnWriteError() {
    return false;
}

从上面可以看出 抽象Nio消息通道,写通道Outbound缓冲区消息,即遍历通道Outbound缓冲区刷新链,当写消息请求为空时,从选择key兴趣集中移除写操作事件,否则,委托doWriteMessage方法,将消息写到底层通道,doWriteMessage方法待子类扩展,写完,将写请求从刷新链上移除,否则,如果需要,添加写事件到选择key的兴趣事件集。

再来看其他方法:
//开始读操作
@Override
protected void doBeginRead() throws Exception {
    if (inputShutdown) {
        return;
    }
    super.doBeginRead();
}
//创建与底层通道交流的Unsafe
@Override
protected AbstractNioUnsafe newUnsafe() {
    return new NioMessageUnsafe();
}


从上面的方法可以看出,实际返回的为nio消息Unsafe,我们来看NioMessageUnsafe,
private final class NioMessageUnsafe extends AbstractNioUnsafe {
    private final List<Object> readBuf = new ArrayList<Object>();
    @Override
    public void read() {
        assert eventLoop().inEventLoop();
	//获取通道配置,Channel管道,接受字节buf分配器Handle
        final ChannelConfig config = config();
        final ChannelPipeline pipeline = pipeline();
        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
        allocHandle.reset(config);
        boolean closed = false;
        Throwable exception = null;
        try {
            try {
                do {
		    //从通道缓冲区读取数据
                    int localRead = doReadMessages(readBuf);
                    if (localRead == 0) {
		       //没有数据可读取
                        break;
                    }
                    if (localRead < 0) {
		       //通道已关闭
                        closed = true;
                        break;
                    }
                    //更新读取消息计数器
                    allocHandle.incMessagesRead(localRead);
                } while (allocHandle.continueReading());
            } catch (Throwable t) {
                exception = t;
            }
            
            int size = readBuf.size();
	    //遍历读取的消息集,通知通道处理消息,即触发管道fireChannelRead事件
            for (int i = 0; i < size; i ++) {
                readPending = false;
                pipeline.fireChannelRead(readBuf.get(i));
            }
            readBuf.clear();
            allocHandle.readComplete();
	    //读取完毕,触发管道fireChannelReadComplete事件
            pipeline.fireChannelReadComplete();

            if (exception != null) {
	        //根据异常判断是否需要,关闭读任务
                closed = closeOnReadError(exception);
                //触发通道fireExceptionCaught事件
                pipeline.fireExceptionCaught(exception);
            }

            if (closed) {
	        //关闭读任务
                inputShutdown = true;
                if (isOpen()) {
                    close(voidPromise());
                }
            }
        } finally {
            // Check if there is a readPending which was not processed yet.
            // This could be for two reasons:
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
            //
            // See https://github.com/netty/netty/issues/2254
            if (!readPending && !config.isAutoRead()) {
	        //如果读任务完毕,且不需自动读,则从选择key兴趣事件集移除读操作事件
                removeReadOp();
            }
        }
    }
}

//AbstractNioMessageChannel

/**
 * Read messages into the given array and return the amount which was read.
 从通道缓冲区读取消息,方法指定的buf集合中,并返回读取的消息数量,待子类扩展
 */
protected abstract int doReadMessages(List<Object> buf) throws Exception;


//判断异常发生时,是否需要关闭读任务
protected boolean closeOnReadError(Throwable cause) {
    // ServerChannel should not be closed even on IOException because it can often continue
    // accepting incoming connections. (e.g. too many open files)
    return cause instanceof IOException &&
            !(cause instanceof PortUnreachableException) &&
            !(this instanceof ServerChannel);
}

从上面可以看出:nio消息Unsafe读操作,从通道接收缓冲区读取数据,
通知通道处理读取数据,触发Channel管道线的fireChannelRead事件,待数据读取完毕,
触发Channel管道线的fireChannelReadComplete事件,如果在读数据的过程中,通道关闭,则
触发通道输入关闭事件(fireUserEventTriggered),如果在读数据的过程中,发生异常,
则触发通道fireExceptionCaught事件,如果读任务完毕,且不需自动读,
则从选择key兴趣事件集移除读操作事件



总结:
抽象Nio消息通道AbstractNioMessageChannel,写通道Outbound缓冲区消息,即遍历通道Outbound缓冲区刷新链,当写消息请求为空时,从选择key兴趣集中移除写操作事件,否则,委托doWriteMessage方法,将消息写到底层通道,doWriteMessage方法待子类扩展,写完,将写请求从刷新链上移除,否则,如果需要,添加写事件到选择key的兴趣事件集。

nio消息Unsafe(NioMessageUnsafe)读操作,从通道接收缓冲区读取数据,通知通道处理读取数据,触发Channel管道线的fireChannelRead事件,待数据读取完毕,触发Channel管道线的fireChannelReadComplete事件,如果在读数据的过程中,通道关闭,则触发通道输入关闭事件(fireUserEventTriggered),如果在读数据的过程中,发生异常,则触发通道fireExceptionCaught事件,如果读任务完毕,且不需自动读,则从选择key兴趣事件集移除读操作事件

0
0
分享到:
评论

相关推荐

    基于netty的nio使用demo源码

    在Netty中,有多种类型的缓冲区,如ByteBuffer、CharBuffer、DoubleBuffer等,它们都是抽象类AbstractBuffer的子类。 3. **选择器(Selector)**:选择器允许单线程检查多个通道上的I/O事件,这样可以高效地处理多...

    Netty实现简单的聊天消息群发功能

    Bootstrap和ServerBootstrap分别用于创建客户端和服务端的连接,Channel是Netty中数据传输的抽象,而EventLoop则负责处理I/O事件。 在实现聊天功能时,我们需要定义两个关键组件:服务器端的处理器和客户端的处理器...

    Socket 之 BIO、NIO、Netty 简单实现

    BIO适合简单的服务,NIO适合处理大量并发连接,而Netty则提供了更高级的抽象,适用于复杂、高性能的应用。通过阅读《Socket 之 BIO、NIO、Netty 简单实现》的博客,你可以了解如何在Java中实现这些通信模型,从而...

    Java-NIO-Netty框架学习

    Java NIO (Non-blocking Input/Output) 是Java平台中用于高效处理I/O操作的一种机制,它与传统的IO模型( Blocking I/O)相比,提供了更高级别的抽象,允许应用程序以非阻塞的方式读写数据,提高了并发性能。Netty是...

    自己手写nio和netty,不建议下载

    NIO(Non-blocking Input/Output)是Java提供的一种...总之,NIO是一种改进的I/O模型,它提高了处理并发连接的能力,而Netty是建立在NIO之上的网络通信框架,提供了更高级别的抽象和优化,使得网络编程更为便捷高效。

    Java_NIO框架Netty教程.pdf

    通过上述简单的服务端和客户端示例,我们可以看出Netty框架在服务器和客户端的代码结构上保持了一致性,都利用了ChannelHandler来处理事件,并且Netty通过一系列的接口和抽象类来提供灵活的编程模型。 Netty还提供...

    深入浅出Netty_netty_

    Netty的NIO(非阻塞I/O)模型使得它在处理高并发场景下表现出色。 Netty的Channel和Pipeline是两个核心组件。Channel是网络连接的抽象,它可以是TCP、UDP或者任何其他类型的I/O连接。Pipeline则是一个处理链,每个...

    NIO框架netty

    Netty源于Java NIO(Non-blocking I/O)技术,它提供了更高级别的抽象,使得开发者可以更加便捷地处理网络通信。在Java世界里,Netty因其高效、稳定和丰富的功能而备受青睐,被广泛应用于各种分布式系统、云计算平台...

    Netty应用说明笔记

    2. 技能要求降低:Netty抽象了底层的NIO细节,使得开发者不需要深入理解操作系统层面的多路复用IO和Java NIO的复杂性。 3. 高效可靠:Netty提供了心跳机制、断线重连、半包读写等高级特性,增强了网络应用的稳定性和...

    nio+bio+netty+fx.zip

    Netty基于NIO,但提供了更高级别的抽象,如Channel、EventLoopGroup和ByteBuf等,使得编写高性能、可扩展的网络应用变得更加容易。Netty支持多种编解码器,可以方便地处理各种协议,如HTTP、FTP、WebSocket等。 **4...

    BIO,NIO,AIO,Netty面试题

    Netty基于NIO,但提供了更高层次的抽象,使得编写网络应用变得简单。它提供了丰富的组件,如编码解码器、心跳机制、线程模型等,减少了开发者的编程负担,提高了系统的稳定性。 面试中可能会涉及到的问题包括: 1. ...

    netty-all-4.1.29.Final-sources.jar 最新版netty源码

    Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programm ...

    netty框架图及netty面试知识点解析

    Netty基于NIO(非阻塞I/O)模型,它提供了高度抽象的API,简化了网络编程。在Netty中,I/O操作被封装在Channel接口中,而ChannelHandler则用于处理I/O事件和数据。事件循环(EventLoop)是Netty处理事件的关键组件,...

    netty in action 中文版 高清带目录 来个最便宜的

    Netty的网络编程模型是建立在NIO基础上的,NIO是一种通过选择器(Selectors)来实现单线程处理多个通道的机制,它与传统的IO模型(BIO)形成对比。BIO模型使用一个单独的线程来处理每个连接,当连接数量增多时,会...

    Netty-讲义.zip

    NIO 相比传统的 IO,提供了选择器(Selector)和通道(Channel)等机制,允许在一个线程中处理多个连接,从而提高了服务器的并发能力。文件可能会介绍如何在 Netty 中使用 Selectors 监听多个 Channel 的事件,以及...

    nio、bio、netty的一些案例

    Netty是一个高性能、异步事件驱动的网络应用框架,它基于NIO构建,但提供了更高级别的抽象,简化了网络编程。Netty的反应器模式(Reactor)设计使得它能有效地处理并发连接,同时提供了易于使用的API,降低了开发...

    netty-netty-4.1.96.Final.tar.gz 官网最新版Netty Project

    Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programm ing

    Netty权威指南第二版源代码

    Channel是Netty中的基本I/O抽象,负责读写数据;Pipeline则是一系列处理器的链,用于处理进出的数据;EventLoop是执行事件循环的地方,负责调度和处理事件;Bootstrap则是用来创建和初始化服务器或客户端连接的工具...

Global site tag (gtag.js) - Google Analytics