0%

试玩NodeJS多进程

NodeJS 的 JavaScript 运行在单个进程的单个线程上,一个 JavaScript 执行进程只能利用一个 CPU 核心,而如今大多数 CPU 均为多核 CPU,为了充分利用 CPU 资源,Node 提供了 child_process 和 cluster 模块来实现多进程以及进程管理。本文将根据 Master-Worker 模式,搭建一个简单的服务器集群来充分利用多核 CPU 资源,探索进程间通信、负载均衡、进程重启等知识。

下图是 Master-Worker 模式,进程分为 master 进程和 worker 进程,master 进程负责调度或管理 worker 进程,worker 进程则负责具体的业务处理。在服务器层面,worker 可以是一个服务进程,负责处理来自客户端的请求,多个 worker 便相当于多个服务器,从而构成一个服务器集群。master 则是负责创建 worker,将来自客户端的请求分配到各个服务器上去处理,并监控 worker 的运行状态以及进行管理等操作。

image

本文将从 child_process 模块开始,熟悉该模块的基本用法。后面再继续进入 cluster 模块的学习。本文所用的代码示例可以从该仓库中找到–【multi-process】。

一、child_process

1.1、Hello world

child_process 模块提供了 spawn()、exec()、execFile()、fork()这 4 个方法用于创建子进程,本文将使用 fork()方法来创建子进程,fork()方法只需指定要执行的 JavaScript 文件模块,即可创建 Node 的子进程。下面是简单的 HelloWorld 示例,master 进程根据 CPU 数量创建出相应数量的 worker 进程,worker 进程中利用进程 ID 来标记自己。

以下是 master 进程代码,文件名为 master.js。

1
2
3
4
5
6
7
8
const childProcess = require('child_process')
const cpuNum = require('os').cpus().length

for (let i = 0; i < cpuNum; ++i) {
childProcess.fork('./worker.js')
}

console.log('Master: Hello world.')

以下是 worker 进程的代码,文件名为 worker.js。

1
console.log('Worker-' + process.pid + ': Hello world.')

执行node master.js,得到如下结果,master 创建 4 个 worker 后输出 HelloWorld 信息,每个 worker 也分别输出自己的 HelloWorld 信息。

image

1.2、父子进程间的通信

创建 worker 之后,接下来实现 master 和 worker 之间的通信。Node 父子进程之间可以通过on('message')send()来实现通信,on('message')其实是监听 message 事件,当该进程收到其他进程发送的消息时,便会触发 message 事件。send()方法则是用于向其他进程发送信息。master 进程中调用child_processfork()方法后会得到一个子进程的实例,通过这个实例可以监听来自子进程的消息或者向子进程发送消息。worker 进程则通过 process 对象接口监听来自父进程的消息或者向父进程发送消息。

image

下面是简单示例,master 创建 worker 之后,向 worker 发送信息,worker 在收到 master 的信息后将信息输出,并回复 master。master 收到回复后输出信息。

master.js

1
2
3
4
5
6
7
8
const childProcess = require('child_process')
const worker = childProcess.fork('./worker.js')

worker.send('Hello world.')

worker.on('message', (msg) => {
console.log('[Master] Received message from worker: ' + msg)
})

worker.js

1
2
3
4
process.on('message', (msg) => {
console.log('[Worker] Received message from master: ' + msg)
process.send('Hi master.')
})

执行node master.js,结果如下,master 和 worker 可以正常通信。

image

1.3、Master 分发请求给 Worker 处理

进程通信时使用到的send()方法,除了发送普通的对象之外,还可以用于发送句柄。句柄是一种引用,可以用来标识资源,例如通过句柄可以标识一个 socket 对象、一个 server 对象等。利用句柄传递,可以实现请求的分发。master 进程创建一个 TCP 服务器监听特定端口,收到客户端的请求后,会得到一个 socket 对象,通过这个 socket 对象可以跟客户端进行通信从而处理客户端的请求。master 进程可以通过句柄传递将该 socket 对象发送给 worker 进程,让 worker 进程去处理请求。该模式的结构图如下,在 master 上还可以通过特定的算法实现负载均衡,将客户端的请求均衡地分发给 worker 去处理。

