`
hongtoushizi
  • 浏览: 370540 次
  • 性别: Icon_minigender_1
  • 来自: 天津
社区版块
存档分类
最新评论

Cluster机制剖析1——进程复制

阅读更多

转载自: http://luckydrq.com/2014-10-14/cluster-analyse-one/

Fork

fork()是类UNIX系统父进程复制子进程的系统调用,在Node里通过libuv实现了对不同平台(unix,linux,windows)的封装。引用百度百科的一段话来描述fork的特性:

fork之后的子进程是父进程的副本,它将获得父进程数据空间、堆、栈等资源的副本。注意,子进程持有的是上述存储空间的“副本”,这意味着父子进程间不共享这些存储空间。

其实,在node的cluster模式里,worker进程的产生也是调用了require('child_process').fork

那直接这样不就行了?

  1. var fork =require('child_process').fork;
  2. var cpuNums =require('os').cpus().length;
  3. var workerPath =require('path').join(__dirname,'worker.js');
  4. for(var i =0; i < cpuNums; i++){
  5. fork(workerPath,{ env: process.env });
  6. }

这样的方式仅仅实现了多进程。多进程运行还涉及父子进程通信,子进程管理,以及负载均衡等问题,这些特性cluster帮你实现了,在后面的章节会一一道来。

两种逻辑

先看一个官网的例子:

  1. var cluster =require('cluster');
  2. var http =require('http');
  3. var numCPUs =require('os').cpus().length;
  4. if(cluster.isMaster){
  5. // Fork workers.
  6. for(var i =0; i < numCPUs; i++){
  7. cluster.fork();
  8. }
  9. cluster.on('exit',function(worker, code, signal){
  10. console.log('worker '+ worker.process.pid +' died');
  11. });
  12. }else{
  13. // Workers can share any TCP connection
  14. // In this case its a HTTP server
  15. http.createServer(function(req, res){
  16. res.writeHead(200);
  17. res.end("hello world\n");
  18. }).listen(8000);
  19. }

以上代码的意思是:如果是父进程(Master),就复制多个子进程,子进程的数目等于CPU核心数。如果是子进程(Worker),就创建一个http服务器并监听8000端口。

我们在前面提到,子进程是父进程的“副本”,将得到父进程的数据(代码)空间及堆栈,因此,父子进程会执行同一段代码逻辑。这样就需要一种机制,能够在代码里区分当前进程是父进程还是子进程。首先来看看神秘的cluster模块究竟是什么:

  1. // lib/cluster.js
  2. functionCluster(){
  3. EventEmitter.call(this);
  4. }
  5. util.inherits(Cluster,EventEmitter);
  6. var cluster =module.exports =newCluster();

它是一个Object,通过继承EventEmitter,使它有了事件驱动机制。

  1. // Define isWorker and isMaster
  2. cluster.isWorker ='NODE_UNIQUE_ID'in process.env;
  3. cluster.isMaster =! cluster.isWorker;

可以看到cluster下有两个flag来标识区分当前进程,依据是当前进程的环境变量(env)里是否包含NODE_UNIQUE_ID这个字段。为什么子进程的环境变量里有这个字段而父进程没有?

我们知道,在复制子进程的时候实际上是调用了require('child_process').fork,看一下这个方法的用法:

  1. child_process.fork(modulePath,[args],[options])
  2. modulePath StringThemodule to run in the child
  3. args ArrayList of string arguments
  4. options Object
  5. cwd StringCurrent working directory of the child process
  6. env ObjectEnvironment key-value pairs
  7. encoding String(Default:'utf8')
  8. execPath StringExecutable used to create the child process
  9. execArgv ArrayList of string arguments passed to the executable (Default: process.execArgv)
  10. silent BooleanIftrue, stdin, stdout,and stderr of the child will be piped to the parent, otherwise they will be inherited from the parent, see the "pipe"and"inherit" options for spawn()'s stdio for more details (default is false)
  11. Return: ChildProcess object

可以看到方法的第三个参数options是个对象,其中options.env可以设置子进程的环境变量,即process.env。因此,可以推测,应该是这里调用的时候,在env里面添加了NODE_UNIQUE_ID这个标识。接下来就来看下代码:

  1. functionWorker(customEnv){
  2. ...
  3. // Create or get process
  4. if(cluster.isMaster){
  5. // Create env object
  6. // first: copy and add id property
  7. var envCopy = util._extend({}, env);
  8. envCopy['NODE_UNIQUE_ID']=this.id;
  9. // second: extend envCopy with the env argument
  10. if(isObject(customEnv)){
  11. envCopy = util._extend(envCopy, customEnv);
  12. }
  13. // fork worker
  14. this.process = fork(settings.exec, settings.args,{
  15. 'env': envCopy,
  16. 'silent': settings.silent,
  17. 'execArgv': settings.execArgv
  18. });
  19. }else{
  20. this.process = process;
  21. }
  22. ...
  23. }

以上代码段印证了之前的推测,就不再赘述了。

还有个细节需要注意:

  1. // src/node.js
  2. if(process.env.NODE_UNIQUE_ID){
  3. var cluster =NativeModule.require('cluster');
  4. cluster._setupWorker();
  5. // Make sure it's not accidentally inherited by child processes.
  6. delete process.env.NODE_UNIQUE_ID;
  7. }

在Node进程启动的时候,子进程会执行以上代码,我们先不追究_setupWorker的细节。可以看到NODE_UNIQUE_ID从环境变量里剔除了,这使得子进程可以作为Master继续复制子进程。

进程复制

继续官网的示例,我们先看父进程的执行逻辑:

  1. if(cluster.isMaster){
  2. // Fork workers.
  3. for(var i =0; i < numCPUs; i++){
  4. cluster.fork();
  5. }
  6. cluster.on('exit',function(worker, code, signal){
  7. console.log('worker '+ worker.process.pid +' died');
  8. });
  9. }

很明显,主要就是执行fork调用。那我们就来看看cluster.fork做了什么?

  1. // Fork a new worker
  2. cluster.fork =function(env){
  3. // This can only be called from the master.
  4. assert(cluster.isMaster);
  5. // Make sure that the master has been initialized
  6. cluster.setupMaster();
  7. return(new cluster.Worker(env));
  8. };

setupMaster,再返回一个cluster.Worker实例。其实workder实例化的代码在前面已经贴过了,我们来个完整版:

  1. // Create a worker object, that works both for master and worker
  2. functionWorker(customEnv){
  3. if(!(thisinstanceofWorker))returnnewWorker();
  4. EventEmitter.call(this);
  5. varself=this;
  6. var env = process.env;
  7. // Assign a unique id, default null
  8. this.id = cluster.isMaster ?++ids : toDecInt(env.NODE_UNIQUE_ID);
  9. // XXX: Legacy. Remove in 0.9
  10. this.workerID =this.uniqueID =this.id;
  11. // Assign state
  12. this.state ='none';
  13. // Create or get process
  14. if(cluster.isMaster){
  15. // Create env object
  16. // first: copy and add id property
  17. var envCopy = util._extend({}, env);
  18. envCopy['NODE_UNIQUE_ID']=this.id;
  19. // second: extend envCopy with the env argument
  20. if(isObject(customEnv)){
  21. envCopy = util._extend(envCopy, customEnv);
  22. }
  23. // fork worker
  24. this.process = fork(settings.exec, settings.args,{
  25. 'env': envCopy,
  26. 'silent': settings.silent,
  27. 'execArgv': settings.execArgv
  28. });
  29. }else{
  30. this.process = process;
  31. }
  32. if(cluster.isMaster){
  33. // Save worker in the cluster.workers array
  34. cluster.workers[this.id]=this;
  35. // Emit a fork event, on next tick
  36. // There is no worker.fork event since this has no real purpose
  37. process.nextTick(function(){
  38. cluster.emit('fork',self);
  39. });
  40. }
  41. // handle internalMessage, exit and disconnect event
  42. this.process.on('internalMessage', handleMessage.bind(null,this));
  43. this.process.once('exit',function(exitCode, signalCode){
  44. prepareExit(self,'dead');
  45. self.emit('exit', exitCode, signalCode);
  46. cluster.emit('exit',self, exitCode, signalCode);
  47. });
  48. this.process.once('disconnect',function(){
  49. prepareExit(self,'disconnected');
  50. self.emit('disconnect');
  51. cluster.emit('disconnect',self);
  52. });
  53. // relay message and error
  54. this.process.on('message',this.emit.bind(this,'message'));
  55. this.process.on('error',this.emit.bind(this,'error'));
  56. }

每个worker分配了一个id,注册在cluster.workers里。父进程和子进程注册了一堆事件,这些事件涉及父子进程通讯,我们在下一篇文章里详细讨论。接下来,我们再看看setupMaster做了什么:

  1. cluster.setupMaster =function(options){
  2. // This can only be called from the master.
  3. assert(cluster.isMaster);
  4. // Don't allow this function to run more than once
  5. if(masterStarted)return;
  6. masterStarted =true;
  7. // Get filename and arguments
  8. options = options ||{};
  9. // By default, V8 writes the profile data of all processes to a single
  10. // v8.log.
  11. //
  12. // Running that log file through a tick processor produces bogus numbers
  13. // because many events won't match up with the recorded memory mappings
  14. // and you end up with graphs where 80+% of ticks is unaccounted for.
  15. //
  16. // Fixing the tick processor to deal with multi-process output is not very
  17. // useful because the processes may be running wildly disparate workloads.
  18. //
  19. // That's why we fix up the command line arguments to include
  20. // a "--logfile=v8-%p.log" argument (where %p is expanded to the PID)
  21. // unless it already contains a --logfile argument.
  22. var execArgv = options.execArgv || process.execArgv;
  23. if(execArgv.some(function(s){return/^--prof/.test(s);})&&
  24. !execArgv.some(function(s){return/^--logfile=/.test(s);}))
  25. {
  26. execArgv = execArgv.slice();
  27. execArgv.push('--logfile=v8-%p.log');
  28. }
  29. // Set settings object
  30. settings = cluster.settings ={
  31. exec: options.exec|| process.argv[1],
  32. execArgv: execArgv,
  33. args: options.args || process.argv.slice(2),
  34. silent: options.silent ||false
  35. };
  36. // emit setup event
  37. cluster.emit('setup');
  38. };

这里是复制子进程之前的一些准备工作。你可以显示调用这个方法并传入一些配置,主要就是指定子进程的执行入口options.exec,如果像官网例子那样不显式调用,则默认把当前文件作为入口。

 

 

 

 

分享到:
评论

相关推荐

    23拓展 2:无所不知 —— Info 指令(1).md

    1. Server块提供了关于Redis服务器环境的参数,比如操作系统、Redis版本、运行架构(32位/64位)等。 2. Clients块提供了客户端连接的相关信息,例如当前连接的客户端数(connected_clients)、正在等待命令的...

    cmd命令大全2010

    #### 1. gpedit.msc —— 组策略编辑器 - **功能**: 打开组策略编辑器,用于管理计算机或用户的安全设置。 - **应用场景**: 企业环境中,管理员常用此工具来配置系统安全策略。 #### 2. sndrec32 —— 录音机 - **...

    加米谷大数据——Spark核心技术原理透视Spark运行原理.docx

    1. **Spark 应用程序 (Application)**:Spark应用程序是由用户编写的,包含了Driver和Executor两部分。一个Application可以包含一个或多个Job,每个Job由一系列Stage组成。 2. **Driver (驱动程序)**:Driver运行...

    Spark源码分析3-The connect between driver,master and excutor

    《Spark源码分析3——驱动器、主节点与执行器之间的连接》 在Spark的分布式计算框架中,驱动器(Driver)、主节点(Master)和执行器(Executor)是核心组件,它们之间的通信和协作构成了Spark作业执行的基础。本文将深入...

    High Performance Cluster Tool Kit-开源

    4. **通信库**:如MPI(Message Passing Interface),是HPC程序间通信的基础,它提供了进程间的同步和通信机制,使得分布式计算成为可能。 5. **性能分析工具**:例如HPCToolkit、Perf等,它们帮助开发者理解和...

    系统运维面试题(AIXLINUXORACLE)试题.pdf

    - RAID1(镜像):数据在两个磁盘上完全复制,提供高冗余,但存储效率低。 - RAID5(带奇偶校验的条带化):数据和奇偶校验信息分布在多个磁盘上,提供冗余且速度较快。 9. **Oracle进程管理** - 使用`killall -...

    BES 快速入门手册

    - **创建集群**:`iastool create --cluster --passport admin --user admin --password admin Cluster1` #### 五、安装说明 手册中未提供详细的安装步骤,但通常安装过程包括以下环节: 1. **下载安装包**:从...

    最新版linux redis-6.2.3.tar.gz

    这里我们讨论的是最新版本的Redis——6.2.3,这是一个稳定版本,提供了许多改进和新特性。 Redis 6.2.3主要知识点包括: 1. **新特性和增强**: - **多线程执行**:Redis 6.0引入了多线程模式,允许在处理客户端I...

    另辟蹊径解决数据库运维技术难题.pdf

    通过对Lms进程日志的分析,发现了一个每六小时出现一次的异常现象,这与数据库卡顿的时间点相符。 这就像医院重症监护室的死亡率问题,看似无关的因素——清洁阿姨在每天早上8点拔掉呼吸机电源——实际上成为了问题...

    parallel_studio

    《并行工作室——探索高效计算的新境界》 “Parallel Studio”是英特尔公司推出的一款强大的并行编程工具套件,专为提升多核处理器和多处理器系统上的应用性能而设计。这款工具集旨在帮助开发者充分利用现代计算机...

    Python库 | dask-2021.6.2-py3-none-any.whl

    比如,`LocalCluster`用于在本地启动一个简单的多进程集群,而`Dask-Yarn`或`Dask-Kubernetes`则支持在Hadoop YARN或Kubernetes上部署Dask集群。 Dask的主要优势在于它的灵活性和可扩展性。它可以很好地适应各种...

    flink-1.9.2-bin-scala_2.12-hadoop_2.7.2.zip

    2. **启动集群**:通过执行 `bin/start-cluster.sh` 启动 Flink 集群,这将启动 JobManager 和多个 TaskManager 进程。 3. **提交作业**:将 Flink 程序打包成 JAR 文件,使用 `bin/flink run` 命令提交作业,例如 ...

    PHP调用R语言

    2. **Rserve**:这是一个R的守护进程,允许远程连接和执行R代码。安装Rserve后,可以使用PHP的Rserve客户端库(如`rphp`)来直接调用R函数。 ```php require_once 'RPHP/R.php'; $r = new R(); $r-&gt;source('...

    HP-UNIX redbook 红皮书

    3. **系统管理**:涵盖用户和组管理、文件系统管理、进程管理、系统日志分析、安全管理等方面,这些都是管理员日常运维中的核心工作。 4. **网络服务**:HP-UNIX支持各种网络协议和服务,如TCP/IP、DNS、DHCP、NFS...

Global site tag (gtag.js) - Google Analytics