Server bind之后,就可以对外提供服务了。Netty使用了reactor模式来提升服务的并发处理能力。boss线程负责监听新的连接请求,当有新的连接进来时,将对应的channel指派一个worker线程来处理。Worker线程负责对该Channel的读写操作。
一.Boss线程
1.阻塞Select
for (;;) {
try {
// Boss线程专门负责监听新入连接,所以阻塞select
selector.select();
// 如果有新连接,先把key清掉
selector.selectedKeys().clear();
// 循环请求队列,处理连接
for (;;) {
SocketChannel acceptedSocket = channel.socket.accept();
if (acceptedSocket == null) {
break;
}
registerAcceptedChannel(acceptedSocket, currentThread);
}
......
}
2.注册新连接
private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
......
//根据用户自定义的的PipelineFactory创建pipeline
ChannelPipeline pipeline =
channel.getConfig().getPipelineFactory().getPipeline();
//hash分配worker线程,默认使用递增循环worker数组方式
NioWorker worker = nextWorker();
//将新的连接注册到worker线程,让worker线程负责后续读写
//新的channel是主channel的子channel,而PipelineSink和主channel是同一个
worker.register(new NioAcceptedSocketChannel(
channel.getFactory(), pipeline, channel,
NioServerSocketPipelineSink.this, acceptedSocket,
worker, currentThread), null);
......
}
void register(AbstractNioChannel<?> channel, ChannelFuture future) {
synchronized (startStopLock) {
......
//创建注册通道的任务
Runnable registerTask = createRegisterTask(channel, future);
//提交任务到阻塞队列
boolean offered = registerTaskQueue.offer(registerTask);
//唤醒selector
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
}
3.创建注册任务
protected Runnable createRegisterTask(AbstractNioChannel<?> channel, ChannelFuture future) {
boolean server = !(channel instanceof NioClientSocketChannel);
return new RegisterTask((NioSocketChannel) channel, future, server);
}
二.worker线程
worker线程负责对应channel的读写操作,一个worker对应一个selector,会同时处理多个channel的读写。
1.主循环
for (;;) {
wakenUp.set(false);
......
if (wakenUp.get()) {
wakenupFromLoop = true;
selector.wakeup();
} else {
wakenupFromLoop = false;
}
cancelledKeys = 0;
//处理注册通道的任务
processRegisterTaskQueue();
//处理异步事件,比如writeComplete事件
processEventQueue();
//处理写数据任务,如果业务线程有异步写的时候,会有WriteTask放入队列
processWriteTaskQueue();
//处理IO准备好的那些channel
processSelectedKeys(selector.selectedKeys());
......
}
2.RegisterTask执行
public void run() {
......
//如果是server,则使用异步模式
if (server) {
channel.channel.configureBlocking(false);
}
//将新的channel注册到worker线程的selector上,默认监听READ事件
synchronized (channel.interestOpsLock) {
channel.channel.register(
selector, channel.getRawInterestOps(), channel);
}
......
//触发BOUND的upstream事件,该事件将在用户自定义的pipeline中运行,在这里EchoServerHandler默认不处理该事件
if (server || !((NioClientSocketChannel) channel).boundManually) {
fireChannelBound(channel, localAddress);
}
//触发CONNECTED的upsteam事件,该事件将在用户自定义的pipeline中运行,在这里EchoServerHandler默认不处理该事件
fireChannelConnected(channel, remoteAddress);
......
}
3.处理读写准备好的那些channel
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
SelectionKey k = i.next();
i.remove();
try {
int readyOps = k.readyOps();
//如果某个channel写就位,则读数据
if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
if (!read(k)) {
// Connection already closed - no need to handle write.
continue;
}
}
//如果写就位,则写数据
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
writeFromSelectorLoop(k);
}
} catch (CancelledKeyException e) {
close(k);
}
......
}
4. 读取
//从channel中读取数据到内部的buffer,转换成内部的ChannelBuffer,触发messageReceived事件
protected boolean read(SelectionKey k) {
final SocketChannel ch = (SocketChannel) k.channel();
final NioSocketChannel channel = (NioSocketChannel) k.attachment();
//预测下次读将读取的buffer大小,默认使用自适应的预测算法,如果上次读取把buffer读满,则增大该值,如果连续2次都没读满,则减小该值
//如果以上都不满足,则保持不变,默认长度1024
final ReceiveBufferSizePredictor predictor =
channel.getConfig().getReceiveBufferSizePredictor();
final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
//默认BufferFactory为HeapChannelBufferFactory,默认使用Big Endian字节序
final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();
int ret = 0;
int readBytes = 0;
boolean failure = true;
//从共享pool中拿配额,从channel中读取对应数据
ByteBuffer bb = recvBufferPool.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder());
try {
while ((ret = ch.read(bb)) > 0) {
readBytes += ret;
if (!bb.hasRemaining()) {
break;
}
}
failure = false;
}
......
//有数据读入,则转换成自己的ChannelBuffer,并触发messageReceived事件,该事件将在用户自定义的Pipeline中执行
if (readBytes > 0) {
bb.flip();
//构造一个ChannelBuffer,默认使用堆内的数组实现
final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
//复制数据到channelBuffer
buffer.setBytes(0, bb);
//写游标
buffer.writerIndex(readBytes);
// 修改预测器的下次读取buffer大小
predictor.previousReceiveBufferSize(readBytes);
// 触发messageReceived事件
fireMessageReceived(channel, buffer);
}
......
}
5.EchoServerHandler接受消息
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
//通过channel将数据写回
e.getChannel().write(e.getMessage());
}
6.数据写回,write方法其实是触发一个Downsteam事件
public static ChannelFuture write(Channel channel, Object message, SocketAddress remoteAddress) {
ChannelFuture future = future(channel);
channel.getPipeline().sendDownstream(
new DownstreamMessageEvent(channel, future, message, remoteAddress));
return future;
}
7.ChannelPipeline中的处理
public void sendDownstream(ChannelEvent e) {
DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
//如果handler已经处理完了,则转发到ChannelSink处理,对于nioserver来说就是NioServerSocketPipelineSink
if (tail == null) {
try {
getSink().eventSunk(this, e);
return;
} catch (Throwable t) {
notifyHandlerException(e, t);
return;
}
}
//否则,继续调用其他handler
sendDownstream(tail, e);
}
8.NioServerSocketPipelineSink中处理channel事件
else if (e instanceof MessageEvent) {
MessageEvent event = (MessageEvent) e;
NioSocketChannel channel = (NioSocketChannel) event.getChannel();
//先放入写任务队列
boolean offered = channel.writeBufferQueue.offer(event);
assert offered;
//最后还是要通过work来写回数据
channel.worker.writeFromUserCode(channel);
}
9.worker线程的处理
void writeFromUserCode(final AbstractNioChannel<?> channel) {
......
//如果业务方使用了业务线程异步写,则直接往worker线程的写队列扔一个WriteTask任务
if (scheduleWriteIfNecessary(channel)) {
return;
}
......
//如果业务方没有使用业务线程异步写,说明现在还在netty的Worker线程中,直接写
write0(channel);
}
10.Worker线程直接写
protected void write0(AbstractNioChannel<?> channel) {
boolean open = true;
boolean addOpWrite = false;
boolean removeOpWrite = false;
//循环写入,如果都写成功了,则将去掉该channel在selector中注册的WRITE事件监听
for (;;) {
MessageEvent evt = channel.currentWriteEvent;
SendBuffer buf = null;
ChannelFuture future = null;
try {
if (evt == null) {
//从队列中拿需要写回的数据内容,如果没有了,则认为写成功了
if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
removeOpWrite = true;
channel.writeSuspended = false;
break;
}
future = evt.getFuture();
//将ChannelBuffer转换成ByteBuffer,此处使用PooledSendBuffer
channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
} else {
future = evt.getFuture();
buf = channel.currentWriteBuffer;
}
long localWrittenBytes = 0;
for (int i = writeSpinCount; i > 0; i --) {
//将Buffer里的数据写出,因为是异步channel,如果socket的write队列满,会导致写处返回0,则重试
localWrittenBytes = buf.transferTo(ch);
//有数据写出就返回,不管是否全部写出
if (localWrittenBytes != 0) {
writtenBytes += localWrittenBytes;
break;
}
if (buf.finished()) {
break;
}
}
//如果全部写出,则通知调用方
if (buf.finished()) {
// Successful write - proceed to the next message.
buf.release();
channel.currentWriteEvent = null;
channel.currentWriteBuffer = null;
evt = null;
buf = null;
future.setSuccess();
}
//如果还没写完,则需要让selector也关心这个channel的write事件,让write就位时,继续写
else {
// Not written fully - perhaps the kernel buffer is full.
addOpWrite = true;
channel.writeSuspended = true;
......
}
}
......
}
channel.inWriteNowLoop = false;
//让selector监听write事件
if (addOpWrite) {
setOpWrite(channel);
}
//写成功后,把write监听去掉
else if (removeOpWrite) {
clearOpWrite(channel);
}
}
//如果worker线程直接写,直接触发writeComplete upstream事件,让handler处理
if (iothread) {
fireWriteComplete(channel, writtenBytes);
}
//如果是业务线程异步写,将通过worker线程的eventQueue实现异步延时触发writeComplete事件
else {
fireWriteCompleteLater(channel, writtenBytes);
}
}
11.Worker线程异步写,当业务方使用多线程处理时,写回的动作对worker来说是异步的
12.业务线程放入写任务队列
if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
boolean offered = writeTaskQueue.offer(channel.writeTask);
assert offered;
}
13.worker线程执行写任务
private void processWriteTaskQueue() throws IOException {
for (;;) {
final Runnable task = writeTaskQueue.poll();
if (task == null) {
break;
}
task.run();
cleanUpCancelledKeys();
}
}
14.WriteTask执行
private final class WriteTask implements Runnable {
WriteTask() {
}
public void run() {
writeTaskInTaskQueue.set(false);
worker.writeFromTaskLoop(AbstractNioChannel.this);
}
}
15.worker线程执行数据写入
void writeFromSelectorLoop(final SelectionKey k) {
AbstractNioChannel<?> ch = (AbstractNioChannel<?>) k.attachment();
ch.writeSuspended = false;
write0(ch);
}
分享到:
相关推荐
项目资源包含:可运行源码+数据集+文档 python + numpy, pandas, matplotlib, pyecharts, wordcloud 适用人群:学习不同技术领域的小白或进阶学习者;可作为课程设计、大作业、工程实训或初期项目立项。 数据来源:数据集taxis.csv从网络下载 数据清洗:异常值与缺失值的处理:有一些数据distance(乘车距离)为零而且上下车地点为空,还有些一些数据的payment(支付方式)为空。 数据预处理:将列名更改成中文 标准化与归一化: 数据分析: 数据可视化:
TypeScript 入门教程
人脸识别项目实战
本资源汇总了 历届全国电子设计竞赛(电赛)真题+模拟题,涵盖 电路设计、嵌入式系统、信号处理、自动控制等核心考点,并提供详细解析及综合测评,帮助参赛者高效备赛、查漏补缺、提升实战能力。 适用人群: 适合 准备参加电子设计竞赛的大学生、电赛爱好者、电子信息类相关专业的学生,以及希望提高电子设计和电路分析能力的工程师。 能学到什么: 电赛考察重点:熟悉往届竞赛的命题方向及考核重点。 电路设计与仿真:提升模拟电路、数字电路、单片机等核心技能。 问题分析与解决能力:通过综合测评找到薄弱点并针对性提升。 实战经验:掌握竞赛策略,提高应试效率和设计能力。 阅读建议: 建议先 通读真题,了解题型与解题思路,然后 结合模拟题实战演练,查找不足并通过测评强化练习,逐步提升竞赛能力。
2024人工智能如何塑造未来产业:AI对各行业组织带来的的变革研究研究报告.pdf
人脸识别项目源码实战
给大家分享一套课程——Vulkan原理与实战课程
c语言学习
海豚鲸鱼数据集 5435张图 正确识别率可达92.6% 可识别:海豚 虎鲸 蜥蜴 海豹 鲨鱼 龟 支持yolov8格式标注
答谢中书书教学设计.docx
人脸识别项目源码实战
c语言学习
人脸识别项目源码实战
人脸识别项目实战
本美发门店管理系统有管理员和用户两个角色。用户功能有项目预定管理,产品购买管理,会员充值管理,余额查询管理。管理员功能有个人中心,用户管理,美容项目管理,项目类型管理,项目预定管理,产品库存管理,产品购买管理,产品入库管理,会员卡管理,会员充值管理,余额查询管理,产品类型管理,系统管理等。因而具有一定的实用性。 本站是一个B/S模式系统,采用SSM框架,MYSQL数据库设计开发,充分保证系统的稳定性。系统具有界面清晰、操作简单,功能齐全的特点,使得美发门店管理系统管理工作系统化、规范化。本系统的使用使管理人员从繁重的工作中解脱出来,实现无纸化办公,能够有效的提高美发门店管理系统管理效率。 关键词:美发门店管理系统;SSM框架;MYSQL数据库;Spring Boot 1系统概述 1 1.1 研究背景 1 1.2研究目的 1 1.3系统设计思想 1 2相关技术 2 2.1 MYSQL数据库 2 2.2 B/S结构 3 2.3 Spring Boot框架简介 4 3系统分析 4 3.1可行性分析 4 3.1.1技术可行性 4 3.1.2经济可行性 5 3.1.3操作可行性 5 3.2系
内容概要:本文档介绍了基于SSA-CNN-GRU麻雀算法优化卷积门控循环单元数据分类预测的详细项目实例,重点讲述了该项目的背景、目标、挑战与解决方案、技术特点、应用领域等方面的内容。文档详细记录了从项目启动、数据预处理、算法设计(SSA优化CNN-GRU模型)、构建与评估模型到实现美观的GUI界面整个过程,并讨论了防止过拟合的技术如正则化、早停和超参数优化。另外还涵盖了项目扩展的可能性、部署和应用策略、需要注意的地方以及未来改进的方向。全文强调了模型的泛化能力和计算效率,展示了该混合算法模型在实际应用中的优越性能。 适合人群:具备一定的Python编程经验及机器学习基础知识的研究人员和技术人员;对深度学习、智能优化算法及实际应用感兴趣的学者和从业者;寻求提升数据分析和预测准确性的金融分析师、数据科学家等相关专业人士。 使用场景及目标:本文档非常适合用作学习和参考资料,以掌握如何将SSA、CNN与GRU三种先进技术结合起来进行复杂的分类和预测问题求解。具体应用场景包括但不限于以下几个方面:金融领域——股票价格预测;医疗保健领域——辅助诊断;工业制造——预防性维护;智能家居——个性化服务;以及其他涉及到时序数据分析和多模态数据处理的场合。文档既包含了理论知识又提供了完整的源代码示例,可以帮助读者理解算法原理并通过实践中加深对其的认识。 其他说明:该项目不仅仅是关于算法的设计实现,更是有关于系统的整体架构规划以及工程上的考量,比如环境准备(确保环境洁净、必要包的安装等)、数据准备、GPU配置支持等等。同时文中给出了详细的代码片段,方便开发者理解和复现实验成果。值得注意的是,虽然文中提供了一套通用解决方案,但在真实场景下还需要针对性的调整参数或修改网络结构来达到最好的性能效果。此外,对于追求更高的预测精度或解决更大规模的问题,作者建议进一步探索深度强化学习等高级技术和多任务学习策略,并且考虑使用增量学习让模型能够适应新数据而不必重新训练整个模型。最后提到安全性和隐私保护也是项目实施过程中的重要因素,要妥善保管用户的敏感信息并且做到合法合规地收集和使用数据。
人脸识别项目实战
人脸识别项目实战
水下垃圾检测数据集,基于voc和yolo标注的两种格式,共23,056个文件,已经划分了训练集和验证集、测试集。并且提供了真实水下的视频数据,可以用作视频推理
(参考GUI)MATLAB车辆检测.zip