`
chenchangqun
  • 浏览: 55400 次
  • 性别: Icon_minigender_1
  • 来自: 大连
社区版块
存档分类
最新评论

netty源码分析-server启动-initAndRegister

 
阅读更多
本文包括以下内容
  1. initAndRegister-createChannel分析
  2. initAndRegister-createChannel分析
  3. ChannelInitializer分析

创建channel-server端bind()调用链如下

  
sever端启动主要处理都在bind()处理中,其中主要代码如下
AbstractBootstrap
private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        final ChannelPromise promise;
        if (regFuture.isDone()) {
            promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            promise = new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    doBind0(regFuture, channel, localAddress, promise);
                }
            });
        }

        return promise;
    }
其中最重要的两个部分已经标出,先来分析一下initAndRegister()

分析initAndRegister方法
时序图如下

    final ChannelFuture initAndRegister() {
        Channel channel;
        try {
//创建channel,并配置其主要属性
            channel = createChannel();
        } catch (Throwable t) {
            return VoidChannel.INSTANCE.newFailedFuture(t);
        }

        try {
//初始化channel
            init(channel);
        } catch (Throwable t) {
            channel.unsafe().closeForcibly();
            return channel.newFailedFuture(t);
        }
//创建ChannelPromise
        ChannelPromise regFuture = channel.newPromise();
        channel.unsafe().register(regFuture);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }
1.createChannel分析
ServerBootstrap:
    Channel createChannel() {
        EventLoop eventLoop = group().next();
        return channelFactory().newChannel(eventLoop, childGroup);
    }
ServerBootstrapChannelFactory
        @Override
        public T newChannel(EventLoop eventLoop, EventLoopGroup childGroup) {
            try {
//class=NioSocketChannel.class,调用它的构造方法。
                Constructor<? extends T> constructor = clazz.getConstructor(EventLoop.class, EventLoopGroup.class);
                return constructor.newInstance(eventLoop, childGroup);
            } catch (Throwable t) {
                throw new ChannelException("Unable to create Channel from class " + clazz, t);
            }
        }
//调用NioServerSocketChannel构造方法创建实例
    public NioServerSocketChannel(EventLoop eventLoop, EventLoopGroup childGroup) {
//调用父类构造方法
        super(null, eventLoop, childGroup, newSocket(), SelectionKey.OP_ACCEPT);
//创建默认配置
        config = new DefaultServerSocketChannelConfig(this, javaChannel().socket());
    }

    protected AbstractNioMessageServerChannel(
            Channel parent, EventLoop eventLoop, EventLoopGroup childGroup, SelectableChannel ch, int readInterestOp) {
//调用父类构造方法
//此时parent=null 。ch=ServerSocketChannelImp(java原生的)
        super(parent, eventLoop, ch, readInterestOp);
//childGroup赋值
        this.childGroup = childGroup;
    }
    protected AbstractNioMessageChannel(
            Channel parent, EventLoop eventLoop, SelectableChannel ch, int readInterestOp) {
//调用父类构造方法
        super(parent, eventLoop, ch, readInterestOp);
    }

protected AbstractNioChannel(Channel parent, EventLoop eventLoop,     SelectableChannel ch, int readInterestOp) {
//调用父类构造方法
        super(parent, eventLoop);
// 赋值 java SocketChannelImpl
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
//配置java.nio.ServerSocketChannel为非阻塞
            ch.configureBlocking(false);
        } catch (IOException e) {
//异常处理暂时不关注
            try {
                ch.close();
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to close a partially initialized socket.", e2);
                }
            }

            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }
最后调用
    protected AbstractChannel(Channel parent, EventLoop eventLoop) {
        this.parent = parent;
// 非空和兼容校验
        this.eventLoop = validate(eventLoop);
        unsafe = newUnsafe();//初始化unsafe××重要×××
        pipeline = new DefaultChannelPipeline(this);//初始化pipeline××重要×××
    }

AbstractNioChannel
//兼容校验
    protected boolean isCompatible(EventLoop loop) {
        return loop instanceof NioEventLoop;
    }
小结:createChannel创建了 unsafe,pipeline,java.nio.ServerSocketChannel,然后为其属性eventLoop,childGroup,readInterestOp,config 赋值。
eventLoop是什么时候创建的  ?
创建 EventLoopGroup时创建的

2.init分析
void init(Channel channel) throws Exception {
        final Map<ChannelOption<?>, Object> options = options();
//options value= {SO_BACKLOG=128}
        synchronized (options) {
            channel.config().setOptions(options);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs();
//attrs={}
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }

        ChannelPipeline p = channel.pipeline();
        if (handler() != null) {
            p.addLast(handler());
        }

        final ChannelHandler currentChildHandler = childHandler;
//获取
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
//childOptions={SO_KEEPALIVE=true}

        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
//childAttrs={}
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }
//ChannelInitializer的作用:定义initChannel方法,便于添加handler。
//ServerBootstrapAcceptor的主要方法channelRead获取了ServerBootstrap的参数,貌似ServerBootstrapAcceptor作用只是获取ServerBootstrap的参数
//详细作用以后分析
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(Channel ch) throws Exception {
                ch.pipeline().addLast(new ServerBootstrapAcceptor(currentChildHandler, currentChildOptions,
                        currentChildAttrs));
            }
        });
    }

在一个标准的netty server childHandler 对应下面部分


3.ChannelInitializer分析

//ChannelInitializer:用于添加handler,在chnnel执行register()时调用
public abstract class ChannelInitializer<C extends Channel> extends ChannelHandlerAdapter {

//子类实现 通常用于添加handler
    protected abstract void initChannel(C ch) throws Exception;

//在chnnel执行register()时调用
    @Override
    @SuppressWarnings("unchecked")
    public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        ChannelPipeline pipeline = ctx.pipeline();
        boolean success = false;
        try {
//调用子类实现方法, 通常用于添加handler
            initChannel((C) ctx.channel());
//使用完毕 移除当前handler
            pipeline.remove(this);
    //传递ChannelRegistered事件,调用其他处理ChannelRegistered事件的handler
//实现事件传递关键的步骤,如果不写,则事件终止
            ctx.fireChannelRegistered();
            success = true;
        } catch (Throwable t) {
            logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
        } finally {
            if (pipeline.context(this) != null) {
                pipeline.remove(this);
            }
            if (!success) {
                ctx.close();
            }
        }
    }
}

在server启动-initAndRegister-init()中添加ServerBootstrapAcceptor和用户添加自定义handler时用到.

小结:init()将ServerBootstrap传递给channel(),并添加了一个访问ServerBootstrap属性的handler
最后看一下这两行代码
//创建ChannelPromise,ChannelPromise用途详见其分析篇
        ChannelPromise regFuture = channel.newPromise();
//注册channel到EventLoop上
        channel.unsafe().register(regFuture);
OK initAndRegister 分析完毕
总结:initAndRegister 创建了NioServerSocketChannel,初始化了ServerBootstrapAcceptor,注册channel到parent EventLoop上 。
  • 大小: 37.1 KB
  • 大小: 52.8 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics