`
zhang011003
  • 浏览: 5188 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

通过走读netty源码来学习netty(二)(基于netty-4.0.19.Final)

 
阅读更多

今天继续走读netty源码

 

 上次走读到NioEventLoop的register方法,进入NioEventLoop类中,发现register方法并不在该类中,而是在父类SingleThreadEventLoop中,进入父类的对应方法后,发现最终调用的是unsafe的register方法

channel.unsafe().register(this, promise);

 有两个参数的register方法是在AbstractChannel类中定义,继续进入AbstractChannel类中查看,发现调用的是unsafe的register方法

channel.unsafe().register(this, promise);

 而unsafe的register方法定义在AbstractUnsafe内部类中,进入到AbstractUnsafe类的对应方法中就会看到如下判断

if (eventLoop.inEventLoop()) {
	register0(promise);
} else {
	try {
		eventLoop.execute(new OneTimeTask() {
			@Override
			public void run() {
				register0(promise);
			}
		});
	} catch (Throwable t) {	
	}
}

先看inEventLoop方法干了什么事

inEventLoop方法是定义在AbstractEventExecutor类中的,而AbstractEventExecutor类中对应的方法又调用了子类SingleThreadEventExecutor的inEventLoop方法,如下是SingleThreadEventExecutor的inEventLoop方法

 public boolean inEventLoop(Thread thread) {
        return thread == this.thread;
    }

 其中参数thread是代表当前线程,所以该方法其实在判断当前线程是否是SingleThreadEventExecutor类中定义的thread。由于我们现在是从main方法中进入的,当然不是了,所以inEventLoop方法返回false

 

 程序应该进入else分支了,else分支调用的是execute方法,传入的是OneTimeTask类的实例。

从NioEventLoop方法中查找execute方法,就会发现execute方法在SingleThreadEventExecutor类中定义

 

boolean inEventLoop = inEventLoop();
if (inEventLoop) {
	addTask(task);
} else {
	startThread();
	addTask(task);
	if (isShutdown() && removeTask(task)) {
		reject();
	}
}

if (!addTaskWakesUp && wakesUpForTask(task)) {
	wakeup(inEventLoop);
}
 刚才查看inEventLoop方法,返回的是false,所以直接进入else分支执行

 

先看startThread方法

 

private void startThread() {
	if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
		if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
			delayedTaskQueue.add(new ScheduledFutureTask<Void>(
					this, delayedTaskQueue, Executors.<Void>callable(new PurgeTask(), null),
					ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));
			thread.start();
		}
	}
}
先判断线程有没有启动,如果没有启动,现将STATE_UPDATER以CAS方式设置为启动,成功后在delayedTaskQueue中增加一个任务,然后启动thread(注意,现在程序除了主线程外,增加了thread线程

 

然后再看addTask方法,这个方法很简单,将传入的OneTimeTask增加到taskQueue队列中

最后看是否需要调用wakeup方法

  1、看addTaskWakesUp变量是否为true,还是需要回到NioEventLoopGroup类的newChild方法中去寻找,最后发现addTaskWakesUp变量为false,

    2、看wakesUpForTask方法返回是否为true,wakesUpForTask方法在SingleThreadEventExecutor中定义,但该方法是protected,所以还需要查找是否有覆盖(这个是走读代码经常会遇到的问题,eclipse中在调用的方法上直接按Ctrl+T),发现SingleThreadEventLoop中重写了该方法

protected boolean wakesUpForTask(Runnable task) {
	return !(task instanceof NonWakeupRunnable);
}

 task当然不是NonWakeupRunnable类型了,所以该方法返回true

 

进入wakeup方法,发现wakeup做的唯一一件事是taskQueue中增加了一个WAKEUP_TASK,而WAKEUP_TASK的run方法什么都不做。

 

终于register方法走读完了,这个应该算是netty中一个比较重要的方法吧

 

现在回到SingleThreadEventLoop的register方法,发现register方法返回的是DefaultChannelPromise的实例,而这个promise一直以参数方式传递到到AbstractUnsafe类中register0方法中,而register0方法是在thread线程中调用,所以promise是在线程中设置值的

 

现在回到了AbstractBootstrap类的doBind方法,查看doBind,发现返回的promise并不是register方法返回的promise,而是一个新的promise(同样是DefaultChannelPromise类的实例)

 

if (regFuture.isDone()) {
	promise = channel.newPromise();
	doBind0(regFuture, channel, localAddress, promise);
} else {
	// Registration future is almost always fulfilled already, but just in case it's not.
	promise = new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE);
	regFuture.addListener(new ChannelFutureListener() {
		@Override
		public void operationComplete(ChannelFuture future) throws Exception {
			doBind0(regFuture, channel, localAddress, promise);
		}
	});
}
 由于返回的regFuture是在单独的线程中设置值的,所以我们可以先假定isDone方法返回的是false(线程还没有设置对应的值),所以进入else分支,else分支中主要是regFuture增加了一个监听器

 

 
终于又回到了EchoServer类的run方法,bind方法调用完成后开始调用sync方法
 ChannelFuture f = b.bind(port).sync();
 检查代码,发现sync调用最终会调用到DefaultPromise类的await方法,而await方法最终调用了Object的wait方法等待
synchronized (this) {
	while (!isDone()) {
		checkDeadLock();
		incWaiters();
		try {
			wait();
		} finally {
			decWaiters();
		}
	}
}
 
至此,主线程的调用进入了等待状态。
下次开始走读主线程调用register方法启动的SingleThreadEventExecutor类中thread线程代码
分享到:
评论

相关推荐

    Apache Spark源码走读之5 -- DStream处理的容错性分析

    ### Apache Spark源码走读之五:DStream处理的容错性分析 #### 环境搭建与背景 为了深入理解Apache Spark Streaming中DStream处理的容错机制,本文将从一个简单的Spark Streaming示例出发,逐步分析Spark如何确保...

    Apache Spark源码走读之3 -- Task运行期之函数调用关系分析

    ### Apache Spark源码走读之3 -- Task运行期之函数调用关系分析 #### 概述 Apache Spark作为一款高效的大数据处理框架,在其内部有着复杂的任务调度与执行机制。本文将深入探讨Spark中Task执行期间的具体流程以及...

    Apache Spark源码走读之2 -- Job的提交与运行

    搭建步骤主要包括下载Spark二进制包,安装Scala、sbt和Java,并通过配置文件修改环境变量,如将$SPARK_HOME/conf目录下的spark-env.sh.template重命名为spark-env.sh,并设置相应的环境变量,以便正确启动master和...

    Storm源码走读笔记

    本文档是关于Storm源码的详细走读笔记,主要分析了Storm的启动场景、Topology提交过程、worker进程中的线程使用情况、消息传递机制以及 TridentTopology的创建和ack机制等多个方面。 首先,文档提到了Storm集群中的...

    Apache Spark源码走读之4 -- DStream实时流数据处理

    ### Apache Spark源码走读之四:DStream实时流数据处理 #### 一、系统概述与流数据特性 本文档探讨了Apache Spark Streaming的核心概念之一——**DStream**(Discretized Stream)及其如何实现对实时流数据的有效...

    Apache_Spark源码走读

    ### Apache Spark 源码解析概述 #### 一、引言 ...通过对Spark源码的学习,不仅可以深入了解其内部机制,还能更好地利用其功能特性来优化大数据处理应用。希望本文能为您的学习之旅提供有价值的参考。

    二次雷达代码走读文档.docx

    "二次雷达代码走读文档" 二次雷达代码走读文档是关于二次雷达系统的详细代码分析报告,涵盖了二次雷达的准备工作、程序结构、函数检查等方面的知识点。 一、准备工作 二次雷达文件位于 peripherals 目录下,包括...

    学生走读管理暂行办法.docx

    《学生走读管理暂行办法》是针对在校大学生走读情况而制定的一项管理政策,旨在满足部分学生因特殊原因需要不住校的需求,同时确保学生在外的人身安全和学业不受影响。以下是该办法的主要内容和相关规定: 1. **...

    Apache Spark源码走读:如何进行代码跟读

    ### Apache Spark源码走读:如何进行代码跟读 #### 概述 本文旨在探讨如何有效地进行Apache Spark源码的阅读与理解。Apache Spark作为一款高性能的分布式计算框架,在大数据处理领域占据着重要地位。其核心由Scala...

    高二2部 走读申请.docx

    随着现代社会的发展,教育管理愈发关注于学生个性化需求的满足和家庭参与度的...通过合理而周全的走读申请流程,学校能够在确保学生安全的前提下,尊重并满足学生的个性化需求,同时也为家长提供了参与教育决策的机会。

    软件测试期末实验报告-.pdf

    **二、基于决策表的测试**: - 决策表测试是一种系统地构造测试用例的方法,它基于输入条件和预期输出的关系。在这个例子中,决策表包括不同的成绩范围(如90-100,80-89,70-79,60-69,0-59,以及x&gt;100)以及对应...

    nova-compute源码分析

    ### nova-compute源码分析 #### 一、Nova概述及工作职责 **1.1 Nova的角色与任务** Nova是OpenStack项目中一个至关重要的组成部分,它主要负责虚拟机实例的生命周期管理,包括创建、调度、运行和销毁等功能。具体...

    C++代码走读意见--开发注意事项

    本篇将基于给定的“C++代码走读意见--开发注意事项”文件中的内容,详细探讨其中提及的关键知识点:内存泄漏问题及解决方案。 ##### 内存泄漏问题分析 **问题场景描述**: 在项目开发过程中,由于未妥善处理动态...

    hadoop源码分析-HDFS部分

    对于深度运维和二次开发者来说,理解HDFS的源码至关重要。 HDFS的设计灵感来源于Google的GFS(Google File System),它将大型文件分割成块,并将这些块分布在多台服务器上,从而实现数据的高可用性和容错性。HDFS...

    mina源码走读与实例

    ### MINA源码走读与实例 #### 一、MINA概述 **MINA**(**M**ulti **I**nterface **N**etwork **A**pplication)是Apache组织下的一款开源网络通信框架,它主要针对TCP/IP、UDP/IP协议栈提供了高效的封装和扩展能力...

    java8源码-java8-source:java8源码走读

    java8 源码 IDEA走读Java源码坏境搭建 新建一个普通java项目(如:java8-source) 创建package(tech.sqlclub.java_source)存放...到此,你可以开始尽情地撸java源码了,记住尽量要多读源码,多读源码,多学习,共勉!

    托福口语翻译词汇.docx

    - **day student**:走读生(原意为“白天的学生”,但在中文中我们通常说“走读生”来表示不在学校住宿的学生) - **in the dark**:一无所知(原意为“在黑暗中”,比喻不知道某些事情) - **cash crops**:经济...

    Arraylist扩容原理走读.png

    Arraylist扩容原理走读

    代码走读检查列表[参考].pdf

    - 数据结构应尽可能本地化,仅通过定义良好的访问函数来访问和修改,这样可以降低出错风险,提高代码的封装性。 6. **接口设计**: - 对外部实体的接口策略需要明确定义,以保证通信的稳定性和安全性。 - 接口...

Global site tag (gtag.js) - Google Analytics