`
海浪儿
  • 浏览: 274112 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

netty4源码分析-connect

阅读更多

本文为原创,转载请注明出处

netty4源码分析-connect

 

 

客户端向服务端发起connect请求由以下代码触发:

ChannelFuture f = b.connect(host, port).sync();

 调用Bootstrap的connect方法:

//Bootstrap
public ChannelFuture connect(String inetHost, int inetPort) {
        return connect(new InetSocketAddress(inetHost, inetPort));
}

public ChannelFuture connect(SocketAddress remoteAddress) {
        if (remoteAddress == null) {
            throw new NullPointerException("remoteAddress");
        }

        validate();
        return doConnect(remoteAddress, localAddress());
    }

private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        final ChannelPromise promise = channel.newPromise();
        if (regFuture.isDone()) {
            doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
        } else {
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
                }
            });
        }

        return promise;
    }

 1、 首先分析initAndRegister()方法

// AbstractBootstrap
final ChannelFuture initAndRegister() {
        final Channel channel = channelFactory().newChannel();
        try {
            init(channel);
        } catch (Throwable t) {
            channel.unsafe().closeForcibly();
            return channel.newFailedFuture(t);
        }

        ChannelPromise regPromise = channel.newPromise();
        group().register(channel, regPromise);
        if (regPromise.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        return regPromise;
    }

 1.1 final Channel channel = channelFactory().newChannel()的执行逻辑与socket那篇文章中分析的差不多

//BootstrapChannelFactory       
public T newChannel() {
            try {
                return clazz.newInstance();
            } catch (Throwable t) {
                throw new ChannelException("Unable to create Channel from class " + clazz, t);
            }
        }

 区别在于:a、本次根据反射创建的channel的类型是SocketChannel

 

//BootstrapChannelFactory       
public NioSocketChannel() {
        this(newSocket());
}
private static SocketChannel newSocket() {
        try {
            return SocketChannel.open();
        } catch (IOException e) {
            throw new ChannelException("Failed to open a socket.", e);
        }
    }
public NioSocketChannel(Channel parent, SocketChannel socket) {
        super(parent, socket);
        config = new DefaultSocketChannelConfig(this, socket.socket());
    }

 b、NioSocketChannel的父类是AbstractNioByteChannel,设置的interestOps为SelectionKey.OP_READ。(注:NioServerSocketChannel的父类是AbstractNioMessageChannel,设置的interestOps为SelectionKey.OP_ACCEPT)

 

//AbstractNioByteChannel
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
        super(parent, ch, SelectionKey.OP_READ);
    }

 c、最后分析父类AbstractNioChannel的构造函数

//AbstractNioChannel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to close a partially initialized socket.", e2);
                }
            }

            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
}

// AbstractChannel
protected AbstractChannel(Channel parent) {
        this.parent = parent;
        unsafe = newUnsafe();
        pipeline = new DefaultChannelPipeline(this);
    }

 newUnsafe()调用AbstractNioByteChannel的实现,实例化一个NioByteUnsafe(注:在NioServerSocketChannel中实例化的是NioMessageUnsafe)。其他点都一样,将channel设置为非阻塞,然后为channel创建管道。

 

1.2 init(channel)

该方法由Bootstrap实现(注:在NioServerSocketChannel中,该方法由ServerBootstrap实现)

//Bootstrap
void init(Channel channel) throws Exception {
        ChannelPipeline p = channel.pipeline();
        p.addLast(handler());

        final Map<ChannelOption<?>, Object> options = options();
        synchronized (options) {
            for (Entry<ChannelOption<?>, Object> e: options.entrySet()) {
                try {
                    if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                        logger.warn("Unknown channel option: " + e);
                    }
                } catch (Throwable t) {
                    logger.warn("Failed to set a channel option: " + channel, t);
                }
            }
        }

        final Map<AttributeKey<?>, Object> attrs = attrs();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }
        }
    }

  该方法主要做了两件事:

  a) 为管道增加一个Inbound处理器ChannelInitializer。经过此步骤后,管道中的处理器链表为:head(outbound)->ChannelInitializer(inbound)->tail(inbound)。注意ChannelInitializer的实现方法initChannel,里面会当channelRegistered事件发生时将EchoClientHandler加入到管道中。

  b) 设置NioSocketChannel的options和attrs

 

1.3 group().register(channel, regPromise);

 

实际是调用MultithreadEventLoopGroup的register方法

//MultithreadEventLoopGroup
public ChannelFuture register(Channel channel, ChannelPromise promise) {
        return next().register(channel, promise);
    }

 next方法从group中选择一个EventExecutor(实际是一个SingleThreadEventLoop),然后执行register方法

//SingleThreadEventLoop
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
        if (channel == null) {
            throw new NullPointerException("channel");
        }
        if (promise == null) {
            throw new NullPointerException("promise");
        }
        channel.unsafe().register(this, promise);
        return promise;
    }

 channel.unsafe().register(this, promise)这里会调用AbstractChannel的内部类AbstractUnsafe的register方法

//AbstractUnsafe
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }
            AbstractChannel.this.eventLoop = eventLoop;
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    closeForcibly();
                    promise.setFailure(t);
                }
            }
        }

 此处开启了eventloop中的线程(即启动了netty客户端的一个线程),并将register0任务加入到线程的任务队列中。经过此步骤后,线程的任务队列仅含有一个任务,即register0任务,且正在被执行。

接着分析register0任务具体干了什么事情

//AbstractUnsafe
private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!ensureOpen(promise)) {
                    return;
                }
                Runnable postRegisterTask = doRegister();
                registered = true;
                promise.setSuccess();
                pipeline.fireChannelRegistered();
                if (postRegisterTask != null) {
                    postRegisterTask.run();
                }
                if (isActive()) {
                    pipeline.fireChannelActive();
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                if (!promise.tryFailure(t)) {
                    logger.warn(
                            "Tried to fail the registration promise, but it is complete already. " +
                                    "Swallowing the cause of the registration failure:", t);
                }
                closeFuture.setClosed();
            }
        }

protected Runnable doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                return null;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    throw e;
                }
            }
        }
    }

 doRegister中的这行代码selectionKey = javaChannel().register(eventLoop().selector, 0, this)将SocketChannel、0、以及this注册到selector中并得到对应的selectionkey

接着promise.setSuccess()将promise设置为success,就会触发异步回调,回调之前main函数所在的线程中为ChannelPromise添加的listener,即Bootstrap的以下代码:

//Bootstrap.java
private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        final ChannelPromise promise = channel.newPromise();
        if (regFuture.isDone()) {
            doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
        } else {
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
                }
            });
        }

 经过此步骤后,线程的任务队列数量由原来的1个增加到了2个,即正在执行的register0任务以及本次新增的doConnect0任务。

再接着分析register0任务中的此行代码pipeline.fireChannelRegistered()

//DefaultChannelPipeline
public ChannelPipeline fireChannelRegistered() {
        head.fireChannelRegistered();
        return this;
    }

//DefaultChannelHandlerContext
public ChannelHandlerContext fireChannelRegistered() {
        final DefaultChannelHandlerContext next = findContextInbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRegistered();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRegistered();
                }
            });
        }
        return this;
}

private void invokeChannelRegistered() {
        try {
            ((ChannelInboundHandler) handler).channelRegistered(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    }

 ChannelRegistered是一个Inbound事件,因此会按照head->tail的顺序执行所有的inbound处理器,目前有三个处理器:head-> ChannelInitializer ->tail,ChannelInitializer和tail都是inbound处理器,所以看一下ChannelInitializer的invokeChannelRegistered方法

//ChannelInitializer
public final void channelRegistered(ChannelHandlerContext ctx)
            throws Exception {
        boolean removed = false;
        boolean success = false;
        try {
            initChannel((C) ctx.channel());
            ctx.pipeline().remove(this);
            removed = true;
            ctx.fireChannelRegistered();
            success = true;
        } catch (Throwable t) {
            logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
        } finally {
            if (!removed) {
                ctx.pipeline().remove(this);
            }
            if (!success) {
                ctx.close();
            }
        }
    }
}

 该方法主要做了以下几件事:

   a) initChannel方法是在此处实例化内部类ChannelInitializer实现的

b.group(group)
             .channel(NioSocketChannel.class)
             .option(ChannelOption.TCP_NODELAY, true)
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(
                             //new LoggingHandler(LogLevel.INFO),
                             new EchoClientHandler(firstMessageSize));
                 }
             });

 其功能就是将EchoClientHandler加入到管道中,并将自己(ChannelInitializer)从管道中删除,此时的处理器链表为:

      Head->EchoClientHandler->tail

  b) 接着调用EchoClientHandler和tail的channelRegistered方法,都没有做啥实质性的事情,最后以tail的空实现结束。

再分析register0任务中的以下代码

 

//AbstractUnsafe
if (isActive()) 
{
   pipeline.fireChannelActive();
 }

// NioSocketChannel
public boolean isActive() {
        SocketChannel ch = javaChannel();
        return ch.isOpen() && ch.isConnected();
    }
 由于此时还没有执行connect操作,所以isActive返回false,不会执行pipeline.fireChannelActive()

 

执行完此代码后,register0任务就执行完了,boss线程中的任务队列中仅剩下doConnect0任务。

 

2、 接下来分析doConnect0方法

 

//Bootstrap
private static void doConnect0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    if (localAddress == null) {
                        channel.connect(remoteAddress, promise);
                    } else {
                        channel.connect(remoteAddress, localAddress, promise);
                    }
                    promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }
 channel.connect(remoteAddress, promise)调用AbstractChannel的方法,channel里会调用管道的方法

 

 

 

//AbstractChannel
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
        return pipeline.connect(remoteAddress, promise);
}

// DefaultChannelPipeline
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
        return tail.connect(remoteAddress, promise);
    }
 由于connect是一个Outbound事件,所以按照tail到head的顺序执行所有的outBound处理器。目前共有Head->EchoClientHandler->tail三个处理器,而只有Head是outbound处理器

 

 

 

//HeadHandler
public void connect(
                ChannelHandlerContext ctx,
                SocketAddress remoteAddress, SocketAddress localAddress,
                ChannelPromise promise) throws Exception {
            unsafe.connect(remoteAddress, localAddress, promise);
        }
 看一下AbstractNioUnsafe的connect方法

 

 

//AbstractNioUnsafe
public void connect(
                final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
            if (!ensureOpen(promise)) {
                return;
            }
            try {
                if (connectPromise != null) {
                    throw new IllegalStateException("connection attempt already made");
                }
                boolean wasActive = isActive();
                if (doConnect(remoteAddress, localAddress)) {
                    promise.setSuccess();
                    if (!wasActive && isActive()) {
                        pipeline().fireChannelActive();
                    }
                } else {
                    connectPromise = promise;
                    requestedRemoteAddress = remoteAddress;
                    // Schedule connect timeout.
                    int connectTimeoutMillis = config().getConnectTimeoutMillis();
                    if (connectTimeoutMillis > 0) {
                        connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                            @Override
                            public void run() {
                                ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                                ConnectTimeoutException cause =
                                        new ConnectTimeoutException("connection timed out: " + remoteAddress);
                                if (connectPromise != null && connectPromise.tryFailure(cause)) {
                                    close(voidPromise());
                                }
                            }
                        }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
                    }
                    promise.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (future.isCancelled()) {
                                if (connectTimeoutFuture != null) {
                                    connectTimeoutFuture.cancel(false);
                                }
                                connectPromise = null;
                                close(voidPromise());
                            }
                        }
                    });
                }
            } catch (Throwable t) {
                if (t instanceof ConnectException) {
                    Throwable newT = new ConnectException(t.getMessage() + ": " + remoteAddress);
                    newT.setStackTrace(t.getStackTrace());
                    t = newT;
                }
                closeIfClosed();
                promise.tryFailure(t);
            }
        }
 重点分析一下doConnect方法,由NioSocketChannel实现

 

//NioSocketChannel
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
        if (localAddress != null) {
            javaChannel().socket().bind(localAddress);
        }

        boolean success = false;
        try {
            boolean connected = javaChannel().connect(remoteAddress);
            if (!connected) {
                selectionKey().interestOps(SelectionKey.OP_CONNECT);
            }
            success = true;
            return connected;
        } finally {
            if (!success) {
                doClose();
            }
        }
    }

 boolean connected = javaChannel().connect(remoteAddress)此处就向服务端发起了connect请求,准备三次握手。由于是非阻塞模式,所以该方法会立即返回。如果建立连接成功,则返回true,否则返回false,后续需要使用select来检测连接是否已建立成功。如果返回false,此种情况就需要将ops设置为SelectionKey.OP_CONNECT,等待connect的select事件通知,然后调用finishConnect方法。

    对于connect,会加一个超时调度任务,默认的超时时间是30s,超时后做什么操作,后续再分析吧。

 

3、 最后分析客户端线程NioEventLoop的select接收到connect事件后的处理逻辑

//NioEventLoop
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }
        try {
            int readyOps = k.readyOps();
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
                if (!ch.isOpen()) {
                    // Connection already closed - no need to handle write.
                    return;
                }
            }
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }
        } catch (CancelledKeyException e) {
            unsafe.close(unsafe.voidPromise());
        }
    }

 当readyOps包含SelectionKey.OP_CONNECT时,必须首先将OP_CONNECT从ops中去掉,然后调用unsafe.finishConnect()。

 

//AbstractUnsafe
public void finishConnect() {
            // Note this method is invoked by the event loop only if the connection attempt was
            // neither cancelled nor timed out.

            assert eventLoop().inEventLoop();
            assert connectPromise != null;

            try {
                boolean wasActive = isActive();
                doFinishConnect();
                connectPromise.setSuccess();
                if (!wasActive && isActive()) {
                    pipeline().fireChannelActive();
                }
            } catch (Throwable t) {
                if (t instanceof ConnectException) {
                    Throwable newT = new ConnectException(t.getMessage() + ": " + requestedRemoteAddress);
                    newT.setStackTrace(t.getStackTrace());
                    t = newT;
                }

                connectPromise.setFailure(t);
                closeIfClosed();
            } finally {
                // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
                // See https://github.com/netty/netty/issues/1770
                if (connectTimeoutFuture != null) {
                    connectTimeoutFuture.cancel(false);
                }
                connectPromise = null;
            }
        }

 doFinishConnect方法通过调用SocketChannel的finishConnect方法完成连接的建立,在NioSocketChannel中实现

//NioSocketChannel
protected void doFinishConnect() throws Exception {
        if (!javaChannel().finishConnect()) {
            throw new Error();
        }
    }

 此时,isActive()返回true,所以触发ChannelActive事件,该事件是一个inbound事件,所以Inbound的处理器可以通过实现channelActive方法来进行相应的操作,netty的自带例子中,EchoClientHandler实现该方法来开始向服务端写数据。

 

总结:从发起connect请求到请求建立先后共经历了以下几件事情:

  1. 创建套接字SocketChannel
  2. 设置套接字为非阻塞
  3. 设置channel当前感兴趣的事件为SelectionKey.OP_READ
  4. 创建作用于SocketChannel的管道Pipeline,该管道中此时的处理器链表为:Head(outbound)->tail(inbound)。
  5. 设置SocketChannel的options和attrs。
  6. 为管道增加一个Inbound处理器ChannelInitializer。经过此步骤后,管道中的处理器链表为:head(outbound)->ChannelInitializer(inbound)->tail(inbound)。注意ChannelInitializer的实现方法initChannel,里面会当channelRegisgered事件发生时将EchoClientHandler加入到管道中。
  7. 启动客户端线程,并将register0任务加入到线程的任务队列中。而register0任务做的事情为:将SocketChannel、0、注册到selector中并得到对应的selectionkey。然后通过回调,将doConnect0任务加入到线程的任务队列中。线程从启动到现在这段时间内,任务队列的变化如下:register0任务->register0任务,doConnect0任务-> doConnect0任务
  8. 通过channelRegistered事件,将EchoClientHandler加入到管道中,并移除ChannelInitializer,经过此步骤后,管道中的处理器链表为:head(outbound)-> EchoClientHandler (inbound)->tail(inbound)。管道从创建到现在这段时间内,处理器链表的变化历史为:head->tail,head->ChannelInitializer(inbound)->tail,head-> EchoClientHandler (inbound)->tail
  9. doConnect0任务会触发connect事件,connect是一个Outbound事件,headHandler通过调用AbstractNioUnsafe的方法向服务端发起connect请求,并设置ops为SelectionKey.OP_CONNECT
  10. 客户端线程NioEventLoop中的select接收到connect事件后,将SelectionKey.OP_CONNECT从ops中移除,然后调用finishConnect方法完成连接的建立。到此,connect就正式建立了。
  11. 最后触发ChannelActive事件。

 

分享到:
评论
1 楼 萨琳娜啊 2018-07-10  
Java读源码之Netty深入剖析
网盘地址:https://pan.baidu.com/s/1SphPUT_jwpM4LCzod-2byw 密码: u2p4
备用地址(腾讯微云):https://share.weiyun.com/5Bs3HcR 密码:uu95be

JavaCoder如果没有研究过Netty,那么你对Java语言的使用和理解仅仅停留在表面水平,如果你要进阶,想了解Java服务器的深层高阶知识,Netty绝对是一个必须要过的门槛。

本课程带你从一个Socket例子入手,一步步深入探究Netty各个模块的源码,深入剖析Netty的工作流程和源码设计,让你不但“真懂”也要“会用”

相关推荐

    Netty权威指南-Netty源码

    总的来说,Netty 源码分析涉及了网络编程、并发处理、事件驱动、协议编解码等多个领域,对理解 Java 高性能网络应用开发有着重要的指导意义。通过阅读源码,我们可以更深入地了解 Netty 如何实现高效的网络通信,并...

    netty-netty-4.1.19.Final.zip_netty_netty学习_rocketmq

    然后,通过分析RocketMQ的源码,观察它是如何利用Netty来实现网络通信的。可以尝试自己编写简单的Netty服务器和客户端,模拟RocketMQ的消息发布和消费过程,加深理解。 总的来说,Netty作为强大的网络通信框架,为...

    netty源码解析视频.txt

    通过对Netty源码的深入分析,我们不仅能够了解到其内部工作原理,还能学习到许多优秀的设计思想和技术实践,这对于提高个人的技术水平和解决实际问题有着非常重要的意义。希望本文能够帮助读者更好地理解和掌握Netty...

    高清Netty5.0架构剖析和源码解读

    NIO客户端13 3.Netty源码分析16 3.1. 服务端创建16 3.1.1. 服务端启动辅助类ServerBootstrap16 3.1.2. NioServerSocketChannel 的注册21 3.1.3. 新的客户端接入25 3.2. 客户端创建28 3.2.1. 客户端连接辅助类...

    基于netty的nio使用demo源码

    Netty是一个高性能、异步事件驱动的网络应用框架,它为Java开发人员提供了构建服务器和客户端应用程序的强大...通过阅读和分析源码,我们可以深入理解Netty的事件驱动模型、NIO机制以及如何在实际项目中应用这些概念。

    Netty5.0架构剖析和源码解读

    Netty源码分析 ##### 3.1. 服务端创建 Netty是一个高性能的网络应用程序框架,其核心优势在于异步事件驱动的I/O处理机制。接下来我们将深入分析Netty服务端的创建过程。 ###### 3.1.1. 服务端启动辅助类...

    MQTT---HiveMQ源码详解(十三)Netty-MQTT消息、事件处理(源码举例解读).pdf

    在本篇 MQTT---HiveMQ 源码详解中,我们重点关注的是 Netty 在处理 MQTT 消息和事件方面的...对于深入理解 MQTT 服务端的工作原理以及如何利用 HiveMQ 构建安全、可扩展的 MQTT 服务,阅读和分析源码是非常有价值的。

    使用jboss netty 创建高性能webservice客户端及服务端

    不过,这只是基础,深入研究Netty源码可以帮助我们更好地理解和优化系统性能,例如了解其内部的事件调度机制、缓冲区管理以及线程模型等。 最后,标签中的"源码"提示我们要关注Netty的底层实现,通过阅读和分析源码...

    基于netty的websocket开发小结

    深入研究Netty源码,我们可以了解其高效、可扩展的设计,以及如何利用Java NIO和线程池优化性能。同时,熟悉WebSocket规范(RFC 6455)有助于更好地理解和调试问题。 最后,"工具"标签可能指的是使用Netty和其他...

    异步redis + java源码

    7. 源码分析 分析Lettuce的源码有助于理解其内部机制,如命令的序列化、网络通信过程、事件处理流程等。例如,`RedisAsyncCommands`中的方法是如何包装为`CompletableFuture`,以及Netty的`ChannelHandlerContext`...

    kafka源码解析与实战

    3. **网络通信**:分析Kafka的Netty为基础的网络层实现,探讨Request/Response模式,以及如何实现高效的I/O处理和连接管理。 4. **Producer工作原理**:详述生产者如何发送消息,包括Partitioner策略、批处理和延迟...

    Android-MQTT:Android MQTT通用服务

    通过分析源码,开发者可以更好地理解如何在实际应用中实现MQTT连接、发布和订阅功能,以及如何处理网络中断等情况。对于想要在Android应用中实现实时通信或物联网功能的开发者来说,这是一个非常有价值的资源。

Global site tag (gtag.js) - Google Analytics