Nodejs进程间通信 蔚落 2022-11-15 12:57 167阅读 0赞 ## 一.场景 ## Node运行在单线程下,但这并不意味着无法利用多核/多机下多进程的优势 事实上,Node最初从设计上就考虑了分布式网络场景: > Node is a single-threaded, single-process system which enforces shared-nothing design with OS process boundaries. It has rather good libraries for networking. I believe this to be a basis for designing very large distributed programs. The “nodes” need to be organized: given a communication protocol, told how to connect to each other. In the next couple months we are working on libraries for Node that allow these networks. P.S.关于Node之所以叫Node,见[Why is Node.js named Node.js?][Why is Node.js named Node.js] ## 二.创建进程 ## 通信方式与进程产生方式有关,而Node有4种创建进程的方式:`spawn()`,`exec()`,`execFile()`和`fork()` ### spawn ### const { spawn } = require('child_process'); const child = spawn('pwd'); // 带参数的形式 // const child = spawn('find', ['.', '-type', 'f']); `spawn()`返回`ChildProcess`实例,`ChildProcess`同样基于事件机制(EventEmitter API),提供了一些事件: * `exit`:子进程退出时触发,可以得知进程退出状态(`code`和`signal`) * `disconnect`:父进程调用`child.disconnect()`时触发 * `error`:子进程创建失败,或被`kill`时触发 * `close`:子进程的`stdio`流(标准输入输出流)关闭时触发 * `message`:子进程通过`process.send()`发送消息时触发,父子进程之间可以通过这种*内置的消息机制通信* 可以通过`child.stdin`,`child.stdout`和`child.stderr`访问子进程的`stdio`流,这些流被关闭的时,子进程会触发`close`事件 P.S.`close`与`exit`的区别主要体现在多进程共享同一`stdio`流的场景,某个进程退出了并不意味着`stdio`流被关闭了 在子进程中,`stdout/stderr`具有Readable特性,而`stdin`具有Writable特性,*与主进程的情况正好相反*: child.stdout.on('data', (data) => { console.log(`child stdout:\n${data}`); }); child.stderr.on('data', (data) => { console.error(`child stderr:\n${data}`); }); 利用进程`stdio`流的管道特性,就可以完成更复杂的事情,例如: const { spawn } = require('child_process'); const find = spawn('find', ['.', '-type', 'f']); const wc = spawn('wc', ['-l']); find.stdout.pipe(wc.stdin); wc.stdout.on('data', (data) => { console.log(`Number of files ${data}`); }); 作用等价于`find . -type f | wc -l`,递归统计当前目录文件数量 IPC选项 另外,通过`spawn()`方法的`stdio`选项可以建立IPC机制: const { spawn } = require('child_process'); const child = spawn('node', ['./ipc-child.js'], { stdio: [null, null, null, 'ipc'] }); child.on('message', (m) => { console.log(m); }); child.send('Here Here'); // ./ipc-child.js process.on('message', (m) => { process.send(`< ${m}`); process.send('> 不要回答x3'); }); 关于`spawn()`的IPC选项的详细信息,请查看[options.stdio][] ### exec ### `spawn()`方法默认不会创建shell去执行传入的命令(所以*性能上稍微好一点*),而`exec()`方法会创建一个shell。另外,`exec()`不是基于stream的,而是把传入命令的执行结果暂存到buffer中,再整个传递给回调函数 `exec()`方法的特点是*完全支持shell语法*,可以直接传入任意shell脚本,例如: const { exec } = require('child_process'); exec('find . -type f | wc -l', (err, stdout, stderr) => { if (err) { console.error(`exec error: ${err}`); return; } console.log(`Number of files ${stdout}`); }); 但`exec()`方法也因此存在[命令注入][Link 1]的安全风险,在含有用户输入等动态内容的场景要特别注意。所以,`exec()`方法的适用场景是:希望直接使用shell语法,并且预期输出数据量不大(不存在内存压力) 那么,有没有既支持shell语法,还具有stream IO优势的方式? 有。*两全其美的方式*如下: const { spawn } = require('child_process'); const child = spawn('find . -type f | wc -l', { shell: true }); child.stdout.pipe(process.stdout); 开启`spawn()`的`shell`选项,并通过`pipe()`方法把子进程的标准输出简单地接到当前进程的标准输入上,以便看到命令执行结果。实际上还有更容易的方式: const { spawn } = require('child_process'); process.stdout.on('data', (data) => { console.log(data); }); const child = spawn('find . -type f | wc -l', { shell: true, stdio: 'inherit' }); `stdio: 'inherit'`允许子进程继承当前进程的标准输入输出(共享`stdin`,`stdout`和`stderr`),所以上例能够通过监听当前进程`process.stdout`的`data`事件拿到子进程的输出结果 另外,除了`stdio`和`shell`选项,`spawn()`还支持一些其它选项,如: const child = spawn('find . -type f | wc -l', { stdio: 'inherit', shell: true, // 修改环境变量,默认process.env env: { HOME: '/tmp/xxx' }, // 改变当前工作目录 cwd: '/tmp', // 作为独立进程存在 detached: true }); *注意*,`env`选项除了以环境变量形式向子进程传递数据外,还可以用来实现沙箱式的环境变量隔离,默认把`process.env`作为子进程的环境变量集,子进程与当前进程一样能够访问所有环境变量,如果像上例中指定自定义对象作为子进程的环境变量集,子进程就无法访问其它环境变量 所以,想要增/删环境变量的话,需要这样做: var spawn_env = JSON.parse(JSON.stringify(process.env)); // remove those env vars delete spawn_env.ATOM_SHELL_INTERNAL_RUN_AS_NODE; delete spawn_env.ELECTRON_RUN_AS_NODE; var sp = spawn(command, ['.'], {cwd: cwd, env: spawn_env}); `detached`选项更有意思: const { spawn } = require('child_process'); const child = spawn('node', ['stuff.js'], { detached: true, stdio: 'ignore' }); child.unref(); 以这种方式创建的独立进程行为取决于操作系统,Windows上detached子进程将拥有自己的console窗口,而Linux上该进程会*创建新的process group*(这个特性可以用来管理子进程族,实现类似于[tree-kill][]的特性) `unref()`方法用来断绝关系,这样“父”进程可以独立退出(不会导致子进程跟着退出),但要注意这时子进程的`stdio`也应该独立于“父”进程,否则“父”进程退出后子进程仍会受到影响 ### execFile ### const { execFile } = require('child_process'); const child = execFile('node', ['--version'], (error, stdout, stderr) => { if (error) { throw error; } console.log(stdout); }); 与`exec()`方法类似,但不通过shell来执行(所以性能稍好一点),所以要求传入*可执行文件*。Windows下某些文件无法直接执行,比如`.bat`和`.cmd`,这些文件就不能用`execFile()`来执行,只能借助`exec()`或开启了`shell`选项的`spawn()` P.S.与`exec()`一样也*不是基于stream的*,同样存在输出数据量风险 xxxSync `spawn`,`exec`和`execFile`都有对应的同步阻塞版本,一直等到子进程退出 const { spawnSync, execSync, execFileSync, } = require('child_process'); 同步方法用来简化脚本任务,比如启动流程,其它时候应该避免使用这些方法 ### fork ### `fork()`是`spawn()`的变体,用来创建Node进程,最大的特点是父子进程自带通信机制(IPC管道): > The child\_process.fork() method is a special case of child\_process.spawn() used specifically to spawn new Node.js processes. Like child\_process.spawn(), a ChildProcess object is returned. The returned ChildProcess will have an additional communication channel built-in that allows messages to be passed back and forth between the parent and child. See subprocess.send() for details. 例如: var n = child_process.fork('./child.js'); n.on('message', function(m) { console.log('PARENT got message:', m); }); n.send({ hello: 'world' }); // ./child.js process.on('message', function(m) { console.log('CHILD got message:', m); }); process.send({ foo: 'bar' }); 因为`fork()`自带通信机制的优势,尤其适合用来拆分耗时逻辑,例如: const http = require('http'); const longComputation = () => { let sum = 0; for (let i = 0; i < 1e9; i++) { sum += i; }; return sum; }; const server = http.createServer(); server.on('request', (req, res) => { if (req.url === '/compute') { const sum = longComputation(); return res.end(`Sum is ${sum}`); } else { res.end('Ok') } }); server.listen(3000); 这样做的致命问题是一旦有人访问`/compute`,后续请求都无法及时处理,因为事件循环还被`longComputation`阻塞着,直到耗时计算结束才能恢复服务能力 为了避免耗时操作阻塞主进程的事件循环,可以把`longComputation()`拆分到子进程中: // compute.js const longComputation = () => { let sum = 0; for (let i = 0; i < 1e9; i++) { sum += i; }; return sum; }; // 开关,收到消息才开始做 process.on('message', (msg) => { const sum = longComputation(); process.send(sum); }); 主进程开启子进程执行`longComputation`: const http = require('http'); const { fork } = require('child_process'); const server = http.createServer(); server.on('request', (req, res) => { if (req.url === '/compute') { const compute = fork('compute.js'); compute.send('start'); compute.on('message', sum => { res.end(`Sum is ${sum}`); }); } else { res.end('Ok') } }); server.listen(3000); 主进程的事件循环不会再被耗时计算阻塞,但进程数量还需要进一步限制,否则资源被进程消耗殆尽时服务能力仍会受到影响 P.S.实际上,`cluster`模块就是对多进程服务能力的封装,*思路与这个简单示例类似* ## 三.通信方式 ## ### 1.通过stdin/stdout传递json ### > stdin/stdout and a JSON payload 最直接的通信方式,拿到子进程的handle后,可以访问其`stdio`流,然后约定一种`message`格式开始愉快地通信: const { spawn } = require('child_process'); child = spawn('node', ['./stdio-child.js']); child.stdout.setEncoding('utf8'); // 父进程-发 child.stdin.write(JSON.stringify({ type: 'handshake', payload: '你好吖' })); // 父进程-收 child.stdout.on('data', function (chunk) { let data = chunk.toString(); let message = JSON.parse(data); console.log(`${message.type} ${message.payload}`); }); 子进程与之类似: // ./stdio-child.js // 子进程-收 process.stdin.on('data', (chunk) => { let data = chunk.toString(); let message = JSON.parse(data); switch (message.type) { case 'handshake': // 子进程-发 process.stdout.write(JSON.stringify({ type: 'message', payload: message.payload + ' : hoho' })); break; default: break; } }); P.S.VS Code进程间通信就采用了这种方式,具体见[access electron API from vscode extension][] 明显的*限制*是需要拿到“子”进程的handle,两个完全独立的进程之间无法通过这种方式来通信(比如跨应用,甚至跨机器的场景) P.S.关于stream及pipe的详细信息,请查看[Node中的流][Node] ### 2.原生IPC支持 ### 如`spawn()`及`fork()`的例子,进程之间可以借助内置的IPC机制通信 父进程: * `process.on('message')`收 * `child.send()`发 子进程: * `process.on('message')`收 * `process.send()`发 限制同上,同样要有一方能够拿到另一方的handle才行 ### 3.sockets ### 借助网络来完成进程间通信,*不仅能跨进程,还能跨机器* [node-ipc][]就采用这种方案,例如: // server const ipc=require('../../../node-ipc'); ipc.config.id = 'world'; ipc.config.retry= 1500; ipc.config.maxConnections=1; ipc.serveNet( function(){ ipc.server.on( 'message', function(data,socket){ ipc.log('got a message : ', data); ipc.server.emit( socket, 'message', data+' world!' ); } ); ipc.server.on( 'socket.disconnected', function(data,socket){ console.log('DISCONNECTED\n\n',arguments); } ); } ); ipc.server.on( 'error', function(err){ ipc.log('Got an ERROR!',err); } ); ipc.server.start(); // client const ipc=require('node-ipc'); ipc.config.id = 'hello'; ipc.config.retry= 1500; ipc.connectToNet( 'world', function(){ ipc.of.world.on( 'connect', function(){ ipc.log('## connected to world ##', ipc.config.delay); ipc.of.world.emit( 'message', 'hello' ); } ); ipc.of.world.on( 'disconnect', function(){ ipc.log('disconnected from world'); } ); ipc.of.world.on( 'message', function(data){ ipc.log('got a message from world : ', data); } ); } ); P.S.更多示例见[RIAEvangelist/node-ipc][RIAEvangelist_node-ipc] 当然,单机场景下通过网络来完成进程间通信有些浪费性能,但网络通信的*优势*在于跨环境的兼容性与更进一步的RPC场景 ### 4.message queue ### 父子进程都通过外部消息机制来通信,跨进程的能力取决于MQ支持 即进程间不直接通信,而是通过中间层(MQ),*加一个控制层*就能获得更多灵活性和优势: * 稳定性:消息机制提供了强大的稳定性保证,比如确认送达(消息回执ACK),失败重发/防止多发等等 * 优先级控制:允许调整消息响应次序 * 离线能力:消息可以被缓存 * 事务性消息处理:把关联消息组合成事务,保证其送达顺序及完整性 P.S.不好实现?包一层能解决吗,不行就包两层…… 比较受欢迎的有[smrchy/rsmq][smrchy_rsmq],例如: // init RedisSMQ = require("rsmq"); rsmq = new RedisSMQ( {host: "127.0.0.1", port: 6379, ns: "rsmq"} ); // create queue rsmq.createQueue({qname:"myqueue"}, function (err, resp) { if (resp===1) { console.log("queue created") } }); // send message rsmq.sendMessage({qname:"myqueue", message:"Hello World"}, function (err, resp) { if (resp) { console.log("Message sent. ID:", resp); } }); // receive message rsmq.receiveMessage({qname:"myqueue"}, function (err, resp) { if (resp.id) { console.log("Message received.", resp) } else { console.log("No messages for me...") } }); 会起一个Redis server,基本原理如下: > Using a shared Redis server multiple Node.js processes can send / receive messages. 消息的收/发/缓存/持久化依靠Redis提供的能力,在此基础上实现完整的队列机制 ### 5.Redis ### 基本思路与message queue类似: > Use Redis as a message bus/broker. Redis自带[Pub/Sub机制][Pub_Sub](即发布-订阅模式),适用于简单的通信场景,比如一对一或一对多并且*不关注消息可靠性*的场景 另外,Redis有list结构,可以用作消息队列,以此提高消息可靠性。一般做法是生产者[LPUSH][]消息,消费者[BRPOP][]消息。适用于要求消息可靠性的简单通信场景,但缺点是消息不具状态,且没有ACK机制,无法满足复杂的通信需求 P.S.Redis的Pub/Sub示例见[What’s the most efficient node.js inter-process communication library/method?][What_s the most efficient node.js inter-process communication library_method] ## 四.总结 ## Node进程间通信有4种方式: * 通过stdin/stdout传递json:最直接的方式,适用于能够拿到“子”进程handle的场景,适用于关联进程之间通信,无法跨机器 * Node原生IPC支持:最native(地道?)的方式,比上一种“正规”一些,具有同样的局限性 * 通过sockets:最通用的方式,有良好的跨环境能力,但存在网络的性能损耗 * 借助message queue:最强大的方式,既然要通信,场景还复杂,不妨扩展出一层消息中间件,漂亮地解决各种通信问题 ### 参考资料 ### * [Node.js Child Processes: Everything you need to know][Node.js Child Processes_ Everything you need to know] [Why is Node.js named Node.js]: https://stackoverflow.com/questions/5621812/why-is-node-js-named-node-js [options.stdio]: https://nodejs.org/dist/latest-v8.x/docs/api/child_process.html#child_process_options_stdio [Link 1]: https://blog.liftsecurity.io/2014/08/19/Avoid-Command-Injection-Node.js/ [tree-kill]: http://www.ayqy.net/blog/nodejs%E8%BF%9B%E7%A8%8B%E9%97%B4%E9%80%9A%E4%BF%A1/npmjs.com/package/tree-kill [access electron API from vscode extension]: https://github.com/Microsoft/vscode/issues/3011#issuecomment-196305696 [Node]: http://www.ayqy.net/blog/node-stream/ [node-ipc]: https://www.npmjs.com/package/node-ipc [RIAEvangelist_node-ipc]: https://github.com/RIAEvangelist/node-ipc/tree/master/example [smrchy_rsmq]: https://github.com/smrchy/rsmq [Pub_Sub]: https://redis.io/topics/pubsub [LPUSH]: https://redis.io/commands/lpush [BRPOP]: https://redis.io/commands/brpop [What_s the most efficient node.js inter-process communication library_method]: https://stackoverflow.com/questions/6463945/whats-the-most-efficient-node-js-inter-process-communication-library-method [Node.js Child Processes_ Everything you need to know]: https://medium.freecodecamp.org/node-js-child-processes-everything-you-need-to-know-e69498fe970a
相关 进程间通信 管道是Unix中最古老的进程间通信的形式。我们把从一个进程连接到另一个进程的一个数据流称为一个“管道“我们之前说进程间通信的本质是让不同的进程看到同一份资源,管道就是其中... 一时失言乱红尘/ 2024年04月25日 20:18/ 0 赞/ 148 阅读
相关 进程间通信 1 引言 \---- 进程间通信的主要目的是实现同一计算机系统内部的相互协作的进程之间的数据共享与信息交换,由于这些进程处于同一软件和硬件环境下,利用操作系统提供的的编程接口 迷南。/ 2024年02月18日 23:52/ 0 赞/ 109 阅读
相关 进程间通信 进程间通信(IPC,InterProcess Communication)是指在不同进程之间传播或交换信息。 IPC的方式通常有管道(包括无名管道和命名管道)、消息队列、信号 以你之姓@/ 2024年02月18日 20:08/ 0 赞/ 119 阅读
相关 进程间通信 进程间通信(IPC,Inter-Process Communication),是指两个或两个以上的进程之间传递数据或信号的一些技术或方法。进程是计算机系统分配资源的最小单位,每 ゞ 浴缸里的玫瑰/ 2023年01月02日 15:24/ 0 赞/ 230 阅读
相关 Nodejs进程间通信 一.场景 Node运行在单线程下,但这并不意味着无法利用多核/多机下多进程的优势 事实上,Node最初从设计上就考虑了分布式网络场景: > Node is a sin 蔚落/ 2022年11月15日 12:57/ 0 赞/ 168 阅读
相关 进程间通信 进程间通信的基本概念 进程间通信意味着两个不同进程间可以交换数据,操作系统中应提供两个进程可以同时访问的内存空间。 通过管道实现进程间通信 基于管道(P 港控/mmm°/ 2022年05月25日 09:42/ 0 赞/ 425 阅读
相关 进程间通信 程序员必须让拥有依赖关系的进程集协调,这样才能达到进程的共同目标。可以使用两种技术来达到协调。第一种技术在具有通信依赖关系的两个进程间传递信息。这种技术称做进程间通信(inte 谁践踏了优雅/ 2022年01月16日 14:09/ 0 赞/ 439 阅读
相关 进程间通信 转载自:[http://songlee24.github.io/2015/04/21/linux-IPC/][http_songlee24.github.io_2015_04_ 不念不忘少年蓝@/ 2021年09月23日 04:10/ 0 赞/ 572 阅读
相关 进程间通信 进程间通信 1. 前言 2. 使用文件实现进程间的通信 3. 使用管道实现进程间的通信 4. 共享内存 5. 以上三种通信方式的区别 青旅半醒/ 2021年08月30日 22:05/ 0 赞/ 582 阅读
还没有评论,来说两句吧...