ChannelFuture f = b.connect(host, port).sync();
//Bootstrap public ChannelFuture connect(String inetHost, int inetPort) { return connect(new InetSocketAddress(inetHost, inetPort)); } public ChannelFuture connect(SocketAddress remoteAddress) { if (remoteAddress == null) { throw new NullPointerException("remoteAddress"); } validate(); return doConnect(remoteAddress, localAddress()); } private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } final ChannelPromise promise = channel.newPromise(); if (regFuture.isDone()) { doConnect0(regFuture, channel, remoteAddress, localAddress, promise); } else { regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { doConnect0(regFuture, channel, remoteAddress, localAddress, promise); } }); } return promise; }
1、 首先分析initAndRegister()方法
// AbstractBootstrap final ChannelFuture initAndRegister() { final Channel channel = channelFactory().newChannel(); try { init(channel); } catch (Throwable t) { channel.unsafe().closeForcibly(); return channel.newFailedFuture(t); } ChannelPromise regPromise = channel.newPromise(); group().register(channel, regPromise); if (regPromise.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regPromise; }
1.1 final Channel channel = channelFactory().newChannel()的执行逻辑与socket那篇文章中分析的差不多
//BootstrapChannelFactory public T newChannel() { try { return clazz.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + clazz, t); } }
//BootstrapChannelFactory public NioSocketChannel() { this(newSocket()); } private static SocketChannel newSocket() { try { return SocketChannel.open(); } catch (IOException e) { throw new ChannelException("Failed to open a socket.", e); } } public NioSocketChannel(Channel parent, SocketChannel socket) { super(parent, socket); config = new DefaultSocketChannelConfig(this, socket.socket()); }
//AbstractNioByteChannel protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) { super(parent, ch, SelectionKey.OP_READ); }
//AbstractNioChannel protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } } // AbstractChannel protected AbstractChannel(Channel parent) { this.parent = parent; unsafe = newUnsafe(); pipeline = new DefaultChannelPipeline(this); }
1.2 init(channel)
//Bootstrap void init(Channel channel) throws Exception { ChannelPipeline p = channel.pipeline(); p.addLast(handler()); final Map<ChannelOption<?>, Object> options = options(); synchronized (options) { for (Entry<ChannelOption<?>, Object> e: options.entrySet()) { try { if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) { logger.warn("Unknown channel option: " + e); } } catch (Throwable t) { logger.warn("Failed to set a channel option: " + channel, t); } } } final Map<AttributeKey<?>, Object> attrs = attrs(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } } }
a) 为管道增加一个Inbound处理器ChannelInitializer。经过此步骤后,管道中的处理器链表为:head(outbound)->ChannelInitializer(inbound)->tail(inbound)。注意ChannelInitializer的实现方法initChannel,里面会当channelRegistered事件发生时将EchoClientHandler加入到管道中。
b) 设置NioSocketChannel的options和attrs
1.3 group().register(channel, regPromise);
//MultithreadEventLoopGroup public ChannelFuture register(Channel channel, ChannelPromise promise) { return next().register(channel, promise); }
//SingleThreadEventLoop public ChannelFuture register(final Channel channel, final ChannelPromise promise) { if (channel == null) { throw new NullPointerException("channel"); } if (promise == null) { throw new NullPointerException("promise"); } channel.unsafe().register(this, promise); return promise; }
channel.unsafe().register(this, promise)这里会调用AbstractChannel的内部类AbstractUnsafe的register方法
//AbstractUnsafe public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { closeForcibly(); promise.setFailure(t); } } }
//AbstractUnsafe private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!ensureOpen(promise)) { return; } Runnable postRegisterTask = doRegister(); registered = true; promise.setSuccess(); pipeline.fireChannelRegistered(); if (postRegisterTask != null) { postRegisterTask.run(); } if (isActive()) { pipeline.fireChannelActive(); } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); if (!promise.tryFailure(t)) { logger.warn( "Tried to fail the registration promise, but it is complete already. " + "Swallowing the cause of the registration failure:", t); } closeFuture.setClosed(); } } protected Runnable doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().selector, 0, this); return null; } catch (CancelledKeyException e) { if (!selected) { eventLoop().selectNow(); selected = true; } else { throw e; } } } }
doRegister中的这行代码selectionKey = javaChannel().register(eventLoop().selector, 0, this)将SocketChannel、0、以及this注册到selector中并得到对应的selectionkey
//Bootstrap.java private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } final ChannelPromise promise = channel.newPromise(); if (regFuture.isDone()) { doConnect0(regFuture, channel, remoteAddress, localAddress, promise); } else { regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { doConnect0(regFuture, channel, remoteAddress, localAddress, promise); } }); }
//DefaultChannelPipeline public ChannelPipeline fireChannelRegistered() { head.fireChannelRegistered(); return this; } //DefaultChannelHandlerContext public ChannelHandlerContext fireChannelRegistered() { final DefaultChannelHandlerContext next = findContextInbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRegistered(); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRegistered(); } }); } return this; } private void invokeChannelRegistered() { try { ((ChannelInboundHandler) handler).channelRegistered(this); } catch (Throwable t) { notifyHandlerException(t); } }
ChannelRegistered是一个Inbound事件,因此会按照head->tail的顺序执行所有的inbound处理器,目前有三个处理器:head-> ChannelInitializer ->tail,ChannelInitializer和tail都是inbound处理器,所以看一下ChannelInitializer的invokeChannelRegistered方法
//ChannelInitializer public final void channelRegistered(ChannelHandlerContext ctx) throws Exception { boolean removed = false; boolean success = false; try { initChannel((C) ctx.channel()); ctx.pipeline().remove(this); removed = true; ctx.fireChannelRegistered(); success = true; } catch (Throwable t) { logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t); } finally { if (!removed) { ctx.pipeline().remove(this); } if (!success) { ctx.close(); } } } }
a) initChannel方法是在此处实例化内部类ChannelInitializer实现的
b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( //new LoggingHandler(LogLevel.INFO), new EchoClientHandler(firstMessageSize)); } });
b) 接着调用EchoClientHandler和tail的channelRegistered方法,都没有做啥实质性的事情,最后以tail的空实现结束。
//AbstractUnsafe if (isActive()) { pipeline.fireChannelActive(); } // NioSocketChannel public boolean isActive() { SocketChannel ch = javaChannel(); return ch.isOpen() && ch.isConnected(); }由于此时还没有执行connect操作,所以isActive返回false,不会执行pipeline.fireChannelActive()
2、 接下来分析doConnect0方法
//Bootstrap private static void doConnect0( final ChannelFuture regFuture, final Channel channel, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { if (localAddress == null) { channel.connect(remoteAddress, promise); } else { channel.connect(remoteAddress, localAddress, promise); } promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }channel.connect(remoteAddress, promise)调用AbstractChannel的方法,channel里会调用管道的方法
//AbstractChannel public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { return pipeline.connect(remoteAddress, promise); } // DefaultChannelPipeline public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { return tail.connect(remoteAddress, promise); }由于connect是一个Outbound事件,所以按照tail到head的顺序执行所有的outBound处理器。目前共有Head->EchoClientHandler->tail三个处理器,而只有Head是outbound处理器
//HeadHandler public void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.connect(remoteAddress, localAddress, promise); }看一下AbstractNioUnsafe的connect方法
//AbstractNioUnsafe public void connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { if (!ensureOpen(promise)) { return; } try { if (connectPromise != null) { throw new IllegalStateException("connection attempt already made"); } boolean wasActive = isActive(); if (doConnect(remoteAddress, localAddress)) { promise.setSuccess(); if (!wasActive && isActive()) { pipeline().fireChannelActive(); } } else { connectPromise = promise; requestedRemoteAddress = remoteAddress; // Schedule connect timeout. int connectTimeoutMillis = config().getConnectTimeoutMillis(); if (connectTimeoutMillis > 0) { connectTimeoutFuture = eventLoop().schedule(new Runnable() { @Override public void run() { ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise; ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress); if (connectPromise != null && connectPromise.tryFailure(cause)) { close(voidPromise()); } } }, connectTimeoutMillis, TimeUnit.MILLISECONDS); } promise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isCancelled()) { if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); } connectPromise = null; close(voidPromise()); } } }); } } catch (Throwable t) { if (t instanceof ConnectException) { Throwable newT = new ConnectException(t.getMessage() + ": " + remoteAddress); newT.setStackTrace(t.getStackTrace()); t = newT; } closeIfClosed(); promise.tryFailure(t); } }重点分析一下doConnect方法,由NioSocketChannel实现
//NioSocketChannel protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { if (localAddress != null) { javaChannel().socket().bind(localAddress); } boolean success = false; try { boolean connected = javaChannel().connect(remoteAddress); if (!connected) { selectionKey().interestOps(SelectionKey.OP_CONNECT); } success = true; return connected; } finally { if (!success) { doClose(); } } }
boolean connected = javaChannel().connect(remoteAddress)此处就向服务端发起了connect请求,准备三次握手。由于是非阻塞模式,所以该方法会立即返回。如果建立连接成功,则返回true,否则返回false,后续需要使用select来检测连接是否已建立成功。如果返回false,此种情况就需要将ops设置为SelectionKey.OP_CONNECT,等待connect的select事件通知,然后调用finishConnect方法。
3、 最后分析客户端线程NioEventLoop的select接收到connect事件后的处理逻辑
//NioEventLoop private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { // Connection already closed - no need to handle write. return; } } if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } } catch (CancelledKeyException e) { unsafe.close(unsafe.voidPromise()); } }
//AbstractUnsafe public void finishConnect() { // Note this method is invoked by the event loop only if the connection attempt was // neither cancelled nor timed out. assert eventLoop().inEventLoop(); assert connectPromise != null; try { boolean wasActive = isActive(); doFinishConnect(); connectPromise.setSuccess(); if (!wasActive && isActive()) { pipeline().fireChannelActive(); } } catch (Throwable t) { if (t instanceof ConnectException) { Throwable newT = new ConnectException(t.getMessage() + ": " + requestedRemoteAddress); newT.setStackTrace(t.getStackTrace()); t = newT; } connectPromise.setFailure(t); closeIfClosed(); } finally { // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used // See https://github.com/netty/netty/issues/1770 if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); } connectPromise = null; } }
//NioSocketChannel protected void doFinishConnect() throws Exception { if (!javaChannel().finishConnect()) { throw new Error(); } }
- 创建套接字SocketChannel
- 设置套接字为非阻塞
- 设置channel当前感兴趣的事件为SelectionKey.OP_READ
- 创建作用于SocketChannel的管道Pipeline,该管道中此时的处理器链表为:Head(outbound)->tail(inbound)。
- 设置SocketChannel的options和attrs。
- 为管道增加一个Inbound处理器ChannelInitializer。经过此步骤后,管道中的处理器链表为:head(outbound)->ChannelInitializer(inbound)->tail(inbound)。注意ChannelInitializer的实现方法initChannel,里面会当channelRegisgered事件发生时将EchoClientHandler加入到管道中。
- 启动客户端线程,并将register0任务加入到线程的任务队列中。而register0任务做的事情为:将SocketChannel、0、注册到selector中并得到对应的selectionkey。然后通过回调,将doConnect0任务加入到线程的任务队列中。线程从启动到现在这段时间内,任务队列的变化如下:register0任务->register0任务,doConnect0任务-> doConnect0任务
- 通过channelRegistered事件,将EchoClientHandler加入到管道中,并移除ChannelInitializer,经过此步骤后,管道中的处理器链表为:head(outbound)-> EchoClientHandler (inbound)->tail(inbound)。管道从创建到现在这段时间内,处理器链表的变化历史为:head->tail,head->ChannelInitializer(inbound)->tail,head-> EchoClientHandler (inbound)->tail
- doConnect0任务会触发connect事件,connect是一个Outbound事件,headHandler通过调用AbstractNioUnsafe的方法向服务端发起connect请求,并设置ops为SelectionKey.OP_CONNECT
- 客户端线程NioEventLoop中的select接收到connect事件后,将SelectionKey.OP_CONNECT从ops中移除,然后调用finishConnect方法完成连接的建立。到此,connect就正式建立了。
- 最后触发ChannelActive事件。
总的来说,Netty 源码分析涉及了网络编程、并发处理、事件驱动、协议编解码等多个领域,对理解 Java 高性能网络应用开发有着重要的指导意义。通过阅读源码,我们可以更深入地了解 Netty 如何实现高效的网络通信,并...
然后,通过分析RocketMQ的源码,观察它是如何利用Netty来实现网络通信的。可以尝试自己编写简单的Netty服务器和客户端,模拟RocketMQ的消息发布和消费过程,加深理解。 总的来说,Netty作为强大的网络通信框架,为...
NIO客户端13 3.Netty源码分析16 3.1. 服务端创建16 3.1.1. 服务端启动辅助类ServerBootstrap16 3.1.2. NioServerSocketChannel 的注册21 3.1.3. 新的客户端接入25 3.2. 客户端创建28 3.2.1. 客户端连接辅助类...
Netty源码分析 ##### 3.1. 服务端创建 Netty是一个高性能的网络应用程序框架,其核心优势在于异步事件驱动的I/O处理机制。接下来我们将深入分析Netty服务端的创建过程。 ###### 3.1.1. 服务端启动辅助类...
在本篇 MQTT---HiveMQ 源码详解中,我们重点关注的是 Netty 在处理 MQTT 消息和事件方面的...对于深入理解 MQTT 服务端的工作原理以及如何利用 HiveMQ 构建安全、可扩展的 MQTT 服务,阅读和分析源码是非常有价值的。
不过,这只是基础,深入研究Netty源码可以帮助我们更好地理解和优化系统性能,例如了解其内部的事件调度机制、缓冲区管理以及线程模型等。 最后,标签中的"源码"提示我们要关注Netty的底层实现,通过阅读和分析源码...
深入研究Netty源码,我们可以了解其高效、可扩展的设计,以及如何利用Java NIO和线程池优化性能。同时,熟悉WebSocket规范(RFC 6455)有助于更好地理解和调试问题。 最后,"工具"标签可能指的是使用Netty和其他...
7. 源码分析 分析Lettuce的源码有助于理解其内部机制,如命令的序列化、网络通信过程、事件处理流程等。例如,`RedisAsyncCommands`中的方法是如何包装为`CompletableFuture`,以及Netty的`ChannelHandlerContext`...
3. **网络通信**:分析Kafka的Netty为基础的网络层实现,探讨Request/Response模式,以及如何实现高效的I/O处理和连接管理。 4. **Producer工作原理**:详述生产者如何发送消息,包括Partitioner策略、批处理和延迟...