image

下面是一个简单示例。master 创建 TCP 服务器并监听 8080 端口,收到请求后将请求分发给 worker 处理。worker 收到 master 发来的 socket 以后,通过 socket 对客户端进行响应。为方便看到请求的处理情况,worker 给出的响应内容会说明该请求是被哪个 worker 处理。

master.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
const childProcess = require('child_process')
const net = require('net')
const cpuNum = require('os').cpus().length

// 创建工作进程
let workers = []
let cur = 0
for (let i = 0; i < cpuNum; ++i) {
workers.push(childProcess.fork('./worker.js'))
console.log('Create worker-' + workers[i].pid)
}

// 创建TCP服务器
const server = net.createServer()

// 服务器收到请求后分发给工作进程去处理
// 通过轮转方式实现工作进程的负载均衡
server.on('connection', (socket) => {
workers[cur].send('socket', socket)
cur = Number.parseInt((cur + 1) % cpuNum)
})

server.listen(8080, () => {
console.log('TCP server: 127.0.0.1:8080')
})

worker.js

1
2
3
4
5
6
7
8
process.on('message', (msg, socket) => {
if (msg === 'socket' && socket) {
// 利用setTimeout模拟处理请求时的操作耗时
setTimeout(() => {
socket.end('Request handled by worker-' + process.pid)
}, 10)
}
})

为了访问 TCP 服务器进行实验,这里需要写一个简单的 TCP 客户端,代码如下。该客户端会创建 10 个 TCP 连接,得到服务器响应之后将响应的内容进行输出。

tcp_client.js

1
2
3
4
5
6
7
8
9
10
11
const net = require('net')
const maxConnectCount = 10

for (let i = 0; i < maxConnectCount; ++i) {
net.createConnection({
port: 8080,
host: '127.0.0.1'
}).on('data', (data) => {
console.log(data.toString())
})
}

先执行node master.js启动服务器,然后执行 node tcp_client.js 启动客户端。得到的结果如下,10 个请求被分发到不同服务器上进行处理,并且可以看到 master 中的轮转分发请求起到了作用,实现了简单的负载均衡。

image

1.4、Worker 监听同一个端口

前面说过,send()方法可以传递句柄,通过传递句柄,我们除了发送 socket 对象之外,还可以直接发送一个 server 对象。我们可以在 master 进程中创建一个 TCP 服务器,将服务器对象直接发送给 worker 进程,让 worker 去监听端口并处理请求。这样的话,master 和 worker 进程都会监听相同端口,当客户端发起请求时,请求可能被 master 接收,也可能被 worker 接收。而 master 不负责处理业务,如果请求被 master 接收到,由于 master 上没有处理业务的逻辑,请求将无法得到处理。因此可以实现为如下图所示的模式,master 将 TCP 服务器发送给 worker 使得所有 worker 监听同一个端口以后,master 关闭对端口的监听。这样便只有 worker 在监听同一端口,请求将会都被 worker 进行处理,与 master 无关。

image

这种模式下,多个进程监听相同端口,当网络请求到来时,会进行抢占式调度,只有一个进程会抢到连接然后进行服务。因此,可以确保每个请求都会被特定的 worker 处理,而不是一个请求同时被多个 worker 处理。但由于是抢占式的调度,不能够保证每个 worker 的负载均衡。可能由于处理不同业务时 CPU 和 IO 繁忙度的不同导致进程抢到的请求数量不同,形成负载不均衡的情况。

下面是简单示例。

master.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
const childProcess = require('child_process')
const net = require('net')
const cpuNum = require('os').cpus().length

