Nodejs进程间通信 蔚落 2022-11-15 12:57 179阅读 0赞 ## 一.场景 ## Node运行在单线程下,但这并不意味着无法利用多核/多机下多进程的优势 事实上,Node最初从设计上就考虑了分布式网络场景: > Node is a single-threaded, single-process system which enforces shared-nothing design with OS process boundaries. It has rather good libraries for networking. I believe this to be a basis for designing very large distributed programs. The “nodes” need to be organized: given a communication protocol, told how to connect to each other. In the next couple months we are working on libraries for Node that allow these networks. P.S.关于Node之所以叫Node,见[Why is Node.js named Node.js?][Why is Node.js named Node.js] ## 二.创建进程 ## 通信方式与进程产生方式有关,而Node有4种创建进程的方式:`spawn()`,`exec()`,`execFile()`和`fork()` ### spawn ### const { spawn } = require('child_process'); const child = spawn('pwd'); // 带参数的形式 // const child = spawn('find', ['.', '-type', 'f']); `spawn()`返回`ChildProcess`实例,`ChildProcess`同样基于事件机制(EventEmitter API),提供了一些事件: * `exit`:子进程退出时触发,可以得知进程退出状态(`code`和`signal`) * `disconnect`:父进程调用`child.disconnect()`时触发 * `error`:子进程创建失败,或被`kill`时触发 * `close`:子进程的`stdio`流(标准输入输出流)关闭时触发 * `message`:子进程通过`process.send()`发送消息时触发,父子进程之间可以通过这种*内置的消息机制通信* 可以通过`child.stdin`,`child.stdout`和`child.stderr`访问子进程的`stdio`流,这些流被关闭的时,子进程会触发`close`事件 P.S.`close`与`exit`的区别主要体现在多进程共享同一`stdio`流的场景,某个进程退出了并不意味着`stdio`流被关闭了 在子进程中,`stdout/stderr`具有Readable特性,而`stdin`具有Writable特性,*与主进程的情况正好相反*: child.stdout.on('data', (data) => { console.log(`child stdout:\n${data}`); }); child.stderr.on('data', (data) => { console.error(`child stderr:\n${data}`); }); 利用进程`stdio`流的管道特性,就可以完成更复杂的事情,例如: const { spawn } = require('child_process'); const find = spawn('find', ['.', '-type', 'f']); const wc = spawn('wc', ['-l']); find.stdout.pipe(wc.stdin); wc.stdout.on('data', (data) => { console.log(`Number of files ${data}`); }); 作用等价于`find . -type f | wc -l`,递归统计当前目录文件数量 IPC选项 另外,通过`spawn()`方法的`stdio`选项可以建立IPC机制: const { spawn } = require('child_process'); const child = spawn('node', ['./ipc-child.js'], { stdio: [null, null, null, 'ipc'] }); child.on('message', (m) => { console.log(m); }); child.send('Here Here'); // ./ipc-child.js process.on('message', (m) => { process.send(`< ${m}`); process.send('> 不要回答x3'); }); 关于`spawn()`的IPC选项的详细信息,请查看[options.stdio][] ### exec ### `spawn()`方法默认不会创建shell去执行传入的命令(所以*性能上稍微好一点*),而`exec()`方法会创建一个shell。另外,`exec()`不是基于stream的,而是把传入命令的执行结果暂存到buffer中,再整个传递给回调函数 `exec()`方法的特点是*完全支持shell语法*,可以直接传入任意shell脚本,例如: const { exec } = require('child_process'); exec('find . -type f | wc -l', (err, stdout, stderr) => { if (err) { console.error(`exec error: ${err}`); return; } console.log(`Number of files ${stdout}`); }); 但`exec()`方法也因此存在[命令注入][Link 1]的安全风险,在含有用户输入等动态内容的场景要特别注意。所以,`exec()`方法的适用场景是:希望直接使用shell语法,并且预期输出数据量不大(不存在内存压力) 那么,有没有既支持shell语法,还具有stream IO优势的方式? 有。*两全其美的方式*如下: const { spawn } = require('child_process'); const child = spawn('find . -type f | wc -l', { shell: true }); child.stdout.pipe(process.stdout); 开启`spawn()`的`shell`选项,并通过`pipe()`方法把子进程的标准输出简单地接到当前进程的标准输入上,以便看到命令执行结果。实际上还有更容易的方式: const { spawn } = require('child_process'); process.stdout.on('data', (data) => { console.log(data); }); const child = spawn('find . -type f | wc -l', { shell: true, stdio: 'inherit' }); `stdio: 'inherit'`允许子进程继承当前进程的标准输入输出(共享`stdin`,`stdout`和`stderr`),所以上例能够通过监听当前进程`process.stdout`的`data`事件拿到子进程的输出结果 另外,除了`stdio`和`shell`选项,`spawn()`还支持一些其它选项,如: const child = spawn('find . -type f | wc -l', { stdio: 'inherit', shell: true, // 修改环境变量,默认process.env env: { HOME: '/tmp/xxx' }, // 改变当前工作目录 cwd: '/tmp', // 作为独立进程存在 detached: true }); *注意*,`env`选项除了以环境变量形式向子进程传递数据外,还可以用来实现沙箱式的环境变量隔离,默认把`process.env`作为子进程的环境变量集,子进程与当前进程一样能够访问所有环境变量,如果像上例中指定自定义对象作为子进程的环境变量集,子进程就无法访问其它环境变量 所以,想要增/删环境变量的话,需要这样做: var spawn_env = JSON.parse(JSON.stringify(process.env)); // remove those env vars delete spawn_env.ATOM_SHELL_INTERNAL_RUN_AS_NODE; delete spawn_env.ELECTRON_RUN_AS_NODE; var sp = spawn(command, ['.'], {cwd: cwd, env: spawn_env}); `detached`选项更有意思: const { spawn } = require('child_process'); const child = spawn('node', ['stuff.js'], { detached: true, stdio: 'ignore' }); child.unref(); 以这种方式创建的独立进程行为取决于操作系统,Windows上detached子进程将拥有自己的console窗口,而Linux上该进程会*创建新的process group*(这个特性可以用来管理子进程族,实现类似于[tree-kill][]的特性) `unref()`方法用来断绝关系,这样“父”进程可以独立退出(不会导致子进程跟着退出),但要注意这时子进程的`stdio`也应该独立于“父”进程,否则“父”进程退出后子进程仍会受到影响 ### execFile ### const { execFile } = require('child_process'); const child = execFile('node', ['--version'], (error, stdout, stderr) => { if (error) { throw error; } console.log(stdout); }); 与`exec()`方法类似,但不通过shell来执行(所以性能稍好一点),所以要求传入*可执行文件*。Windows下某些文件无法直接执行,比如`.bat`和`.cmd`,这些文件就不能用`execFile()`来执行,只能借助`exec()`或开启了`shell`选项的`spawn()` P.S.与`exec()`一样也*不是基于stream的*,同样存在输出数据量风险 xxxSync `spawn`,`exec`和`execFile`都有对应的同步阻塞版本,一直等到子进程退出 const { spawnSync, execSync, execFileSync, } = require('child_process'); 同步方法用来简化脚本任务,比如启动流程,其它时候应该避免使用这些方法 ### fork ### `fork()`是`spawn()`的变体,用来创建Node进程,最大的特点是父子进程自带通信机制(IPC管道): > The child\_process.fork() method is a special case of child\_process.spawn() used specifically to spawn new Node.js processes. Like child\_process.spawn(), a ChildProcess object is returned. The returned ChildProcess will have an additional communication channel built-in that allows messages to be passed back and forth between the parent and child. See subprocess.send() for details. 例如: var n = child_process.fork('./child.js'); n.on('message', function(m) { console.log('PARENT got message:', m); }); n.send({ hello: 'world' }); // ./child.js process.on('message', function(m) { console.log('CHILD got message:', m); }); process.send({ foo: 'bar' }); 因为`fork()`自带通信机制的优势,尤其适合用来拆分耗时逻辑,例如: const http = require('http'); const longComputation = () => { let sum = 0; for (let i = 0; i < 1e9; i++) { sum += i; }; return sum; }; const server = http.createServer(); server.on('request', (req, res) => { if (req.url === '/compute') { const sum = longComputation(); return res.end(`Sum is ${sum}`); } else { res.end('Ok') } }); server.listen(3000); 这样做的致命问题是一旦有人访问`/compute`,后续请求都无法及时处理,因为事件循环还被`longComputation`阻塞着,直到耗时计算结束才能恢复服务能力 为了避免耗时操作阻塞主进程的事件循环,可以把`longComputation()`拆分到子进程中: // compute.js const longComputation = () => { let sum = 0; for (let i = 0; i < 1e9; i++) { sum += i; }; return sum; }; // 开关,收到消息才开始做 process.on('message', (msg) => { const sum = longComputation(); process.send(sum); }); 主进程开启子进程执行`longComputation`: const http = require('http'); const { fork } = require('child_process'); const server = http.createServer(); server.on('request', (req, res) => { if (req.url === '/compute') { const compute = fork('compute.js'); compute.send('start'); compute.on('message', sum => { res.end(`Sum is ${sum}`); }); } else { res.end('Ok') } }); server.listen(3000); 主进程的事件循环不会再被耗时计算阻塞,但进程数量还需要进一步限制,否则资源被进程消耗殆尽时服务能力仍会受到影响 P.S.实际上,`cluster`模块就是对多进程服务能力的封装,*思路与这个简单示例类似* ## 三.通信方式 ## ### 1.通过stdin/stdout传递json ### > stdin/stdout and a JSON payload 最直接的通信方式,拿到子进程的handle后,可以访问其`stdio`流,然后约定一种`message`格式开始愉快地通信: const { spawn } = require('child_process'); child = spawn('node', ['./stdio-child.js']); child.stdout.setEncoding('utf8'); // 父进程-发 child.stdin.write(JSON.stringify({ type: 'handshake', payload: '你好吖' })); // 父进程-收 child.stdout.on('data', function (chunk) { let data = chunk.toString(); let message = JSON.parse(data); console.log(`${message.type} ${message.payload}`); }); 子进程与之类似: // ./stdio-child.js // 子进程-收 process.stdin.on('data', (chunk) => { let data = chunk.toString(); let message = JSON.parse(data); switch (message.type) { case 'handshake': // 子进程-发 process.stdout.write(JSON.stringify({ type: 'message', payload: message.payload + ' : hoho' })); break; default: break; } }); P.S.VS Code进程间通信就采用了这种方式,具体见[access electron API from vscode extension][] 明显的*限制*是需要拿到“子”进程的handle,两个完全独立的进程之间无法通过这种方式来通信(比如跨应用,甚至跨机器的场景) P.S.关于stream及pipe的详细信息,请查看[Node中的流][Node] ### 2.原生IPC支持 ### 如`spawn()`及`fork()`的例子,进程之间可以借助内置的IPC机制通信 父进程: * `process.on('message')`收 * `child.send()`发 子进程: * `process.on('message')`收 * `process.send()`发 限制同上,同样要有一方能够拿到另一方的handle才行 ### 3.sockets ### 借助网络来完成进程间通信,*不仅能跨进程,还能跨机器* [node-ipc][]就采用这种方案,例如: // server const ipc=require('../../../node-ipc'); = 'world'; ipc.config.retry= 1500; ipc.config.maxConnections=1; ipc.serveNet( function(){ ipc.server.on( 'message', function(data,socket){ ipc.log('got a message : ', data); ipc.server.emit( socket, 'message', data+' world!' ); } ); ipc.server.on( 'socket.disconnected', function(data,socket){ console.log('DISCONNECTED\n\n',arguments); } ); } ); ipc.server.on( 'error', function(err){ ipc.log('Got an ERROR!',err); } ); ipc.server.start(); // client const ipc=require('node-ipc'); = 'hello'; ipc.config.retry= 1500; ipc.connectToNet( 'world', function(){ 'connect', function(){ ipc.log('## connected to world ##', ipc.config.delay); 'message', 'hello' ); } ); 'disconnect', function(){ ipc.log('disconnected from world'); } ); 'message', function(data){ ipc.log('got a message from world : ', data); } ); } ); P.S.更多示例见[RIAEvangelist/node-ipc][RIAEvangelist_node-ipc] 当然,单机场景下通过网络来完成进程间通信有些浪费性能,但网络通信的*优势*在于跨环境的兼容性与更进一步的RPC场景 ### 4.message queue ### 父子进程都通过外部消息机制来通信,跨进程的能力取决于MQ支持 即进程间不直接通信,而是通过中间层(MQ),*加一个控制层*就能获得更多灵活性和优势: * 稳定性:消息机制提供了强大的稳定性保证,比如确认送达(消息回执ACK),失败重发/防止多发等等 * 优先级控制:允许调整消息响应次序 * 离线能力:消息可以被缓存 * 事务性消息处理:把关联消息组合成事务,保证其送达顺序及完整性 P.S.不好实现?包一层能解决吗,不行就包两层…… 比较受欢迎的有[smrchy/rsmq][smrchy_rsmq],例如: // init RedisSMQ = require("rsmq"); rsmq = new RedisSMQ( {host: "", port: 6379, ns: "rsmq"} ); // create queue rsmq.createQueue({qname:"myqueue"}, function (err, resp) { if (resp===1) { console.log("queue created") } }); // send message rsmq.sendMessage({qname:"myqueue", message:"Hello World"}, function (err, resp) { if (resp) { console.log("Message sent. ID:", resp); } }); // receive message rsmq.receiveMessage({qname:"myqueue"}, function (err, resp) { if ( { console.log("Message received.", resp) } else { console.log("No messages for me...") } }); 会起一个Redis server,基本原理如下: > Using a shared Redis server multiple Node.js processes can send / receive messages. 消息的收/发/缓存/持久化依靠Redis提供的能力,在此基础上实现完整的队列机制 ### 5.Redis ### 基本思路与message queue类似: > Use Redis as a message bus/broker. Redis自带[Pub/Sub机制][Pub_Sub](即发布-订阅模式),适用于简单的通信场景,比如一对一或一对多并且*不关注消息可靠性*的场景 另外,Redis有list结构,可以用作消息队列,以此提高消息可靠性。一般做法是生产者[LPUSH][]消息,消费者[BRPOP][]消息。适用于要求消息可靠性的简单通信场景,但缺点是消息不具状态,且没有ACK机制,无法满足复杂的通信需求 P.S.Redis的Pub/Sub示例见[What’s the most efficient node.js inter-process communication library/method?][What_s the most efficient node.js inter-process communication library_method] ## 四.总结 ## Node进程间通信有4种方式: * 通过stdin/stdout传递json:最直接的方式,适用于能够拿到“子”进程handle的场景,适用于关联进程之间通信,无法跨机器 * Node原生IPC支持:最native(地道?)的方式,比上一种“正规”一些,具有同样的局限性 * 通过sockets:最通用的方式,有良好的跨环境能力,但存在网络的性能损耗 * 借助message queue:最强大的方式,既然要通信,场景还复杂,不妨扩展出一层消息中间件,漂亮地解决各种通信问题 ### 参考资料 ### * [Node.js Child Processes: Everything you need to know][Node.js Child Processes_ Everything you need to know] [Why is Node.js named Node.js]: [options.stdio]: [Link 1]: [tree-kill]: [access electron API from vscode extension]: [Node]: [node-ipc]: [RIAEvangelist_node-ipc]: [smrchy_rsmq]: [Pub_Sub]: [LPUSH]: [BRPOP]: [What_s the most efficient node.js inter-process communication library_method]: [Node.js Child Processes_ Everything you need to know]:
