转载自: http://luckydrq.com/2014-10-14/cluster-analyse-one/
Fork
fork()是类UNIX系统父进程复制子进程的系统调用,在Node里通过libuv实现了对不同平台(unix,linux,windows)的封装。引用百度百科的一段话来描述fork
的特性:
fork之后的子进程是父进程的副本,它将获得父进程数据空间、堆、栈等资源的副本。注意,子进程持有的是上述存储空间的“副本”,这意味着父子进程间不共享这些存储空间。
其实,在node的cluster
模式里,worker进程的产生也是调用了require('child_process').fork
。
那直接这样不就行了?
var fork =require('child_process').fork;
var cpuNums =require('os').cpus().length;
var workerPath =require('path').join(__dirname,'worker.js');
for(var i =0; i < cpuNums; i++){
fork(workerPath,{ env: process.env });
}
这样的方式仅仅实现了多进程。多进程运行还涉及父子进程通信,子进程管理,以及负载均衡等问题,这些特性cluster
帮你实现了,在后面的章节会一一道来。
两种逻辑
先看一个官网的例子:
var cluster =require('cluster');
var http =require('http');
var numCPUs =require('os').cpus().length;
if(cluster.isMaster){
// Fork workers.
for(var i =0; i < numCPUs; i++){
cluster.fork();
}
cluster.on('exit',function(worker, code, signal){
console.log('worker '+ worker.process.pid +' died');
});
}else{
// Workers can share any TCP connection
// In this case its a HTTP server
http.createServer(function(req, res){
res.writeHead(200);
res.end("hello world\n");
}).listen(8000);
}
以上代码的意思是:如果是父进程(Master),就复制多个子进程,子进程的数目等于CPU核心数。如果是子进程(Worker),就创建一个http服务器并监听8000端口。
我们在前面提到,子进程是父进程的“副本”,将得到父进程的数据(代码)空间及堆栈,因此,父子进程会执行同一段代码逻辑。这样就需要一种机制,能够在代码里区分当前进程是父进程还是子进程。首先来看看神秘的cluster
模块究竟是什么:
// lib/cluster.js
functionCluster(){
EventEmitter.call(this);
}
util.inherits(Cluster,EventEmitter);
var cluster =module.exports =newCluster();
它是一个Object
,通过继承EventEmitter
,使它有了事件驱动机制。
// Define isWorker and isMaster
cluster.isWorker ='NODE_UNIQUE_ID'in process.env;
cluster.isMaster =! cluster.isWorker;
可以看到cluster
下有两个flag来标识区分当前进程,依据是当前进程的环境变量(env
)里是否包含NODE_UNIQUE_ID
这个字段。为什么子进程的环境变量里有这个字段而父进程没有?
我们知道,在复制子进程的时候实际上是调用了require('child_process').fork
,看一下这个方法的用法:
child_process.fork(modulePath,[args],[options])
modulePath StringThemodule to run in the child
args ArrayList of string arguments
options Object
cwd StringCurrent working directory of the child process
env ObjectEnvironment key-value pairs
encoding String(Default:'utf8')
execPath StringExecutable used to create the child process
execArgv ArrayList of string arguments passed to the executable (Default: process.execArgv)
silent BooleanIftrue, stdin, stdout,and stderr of the child will be piped to the parent, otherwise they will be inherited from the parent, see the "pipe"and"inherit" options for spawn()'s stdio for more details (default is false)
Return: ChildProcess object
可以看到方法的第三个参数options
是个对象,其中options.env
可以设置子进程的环境变量,即process.env
。因此,可以推测,应该是这里调用的时候,在env里面添加了NODE_UNIQUE_ID
这个标识。接下来就来看下代码:
functionWorker(customEnv){
...
// Create or get process
if(cluster.isMaster){
// Create env object
// first: copy and add id property
var envCopy = util._extend({}, env);
envCopy['NODE_UNIQUE_ID']=this.id;
// second: extend envCopy with the env argument
if(isObject(customEnv)){
envCopy = util._extend(envCopy, customEnv);
}
// fork worker
this.process = fork(settings.exec, settings.args,{
'env': envCopy,
'silent': settings.silent,
'execArgv': settings.execArgv
});
}else{
this.process = process;
}
...
}
以上代码段印证了之前的推测,就不再赘述了。
还有个细节需要注意:
// src/node.js
if(process.env.NODE_UNIQUE_ID){
var cluster =NativeModule.require('cluster');
cluster._setupWorker();
// Make sure it's not accidentally inherited by child processes.
delete process.env.NODE_UNIQUE_ID;
}
在Node进程启动的时候,子进程会执行以上代码,我们先不追究_setupWorker
的细节。可以看到NODE_UNIQUE_ID
从环境变量里剔除了,这使得子进程可以作为Master继续复制子进程。
进程复制
继续官网的示例,我们先看父进程的执行逻辑:
if(cluster.isMaster){
// Fork workers.
for(var i =0; i < numCPUs; i++){
cluster.fork();
}
cluster.on('exit',function(worker, code, signal){
console.log('worker '+ worker.process.pid +' died');
});
}
很明显,主要就是执行fork
调用。那我们就来看看cluster.fork
做了什么?
// Fork a new worker
cluster.fork =function(env){
// This can only be called from the master.
assert(cluster.isMaster);
// Make sure that the master has been initialized
cluster.setupMaster();
return(new cluster.Worker(env));
};
先setupMaster
,再返回一个cluster.Worker
实例。其实workder
实例化的代码在前面已经贴过了,我们来个完整版:
// Create a worker object, that works both for master and worker
functionWorker(customEnv){
if(!(thisinstanceofWorker))returnnewWorker();
EventEmitter.call(this);
varself=this;
var env = process.env;
// Assign a unique id, default null
this.id = cluster.isMaster ?++ids : toDecInt(env.NODE_UNIQUE_ID);
// XXX: Legacy. Remove in 0.9
this.workerID =this.uniqueID =this.id;
// Assign state
this.state ='none';
// Create or get process
if(cluster.isMaster){
// Create env object
// first: copy and add id property
var envCopy = util._extend({}, env);
envCopy['NODE_UNIQUE_ID']=this.id;
// second: extend envCopy with the env argument
if(isObject(customEnv)){
envCopy = util._extend(envCopy, customEnv);
}
// fork worker
this.process = fork(settings.exec, settings.args,{
'env': envCopy,
'silent': settings.silent,
'execArgv': settings.execArgv
});
}else{
this.process = process;
}
if(cluster.isMaster){
// Save worker in the cluster.workers array
cluster.workers[this.id]=this;
// Emit a fork event, on next tick
// There is no worker.fork event since this has no real purpose
process.nextTick(function(){
cluster.emit('fork',self);
});
}
// handle internalMessage, exit and disconnect event
this.process.on('internalMessage', handleMessage.bind(null,this));
this.process.once('exit',function(exitCode, signalCode){
prepareExit(self,'dead');
self.emit('exit', exitCode, signalCode);
cluster.emit('exit',self, exitCode, signalCode);
});
this.process.once('disconnect',function(){
prepareExit(self,'disconnected');
self.emit('disconnect');
cluster.emit('disconnect',self);
});
// relay message and error
this.process.on('message',this.emit.bind(this,'message'));
this.process.on('error',this.emit.bind(this,'error'));
}
每个worker分配了一个id
,注册在cluster.workers
里。父进程和子进程注册了一堆事件,这些事件涉及父子进程通讯,我们在下一篇文章里详细讨论。接下来,我们再看看setupMaster
做了什么:
cluster.setupMaster =function(options){
// This can only be called from the master.
assert(cluster.isMaster);
// Don't allow this function to run more than once
if(masterStarted)return;
masterStarted =true;
// Get filename and arguments
options = options ||{};
// By default, V8 writes the profile data of all processes to a single
// v8.log.
//
// Running that log file through a tick processor produces bogus numbers
// because many events won't match up with the recorded memory mappings
// and you end up with graphs where 80+% of ticks is unaccounted for.
//
// Fixing the tick processor to deal with multi-process output is not very
// useful because the processes may be running wildly disparate workloads.
//
// That's why we fix up the command line arguments to include
// a "--logfile=v8-%p.log" argument (where %p is expanded to the PID)
// unless it already contains a --logfile argument.
var execArgv = options.execArgv || process.execArgv;
if(execArgv.some(function(s){return/^--prof/.test(s);})&&
!execArgv.some(function(s){return/^--logfile=/.test(s);}))
{
execArgv = execArgv.slice();
execArgv.push('--logfile=v8-%p.log');
}
// Set settings object
settings = cluster.settings ={
exec: options.exec|| process.argv[1],
execArgv: execArgv,
args: options.args || process.argv.slice(2),
silent: options.silent ||false
};
// emit setup event
cluster.emit('setup');
};
这里是复制子进程之前的一些准备工作。你可以显示调用这个方法并传入一些配置,主要就是指定子进程的执行入口options.exec
,如果像官网例子那样不显式调用,则默认把当前文件作为入口。
相关推荐
**知识点:MySQL Cluster Manager——自动化、监控与高可用性** 在IT行业中,数据库管理一直是一项技术密集型且复杂的任务,尤其是在处理大规模数据集群时。**MySQL Cluster Manager**(MCM)作为一款专为简化和...
1. Server块提供了关于Redis服务器环境的参数,比如操作系统、Redis版本、运行架构(32位/64位)等。 2. Clients块提供了客户端连接的相关信息,例如当前连接的客户端数(connected_clients)、正在等待命令的...
#### 1. gpedit.msc —— 组策略编辑器 - **功能**: 打开组策略编辑器,用于管理计算机或用户的安全设置。 - **应用场景**: 企业环境中,管理员常用此工具来配置系统安全策略。 #### 2. sndrec32 —— 录音机 - **...
1. **Spark 应用程序 (Application)**:Spark应用程序是由用户编写的,包含了Driver和Executor两部分。一个Application可以包含一个或多个Job,每个Job由一系列Stage组成。 2. **Driver (驱动程序)**:Driver运行...
《Spark源码分析3——驱动器、主节点与执行器之间的连接》 在Spark的分布式计算框架中,驱动器(Driver)、主节点(Master)和执行器(Executor)是核心组件,它们之间的通信和协作构成了Spark作业执行的基础。本文将深入...
Hadoop作为开源领域的重要成员,其设计理念深受Google三大核心技术——GFS(Google File System)、MapReduce以及BigTable的影响。Google通过一系列论文公开了其在分布式计算领域的核心技术,包括: - **Google ...
借鉴了Google的五大技术——GoogleCluster、Chubby、GFS、BigTable和MapReduce,Apache社区发展出了相应的开源实现:ZooKeeper、HDFS、HBase和Hadoop MapReduce。其中,HDFS作为分布式文件系统,为其他组件提供了...
4. **通信库**:如MPI(Message Passing Interface),是HPC程序间通信的基础,它提供了进程间的同步和通信机制,使得分布式计算成为可能。 5. **性能分析工具**:例如HPCToolkit、Perf等,它们帮助开发者理解和...
自Oracle 10G起,Oracle引入了一套全面的集群管理解决方案——Cluster-Ready Services (CRS),这套解决方案提供了包括集群通信、消息传递、锁定机制和负载均衡等功能在内的框架,极大地简化了RAC(Real Application ...
- RAID1(镜像):数据在两个磁盘上完全复制,提供高冗余,但存储效率低。 - RAID5(带奇偶校验的条带化):数据和奇偶校验信息分布在多个磁盘上,提供冗余且速度较快。 9. **Oracle进程管理** - 使用`killall -...
BigTable还引入了分区和复制机制,确保数据的高可用性和一致性,即使在部分节点故障的情况下也能正常运行。 #### 五、分布式锁服务系统Chubby **知识点概览:** Chubby论文,发表于2006年,描述了Google开发的...
- **创建集群**:`iastool create --cluster --passport admin --user admin --password admin Cluster1` #### 五、安装说明 手册中未提供详细的安装步骤,但通常安装过程包括以下环节: 1. **下载安装包**:从...
这里我们讨论的是最新版本的Redis——6.2.3,这是一个稳定版本,提供了许多改进和新特性。 Redis 6.2.3主要知识点包括: 1. **新特性和增强**: - **多线程执行**:Redis 6.0引入了多线程模式,允许在处理客户端I...
通过对Lms进程日志的分析,发现了一个每六小时出现一次的异常现象,这与数据库卡顿的时间点相符。 这就像医院重症监护室的死亡率问题,看似无关的因素——清洁阿姨在每天早上8点拔掉呼吸机电源——实际上成为了问题...
《并行工作室——探索高效计算的新境界》 “Parallel Studio”是英特尔公司推出的一款强大的并行编程工具套件,专为提升多核处理器和多处理器系统上的应用性能而设计。这款工具集旨在帮助开发者充分利用现代计算机...
比如,`LocalCluster`用于在本地启动一个简单的多进程集群,而`Dask-Yarn`或`Dask-Kubernetes`则支持在Hadoop YARN或Kubernetes上部署Dask集群。 Dask的主要优势在于它的灵活性和可扩展性。它可以很好地适应各种...
2. **启动集群**:通过执行 `bin/start-cluster.sh` 启动 Flink 集群,这将启动 JobManager 和多个 TaskManager 进程。 3. **提交作业**:将 Flink 程序打包成 JAR 文件,使用 `bin/flink run` 命令提交作业,例如 ...
- **CRS(Cluster Ready Services)**:自Oracle 10G起,Oracle引入了一整套用于管理集群的解决方案——Cluster Ready Services (CRS)。CRS不仅提供了集群内部的通信、消息传递和锁定机制,还实现了负载管理和高...
2. **Rserve**:这是一个R的守护进程,允许远程连接和执行R代码。安装Rserve后,可以使用PHP的Rserve客户端库(如`rphp`)来直接调用R函数。 ```php require_once 'RPHP/R.php'; $r = new R(); $r->source('...
3. **系统管理**:涵盖用户和组管理、文件系统管理、进程管理、系统日志分析、安全管理等方面,这些都是管理员日常运维中的核心工作。 4. **网络服务**:HP-UNIX支持各种网络协议和服务,如TCP/IP、DNS、DHCP、NFS...