Server bind之后,就可以对外提供服务了。Netty使用了reactor模式来提升服务的并发处理能力。boss线程负责监听新的连接请求,当有新的连接进来时,将对应的channel指派一个worker线程来处理。Worker线程负责对该Channel的读写操作。
一.Boss线程
1.阻塞Select
for (;;) {
try {
// Boss线程专门负责监听新入连接,所以阻塞select
selector.select();
// 如果有新连接,先把key清掉
selector.selectedKeys().clear();
// 循环请求队列,处理连接
for (;;) {
SocketChannel acceptedSocket = channel.socket.accept();
if (acceptedSocket == null) {
break;
}
registerAcceptedChannel(acceptedSocket, currentThread);
}
......
}
2.注册新连接
private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
......
//根据用户自定义的的PipelineFactory创建pipeline
ChannelPipeline pipeline =
channel.getConfig().getPipelineFactory().getPipeline();
//hash分配worker线程,默认使用递增循环worker数组方式
NioWorker worker = nextWorker();
//将新的连接注册到worker线程,让worker线程负责后续读写
//新的channel是主channel的子channel,而PipelineSink和主channel是同一个
worker.register(new NioAcceptedSocketChannel(
channel.getFactory(), pipeline, channel,
NioServerSocketPipelineSink.this, acceptedSocket,
worker, currentThread), null);
......
}
void register(AbstractNioChannel<?> channel, ChannelFuture future) {
synchronized (startStopLock) {
......
//创建注册通道的任务
Runnable registerTask = createRegisterTask(channel, future);
//提交任务到阻塞队列
boolean offered = registerTaskQueue.offer(registerTask);
//唤醒selector
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
}
3.创建注册任务
protected Runnable createRegisterTask(AbstractNioChannel<?> channel, ChannelFuture future) {
boolean server = !(channel instanceof NioClientSocketChannel);
return new RegisterTask((NioSocketChannel) channel, future, server);
}
二.worker线程
worker线程负责对应channel的读写操作,一个worker对应一个selector,会同时处理多个channel的读写。
1.主循环
for (;;) {
wakenUp.set(false);
......
if (wakenUp.get()) {
wakenupFromLoop = true;
selector.wakeup();
} else {
wakenupFromLoop = false;
}
cancelledKeys = 0;
//处理注册通道的任务
processRegisterTaskQueue();
//处理异步事件,比如writeComplete事件
processEventQueue();
//处理写数据任务,如果业务线程有异步写的时候,会有WriteTask放入队列
processWriteTaskQueue();
//处理IO准备好的那些channel
processSelectedKeys(selector.selectedKeys());
......
}
2.RegisterTask执行
public void run() {
......
//如果是server,则使用异步模式
if (server) {
channel.channel.configureBlocking(false);
}
//将新的channel注册到worker线程的selector上,默认监听READ事件
synchronized (channel.interestOpsLock) {
channel.channel.register(
selector, channel.getRawInterestOps(), channel);
}
......
//触发BOUND的upstream事件,该事件将在用户自定义的pipeline中运行,在这里EchoServerHandler默认不处理该事件
if (server || !((NioClientSocketChannel) channel).boundManually) {
fireChannelBound(channel, localAddress);
}
//触发CONNECTED的upsteam事件,该事件将在用户自定义的pipeline中运行,在这里EchoServerHandler默认不处理该事件
fireChannelConnected(channel, remoteAddress);
......
}
3.处理读写准备好的那些channel
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
SelectionKey k = i.next();
i.remove();
try {
int readyOps = k.readyOps();
//如果某个channel写就位,则读数据
if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
if (!read(k)) {
// Connection already closed - no need to handle write.
continue;
}
}
//如果写就位,则写数据
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
writeFromSelectorLoop(k);
}
} catch (CancelledKeyException e) {
close(k);
}
......
}
4. 读取
//从channel中读取数据到内部的buffer,转换成内部的ChannelBuffer,触发messageReceived事件
protected boolean read(SelectionKey k) {
final SocketChannel ch = (SocketChannel) k.channel();
final NioSocketChannel channel = (NioSocketChannel) k.attachment();
//预测下次读将读取的buffer大小,默认使用自适应的预测算法,如果上次读取把buffer读满,则增大该值,如果连续2次都没读满,则减小该值
//如果以上都不满足,则保持不变,默认长度1024
final ReceiveBufferSizePredictor predictor =
channel.getConfig().getReceiveBufferSizePredictor();
final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
//默认BufferFactory为HeapChannelBufferFactory,默认使用Big Endian字节序
final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();
int ret = 0;
int readBytes = 0;
boolean failure = true;
//从共享pool中拿配额,从channel中读取对应数据
ByteBuffer bb = recvBufferPool.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder());
try {
while ((ret = ch.read(bb)) > 0) {
readBytes += ret;
if (!bb.hasRemaining()) {
break;
}
}
failure = false;
}
......
//有数据读入,则转换成自己的ChannelBuffer,并触发messageReceived事件,该事件将在用户自定义的Pipeline中执行
if (readBytes > 0) {
bb.flip();
//构造一个ChannelBuffer,默认使用堆内的数组实现
final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
//复制数据到channelBuffer
buffer.setBytes(0, bb);
//写游标
buffer.writerIndex(readBytes);
// 修改预测器的下次读取buffer大小
predictor.previousReceiveBufferSize(readBytes);
// 触发messageReceived事件
fireMessageReceived(channel, buffer);
}
......
}
5.EchoServerHandler接受消息
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
//通过channel将数据写回
e.getChannel().write(e.getMessage());
}
6.数据写回,write方法其实是触发一个Downsteam事件
public static ChannelFuture write(Channel channel, Object message, SocketAddress remoteAddress) {
ChannelFuture future = future(channel);
channel.getPipeline().sendDownstream(
new DownstreamMessageEvent(channel, future, message, remoteAddress));
return future;
}
7.ChannelPipeline中的处理
public void sendDownstream(ChannelEvent e) {
DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
//如果handler已经处理完了,则转发到ChannelSink处理,对于nioserver来说就是NioServerSocketPipelineSink
if (tail == null) {
try {
getSink().eventSunk(this, e);
return;
} catch (Throwable t) {
notifyHandlerException(e, t);
return;
}
}
//否则,继续调用其他handler
sendDownstream(tail, e);
}
8.NioServerSocketPipelineSink中处理channel事件
else if (e instanceof MessageEvent) {
MessageEvent event = (MessageEvent) e;
NioSocketChannel channel = (NioSocketChannel) event.getChannel();
//先放入写任务队列
boolean offered = channel.writeBufferQueue.offer(event);
assert offered;
//最后还是要通过work来写回数据
channel.worker.writeFromUserCode(channel);
}
9.worker线程的处理
void writeFromUserCode(final AbstractNioChannel<?> channel) {
......
//如果业务方使用了业务线程异步写,则直接往worker线程的写队列扔一个WriteTask任务
if (scheduleWriteIfNecessary(channel)) {
return;
}
......
//如果业务方没有使用业务线程异步写,说明现在还在netty的Worker线程中,直接写
write0(channel);
}
10.Worker线程直接写
protected void write0(AbstractNioChannel<?> channel) {
boolean open = true;
boolean addOpWrite = false;
boolean removeOpWrite = false;
//循环写入,如果都写成功了,则将去掉该channel在selector中注册的WRITE事件监听
for (;;) {
MessageEvent evt = channel.currentWriteEvent;
SendBuffer buf = null;
ChannelFuture future = null;
try {
if (evt == null) {
//从队列中拿需要写回的数据内容,如果没有了,则认为写成功了
if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
removeOpWrite = true;
channel.writeSuspended = false;
break;
}
future = evt.getFuture();
//将ChannelBuffer转换成ByteBuffer,此处使用PooledSendBuffer
channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
} else {
future = evt.getFuture();
buf = channel.currentWriteBuffer;
}
long localWrittenBytes = 0;
for (int i = writeSpinCount; i > 0; i --) {
//将Buffer里的数据写出,因为是异步channel,如果socket的write队列满,会导致写处返回0,则重试
localWrittenBytes = buf.transferTo(ch);
//有数据写出就返回,不管是否全部写出
if (localWrittenBytes != 0) {
writtenBytes += localWrittenBytes;
break;
}
if (buf.finished()) {
break;
}
}
//如果全部写出,则通知调用方
if (buf.finished()) {
// Successful write - proceed to the next message.
buf.release();
channel.currentWriteEvent = null;
channel.currentWriteBuffer = null;
evt = null;
buf = null;
future.setSuccess();
}
//如果还没写完,则需要让selector也关心这个channel的write事件,让write就位时,继续写
else {
// Not written fully - perhaps the kernel buffer is full.
addOpWrite = true;
channel.writeSuspended = true;
......
}
}
......
}
channel.inWriteNowLoop = false;
//让selector监听write事件
if (addOpWrite) {
setOpWrite(channel);
}
//写成功后,把write监听去掉
else if (removeOpWrite) {
clearOpWrite(channel);
}
}
//如果worker线程直接写,直接触发writeComplete upstream事件,让handler处理
if (iothread) {
fireWriteComplete(channel, writtenBytes);
}
//如果是业务线程异步写,将通过worker线程的eventQueue实现异步延时触发writeComplete事件
else {
fireWriteCompleteLater(channel, writtenBytes);
}
}
11.Worker线程异步写,当业务方使用多线程处理时,写回的动作对worker来说是异步的
12.业务线程放入写任务队列
if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
boolean offered = writeTaskQueue.offer(channel.writeTask);
assert offered;
}
13.worker线程执行写任务
private void processWriteTaskQueue() throws IOException {
for (;;) {
final Runnable task = writeTaskQueue.poll();
if (task == null) {
break;
}
task.run();
cleanUpCancelledKeys();
}
}
14.WriteTask执行
private final class WriteTask implements Runnable {
WriteTask() {
}
public void run() {
writeTaskInTaskQueue.set(false);
worker.writeFromTaskLoop(AbstractNioChannel.this);
}
}
15.worker线程执行数据写入
void writeFromSelectorLoop(final SelectionKey k) {
AbstractNioChannel<?> ch = (AbstractNioChannel<?>) k.attachment();
ch.writeSuspended = false;
write0(ch);
}
分享到:
相关推荐
《深入浅出Netty》是针对Java网络编程框架Netty的一份详细讲解材料,由淘宝工程师精心制作,特别适合初学者。Netty是一款高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。...
资源名称:深入浅出Netty资源截图: 资源太大,传百度网盘了,链接在附件中,有需要的同学自取。
《深入浅出Netty》是一本专注于讲解Netty框架的编程指南,非常适合初学者入门。Netty是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。这本书通过详实的代码案例,...
《深入浅出Netty_netty5.0_》是针对Netty 5.0版本的一本详细教程,旨在帮助读者理解并熟练运用这一强大的网络编程框架。Netty是一个开源的Java框架,它为开发高效、稳定、可扩展的网络应用提供了全面的支持。在本文...
最近几年,Netty社区的发展如火如荼,无论是大数据领域,还是微服务架构,底层都需要一个高效的分布式通信框架作为基础组件。 Netty凭借优异的性能、灵活的可扩展新得到了广泛的应用。短短几年间,Netty已经成为...
这个“深入浅出Netty文档”压缩包包含了一份详细的PDF文档,旨在帮助开发者深入理解Netty的工作原理及其在实际应用中的使用。 Netty 的核心是它的事件驱动模型,基于Java NIO(非阻塞I/O)实现。它提供了一种优雅的...
《深入浅出Netty》是针对Java网络编程框架Netty的一份详细讲解文档,由阿里巴巴的专家编写。Netty是一个高性能、异步事件驱动的网络应用程序框架,它为开发可维护的高性能协议服务器和客户端提供了丰富的组件和API。...
Netty通过利用Java的NIO类库,提供了高效的网络处理能力,并且具有很高的可定制性和扩展性,它支持快速开发可维护的高性能协议服务器和客户端。 Netty的设计目标是让网络应用开发变得更加简单,它抽象了网络编程中...
在本文中,我们将深入探讨如何使用 Netty 实现 WebSocket 服务器,以及与 Java 客户端进行交互的过程。 WebSocket 协议是一种在浏览器和服务器之间建立长连接的协议,它允许双向通信,即服务器可以主动向客户端推送...
李林锋,2007 年毕业...软件的设计和开发工作,有7 年NIO 设计和开发经验,精通Netty、Mina 等 NIO 框架和平台中间件,现任华为软件平台开放实验室架构师,《Netty 权威 指南》、《分布式服务框架原理与实践》作者。
Netty 是一个高性能、异步事件驱动的网络应用程序框架,专为 Java 平台...《深入浅出Netty.pdf》这本书很可能会详细讲解这些知识点,包括理论基础、实践示例以及高级特性的使用,对于理解和掌握 Netty 有着极大的帮助。
本项目“rtmpServer-master_nettyrtmp_rtmp推流_netty开发rtmp_rtmpServer-master”是针对RTMP协议开发的一个服务器端实现,它基于强大的Java网络库Netty。以下将详细介绍该服务器的开发背景、功能、核心技术和应用...