本文为原创,转载请注明出处
netty4源码分析-socket
服务端启动的第一步必须先创建一个监听套接字ServerSocketChannel,该过程是由ChannelFuture f = b.bind(port)中的bind触发。下面详细分析其过程:
Bind源码如下,代码位于ServerBootstrap的父类AbstractBootstrap
//AbstractBootstrap
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
validate()方法的作用为:校验:bossGroup、BootstrapChannelFactory、childHandler非空。如果childGroup为空,则复用bossGroup,将bossGroup赋值给childGroup。
接着来看doBind的逻辑:
//AbstractBootstrap
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regPromise = initAndRegister();
final Channel channel = regPromise.channel();
final ChannelPromise promise = channel.newPromise();
if (regPromise.isDone()) {
doBind0(regPromise, channel, localAddress, promise);
} else {
regPromise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doBind0(future, channel, localAddress, promise);
}
});
}
return promise;
}
重点分析里面的initAndRegister()方法
//AbstractBootstrap
final ChannelFuture initAndRegister() {
final Channel channel = channelFactory().newChannel();
try {
init(channel);
} catch (Throwable t) {
channel.unsafe().closeForcibly();
return channel.newFailedFuture(t);
}
ChannelPromise regPromise = channel.newPromise();
group().register(channel, regPromise);
if (regPromise.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regPromise;
}
a)首先分析以下代码:
final Channel channel = channelFactory().newChannel()
channelFactory()方法返回之前创建的BootstrapChannelFactory,里面的newChannel()方法会根据反射创建一个ServerSocketChannel
//BootstrapChannelFactory
public T newChannel() {
try {
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
注:clazz是在服务端启动的这段代码(b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)中设置的。
clazz.newInstance()会调用NioServerSocketChannel的默认构造函数
// NioServerSocketChannel
public NioServerSocketChannel() {
super(null, newSocket(), SelectionKey.OP_ACCEPT);
config = new DefaultServerSocketChannelConfig(this, javaChannel().socket());
}
private static ServerSocketChannel newSocket() {
try {
return ServerSocketChannel.open();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
注意newSocket中的这行代码:
return ServerSocketChannel.open();
此处就是服务端监听套接字ServerSocketChannel创建的地方。
既然是使用NIO,那么设置创建的ServerSocketChannel为非阻塞是在哪个地方发生的呢?看下这行代码
super(null, newSocket(), SelectionKey.OP_ACCEPT);
它会对NioServerSocketChannel的父类进行初始化:NioServerSocketChannel的父类是AbstractNioMessageChannel,其构造方法仅仅初始化其父类AbstractNioChannel,父类构造方法如下:
//AbstractNioChannel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
ch.configureBlocking(false)此处就将之前创建的ServerSocketChannel设置为非阻塞模式。
该方法里还有三点需要注意:
1、super(parent)会调用AbstractNioChannel的父类AbstractChannel的构造方法
// AbstractChannel.java
protected AbstractChannel(Channel parent) {
this.parent = parent;
unsafe = newUnsafe();
pipeline = new DefaultChannelPipeline(this);
}
newUnsafe()是由子类AbstractNioMessageChannel实现的,里面实例化了一个内部类NioMessageUnsafe(注:该类很重要,里面定义了read方法,会触发accept的调用,后面对其重点分析)。
2、this.readInterestOp = readInterestOp:设置channel的ops为SelectionKey.OP_ACCEPT(值为16)
3、pipeline = new DefaultChannelPipeline(this),创建作用于ServerSocketChannel的管道Pipeline
// DefaultChannelPipeline
public DefaultChannelPipeline(Channel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
this.channel = channel;
TailHandler tailHandler = new TailHandler();
tail = new DefaultChannelHandlerContext(this, null, generateName(tailHandler), tailHandler);
HeadHandler headHandler = new HeadHandler(channel.unsafe());
head = new DefaultChannelHandlerContext(this, null, generateName(headHandler), headHandler);
head.next = tail;
tail.prev = head;
}
DefaultChannelPipeline维护了一个以DefaultChannelHandlerContext为元素的双向链表结构,Head是一个Outbound处理器,而tail是一个Inbound处理器。经过此步骤后,管道中的处理器链表为:Head->tail。
b)再来分析以下代码
init(channel)
该方法由子类ServerBootstrap实现
// ServerBootstrap.java
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
channel.config().setOptions(options);
}
final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
if (handler() != null) {
p.addLast(handler());
}
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
该方法主要做了两件事:
1、设置NioServerSocketChannel的options和attrs,并存储之后用于SocketChannel的options和attrs。
2、为NioServerSocketChannel对应的管道增加一个Inbound处理器ChannelInitializer。经过此步骤后,管道中的处理器链表为:head(outbound)->ChannelInitializer(inbound)->tail(inbound)。注意ChannelInitializer的实现方法initChannel,里面会当channelRegistered事件发生时将ServerBootstrapAcceptor加入到管道中。
c) 最后分析 以下代码:
group().register(channel, regPromise);
实际是调用MultithreadEventLoopGroup的register方法
//MultithreadEventLoopGroup
public ChannelFuture register(Channel channel, ChannelPromise promise) {
return next().register(channel, promise);
}
next方法从bossGroup中选择一个EventExecutor(它实际是一个SingleThreadEventLoop),然后执行register方法
//SingleThreadEventLoop
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
if (channel == null) {
throw new NullPointerException("channel");
}
if (promise == null) {
throw new NullPointerException("promise");
}
channel.unsafe().register(this, promise);
return promise;
}
channel.unsafe().register(this, promise)这里会调用AbstractChannel的内部类AbstractUnsafe的register方法
//AbstractUnsafe
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
closeForcibly();
promise.setFailure(t);
}
}
}
此处开启了eventloop中的线程(即启动了boss线程),并将register0任务加入到boss线程的队列中。经过此步骤后,boss线程的任务队列仅含有一个任务,即register0任务,且正在被执行。
接着分析register0任务具体干了什么事情
//AbstractUnsafe
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!ensureOpen(promise)) {
return;
}
Runnable postRegisterTask = doRegister();
registered = true;
promise.setSuccess();
pipeline.fireChannelRegistered();
if (postRegisterTask != null) {
postRegisterTask.run();
}
if (isActive()) {
pipeline.fireChannelActive();
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
if (!promise.tryFailure(t)) {
logger.warn(
"Tried to fail the registration promise, but it is complete already. " +
"Swallowing the cause of the registration failure:", t);
}
closeFuture.setClosed();
}
}
protected Runnable doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return null;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}
看一下doRegister代码,看到这行代码了吧
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
它会调用java.nio.channels.spi. AbstractSelectableChannel的register方法, 将ServerSocketChannel、0、以及this注册到selector中并得到对应的selectionkey。
接着再看register0方法中的promise.setSuccess(),将promise设置为success,就会触发异步回调,回调之前main函数所在的线程中为ChannelPromise添加的listner,即AbstractBootstrap的以下代码:
//AbstractBootstrap.java
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regPromise = initAndRegister();
final Channel channel = regPromise.channel();
final ChannelPromise promise = channel.newPromise();
if (regPromise.isDone()) {
doBind0(regPromise, channel, localAddress, promise);
} else {
regPromise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doBind0(future, channel, localAddress, promise);
}
});
}
return promise;
}
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
经过此步骤后,boss线程的任务队列数量由原来的1个增加到了2个,即正在执行的register0任务以及本次新增的bind任务。Bind任务在下一篇文章中进行分析。
再接着分析register0任务中的此行代码pipeline.fireChannelRegistered()
//DefaultChannelPipeline
public ChannelPipeline fireChannelRegistered() {
head.fireChannelRegistered();
return this;
}
//DefaultChannelHandlerContext
public ChannelHandlerContext fireChannelRegistered() {
final DefaultChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
return this;
}
private void invokeChannelRegistered() {
try {
((ChannelInboundHandler) handler).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
ChannelRegistered是一个Inbound事件,因此会按照head->tail的顺序执行所有的inbound处理器,目前有三个处理器:head-> ChannelInitializer ->tail->,ChannelInitializer和tail都是inbound处理器,所以看一下ChannelInitializer的invokeChannelRegistered方法.
//ChannelInitializer
public final void channelRegistered(ChannelHandlerContext ctx)
throws Exception {
boolean removed = false;
boolean success = false;
try {
initChannel((C) ctx.channel());
ctx.pipeline().remove(this);
removed = true;
ctx.fireChannelRegistered();
success = true;
} catch (Throwable t) {
logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
} finally {
if (!removed) {
ctx.pipeline().remove(this);
}
if (!success) {
ctx.close();
}
}
}
}
该方法主要做了以下几件事:
1、initChannel方法是在ServerBootstrap中执行Init方法时,实例化内部类ChannelInitializer实现的
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
其功能就是将ServerBootstrapAcceptor作为一个inbound处理器加入到管道中,此时的处理器链表为: Head->ChannelInitializer->ServerBootstrapAcceptor->tail
2、 然后ChannelInitializer将自己从管道中删除,此时的处理器链表变为:Head->ServerBootstrapAcceptor->tail
3、 接着调用ServerBootstrapAcceptor和tail的channelRegistered方法,都没有做啥实质性的事情,最后以tail的空实现结束。
再分析register0任务中的以下代码
//AbstractUnsafe
if (isActive())
{
pipeline.fireChannelActive();
}
// NioServerSocketChannel
public boolean isActive() {
return javaChannel().socket().isBound();
}
由于对于监听套接字还没有执行bind操作,所以isActive返回false,不会执行pipeline.fireChannelActive()该行代码。执行完此代码后,register0任务就执行完了,boss线程中的任务队列中仅剩下bind任务。
总结:initAndRegister()方法主要做了以下几件事情:
1、 创建服务端监听套接字ServerSocketChannel
2、 设置监听套接字为非阻塞
3、 设置channel当前感兴趣的事件为SelectionKey.OP_ACCEPT(值为16)
4、 创建作用于ServerSocketChannel的管道Pipeline,该管道中此时的处理器链表为:Head(outbound)->tail(inbound)。
5、 设置NioServerSocketChannel的options和attrs,并存储之后用于SocketChannel的options和attrs
6、 为NioServerSocketChannel对应的管道增加一个Inbound处理器ChannelInitializer。经过此步骤后,管道中的处理器链表为:head(outbound)->ChannelInitializer(inbound)->tail(inbound)。注意ChannelInitializer的实现方法initChannel,里面会当channelRegisgered事件发生时将ServerBootstrapAcceptor加入到管道中。
7、 启动了boss线程,并将register0任务加入到boss线程的队列中。而register0做的事情为:将ServerSocketChannel、0、注册到selector中并得到对应的selectionkey。然后触发绑定端口的操作,将bind任务加入到boss线程的任务队列中,该内容在下一篇文章中分析。
8、通过channelRegistered事件,将ServerBootstrapAcceptor加入到管道中,并移除ChannelInitializer,经过此步骤后,管道中的处理器链表为:head(outbound)-> ServerBootstrapAcceptor (inbound)->tail(inbound)。
管道从创建到现在这段时间内,处理器链表的变化历史为:
head->tail
head->ChannelInitializer(inbound)->tail
head->ServerBootstrapAcceptor(inbound)->tail
分享到:
相关推荐
Netty 的源码分析主要包括以下几个方面: 1. **I/O 模型**:理解 Netty 如何利用 NIO 实现非阻塞 I/O,包括 Channel、Selector 和 Socket 的工作原理。 2. **Pipeline 实现**:分析 Handler 的添加、移除和调用链的...
通过分析和学习Netty 4.1的源码,我们可以深入理解其设计理念,如如何实现高效的并发处理,如何优雅地处理I/O事件,以及如何设计灵活的网络协议处理器。这对于开发高性能网络应用,特别是分布式系统和微服务架构,...
深入源码分析,我们可以看到Netty如何优雅地处理了线程安全、内存池管理、心跳机制、解码编码、零拷贝等高级特性。例如,Netty通过内部的DirectBufferPool和HeapBufferPool实现了内存池,减少了内存分配和释放的开销...
#### 五、Netty源码分析实战案例 1. **ChannelHandlerContext与ChannelHandlerAdaptor详解**: - 分析`ChannelHandlerContext`的生命周期及其与`ChannelHandler`之间的交互方式。 - 深入理解`...
java后端源码部署 ...源码和技术架构分析详情见 :backhand_index_pointing_right: 运行 eclipse/IDEA里 Step1 右键run as--java application-- FpushServerApp.java Step2 右键run as--java application-- FpushCl
提供的源码可能包含了Android客户端如何建立和维持与Netty服务器的Socket长连接,以及双方如何交换数据的示例。 在分析源码时,应关注以下几个关键点: 1. Android客户端如何创建Socket连接并保持连接。 2. 如何在...
Netty是一款高性能的网络应用程序框架,它使用Java编程语言开发,主要用于网络应用程序的快速和易于开发,支持TCP和UDP...通过对Netty源码的深入分析,可以更好地理解其工作机制,对开发高性能的网络应用有极大的帮助。
NIO客户端13 3.Netty源码分析16 3.1. 服务端创建16 3.1.1. 服务端启动辅助类ServerBootstrap16 3.1.2. NioServerSocketChannel 的注册21 3.1.3. 新的客户端接入25 3.2. 客户端创建28 3.2.1. 客户端连接辅助类...
由于Netty源码较庞大,所以文章建议通过分析几个关键组件来构建对整个框架的理解,从而避免在庞大的源码海洋中迷失方向。通过理解Netty的关键点,包括NIO的使用、事件处理机制、ChannelPipeline的运作以及Handler的...
在IT行业中,网络通信是至关重要的一个领域,特别是在分布式系统和高性能应用中。Netty作为一个高性能、异步事件驱动的网络应用...通过阅读和分析NettyDemo-master中的源码,开发者能够进一步提升自己的网络编程技能。
- 通过源码分析,解决实际开发中遇到的问题,提高故障排查能力。 此外,书中例子代码可以帮助你更好地理解和实践这些概念,而提供的 jar 包则可以直接用于你的项目,快速构建基于 Netty 的网络服务。 总之,...
- **源码分析**: - **初始化过程**:分析如何创建Channel、EventLoop等组件,并配置ChannelPipeline。 - **事件循环机制**:深入了解EventLoop的工作原理,包括如何调度任务、处理I/O事件等。 - **编解码器设计*...
47_Netty服务器与客户端编码模式回顾及源码分析准备 48_Netty与NIO系统总结及NIO与Netty之间的关联关系分析 49_零拷贝深入剖析及用户空间与内核空间切换方式 50_零拷贝实例深度剖析 51_NIO零拷贝彻底分析与Gather...
6)Netty源码分析 ByteBuf工作原理 Channel, Unsafe ChannelPipline, ChannelHandler EventLoop, EventLoopGroup Future, Promise 7) Netty逻辑架构 8)Netty中的多线程编程 9)Netty与RPC 10)Netty的可靠性 ...
88_ReplayingDecoder源码分析与特性解读;89_Netty常见且重要编解码器详解;90_TCP粘包与拆包实例演示及分析;91_Netty自定义协议与TCP粘包拆包问题解决之道;92_精通并发与Netty课程总结与展望