// 创建工作进程
let workers = []
let cur = 0
for (let i = 0; i < cpuNum; ++i) {
workers.push(childProcess.fork('./worker.js'))
console.log('Create worker-' + workers[i].pid)
}

// 创建TCP服务器
const server = net.createServer()

server.listen(8080, () => {
console.log('TCP server: 127.0.0.1:8080')
// 监听端口后将服务器句柄发送给工作进程
for (let i = 0; i < cpuNum; ++i) {
workers[i].send('server', server)
}
// 关闭主线程服务器的端口监听
server.close()
})

worker.js

1
2
3
4
5
6
7
8
9
10
process.on('message', (msg, server) => {
if (msg === 'server' && server) {
server.on('connection', (socket) => {
// 利用setTimeout模拟处理请求时的操作耗时
setTimeout(() => {
socket.end('Request handled by worker-' + process.pid)
}, 10)
})
}
})

继续使用之前的tcp_client来进行实验,先执行node master.js启动服务器,然后执行node tcp_client.js启动客户端。得到结果如下,请求可以被不同的 worker 进程处理,但由于 worker 进程是抢占式地为请求进行服务,所以不一定能实现每个 worker 的负载均衡。

image

1.5、进程重启

worker 进程可能因为某些异常情况而退出,为了提高集群的稳定性,master 进程需要监听子进程的存活状态,当子进程退出之后,master 进程要及时重启新的子进程。在 Node 中,子进程退出时,会在父进程中触发 exit 事件。父进程只需通过监听该事件便可知道子进程是否退出,并在退出的时候做出相应的处理。下面是在之前的监听同一端口模式下,增加了进程重启功能。进程重启时,master 进程需要重新传递 server 对象给新的 worker 进程,因此不能关闭 master 进程上的 server,否则在进程重启时 server 被关闭,得到的句柄将为空,无法正常传递。master 进程的 server 不关闭,会导致 master 进程也监听端口,会有部分请求被 master 进程接收,为了让着部分请求能够得到处理,可以在 master 进程添加处理业务的代码。由于 master 也参与了业务处理,业务处理进程的数量增加 1 个,所以 worker 进程可以少创建 1 个。这也就是下面简单示例中的做法。

这种实现方式使得 master 既进行进程管理又参与了业务处理,如果要保持 master 只负责进程管理而不涉及业务处理,可以采取另外一种实现方式:master 接收到请求后,按照前面 1.3 节的做法将请求转发给 worker 进行处理,这样 master 将继续只负责对 worker 进程的管理。

master.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
const childProcess = require('child_process')
const net = require('net')
const cpuNum = require('os').cpus().length - 1

// 创建工作进程
let workers = []
let cur = 0
for (let i = 0; i < cpuNum; ++i) {
workers.push(childProcess.fork('./worker.js'))
console.log('Create worker-' + workers[i].pid)
}

// 创建TCP服务器
const server = net.createServer()

// 由于master进程也会监听端口。因此需要对请求做出处理
server.on('connection', (socket) => {
// 利用setTimeout模拟处理请求时的操作耗时
setTimeout(() => {
socket.end('Request handled by master')
}, 10)
})

server.listen(8080, () => {
console.log('TCP server: 127.0.0.1:8080')
// 监听端口后将服务器句柄发送给工作进程
for (let i = 0; i < cpuNum; ++i) {
workers[i].send('server', server)
// 工作进程退出后重启
workers[i].on('exit', ((i) => {
return () => {
console.log('Worker-' + workers[i].pid + ' exited')
workers[i] = childProcess.fork('./worker.js')
console.log('Create worker-' + workers[i].pid)
workers[i].send('server', server)
}
})(i))
}
// 关闭主线程服务器的端口监听
// server.close()
})

worker.js

1
2
3
4
5
6
7
8
9
10
process.on('message', (msg, server) => {
if (msg === 'server' && server) {
server.on('connection', (socket) => {
// 利用setTimeout模拟处理请求时的操作耗时
setTimeout(() => {
socket.end('Request handled by worker-' + process.pid)
}, 10)
})
}
})

