`
Donald_Draper
  • 浏览: 978839 次
社区版块
存档分类
最新评论

netty NioServerSocketChannel解析

阅读更多
netty ServerBootStrap解析:http://donald-draper.iteye.com/blog/2392572
netty 通道接口定义:http://donald-draper.iteye.com/blog/2392740
netty 抽象通道初始化:http://donald-draper.iteye.com/blog/2392801
netty 抽象Unsafe定义:http://donald-draper.iteye.com/blog/2393053
netty 通道Outbound缓冲区:http://donald-draper.iteye.com/blog/2393098
netty 抽象通道后续:http://donald-draper.iteye.com/blog/2393166
netty 抽象nio通道:http://donald-draper.iteye.com/blog/2393269
netty 抽象nio字节通道:http://donald-draper.iteye.com/blog/2393323
netty 抽象nio消息通道:http://donald-draper.iteye.com/blog/2393364
引言
上一篇我们看了抽象nio消息通道,先来回顾一下:
抽象Nio消息通道AbstractNioMessageChannel,写通道Outbound缓冲区消息,即遍历通道Outbound缓冲区刷新链,当写消息请求为空时,从选择key兴趣集中移除写操作事件,否则,委托doWriteMessage方法,将消息写到底层通道,doWriteMessage方法待子类扩展,写完,将写请求从刷新链上移除,否则,如果需要,添加写事件到选择key的兴趣事件集。

nio消息Unsafe(NioMessageUnsafe)读操作,从通道接收缓冲区读取数据,通知通道处理读取数据,触发Channel管道线的fireChannelRead事件,待数据读取完毕,触发Channel管道线的fireChannelReadComplete事件,如果在读数据的过程中,通道关闭,则触发通道输入关闭事件(fireUserEventTriggered),如果在读数据的过程中,发生异常,则触发通道fireExceptionCaught事件,如果读任务完毕,且不需自动读,则从选择key兴趣事件集移除读操作事件

今天终于到我们的目的了nio 服务端socket通道,NioServerSocketChannel,
package io.netty.channel.socket.nio;

import io.netty.channel.ChannelException;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.util.internal.SocketUtils;
import io.netty.channel.nio.AbstractNioMessageChannel;
import io.netty.channel.socket.DefaultServerSocketChannelConfig;
import io.netty.channel.socket.ServerSocketChannelConfig;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.List;

/**
 * A {@link io.netty.channel.socket.ServerSocketChannel} implementation which uses
 * NIO selector based implementation to accept new connections.
 */
public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {
    private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);//通道元数据
    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();//选择器提供者
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerSocketChannel.class);
    private final ServerSocketChannelConfig config;//通道配置
}

从上面来看,nio服务端socket通道内部有两个变量,一个为选择器提供者,一个为通道配置。
来看构造
/**
 * Create a new instance
 */
public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

/**
 * Create a new instance using the given {@link SelectorProvider}.
 */
public NioServerSocketChannel(SelectorProvider provider) {
    this(newSocket(provider));
}

/**
 * Create a new instance using the given {@link ServerSocketChannel}.
 */
public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    //创建通道配置
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}


来看创建socket通道,
private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
        /**
         *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
         *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
         *
         *  See [url=https://github.com/netty/netty/issues/2308]#2308[/url].
	 委托给选择器提供者,打开一个通道
         */
        return provider.openServerSocketChannel();
    } catch (IOException e) {
        throw new ChannelException(
                "Failed to open a server socket.", e);
    }
}

关于通道配置NioServerSocketChannelConfig,我们nio服务端socket通道的内部方法看完,再来看
回头看通道配置,


来看其他方法:

//获取本地socket地址
@Override
public InetSocketAddress localAddress() {
    return (InetSocketAddress) super.localAddress();
}
//获取通道元数据
@Override
public ChannelMetadata metadata() {
    return METADATA;
}
//获取通道配置
@Override
public ServerSocketChannelConfig config() {
    return config;
}
//判断通道,是否激活,主要通过通道关联socket的isBound方法判断
@Override
public boolean isActive() {
    return javaChannel().socket().isBound();
}
//获取关联socket通道
@Override
protected ServerSocketChannel javaChannel() {
    return (ServerSocketChannel) super.javaChannel();
}
//远端地址为空
@Override
public InetSocketAddress remoteAddress() {
    return null;
}
//安全获取本地socket地址
@Override
protected SocketAddress localAddress0() {
    return SocketUtils.localSocketAddress(javaChannel().socket());
}
//关闭通道
@Override
protected void doClose() throws Exception {
    javaChannel().close();
}

//绑定socket地址
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        //如果jdk版本大于1.7 则使用通道bind方法,绑定socket地址
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        //否则使用通道关联Socket的bind方法,绑定socket地址
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

从上面来看,通道实际绑定socket地址,首先判断jdk版本信息,如果jdk版本大于1.7 则使用通道bind方法,绑定socket地址,
否则为通道关联Socket的bind方法。

@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    //接受通道连接,并创建与客户端交互的socket通道
    SocketChannel ch = SocketUtils.accept(javaChannel());

    try {
        if (ch != null) {
	    //将创建的与客户端交互的socket通道,添加到结果集
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);

        try {
            ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
        }
    }

    return 0;
}
//SocketUtils
//安全接受socket连接
 public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
    try {
        return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
            @Override
            public SocketChannel run() throws IOException {
                return serverSocketChannel.accept();
            }
        });
    } catch (PrivilegedActionException e) {
        throw (IOException) e.getCause();
    }
}


读取的消息委托给谁来处理呢,这要回到SeverBootStrap这篇文章,主要是ServerBootstrapAcceptor。
netty ServerBootStrap解析:http://donald-draper.iteye.com/blog/2392572

下面我们来看简单说一下,从ServerBootStrap初始化通道开始:

下面我们来看初始化通道,这个是重点:
//SeverBootStrap
@Override  
void init(Channel channel) throws Exception {  
    final Map<ChannelOption<?>, Object> options = options0();  
    synchronized (options) {  
        //设置父Server通道选项  
        setChannelOptions(channel, options, logger);  
    }  
  
    final Map<AttributeKey<?>, Object> attrs = attrs0();  
    synchronized (attrs) {  
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {  
            @SuppressWarnings("unchecked")  
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();  
            //设置父Server通道属性  
            channel.attr(key).set(e.getValue());  
        }  
    }  
   //获取Server通道的Channel管道  
    ChannelPipeline p = channel.pipeline();  
    final EventLoopGroup currentChildGroup = childGroup;  
    final ChannelHandler currentChildHandler = childHandler;  
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;  
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;  
  
    synchronized (childOptions) {  
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));  
    }  
  
    synchronized (childAttrs) {  
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));  
    }  
  
    p.addLast(new ChannelInitializer<Channel>() {  
        @Override  
        public void initChannel(final Channel ch) throws Exception {  
            final ChannelPipeline pipeline = ch.pipeline();  
            ChannelHandler handler = config.handler();  
            if (handler != null) {  
            //将通道处理器添加到通道内部的Channel管道内  
                pipeline.addLast(handler);  
            }  
            ch.eventLoop().execute(new Runnable() {  
                @Override  
                public void run() {  
            //将Server引导配置监听器添加到通道内部的Channel管道内 ,这个是重点
                    pipeline.addLast(new ServerBootstrapAcceptor(  
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));  
                }  
            });  
        }  
    });  
}  

我们来看引导配置监听器,实际为一个Inbound通道处理器

  
 private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {  
        private final EventLoopGroup childGroup;//与客户端交互通道注册的事件循环组  
        private final ChannelHandler childHandler;//与客户端交互通道的通道处理器  
        private final Entry<ChannelOption<?>, Object>[] childOptions;//与客户端交互通道的选项配置  
        private final Entry<AttributeKey<?>, Object>[] childAttrs;//与客户端交互通道的属性  
        private final Runnable enableAutoReadTask;  
      
        ServerBootstrapAcceptor(  
                final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,  
                Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {  
            this.childGroup = childGroup;  
            this.childHandler = childHandler;  
            this.childOptions = childOptions;  
            this.childAttrs = childAttrs;  
      
            // Task which is scheduled to re-enable auto-read.  
            // It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may  
            // not be able to load the class because of the file limit it already reached.  
            // 此任务用于开启通道自动读取配置,将会被所在的事件循环调度。  
            // See https://github.com/netty/netty/issues/1328  
            enableAutoReadTask = new Runnable() {  
                @Override  
                public void run() {  
             //开启通道自动读取配置  
                    channel.config().setAutoRead(true);  
                }  
            };  
        }  
       //通道读取操作
        @Override  
        @SuppressWarnings("unchecked")  
        public void channelRead(ChannelHandlerContext ctx, Object msg) {  
            //与客户端交互通道 ,这个就是在nio服务端socket通道中,doReadMessages方法接受客户端连接,
	    //创建的客户端交互socket通道
            final Channel child = (Channel) msg;  
            //配置与客户端交互通道的通道处理器  
            child.pipeline().addLast(childHandler);  
            //配置与客户端交互通道的选项  
            setChannelOptions(child, childOptions, logger);  
            for (Entry<AttributeKey<?>, Object> e: childAttrs) {  
        //配置与客户端交互通道的属性  
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());  
            }  
      
            try {  
        //注册与客户端交互通道到childGroup事件循环组  
                childGroup.register(child).addListener(new ChannelFutureListener() {  
                    @Override  
                    public void operationComplete(ChannelFuture future) throws Exception {  
                        if (!future.isSuccess()) {  
                 //注册失败,则关闭通道  
                            forceClose(child, future.cause());  
                        }  
                    }  
                });  
            } catch (Throwable t) {  
                forceClose(child, t);  
            }  
        }  
         //关闭通道  
        private static void forceClose(Channel child, Throwable t) {  
            child.unsafe().closeForcibly();  
            logger.warn("Failed to register an accepted channel: {}", child, t);  
        }  
      
        @Override  
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {  
            final ChannelConfig config = ctx.channel().config();  
            if (config.isAutoRead()) {  
                // stop accept new connections for 1 second to allow the channel to recover  
         //发生异常,则停止接受连接请求1秒钟,允许通道恢复  
                // See https://github.com/netty/netty/issues/1328  
                config.setAutoRead(false);  
                ctx.channel().eventLoop().schedule(enableAutoReadTask, 1, TimeUnit.SECONDS);  
            }  
            // still let the exceptionCaught event flow through the pipeline to give the user  
            // a chance to do something with it  
     //触发异常  
            ctx.fireExceptionCaught(cause);  
        }  
    }  


从上面来看,doReadMessages方法,实际为当接受客户端的连接请求时,创建一个与客户端交互的socket通道,并添加到读操作结果集中,实际为socket通道集。并将socket通道集交给ServerBootStrap的引导配置监听器ServerBootstrapAcceptor处理,Server引导配置监听器实际为一个Inbound通道处理器,每当有客户端连接请求时,则创建一个与客户端交互的通道,将child通道选项及属性配置给通道,并将通道注册到childGroup事件循环组,然后将通道处理器添加到与客户端交互的通道内部的Channel管道中。 客户端连接服务端时,首先向服务端发送连接请求数据,服务端接受到连接请求时,创建一个与客户端交互的socket通道。

再来看其他方法
// Unnecessary stuff
//由于服务端通道用于接受客户端的请求,所有不支持连接,写消息,消息过滤等等操作
@Override
protected boolean doConnect(
        SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
    throw new UnsupportedOperationException();
}

@Override
protected void doFinishConnect() throws Exception {
    throw new UnsupportedOperationException();
}

@Override
protected SocketAddress remoteAddress0() {
    return null;
}

@Override
protected void doDisconnect() throws Exception {
    throw new UnsupportedOperationException();
}

@Override
protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
    throw new UnsupportedOperationException();
}

@Override
protected final Object filterOutboundMessage(Object msg) throws Exception {
    throw new UnsupportedOperationException();
}

我们再回到Nio服务端通道配置,
//Nio服务端通道配置, 为NioServerSocketChannel的内部类,这个我们单独列一篇文章来说
private final class NioServerSocketChannelConfig extends DefaultServerSocketChannelConfig {
    private NioServerSocketChannelConfig(NioServerSocketChannel channel, ServerSocket javaSocket) {
        super(channel, javaSocket);
    }

    @Override
    protected void autoReadCleared() {
        clearReadPending();
    }
}


总结:

nio服务端socket通道NioServerSocketChannel内部有两个变量,一个为选择器提供者SelectorProvider,一个为通道配置ServerSocketChannelConfig。

通道实际绑定socket地址,首先判断jdk版本信息,如果jdk版本大于1.7 则使用通道bind方法,绑定socket地址,否则为通道关联Socket的bind方法。

doReadMessages方法,实际为当接受客户端的连接请求时,创建一个与客户端交互的socket通道,并添加到读操作结果集中,实际为socket通道集。并将socket通道集交给ServerBootStrap的引导配置监听器ServerBootstrapAcceptor处理,Server引导配置监听器实际为一个Inbound通道处理器,每当有客户端连接请求时,则创建一个与客户端交互的通道,将child通道选项及属性配置给通道,并将通道注册到childGroup事件循环组,然后将通道处理器添加到与客户端交互的通道内部的Channel管道中。 客户端连接服务端时,首先向服务端发送连接请求数据,服务端接受到连接请求时,创建一个与客户端交互的socket通道。

由于服务端通道用于接受客户端的请求,所有不支持连接,写消息,消息过滤等等操作。

0
1
分享到:
评论

相关推荐

    netty源码解析视频

    - `Channel`有多种实现,如`NioServerSocketChannel`、`NioSocketChannel`等,选择合适的`Channel`类型对于性能至关重要。 3. **EventLoop**与**EventLoopGroup**: - `EventLoop`:负责处理一系列`Channel`的...

    java nio&netty系列之三netty网络模型代码以及简化版代码示例

    .channel(NioServerSocketChannel.class) // 选择NioServerSocketChannel作为服务器通道 .childHandler(new ChannelInitializer() { // 创建ChannelInitializer来设置每个新连接的管道 @Override protected void...

    基于netty编写的socket服务端

    这里的`NioServerSocketChannel`是Netty提供的用于监听客户端连接的服务器套接字通道,`MyBusinessHandler`是我们自定义的业务处理器,它会处理每个新建立的连接。 在`MyBusinessHandler`中,我们可以实现对客户端...

    Netty源码解析-服务启动过程.pdf

    - **`.channel(NioServerSocketChannel.class)`**:指定使用`NioServerSocketChannel`作为服务器通道类型。 - **`.childHandler()`**:设置`ChannelInitializer`实例,用于初始化每个客户端连接的`ChannelPipeline`...

    笔记,4、深入Netty1

    本文将深入探讨Netty中的几个关键组件,包括`NioServerSocketChannel`、`NioSocketChannel`、`NioEventLoop`、`NioEventLoopGroup`以及`Unsafe`系列类,并通过类关系图来解析它们的作用和相互关系。 1. **Nio...

    Netty原理架构解析

    本文来自于csdn,本文主要介绍了关于Netty的原理架构解析,介绍的NettyReactor模型以及服务端Netty的工作架构,希望对您的学习有所帮助。Netty是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能...

    Netty5 AIO

    Netty5中,AIO模型主要由NioEventLoop和NioServerSocketChannel实现。NioEventLoop是事件循环,负责处理I/O事件和执行用户任务;NioServerSocketChannel是服务器端的通道,用于监听新的连接。 3. **Channel和...

    Netty权威指南-Netty源码

    ServerBootstrap 配置了 EventLoopGroup(包含多个 EventLoop)和 Channel 实例,如 NioServerSocketChannel。然后通过绑定端口启动服务器,这会触发一系列的事件,如 BIND、CONNECT 和 ACCEPT。 接着,我们深入到 ...

    高清Netty5.0架构剖析和源码解读

    NioServerSocketChannel 的注册21 3.1.3. 新的客户端接入25 3.2. 客户端创建28 3.2.1. 客户端连接辅助类Bootstrap28 3.2.2. 服务端返回ACK应答,客户端连接成功32 3.3. 读操作33 3.3.1. 异步读取消息33 3.4. 写操作...

    netty学习文件,实现http,websocket,protobuf

    1. **配置Bootstrap**:为服务器和客户端设置Bootstrap,指定EventLoopGroup(负责事件循环)和Channel(如NioServerSocketChannel或NioSocketChannel)。 2. **添加处理器**:根据需求添加HttpServerCodec、...

    netty http client & server

    1. 创建 Bootstrap:Bootstrap 是 Netty 中用于配置和启动服务器的核心类,我们需要配置 ServerBootstrap 并设置其 Channel 类型为 NioServerSocketChannel。 2. 绑定端口:通过调用 bind() 方法,将服务器绑定到...

    netty源码深入分析

    .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ...

    Springboot项目Netty服务并自定义Gson配置类解析数据包

    import io.netty.channel.socket.nio.NioServerSocketChannel; public class NettyServer { public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); ...

    java应用netty服务端和客户端

    3. **ServerChannel**:服务器端的通道接口,如NioServerSocketChannel,用于监听和接受新的连接。 4. **ChannelInitializer**:用于添加服务器端的处理器链,比如解码器、编码器、业务逻辑处理器等。 5. **...

    Netty简介 Netty线程模型和EventLoop Codec编码与解码 ByteBuf容器

    Netty提供了多种Transport实现,如NioServerSocketChannel用于监听TCP连接,NioDatagramChannel用于UDP通信。这些Transport实现了底层的I/O操作,如创建socket、发送和接收数据,将复杂的网络编程简化为对Channel的...

    spring boot 整合的netty 实现的socket的服务端和客户端

    .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ServerHandler()); ...

Global site tag (gtag.js) - Google Analytics