`
Donald_Draper
  • 浏览: 985134 次
社区版块
存档分类
最新评论

netty 抽象通道后续

阅读更多
netty Inboudn/Outbound通道Invoker:http://donald-draper.iteye.com/blog/2388233
netty 通道接口定义:http://donald-draper.iteye.com/blog/2392740
netty 抽象通道初始化:http://donald-draper.iteye.com/blog/2392801
netty 抽象Unsafe定义:http://donald-draper.iteye.com/blog/2393053
netty 通道Outbound缓冲区:http://donald-draper.iteye.com/blog/2393098
引言:
前一篇文章我们看了通道Outbound缓存区ChannelOutboundBuffer ,先来回顾一下:
通道Outbound缓存区内部关联一个通道,同时有一个线程本地buf数组,一个未刷新的buf链表和一个刷新buf链表。通道写消息时,消息将会被包装成写请求Entry。

添加消息到通道Outbound缓冲区,首先包装消息为写请求Entry,将写请求Entry添加到未刷新写请求链表上,并更新通道当前待发送的字节数据,如果通道待发送的字节数大于通道写bufsize,则更新通道写状态,并触发ChannelWritabilityChanged事件。触发事件实际操作委托给通道的Channel管道。

添加刷新操作,即遍历未刷新写请求链表,将写请求添加到刷新链表中,如果写请求取消,则更新通道待发送字节数,如果待发送字节数消息,小于通道配置的写buf size,则更新通道可写状态。

移除操作,主要是从刷新写请求链移除链头写请求,并则释放写请求消息,更新写请求任务结果,当前通道待发送字节数和可写状态,并触发相应的事件

从刷新写请求链表,移除writtenBytes个字节数方法removeBytes,自旋,直至从刷新链中移除writtenBytes个字节数,如果链头消息的可读字节数小于writtenBytes,则移除写请求Entry,否则更新writtenBytes,继续从刷新链中的写请求消息中移除writtenBytes个字节数。

将刷新链上的写请求消息,添加到nio buffer数组中方法nioBuffers,主要是将刷新链上的写请求消息包装成direct buf添加到通道Outbound缓存区的nio buf数组中,这个方法主要在NioSocketChannel#doWrite方法重用。方法调用后,#nioBufferCount和#nioBufferSize,将返回当前nio buf数组的长度和可读字节数。

看完了抽象Unsafe定义和通道Outbound缓冲区,今天我们回到抽象通道,接着抽象通道初始化,继续看抽象通道的其他方法

先把抽象通道的变量声明贴出来及构造,以便理解:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {  
  
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);  
  
    private static final ClosedChannelException FLUSH0_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(  
            new ClosedChannelException(), AbstractUnsafe.class, "flush0()");//flush0方法调用时,通道关闭异常  
    private static final ClosedChannelException ENSURE_OPEN_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(  
            new ClosedChannelException(), AbstractUnsafe.class, "ensureOpen(...)");//确保通道打开方法调用时,通道关闭异常  
    private static final ClosedChannelException CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(  
            new ClosedChannelException(), AbstractUnsafe.class, "close(...)");//close方法调用时,通道关闭异常  
    private static final ClosedChannelException WRITE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(  
            new ClosedChannelException(), AbstractUnsafe.class, "write(...)");//write方法调用时,通道关闭异常  
    private static final NotYetConnectedException FLUSH0_NOT_YET_CONNECTED_EXCEPTION = ThrowableUtil.unknownStackTrace(  
            new NotYetConnectedException(), AbstractUnsafe.class, "flush0()");//flush0方法调用时,通道还未连接异常  
  
    private final Channel parent;//所属通道  
    private final ChannelId id;//通道id  
    private final Unsafe unsafe;//硬件底层操作类  
    private final DefaultChannelPipeline pipeline;//Channel管道  
    private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);//空异步任务  
    private final CloseFuture closeFuture = new CloseFuture(this);//异步关闭任务  
  
    private volatile SocketAddress localAddress;//本地socket地址  
    private volatile SocketAddress remoteAddress;//远端socket地址  
    private volatile EventLoop eventLoop;//通道注册的事件循环  
    private volatile boolean registered;//是否注册  
    /** Cache for the string representation of this channel */  
    private boolean strValActive;  
    private String strVal;  
    ...  
        /** 
     * Creates a new instance. 
     * 
     * @param parent 
     *        the parent of this channel. {@code null} if there's no parent. 
     */  
    protected AbstractChannel(Channel parent) {  
        this.parent = parent;  
        //创建通道id  
        id = newId();  
        //创建底层操作类unsafe  
        unsafe = newUnsafe();  
        //新建Channel管道  
        pipeline = newChannelPipeline();  
    }  
      
    /** 
     * Creates a new instance. 
     * 
     * @param parent 
     *        the parent of this channel. {@code null} if there's no parent. 
     */  
    protected AbstractChannel(Channel parent, ChannelId id) {  
        this.parent = parent;  
        this.id = id;  
        unsafe = newUnsafe();  
        pipeline = newChannelPipeline();  
    }  
}  

上面抽象通道的变量和构造有单独的文章讲述,这里不再说:
来看其他方法
//判断通道是否可写
@Override
public boolean isWritable() {
    //如果unsafe关联的通道Outbound 缓冲区不为空,且可写返回true
    ChannelOutboundBuffer buf = unsafe.outboundBuffer();
    return buf != null && buf.isWritable();
}

//直到通道可写前,通道Outbound 缓冲区的字节数。如果通道不可写,则返回0 
@Override
public long bytesBeforeUnwritable() {
    ChannelOutboundBuffer buf = unsafe.outboundBuffer();
    // isWritable() is currently assuming if there is no outboundBuffer then the channel is not writable.
    // We should be consistent with that here.
    return buf != null ? buf.bytesBeforeUnwritable() : 0;
}
//获取直到通道可写,通道底层buf有多少字节数据需要发送。如果可写返回0
@Override
public long bytesBeforeWritable() {
    ChannelOutboundBuffer buf = unsafe.outboundBuffer();
    // isWritable() is currently assuming if there is no outboundBuffer then the channel is not writable.
    // We should be consistent with that here.
    return buf != null ? buf.bytesBeforeWritable() : Long.MAX_VALUE;
}
//获取所属通道
@Override
public Channel parent() {
    return parent;
}
//获取通道内部Channel管道
@Override
public ChannelPipeline pipeline() {
    return pipeline;
}
//获取通道字节buf分配器
@Override
public ByteBufAllocator alloc() {
    return config().getAllocator();
}
//获取通道所在的事件循环
@Override
public EventLoop eventLoop() {
    EventLoop eventLoop = this.eventLoop;
    if (eventLoop == null) {
        throw new IllegalStateException("channel not registered to an event loop");
    }
    return eventLoop;
}
//获取通道本地地址
@Override
public SocketAddress localAddress() {
    SocketAddress localAddress = this.localAddress;
    if (localAddress == null) {
        try {
            this.localAddress = localAddress = unsafe().localAddress();
        } catch (Throwable t) {
            // Sometimes fails on a closed socket in Windows.
            return null;
        }
    }
    return localAddress;
}

/**
 * @deprecated no use-case for this.
 */
@Deprecated
protected void invalidateLocalAddress() {
    localAddress = null;
}
//获取远端socket地址
@Override
public SocketAddress remoteAddress() {
    SocketAddress remoteAddress = this.remoteAddress;
    if (remoteAddress == null) {
        try {
            this.remoteAddress = remoteAddress = unsafe().remoteAddress();
        } catch (Throwable t) {
            // Sometimes fails on a closed socket in Windows.
            return null;
        }
    }
    return remoteAddress;
}

/**
 * @deprecated no use-case for this.
 */
@Deprecated
protected void invalidateRemoteAddress() {
    remoteAddress = null;
}
//判断通道是否注册到事件循环
@Override
public boolean isRegistered() {
    return registered;
}
//绑定本地socket地址
@Override
public ChannelFuture bind(SocketAddress localAddress) {
    return pipeline.bind(localAddress);
}
//连接远端socket地址
@Override
public ChannelFuture connect(SocketAddress remoteAddress) {
    return pipeline.connect(remoteAddress);
}
//上面两个方法的合体
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
    return pipeline.connect(remoteAddress, localAddress);
}
//断开通道连接
@Override
public ChannelFuture disconnect() {
    return pipeline.disconnect();
}
//关闭通道
@Override
public ChannelFuture close() {
    return pipeline.close();
}
//从事件循环反注册
@Override
public ChannelFuture deregister() {
    return pipeline.deregister();
}
//刷新通道
@Override
public Channel flush() {
    pipeline.flush();
    return this;
}
下面几个方法与上面不同是,带了一个异步任务结果,在操作完成时,通道任务结果
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return pipeline.bind(localAddress, promise);
}

@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return pipeline.connect(remoteAddress, promise);
}

@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
    return pipeline.connect(remoteAddress, localAddress, promise);
}

@Override
public ChannelFuture disconnect(ChannelPromise promise) {
    return pipeline.disconnect(promise);
}

@Override
public ChannelFuture close(ChannelPromise promise) {
    return pipeline.close(promise);
}

@Override
public ChannelFuture deregister(ChannelPromise promise) {
    return pipeline.deregister(promise);
}
//通道读操作
@Override
public Channel read() {
    pipeline.read();
    return this;
}
//写消息
@Override
public ChannelFuture write(Object msg) {
    return pipeline.write(msg);
}

@Override
public ChannelFuture write(Object msg, ChannelPromise promise) {
    return pipeline.write(msg, promise);
}
//写并刷新消息
@Override
public ChannelFuture writeAndFlush(Object msg) {
    return pipeline.writeAndFlush(msg);
}

@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
    return pipeline.writeAndFlush(msg, promise);
}
//创建异步可写任务
@Override
public ChannelPromise newPromise() {
    return pipeline.newPromise();
}
//创建异步可写进度任务
@Override
public ChannelProgressivePromise newProgressivePromise() {
    return pipeline.newProgressivePromise();
}
//创建已经成功的任务
@Override
public ChannelFuture newSucceededFuture() {
    return pipeline.newSucceededFuture();
}
//创建已经失败的任务
@Override
public ChannelFuture newFailedFuture(Throwable cause) {
    return pipeline.newFailedFuture(cause);
}
//获取关闭异步任务
@Override
public ChannelFuture closeFuture() {
    return closeFuture;
}
//获取通道unsafe
@Override
public Unsafe unsafe() {
    return unsafe;
}
//创建空异步任务
 @Override
public final ChannelPromise voidPromise() {
    return pipeline.voidPromise();
}

在上面我们看到好多方法时委托给通道的Channel管道,这是因为:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {


public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {


即通道实际上是一个ChannelOutboundInvoker,当通道相关操作发生时,触发通道相关操作事件。

从上面可以看出,通道的绑定操作、连接,写消息,读操作,刷新操作,反注册、断开连接,关闭通道等操作事件实际调用通道的Channel管道的相关方法,即触发通道相关事件,这些方法是重写了通道OutboundInvoker的相关方法。在抽象Unsafe那篇文章中,我们看到其内部也有绑定、注册,读操作,写操作和关闭操作,这些是通道的实际操作方法。


总结:

通道的绑定操作、连接,写消息,读操作,刷新操作,反注册、断开连接,关闭通道等操作事件实际调用通道的Channel管道的相关方法,即触发通道相关事件,这些方法是重写了通道OutboundInvoker的相关方法。在抽象Unsafe那篇文章中,我们看到其内部也有绑定、注册,读操作,写操作和关闭操作,这些是通道的实际操作方法。

0
0
分享到:
评论

相关推荐

    netty框架 jar包

    在Netty中,通道(Channel)是连接的抽象,它代表了与远程实体的一个连接,可以进行读写操作。而通道处理器链(ChannelPipeline)则允许用户自定义数据在网络中传输的处理流程,这提供了高度的灵活性和可扩展性。 ...

    Netty实战 电子版.pdf_java_netty_服务器_

    Pipeline(管道)则是一系列处理通道事件的处理器链,每个处理器可以对数据进行解码、编码或者执行业务逻辑。 3. **ByteBuf**:Netty提供了自己的ByteBuf类,作为缓冲区,它比Java的ByteBuffer更易用且高效,支持...

    跟闪电侠学Netty:Netty即时聊天实战与底层原理-book-netty.zip

    2. **Netty架构**:Netty采用了反应器模式,包含Bootstrap(引导类)、ServerBootstrap(服务器引导类)、Channel(通道)、EventLoop(事件循环)、Pipeline(处理链)等组件,构建了高效的事件驱动模型。...

    netty-netty-4.1.19.Final.zip_netty_netty学习_rocketmq

    在Netty中,EventLoop(事件循环)负责处理I/O事件,而Channel(通道)则代表网络连接,它们是进行数据传输的基础。ChannelHandler(通道处理器)是业务逻辑的载体,通过ChannelPipeline(通道管道)组织成处理链,...

    netty实战教程、netty代码demo

    Netty 是由 JBoss 组织开源的一个网络通信框架,基于 Java NIO(非阻塞I/O)构建,提供了一套高度抽象和优化的 API,使得开发者可以轻松地处理网络连接的各种复杂情况。Netty 提供了多种传输类型,如 TCP、UDP 以及...

    深入浅出Netty_netty_

    Channel是网络连接的抽象,它可以是TCP、UDP或者任何其他类型的I/O连接。Pipeline则是一个处理链,每个连接都有一个自己的Pipeline,其中包含多个处理器(ChannelHandler),这些处理器按顺序处理进出的数据。 ...

    Netty实战.epub_netty实战epub_netty实战epub_netty_

    《Netty实战》这本书是针对Java网络编程框架Netty的一本深入实践教程,旨在帮助读者掌握Netty的核心特性和实际应用。Netty是一款高性能、异步事件驱动的网络应用程序框架,广泛应用于各种分布式系统、微服务架构以及...

    netty4.0源码,netty例子,netty api文档

    2. **Channel**:在Netty中,Channel是连接的抽象,它代表了到另一个实体(例如,另一个网络节点)的连接。每个Channel都有自己的生命周期,可以进行读写操作,并且可以注册监听器以响应各种网络事件。 3. **...

    Netty基础,用于学习Netty,参考黑马程序员的netty教程

    Netty基础,用于学习Netty,参考黑马程序员的netty教程

    netty所需要得jar包

    - Netty提供了许多性能优化选项,如自定义ByteBuf分配器、心跳机制、通道空闲检测等。 10. **错误处理与异常安全**: - Netty提供了一套完善的异常处理机制,确保即使在出错时也能优雅地关闭资源。 综上所述,...

    netty-4.1.17.Final.jar

    通道是Netty中的基本I/O抽象,它可以代表任何类型的连接,如TCP、UDP或者本地进程间通信。事件处理器则负责处理各种I/O事件,如连接建立、数据接收和发送等,通过非阻塞I/O(NIO)实现高效率的数据传输。 在"Netty-...

    Android-netty和socket通信的demo

    Netty的核心组件包括Bootstrap(启动器)、ServerBootstrap(服务器启动器)、Channel(通道)、EventLoop(事件循环)和ChannelHandler(处理程序)。通过这些组件,开发者可以方便地创建出可伸缩、低延迟的网络...

    netty官网学习手册中文版

    - **Channel与Pipeline**:Channel是网络连接的抽象,负责读写数据;Pipeline则是一系列处理I/O事件的处理器链,每个处理器执行特定的处理任务。 - **EventLoop与EventLoopGroup**:EventLoop是执行I/O操作和处理...

    netty-netty-4.1.69.Final.tar.gz

    1. **异步事件驱动**:Netty基于Java NIO(非阻塞I/O)实现,利用了其事件循环(EventLoop)和通道(Channel)的概念,可以处理大量并发连接,提高了系统的吞吐量。 2. **高效性**:Netty通过减少对象创建和内存...

    netty框架图及netty面试知识点解析

    Netty基于NIO(非阻塞I/O)模型,它提供了高度抽象的API,简化了网络编程。在Netty中,I/O操作被封装在Channel接口中,而ChannelHandler则用于处理I/O事件和数据。事件循环(EventLoop)是Netty处理事件的关键组件,...

    Netty进阶之路-跟着案例学Netty

    《Netty进阶之路-跟着案例学Netty》是由知名技术专家李林峰撰写的一本专为Java开发者深入理解Netty框架而准备的书籍。这本书旨在通过实例教学,帮助读者全面掌握Netty的核心特性和实战技巧,提升网络编程的能力。 ...

    netty-netty-4.1.96.Final.tar.gz 官网最新版Netty Project

    它活跃和成长于用户社区,像大型公司 Facebook 和 Instagram 以及流行 开源项目如 Infinispan, HornetQ, Vert.x, Apache Cassandra 和 Elasticsearch 等,都利用其强大的对于网络抽象的核心代码。 Netty is a NIO ...

    Netty权威指南-Netty源码

    Netty 的核心组件包括:ByteBuf(字节缓冲区)、Channel(通道)、EventLoop(事件循环)、Pipeline(处理链)以及Handler(处理器)。ByteBuf 提供了一种高效的方式来处理网络数据,避免了 Java 原生字节数组操作中...

    netty4-netty5.rar

    ChannelHandlerAdapter 4.X版本和5.X版本的差别很大。ChannelRead是属于5.X版本的4.X版本没有这个方法,所以如果要用ChannelRead。可以更换5.X版本的Netty。

Global site tag (gtag.js) - Google Analytics