执行node master.js启动服务器后,可以通过任务管理器直接杀掉进程来模拟进程异常退出。可以看到 worker 进程退出后,master 能够发现并及时创建新的 worker 进程。任务管理器中的 Node 进程数量恢复原样。

image

image

执行node tcp_client.js启动客户端,客户端发出的连接请求被处理的情况如下,同样地,由于监听同一端口,进程之间采取抢占式服务,不一定保障负载均衡。

image

1.6、处理 HTTP 服务

前面的示例所使用的是 TCP 服务器,如果要处理 HTTP 请求,需要使用 HTTP 服务器。而 HTTP 其实是基于 TCP 的,发送 HTTP 请求的时候同样也会发起 TCP 连接。只需要对前面的 TCP 服务器进行一点小改动便可以支持 HTTP 了。在进程中新增 HTTP 服务器,当 TCP 服务器收到请求时,把请求提交给 HTTP 服务器处理即可。下面是 worker 进程的改动示例。

worker.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
const http = require('http')
const httpServer = http.createServer((req, res) => {
// 利用setTimeout模拟处理请求时的操作耗时
setTimeout(() => {
res.writeHead(200, { 'Content-Type': 'text/plain' })
res.end('Request handled by worker-' + process.pid)
}, 10)
})

process.on('message', (msg, server) => {
if (msg === 'server' && server) {
server.on('connection', (socket) => {
// 提交给HTTP服务器处理
httpServer.emit('connection', socket)
})
}
})

image

二、cluster

前面简单描述了使用 child_process 实现单机 Node 集群的做法,需要处理挺多的细节。Node 提供了 cluster 模块,该模块提供了更完善的 API,除了能够实现多进程充分利用 CPU 资源以外,还能够帮助我们更好地进行进程管理和处理进程的健壮性问题。下面是简单示例,if 条件语句判断当前进程是 master 还是 worker,master 进程会执行 if 语句块包含的代码,而 worker 进程则执行 else 语句块包含的代码。master 进程中,利用 cluster 模块创建了与 CPU 数量相应的 worker 进程,并通过监听 cluster 的 online 事件来判断 worker 的创建成功。在 worker 进程退出后,会触发 master 进程中 cluster 模块上的 exit 事件,通过监听该事件可以了解 worker 进程的退出情况并及时 fork 新的 worker。最后,worker 进程中只需创建服务器监听端口,对客户端请求做出处理即可。(这里设置相同端口 8080 之后,所有 worker 都将监听同一个端口)

server.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
const cluster = require('cluster')

if (cluster.isMaster) {
const cpuNum = require('os').cpus().length

for (let i = 0; i < cpuNum; ++i) {
cluster.fork()
}

// 创建进程完成后输出提示信息
cluster.on('online', (worker) => {
console.log('Create worker-' + worker.process.pid)
})

// 子进程退出后重启
cluster.on('exit', (worker, code, signal) => {
console.log('[Master] worker ' + worker.process.pid + ' died with code: ' + code + ', and signal: ' + signal)
cluster.fork()
})
} else {
const net = require('net')
net.createServer().on('connection', (socket) => {
// 利用setTimeout模拟处理请求时的操作耗时
setTimeout(() => {
socket.end('Request handled by worker-' + process.pid)
}, 10)
}).listen(8080)
}

执行node server.js启动服务器,继续按照之前的做法,利用任务管理器杀死进程,可以看到在进程被杀后 master 能够及时启动新的 worker。

image

继续运行 tcp_client,可以看到服务器能够正常处理请求。

image

三、小结

利用child_processcluster模块能够很好地实现Master-Worker模式多进程架构,实现单机服务器集群,充分利用了多核 CPU 资源。通过进程通信能够实现进程管理、重启以及负载均衡,从而提高集群的稳定性和健壮性。