`
Donald_Draper
  • 浏览: 978844 次
社区版块
存档分类
最新评论

netty 抽象nio字节通道

阅读更多
netty 通道接口定义:http://donald-draper.iteye.com/blog/2392740
netty 抽象通道初始化:http://donald-draper.iteye.com/blog/2392801
netty 抽象Unsafe定义:http://donald-draper.iteye.com/blog/2393053
netty 通道Outbound缓冲区:http://donald-draper.iteye.com/blog/2393098
netty 抽象通道后续:http://donald-draper.iteye.com/blog/2393166
netty 抽象nio通道:http://donald-draper.iteye.com/blog/2393269
引言
前一篇文章,我们看了抽象nio通道,先来回顾一下:
抽象nio通道AbstractNioChannel内部关联一个可选择通道(SelectableChannel)和一个选择key(selectionKey)。抽象Nio通道构造,主要是初始化通道并配置为非阻塞模式。

注册doRegister工作主要是,注册可选择通道到通道所在事件循环的选择器中。反注册doDeregister,委托给事件循环,取消选择key,即从事件循环关联选择器的选择key集合中移除当前选择key。开始读操作doBeginRead,实际工作为将读操作事件,添加选择key的兴趣事件集

抽象nioUnsafe为特殊的Unsafe,允许访问底层的选择通道。选择通道方法返回的实际为抽象nio通道内部的底层可选择通道。移除读兴趣事件removeReadOp,即从选择key兴趣事件集中,移除读操作事件。连接操作,将实际连接操作委托给doConnect,待子类实现,如果连接成功,则通知异步任务连接成功,如果是第一次连接,则触发通道的激活事件fireChannelActive。完成连接操作,实际工作委托给抽象Nio通道的doFinishConnect方法,待子类实现,完成后更新任务结果,触发通道的激活事件fireChannelActive,如果出现异常,则更新连接任务为异常失败。

今天我们来看一下抽象nio字节通道AbstractNioByteChannel
package io.netty.channel.nio;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.FileRegion;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.socket.ChannelInputShutdownReadComplete;
import io.netty.util.internal.StringUtil;

import java.io.IOException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;

/**
 * {@link AbstractNioChannel} base class for {@link Channel}s that operate on bytes.
 */
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
    private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);//通道元数据
    private static final String EXPECTED_TYPES =
            " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
            StringUtil.simpleClassName(FileRegion.class) + ')';

    private Runnable flushTask;//刷新任务
     /**
     * Create a new instance
     *创建nio字节通道
     * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}
     * @param ch                the underlying {@link SelectableChannel} on which it operates
     */
    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
        //读操作事件为SelectionKey.OP_READ
        super(parent, ch, SelectionKey.OP_READ);
    }
}

来看实际写操作:

@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    int writeSpinCount = -1;//写操作自旋此数计数器

    boolean setOpWrite = false;
    for (;;) {
        //获取当前通道Outbound缓冲区中的当前待刷新消息
        Object msg = in.current();
        if (msg == null) {
            // Wrote all messages.
	    //清除通道选择key兴趣事件集中写操作事件
            clearOpWrite();
            // Directly return here so incompleteWrite(...) is not called.
            return;
        }

        if (msg instanceof ByteBuf) {//如果为字节buf
            ByteBuf buf = (ByteBuf) msg;
            int readableBytes = buf.readableBytes();
            if (readableBytes == 0) {
	        //可读字节数为0,则从通道Outbound缓存区中移除写请求
                in.remove();
                continue;
            }

            boolean done = false;
            long flushedAmount = 0;
            if (writeSpinCount == -1) {
	        //获取通道配置的写请求自旋次数
                writeSpinCount = config().getWriteSpinCount();
            }
            for (int i = writeSpinCount - 1; i >= 0; i --) {
	        //实际发送字节数据
                int localFlushedAmount = doWriteBytes(buf);
                if (localFlushedAmount == 0) {
		    //没有字节要发送
                    setOpWrite = true;
                    break;
                }
                //更新刷新字节计数器
                flushedAmount += localFlushedAmount;
                if (!buf.isReadable()) {
		    //如果buf不可读,则数据发送完毕
                    done = true;
                    break;
                }
            }
            //报告字节数据发送进度
            in.progress(flushedAmount);

            if (done) {
	        //发送完,则移除写请求
                in.remove();
            } else {
                // Break the loop and so incompleteWrite(...) is called.
                break;
            }
        } else if (msg instanceof FileRegion) {
	    //如果是文件Region消息
            FileRegion region = (FileRegion) msg;
            boolean done = region.transferred() >= region.count();
            if (!done) {
                long flushedAmount = 0;
                if (writeSpinCount == -1) {
                    writeSpinCount = config().getWriteSpinCount();
                }
                for (int i = writeSpinCount - 1; i >= 0; i--) {
		   //完成实际写操作
                    long localFlushedAmount = doWriteFileRegion(region);
                    if (localFlushedAmount == 0) {
                        setOpWrite = true;
                        break;
                    }
                    //更新刷新字节计数器
                    flushedAmount += localFlushedAmount;
                    if (region.transferred() >= region.count()) {
                        done = true;
                        break;
                    }
                }
                //更新数据发送进度
                in.progress(flushedAmount);
            }

            if (done) {
	        //发送完,则移除写请求
                in.remove();
            } else {
                // Break the loop and so incompleteWrite(...) is called.
                break;
            }
        } else {
            // Should not reach here.
            throw new Error();
        }
    }
    //完成写后继任务
    incompleteWrite(setOpWrite);
}

/**
 * Write a {@link FileRegion}
 *写一个文件region,待子类扩展
 * @param region        the {@link FileRegion} from which the bytes should be written
 * @return amount       the amount of written bytes
 */
protected abstract long doWriteFileRegion(FileRegion region) throws Exception;

/**
 * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
 写一个字节buf,待子类扩展
 * @param buf           the {@link ByteBuf} from which the bytes should be written
 * @return amount       the amount of written bytes
 */
protected abstract int doWriteBytes(ByteBuf buf) throws Exception;


//完成写后继任务
protected final void incompleteWrite(boolean setOpWrite) {
    // Did not write completely.
    if (setOpWrite) {
        //发送数据完成,则重新添加写操作事件到选择key兴趣事件集
        setOpWrite();
    } else {
        //否则,继续刷新通道Outbound缓冲区中的写请求
        // Schedule flush again later so other tasks can be picked up in the meantime
        Runnable flushTask = this.flushTask;
        if (flushTask == null) {
            flushTask = this.flushTask = new Runnable() {
                @Override
                public void run() {
                    flush();
                }
            };
        }
        eventLoop().execute(flushTask);
    }
}
//添加写操作事件到选择key兴趣事件集
protected final void setOpWrite() {
    final SelectionKey key = selectionKey();
    // Check first if the key is still valid as it may be canceled as part of the deregistration
    // from the EventLoop
    // See https://github.com/netty/netty/issues/2104
    if (!key.isValid()) {
        return;
    }
    final int interestOps = key.interestOps();
    if ((interestOps & SelectionKey.OP_WRITE) == 0) {
        key.interestOps(interestOps | SelectionKey.OP_WRITE);
    }
}
//清除通道选择key兴趣事件集中写操作事件
protected final void clearOpWrite() {
    final SelectionKey key = selectionKey();
    // Check first if the key is still valid as it may be canceled as part of the deregistration
    // from the EventLoop
    // See https://github.com/netty/netty/issues/2104
    if (!key.isValid()) {
        return;
    }
    final int interestOps = key.interestOps();
    if ((interestOps & SelectionKey.OP_WRITE) != 0) {
        key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
    }
}

从上面可以看出,写通道Outbound缓冲区,即遍历刷新链上的写请求,如果写请求消息为
字节buf,则调用doWriteBytes完成实际数据发送操作,待子类扩展,如果写请求消息为文件Region,调用doWriteFileRegion完成实际数据发送操作,待子类扩展,数据发送,则更新通道的数据发送进度,并从刷新链上移除写请求;如果所有写请求发送完毕,则重新添加写操作事件到选择key兴趣事件集,否则继续刷新通道Outbound缓冲区中的写请求。

//过滤消息
@Override
protected final Object filterOutboundMessage(Object msg) {
    if (msg instanceof ByteBuf) {
        ByteBuf buf = (ByteBuf) msg;
	//如果消息为direct buf,直接返回消息
        if (buf.isDirect()) {
            return msg;
        }
        //否则包装buf为direct buf,这个newDirectBuffer方法在上一篇文章中以说
        return newDirectBuffer(buf);
    }

    if (msg instanceof FileRegion) {
        //如果是文件Region直接返回
        return msg;
    }

    throw new UnsupportedOperationException(
            "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}


再看其他方法:
//获取通道元数据
@Override
public ChannelMetadata metadata() {
    return METADATA;
}
/**
 * Shutdown the input side of the channel.
 关闭通道输入流
 */
protected abstract ChannelFuture shutdownInput();
//通道是否关闭输入流
protected boolean isInputShutdown0() {
    return false;
}
//创建底层通道操作类Unsafe
@Override
protected AbstractNioUnsafe newUnsafe() {
    return new NioByteUnsafe();
}

从newUnsafe方法,返回的为NioByteUnsafe
我们来看nio字节Unsafe:
protected class NioByteUnsafe extends AbstractNioUnsafe {
    //读操作
    @Override
    public final void read() {
        //获取通道配置,Channel管道,字节buf分配器,接受字节buf分配器Handle
        final ChannelConfig config = config();
        final ChannelPipeline pipeline = pipeline();
        final ByteBufAllocator allocator = config.getAllocator();
        final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
        allocHandle.reset(config);

        ByteBuf byteBuf = null;
        boolean close = false;
        try {
            do {
	        //分配一个字节buf
                byteBuf = allocHandle.allocate(allocator);
		//读取通道接收缓冲区数据
                allocHandle.lastBytesRead(doReadBytes(byteBuf));
                if (allocHandle.lastBytesRead() <= 0) {
                    // nothing was read. release the buffer.
		    //没有数据可读,则释放buf
                    byteBuf.release();
                    byteBuf = null;
                    close = allocHandle.lastBytesRead() < 0;
                    break;
                }
                //更新读取消息计数器
                allocHandle.incMessagesRead(1);
                readPending = false;
		//通知通道处理读取数据,触发Channel管道线的fireChannelRead事件
                pipeline.fireChannelRead(byteBuf);
                byteBuf = null;
            } while (allocHandle.continueReading());
            //读取操作完毕
            allocHandle.readComplete();
	    //触发Channel管道线的fireChannelReadComplete事件
            pipeline.fireChannelReadComplete();

            if (close) {
	        //如果通道关闭,关闭读操作
                closeOnRead(pipeline);
            }
        } catch (Throwable t) {
	    //处理读操作异常
            handleReadException(pipeline, byteBuf, t, close, allocHandle);
        } finally {
            // Check if there is a readPending which was not processed yet.
            // This could be for two reasons:
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
            //
            // See https://github.com/netty/netty/issues/2254
            if (!readPending && !config.isAutoRead()) {
	        //读操作完毕,且没有配置自动读,则从选择key兴趣集中移除读操作事件
                removeReadOp();
            }
        }
    }
     //关闭读操作
    private void closeOnRead(ChannelPipeline pipeline) {
        if (!isInputShutdown0()) {
            if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
	        //关闭通道输入流
                shutdownInput();
		//触发通道输入关闭事件
                pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
            } else {
                close(voidPromise());
            }
        } else {
	    //触发通道输入关闭没有数据可读取事件
            pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
        }
    }
    //处理读操作异常
     private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
            RecvByteBufAllocator.Handle allocHandle) {
        if (byteBuf != null) {
            if (byteBuf.isReadable()) {
                readPending = false;
		//字节buf中还有数据没处理,则继续处理
                pipeline.fireChannelRead(byteBuf);
            } else {
                byteBuf.release();
            }
        }
	//读取操作完毕,触发Channel管道线的fireChannelReadComplete事件及fireExceptionCaught事件
        allocHandle.readComplete();
        pipeline.fireChannelReadComplete();
        pipeline.fireExceptionCaught(cause);
        if (close || cause instanceof IOException) { 
	   //如果需要,关闭读操作
            closeOnRead(pipeline);
        }
    } 
}

从上面可以看出,nio字节Unsafe读操作,从通道接收缓冲区读取数据,
通知通道处理读取数据,触发Channel管道线的fireChannelRead事件,待数据读取完毕,
触发Channel管道线的fireChannelReadComplete事件,如果在读数据的过程中,通道关闭,则
触发通道输入关闭事件(fireUserEventTriggered),如果在读数据的过程中,发生异常,则
读取缓存区中没有读完的数据,并通道通道处理剩余数据。

总结:

写通道Outbound缓冲区,即遍历刷新链上的写请求,如果写请求消息为字节buf,则调用doWriteBytes完成实际数据发送操作,待子类扩展,如果写请求消息为文件Region,调用doWriteFileRegion完成实际数据发送操作,待子类扩展,数据发送,则更新通道的数据发送进度,并从刷新链上移除写请求;如果所有写请求发送完毕,则重新添加写操作事件到选择key兴趣事件集,否则继续刷新通道Outbound缓冲区中的写请求。

nio字节Unsafe读操作,从通道接收缓冲区读取数据,通知通道处理读取数据,触发Channel管道线的fireChannelRead事件,待数据读取完毕,触发Channel管道线的fireChannelReadComplete事件,如果在读数据的过程中,通道关闭,则触发通道输入关闭事件(fireUserEventTriggered),如果在读数据的过程中,发生异常,则读取缓存区中没有读完的数据,并通道通道处理剩余数据。


附:

//ChannelMetadata
/**
 * Represents the properties of a {@link Channel} implementation.
 */
public final class ChannelMetadata {

    private final boolean hasDisconnect;//是否基于无连接的通信,fasle-》TCP,true-》UDP
    private final int defaultMaxMessagesPerRead;//每次读取,允许读取的最大消息数
    /**
     * Create a new instance
     *
     * @param hasDisconnect     {@code true} if and only if the channel has the {@code disconnect()} operation
     *                          that allows a user to disconnect and then call {@link Channel#connect(SocketAddress)}
     *                          again, such as UDP/IP.
     * @param defaultMaxMessagesPerRead If a {@link MaxMessagesRecvByteBufAllocator} is in use, then this value will be
     * set for {@link MaxMessagesRecvByteBufAllocator#maxMessagesPerRead()}. Must be {@code > 0}.
     */
    public ChannelMetadata(boolean hasDisconnect, int defaultMaxMessagesPerRead) {
        if (defaultMaxMessagesPerRead <= 0) {
            throw new IllegalArgumentException("defaultMaxMessagesPerRead: " + defaultMaxMessagesPerRead +
                                               " (expected > 0)");
        }
        this.hasDisconnect = hasDisconnect;
        this.defaultMaxMessagesPerRead = defaultMaxMessagesPerRead;
    }
    ...
}

//ChannelInputShutdownReadComplete
package io.netty.channel.socket;

/**
 * User event that signifies the channel's input side is shutdown, 
 and we tried to shut it down again. This typically
 * indicates that there is no more data to read.
 */
public final class ChannelInputShutdownReadComplete {
    public static final ChannelInputShutdownReadComplete INSTANCE = new ChannelInputShutdownReadComplete();

    private ChannelInputShutdownReadComplete() {
    }
}


//ChannelInputShutdownEvent
package io.netty.channel.socket;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;

/**
 * Special event which will be fired and passed to the
 * {@link ChannelInboundHandler#userEventTriggered(ChannelHandlerContext, Object)} methods once the input of
 * a {@link SocketChannel} was shutdown and the {@link SocketChannelConfig#isAllowHalfClosure()} method returns
 * {@code true}.
 */
public final class ChannelInputShutdownEvent {

    /**
     * Instance to use
     */
    @SuppressWarnings("InstantiationOfUtilityClass")
    public static final ChannelInputShutdownEvent INSTANCE = new ChannelInputShutdownEvent();

    private ChannelInputShutdownEvent() { }
}
0
0
分享到:
评论

相关推荐

    Java_NIO框架Netty教程.pdf

    通过上述简单的服务端和客户端示例,我们可以看出Netty框架在服务器和客户端的代码结构上保持了一致性,都利用了ChannelHandler来处理事件,并且Netty通过一系列的接口和抽象类来提供灵活的编程模型。 Netty还提供...

    netty-dome保证可以运行

    1. **Channel**: 在 Netty 中,Channel 是连接到某个网络端点(如套接字)的抽象表示。它是网络 I/O 操作的基础,你可以通过 Channel 进行读写操作。 2. **EventLoop 和 EventLoopGroup**: EventLoop 是执行任务和...

    netty4中文用户手册

    Netty的ByteBuf是一个灵活的可扩展的缓冲区,其抽象了字节序列并支持自动扩容以及内存回收等优化操作,显著降低了内存泄漏的风险。 Netty4用户手册还介绍了如何使用Netty实现基础的网络应用,例如抛弃服务器、应答...

    netty官网学习手册中文版

    - **NIO(Non-blocking I/O)**:Netty基于Java NIO构建,实现了非阻塞的I/O模型,使得在高并发场景下性能优异。 - **Channel与Pipeline**:Channel是网络连接的抽象,负责读写数据;Pipeline则是一系列处理I/O...

    netty实战教程、netty代码demo

    Netty 是由 JBoss 组织开源的一个网络通信框架,基于 Java NIO(非阻塞I/O)构建,提供了一套高度抽象和优化的 API,使得开发者可以轻松地处理网络连接的各种复杂情况。Netty 提供了多种传输类型,如 TCP、UDP 以及...

    netty开发工具包

    Netty 的核心组件包括EventLoop(事件循环)、Channel(通道)、Handler(处理器)等,它们协同工作,实现了事件驱动的网络通信。 1. **EventLoop**:Netty 的事件循环是线程模型的基础,它负责接收和处理I/O事件。...

    Netty权威指南-Netty源码

    Netty 的核心组件包括:ByteBuf(字节缓冲区)、Channel(通道)、EventLoop(事件循环)、Pipeline(处理链)以及Handler(处理器)。ByteBuf 提供了一种高效的方式来处理网络数据,避免了 Java 原生字节数组操作中...

    Netty使用初步

    通道是网络连接的抽象,可以代表TCP连接、UDP套接字或者本地进程间通信。事件处理器则是处理与通道相关的事件,如读取、写入、连接和关闭等。Netty通过责任链模式来组织这些处理器,形成了所谓的“处理器管道...

    netty4.0源码,netty例子,netty api文档

    1. **异步事件驱动**:Netty基于非阻塞I/O模型,利用Java NIO库,实现了高效的网络通信。通过事件循环(EventLoop)和事件处理器(ChannelHandler),Netty能够处理大量并发连接,显著提高系统的吞吐量。 2. **...

    基于JAVA IO, NIO, Netty, 多线程并发实战源码.zip

    Netty提供了一种更高级别的抽象,使得网络编程变得更加简单和灵活。 **多线程并发**: 多线程并发是Java的核心特性之一,允许程序同时执行多个任务。通过使用Thread类或实现Runnable接口,可以创建并启动新的线程。...

    BIO,NIO,AIO,Netty面试题 35道1

    Netty是一个基于NIO的网络编程框架,提供了高度优化的异步事件驱动网络应用程序框架,用于快速开发可维护的高性能服务器和客户端。Netty简化了NIO的复杂性,使得开发者可以更容易地编写出高效、可靠的网络应用。 ...

    Netty权威指南源代码

    3. **ByteBuf**:Netty自定义的字节缓冲区,相比于Java的`ByteBuffer`,它提供了更高效且方便的操作方式,如预读、预写等,避免了不必要的内存复制。 4. **编码与解码**:Netty提供了丰富的编解码器,如...

    netty-4.1_javaNetty_netty_服务器_

    2. **Channel**:在 Netty 中,Channel 是连接到某个网络端点的抽象,它可以读写数据。例如,Socket 或者 UDP 的 DatagramChannel。 3. **Pipeline**:ChannelHandler 组成的链路,用于处理入站和出站事件。这使得 ...

    netty-4.1.4-jar包

    4. **ByteBuf**:Netty提供的高效字节缓冲区,提供了比Java的ByteBuffer更易用和高效的API,支持零拷贝和内存池管理。 5. **Decoder** 和 **Encoder**:用于在网络数据和应用对象之间进行编码和解码,例如将HTTP请求...

    Netty分享课件PPT

    Netty基于NIO进行了优化和封装,提供了更高级别的抽象,使得网络编程变得更加简单。Netty中的关键概念包括: 1. **Channel**:Netty的网络操作抽象类,代表一个到另一端点的连接。它支持多种I/O操作,如绑定、连接...

    netty4.1.source_code.zip

    Netty 提供了一种简化网络编程的方式,通过提供高级抽象层,如 ByteBuf(用于高效的数据处理)、Channel(表示网络连接)和 ChannelHandlerContext(作为处理事件和调用处理程序的方法)。这些组件使得开发者可以...

    netty 4.1中文.CHM

    1. **ByteBuf**:这是Netty的核心数据结构,用于高效地处理网络I/O中的字节流。它提供了一种灵活且高性能的方式来存储和操作二进制数据,支持零拷贝,减少了不必要的内存复制,提高了性能。 2. **Zero-Copy**:...

    netty学习教程

    - ByteBuf:Netty自定义的字节缓冲区,比Java NIO的ByteBuffer更易用且性能优秀。 2. **Netty的Channel和Pipeline** - Channel:Netty中的网络连接抽象,负责读写数据。 - ChannelHandlerContext:在Pipeline中...

    netty技术文档,socket技术第七章

    Netty 在 Socket 基础上提供了更高级别的抽象,使得开发者可以更高效地处理网络数据流。 在 Netty 中,"第七章:编解码器Codec" 是一个关键部分,主要讨论了如何处理网络通信中的数据编码与解码问题。编解码器是 ...

    netty4 api 中文

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。...对于希望学习Netty的开发者,建议先熟悉Java NIO基础,然后结合这个中文API文档,逐步探索Netty的高级特性。

Global site tag (gtag.js) - Google Analytics