V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
xcold
V2EX  ›  Node.js

更优雅轻量地用 JS 进行 “IPC” 调用,我写了 event-invoke 库

  •  
  •   xcold · 2022-02-28 12:07:29 +08:00 · 9255 次点击
    这是一个创建于 1035 天前的主题,其中的信息可能已经有所发展或是发生改变。

    背景

    团队最近有一个 Node.js 全新的模块需要开发,涉及多进程的管理和通讯,简化模型可以理解为需要频繁从 master 进程调用 worker 进程的某些方法,简单设计实现了一个 event-invoke 的库,可以简单优雅进行调用。

    Node.js 提供了 child_process 模块,在 master 进程通过 fork / spawn 等方法调用可以创建 worker 进程并获取其对象(简称 cp )。父子进程会建立 IPC 通道,在 master 进程中可以使用 cp.send() 给 worker 进程发送 IPC 消息,而在 worker 进程中也可以通过 process.send() 给父进程发送 IPC 消息,达到双工通信的目的。(进程管理涉及更复杂的工作,本文暂不涉及)

    最小实现

    基于以上前提,借助 IPC 通道和进程对象,我们可以通过事件驱动的方式实现进程间的通信,只需要简单的几行代码,就能实现基本调用逻辑,例如:

    // master.js
    const child_process = require('child_process');
    const cp = child_process.fork('./worker.js');
    
    function invoke() {
    	cp.send({ name: 'methodA', args: [] });
      cp.on('message', (packet) => {
      	console.log('result: %j', packet.payload);
      });
    }
    
    invoke();
    
    // worker.js
    const methodMap = {
      methodA() {}
    }
    
    cp.on('message', async (packet) => {
      const { name, args } = packet;
      const result = await methodMap[name)(...args);
      process.send({ name, payload: result });
    });
    

    仔细分析上述代码实现,直观感受 invoke 调用并不优雅,并且当调用量较大时,会创建很多的 message 监听器,并且要保证请求和响应是一一对应,需要做很多额外的设计。希望设计一个简单理想的方式,只需提供 invoke 方法,传入方法名和参数,返回一个 Promise ,像调用本地方法那样进行 IPC 调用,而不用考虑消息通信的细节。

    // 假想中的 IPC 调用
    const res1 = await invoker.invoke('sleep', 1000);
    console.log('sleep 1000ms:', res1);
    const res2 = await invoker.invoke('max', [1, 2, 3]); // 3
    console.log('max(1, 2, 3):', res2);
    

    流程设计

    从调用的模型看,可以将角色抽象为 Invoker 和 Callee ,分别对应服务调用方和提供方,将消息通讯的细节可以封装在内部。parent_process 和 child_process 的通信桥梁是操作系统提供的 IPC 通道,单纯从 API 的视角看,可以简化为两个 Event 对象(主进程为 cp ,子进程为 process )。Event 对象作为中间的双工通道两端,暂且命名为 InvokerChannel 和 CalleeChannel 。

    关键实体和流程如下: 825cb24fe4bb2cc7f9713606d8594a77.png

    • Callee 中注册可被调用的所有方法,并保存在 functionMap
    • 用户调用 Invoker.invoke() 时:
      • 创建一个 promise 对象,返回给用户,同时将其保存在 promiseMap 中
      • 每次调用生成一个 id ,保证调用和执行结果是一一对应的
      • 进行超时控制,超时的任务直接执行 reject 该 promise
    • Invoker 通过 Channel 把调用方法消息发送给 Callee
    • Callee 解析收到的消息,通过 name 执行对应方法,并将结果和完成状态(成功 or 异常)通过 Channel 发送消息给 Invoker
    • Invoker 解析消息,通过 id+name 找到对应的 promise 对象,成功则 resolve ,失败则 reject

    实际上,这个设计不仅适用 IPC 调用,在浏览器的场景下也能直接得到很好的应用,比如说跨 iframe 的调用可以包装 window.postMessage(),跨标签页调用可以使用 storage 事件,以及 Web worker 中可借助 worker.postMessage() 作为通信的桥梁。

    快速开始

    基于以上设计,实现编码必然不在话下,趁着非工作时间迅速完成开发和文档的工作,源代码:https://github.com/x-cold/event-invoke

    安装依赖

    npm i -S event-invoke
    

    父子进程通信实例

    示例代码:Example code

    // parent.js
    const cp = require('child_process');
    const { Invoker } = require('event-invoke');
    
    const invokerChannel = cp.fork('./child.js');
    
    const invoker = new Invoker(invokerChannel);
    
    async function main() {
      const res1 = await invoker.invoke('sleep', 1000);
      console.log('sleep 1000ms:', res1);
      const res2 = await invoker.invoke('max', [1, 2, 3]); // 3
      console.log('max(1, 2, 3):', res2);
      invoker.destroy();
    }
    
    main();
    
    // child.js
    const { Callee } = require('event-invoke');
    
    const calleeChannel = process;
    
    const callee = new Callee(calleeChannel);
    
    // async method
    callee.register(async function sleep(ms) {
      return new Promise((resolve) => {
        setTimeout(resolve, ms);
      });
    });
    
    // sync method
    callee.register(function max(...args) {
      return Math.max(...args);
    });
    
    callee.listen();
    

    自定义 Channel 实现 PM2 进程间调用

    示例代码:Example code

    // pm2.config.cjs
    module.exports = {
      apps: [
        {
          script: 'invoker.js',
          name: 'invoker',
          exec_mode: 'fork',
        },
        {
          script: 'callee.js',
          name: 'callee',
          exec_mode: 'fork',
        }
      ],
    };
    
    // callee.js
    import net from 'net';
    import pm2 from 'pm2';
    import {
      Callee,
      BaseCalleeChannel
    } from 'event-invoke';
    
    const messageType = 'event-invoke';
    const messageTopic = 'some topic';
    
    class CalleeChannel extends BaseCalleeChannel {
      constructor() {
        super();
        this._onProcessMessage = this.onProcessMessage.bind(this);
        process.on('message', this._onProcessMessage);
      }
    
      onProcessMessage(packet) {
        if (packet.type !== messageType) {
          return;
        }
        this.emit('message', packet.data);
      }
    
      send(data) {
        pm2.list((err, processes) => {
          if (err) { throw err; }
          const list = processes.filter(p => p.name === 'invoker');
          const pmId = list[0].pm2_env.pm_id;
          pm2.sendDataToProcessId({
            id: pmId,
            type: messageType,
            topic: messageTopic,
            data,
          }, function (err, res) {
            if (err) { throw err; }
          });
        });
      }
    
      destory() {
        process.off('message', this._onProcessMessage);
      }
    }
    
    const channel = new CalleeChannel();
    const callee = new Callee(channel);
    
    // async method
    callee.register(async function sleep(ms) {
      return new Promise((resolve) => {
        setTimeout(resolve, ms);
      });
    });
    
    // sync method
    callee.register(function max(...args) {
      return Math.max(...args);
    });
    
    callee.listen();
    
    // keep your process alive
    net.createServer().listen();
    
    // invoker.js
    import pm2 from 'pm2';
    import {
      Invoker,
      BaseInvokerChannel
    } from 'event-invoke';
    
    const messageType = 'event-invoke';
    const messageTopic = 'some topic';
    
    class InvokerChannel extends BaseInvokerChannel {
      constructor() {
        super();
        this._onProcessMessage = this.onProcessMessage.bind(this);
        process.on('message', this._onProcessMessage);
      }
    
      onProcessMessage(packet) {
        if (packet.type !== messageType) {
          return;
        }
        this.emit('message', packet.data);
      }
    
      send(data) {
        pm2.list((err, processes) => {
          if (err) { throw err; }
          const list = processes.filter(p => p.name === 'callee');
          const pmId = list[0].pm2_env.pm_id;
          pm2.sendDataToProcessId({
            id: pmId,
            type: messageType,
            topic: messageTopic,
            data,
          }, function (err, res) {
            if (err) { throw err; }
          });
        });
      }
    
      connect() {
        this.connected = true;
      }
    
      disconnect() {
        this.connected = false;
      }
    
      destory() {
        process.off('message', this._onProcessMessage);
      }
    }
    
    const channel = new InvokerChannel();
    channel.connect();
    
    const invoker = new Invoker(channel);
    
    setInterval(async () => {
      const res1 = await invoker.invoke('sleep', 1000);
      console.log('sleep 1000ms:', res1);
      const res2 = await invoker.invoke('max', [1, 2, 3]); // 3
      console.log('max(1, 2, 3):', res2);
    }, 5 * 1000);
    

    下一步

    目前 event-invoke 具备了优雅调用“IPC”调用的基本能力,代码覆盖率 100%,同时提供了相对完善的类型描述。感兴趣的同学可以直接使用,有任何问题可以直接提 Issue

    另外一些后续仍要持续完善的部分:

    • 更丰富的示例,覆盖跨 Iframe ,跨标签页,Web worker 等使用场景
    • 提供开箱即用通用 Channel
    • 更友好的异常处理
    11 条回复    2022-03-06 23:48:15 +08:00
    codehz
        1
    codehz  
       2022-02-28 12:21:57 +08:00 via Android   ❤️ 2
    我觉得这个接口设计不够友好,js 完全可以用 Proxy 的方法让远程调用表现的好像本地调用一样,还可以支持迭代器和异步迭代器


    https://github.com/Jack-Works/async-call-rpc
    kinglisky
        2
    kinglisky  
       2022-02-28 12:56:19 +08:00
    好巧呀,这两天我也在搞 PRC 工具库,楼上刚发帖,楼下就看到楼主发的帖子,哈哈哈卷起来~

    留个脚印,一起学习 https://github.com/kinglisky/rpc-shooter
    EPr2hh6LADQWqRVH
        3
    EPr2hh6LADQWqRVH  
       2022-02-28 13:23:37 +08:00
    之前我也给 Electron 写了一个,现在的话意义不大了,MessageChannel 的形式标准化程度更高抽象得也更好,浏览器也能用,完爆自己造轮子
    zhuangzhuang1988
        4
    zhuangzhuang1988  
       2022-02-28 13:32:03 +08:00
    ruoxie
        5
    ruoxie  
       2022-02-28 13:43:41 +08:00
    websocket ,postMessage ,vscode 插件消息通讯我都是这么封装了,从 socket.io 那学来的
    xcold
        6
    xcold  
    OP
       2022-02-28 14:47:06 +08:00
    @codehz 👍🏻 很好的设计思路。
    watcher
        7
    watcher  
       2022-02-28 15:25:35 +08:00
    更优雅轻量地用 Nodejs 进行...
    dany813
        8
    dany813  
       2022-02-28 15:27:21 +08:00
    学习了
    xcold
        9
    xcold  
    OP
       2022-03-01 10:14:55 +08:00
    @watcher 浏览器也是可以支持的,只是写代码比较仓促,还没加上例子
    yazoox
        10
    yazoox  
       2022-03-01 13:57:55 +08:00
    不错,学习一下。
    楼主多写一些在不同场景下的应用示例。
    himself65
        11
    himself65  
       2022-03-06 23:48:15 +08:00 via iPhone
    支持 1L ,我们项目也在用那个库
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   978 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 53ms · UTC 19:01 · PVG 03:01 · LAX 11:01 · JFK 14:01
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.