V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
molvqingtai
V2EX  ›  程序员

如何实现分布式消息同步?

  •  
  •   molvqingtai ·
    molvqingtai · 22 天前 · 2385 次点击

    前段时间写了一个浏览器扩展,详见: https://www.v2ex.com/t/1076581?p=1#reply25

    有一个痛点,因为所有消息都储存在本地,导致无法接收离线消息,例如:

    现在有 UserA 、UserB 、UserC 、UserD 4 个用户,A 、B 、C 在线,D 离线

    时间点 1:UserA 、UserB 、UserC 三个用户聊天,产生 3 条信息 “message-1-A, message-1-B, message-1-C”

    时间点 2:UserC 离线退出聊天,UserA 与 UserB 继续聊天,产生 2 条信息:“message-2-A, message-2-B”

    时间点 3:UserC 、UserD 上线加入聊天,那么此时,时间点 1 和时间点 2 的聊天信息需要同步给 UserC 和 User D

    UserA 、UserB 拥有所有信息记录 5 条, 无需同步

    UserC 本地记录 3 条,需要同步时间点 2 ,同步 2 条

    UserD 本地无记录,需要同步时间点 1+2 同步 5 条

    如上,我了解一些分布式同步的解决方案,etcd 、raft 等,奈何太菜看得一脸懵逼,上手太复杂 目前想到的解决方案就是,A 、B 、C 、D 广播自己所有的消息记录,然后各自接收,通过消息的时间戳 Diff, 然后同步时根据 Diff 出的差值,追加或插入。

    1. 性能问题:是否可以避免广播所有的消息记录,能否做到只广播其他用户需要同步的记录?
    2. 同步信息过多:因为插件的消息时储存在浏览器 IndexDB 中,一方面是有大小限制,感觉没有必要同步所有的历史消息,比如可以设置一个只同步 7 天 30 天..的聊天记录,或许有更好的方案?
    3. 消息数据结构:我现在只是简单的使用一个带时间戳的 List 来存储消息,要实现上面的功能是否有更好的存储方案,比如链表等?
    第 1 条附言  ·  22 天前
    解答纯 P2P 不能同步的疑惑,其实目前已经实现了在线用户数量显示

    大概原理:

    A 、B 、C 、D ,4 个用户,D 离线,当 D 上线时,会给所有在线的 Peer 节点发送 join 事件,然后 A 、B 、C 接收到某个 D 的 join 事件后,将自己的 userId 发送给 D ,就是 D 就能知道 ABC 在线,同时 ABC 也知道 D 在线,离开同理,以上逻辑只适用于本地自定义 userId 不变的情况下

    每加入一个用户会和所有的用户交换信息,这样是虽然实现了在线数量的同步,同理,如果这种方式用在消息记录同步,虽然可行,网络成本太高了,毕竟在线数量只需要交换 userId ,如果全量交换消息记录数据量太大了
    38 条回复    2024-11-03 07:40:31 +08:00
    cpstar
        1
    cpstar  
       22 天前   ❤️ 1
    我就想知道,如果没有远端服务器,如果 ABCD 全都离线状态,那么其中一者上线后,从哪获取消息?比如,CD 提前离线,而 AB 聊的火热但之后离线,CD 再上线时必然无法获取 AB 的消息,又聊的火热,再等 AB 其中一者上线时,CD 才能获取到 AB 最后聊天的内容,然后 CD 怎么插数据?
    于是必然需要远端存储,那么就与单纯本地存储相违背了。那么,没了,南山无敌说号称“不存”,么?说根本,OP 就在讨论一个 IM 的设计原则。单纯的本地存储只能解决点对点消息,而且一旦一点离线,无法发送,这么说当年抠抠差不多这个逻辑。一旦开群聊,那势必。。。
    iintothewind
        2
    iintothewind  
       22 天前
    部署一个 mq 服务, 支持每个用户创建可存储 mq 消费的 offset 的状态的 session,
    然后每个用户根据 id 创建 session 就好了.
    可选方案 mqtt, redis-mq 都可以, 没必要上 kafka.
    单用户多个客户端消息同步, 这个说真的比较难办, web 的话你没有权限拿到每个客户端的唯一 id, 很难区分, 即便能拿到, 你还要根据客户端不同, 从消息总线里面 replay+filter, 重新同步, 比较麻烦.

    带时间戳的 list, 你为啥不用 timer series 数据库, redis 本身就是了, 何必自己设计?
    qping
        3
    qping  
       22 天前 via Android
    暂时想到两个方案
    1 要么选举出一个中心节点,如果中心节点下线,那要重新选举。

    2 所有客户端保留的都是最近 n 天的聊天记录,也就是说它们存储的历史消息都是一样的,有其他人上线时,就需要就近从任意一人那里获得历史消息

    方案一,实现复杂,如果中心节点下线那所有人都会卡顿


    方案二,如果有人改动了消息,比如在聊天时清空了本地的消息,正好有人上线和他同步,那就同步不到消息了
    qping
        4
    qping  
       22 天前 via Android
    @iintothewind 他要的是分布式消息同步,只有客户端没有服务端
    iintothewind
        5
    iintothewind  
       22 天前
    @qping 没有中心节点, 那消息有没有送达, 送达的顺序都没办法保证啊, 因为没有一个一致的版本, 难道要自己做分布式服务, 搞仲裁啊? nb, 我倒想看看, 这咋搞?
    mayli
        6
    mayli  
       22 天前
    常见的分布式问题,CAP ,节点不够就是会丢消息脑裂
    niubee1
        7
    niubee1  
       22 天前
    纯粹的 P2P 消息,是无法做到多端同步的,能够实现多端同步的,必然不是纯粹的 P2P 。
    molvqingtai
        8
    molvqingtai  
    OP
       22 天前
    @iintothewind #5 送达顺序,默认以消息发送事件为准,假设消息时及时的,延迟 0
    molvqingtai
        9
    molvqingtai  
    OP
       22 天前
    @molvqingtai 发送时间
    molvqingtai
        10
    molvqingtai  
    OP
       22 天前
    @cpstar 如果没人任何用户在线,就没有同步呀
    luckyrayyy
        11
    luckyrayyy  
       22 天前
    同步这个我感觉还好,你服务器得存了所有消息吧。每个群的消息 ID 都是递增的吧,本地登陆的时候对比一下本地消息最大值和服务端消息最大值,拉中间缺失的部分就行了?
    luckyrayyy
        12
    luckyrayyy  
       22 天前
    哦没服务端,告辞。那没人在线的时候就没有数据可以同步呀
    iintothewind
        13
    iintothewind  
       22 天前
    @molvqingtai #8 你说的是 payload 里面的时间戳吧, 我说的是实际送达每个客户端的消息的顺序和时间.
    q958951326
        14
    q958951326  
       22 天前
    外行来说个思路,我怎么感觉这个消息同步,有点类似于动态路由协议中的路由信息的交换?
    qping
        15
    qping  
       22 天前
    @molvqingtai 客户端时间是不可靠得,我把本机时间调到一年后咋整
    DsuineGP
        16
    DsuineGP  
       22 天前   ❤️ 2
    TL;DR 用 CRDT 算法

    原因:
    - 常用的 raft 算法无法容忍脑裂, 即 A-B / C-D 上线然后还要实现日志同步, 即使要解决脑裂也要用非常扭曲的方法;
    - CTDT 是 p2p base 的分布式一致性算法, 且你的业务场景比较简单, 只需要需要处理 retain+insert 的场景, 不涉及到 remove;
    - https://github.com/vlcn-io/cr-sqlite 这个用 crdt 做 sqllite 代理的项目应该比较适合你的业务场景
    zsxzy
        17
    zsxzy  
       22 天前
    类似区块链的共识算法 , 解决双花问题
    SilentRhythm
        18
    SilentRhythm  
       22 天前
    外行插个嘴不知道能否给 op 一些思路:
    1. 把广播推送消息内容改成广播新消息通知,如“user1 在 XXX 时间戳发送了一条新消息”,在线的用户收到通知去 user1 拉取,拉取时带上本地最新消息时间戳作为 offset 。
    2. 不按日期存,按最大消息条数存,比如最后 1024 条;
    molvqingtai
        19
    molvqingtai  
    OP
       22 天前
    @qping #15 这个好解决,从一些公开的时间服务获取时间
    codegenerator
        20
    codegenerator  
       22 天前
    无服务的话只能采用类似 gossip 协议可以实现
    但是只能最终一致,勉强能达到需求
    jimmy2024
        21
    jimmy2024  
       22 天前
    又来一个套免费方案的
    xichuhanguguan
        22
    xichuhanguguan  
       22 天前
    我有个疑问,像 p2p 用的 DHT 网络。他在加入网络的时候是需要一个知道一个在网络中的节点,向他去获取网络中其他节点信息。新用户怎么加入这个网络,邀请制吗?
    molvqingtai
        23
    molvqingtai  
    OP
       22 天前
    @jimmy2024 我的应用本来就是开源免费的,单纯的技术交流,何来免费套方案
    molvqingtai
        24
    molvqingtai  
    OP
       22 天前
    @xichuhanguguan 同一个域名就是节点,一个域名一个聊天室
    Dynesshely
        25
    Dynesshely  
       22 天前
    不引入服务器的话这个问题无解
    提前下线的用户,在无人在线的情况下登录,无法获取既有的聊天数据
    XiLingHost
        26
    XiLingHost  
       22 天前
    @xichuhanguguan 要么通过 tracker ,要么有 boostrap 节点
    molvqingtai
        27
    molvqingtai  
    OP
       22 天前
    @Dynesshely 没人在线的情况下,获取聊天记录也没意义
    seedhk
        28
    seedhk  
       22 天前
    想到一个点:
    没有服务器的情况下,如果新节点上线,如何保证他获取到的消息是未经过篡改的?
    molvqingtai
        29
    molvqingtai  
    OP
       22 天前
    @seedhk 不能保证,能做到的只能验证消息的数据格式,不符合格式就过滤掉
    molvqingtai
        30
    molvqingtai  
    OP
       22 天前
    @seedhk 我一直没有实现文件传输功能,这也是考虑的点,怕大家电脑中毒了
    Dynesshely
        31
    Dynesshely  
       22 天前
    @molvqingtai 你要解决的不就是接收不了离线消息的问题嘛
    molvqingtai
        32
    molvqingtai  
    OP
       22 天前
    @Dynesshely #31 可能我描述不太准确,应该是可以同步其他客户端本地存在的消息,但自己本地不存在的消息,要实现这个操作当然需要至少两个用户在线
    bli22ard
        33
    bli22ard  
       22 天前   ❤️ 1
    看过电报的群组聊天协议, 每条消息有一条 id ,这个 id 就是从 0 开始递增,每次+1 , 发消息,服务端每次+1 就可以了。对于客户端来说,它只要关注,群组 id 和 最后一条消息的 id 。客户端启动,建立 websocket 连接,带上自己的 last message id , 然后服务端,从这个 last id 开是这个客户端推送,这个同步要考虑,消息差了非常多,你需要设定一个阈值,超过这个阈值,就只取最后多少条消息。这块有个细节就是,要处理好,websocket 建立这段时间,出现新的消息,而导致的一些倒霉客户端没同步到这些最新消息的问题。你说的 etcd 这些用来实现存储一致性的,不适合你这种场景,你这种场景 mysql 就可以了,性能不够分库,就可以了。
    fano
        34
    fano  
       22 天前
    @DsuineGP 正解
    2Nfree
        35
    2Nfree  
       22 天前
    直接引入区块链吧,内容寻址、分块存储、P2P 传输、DHT 索引
    2Nfree
        36
    2Nfree  
       22 天前
    @2Nfree 但是这样还是解决不了全部节点下线消息不可用的问题
    molvqingtai
        37
    molvqingtai  
    OP
       21 天前
    @2Nfree #35 准备综合楼上的思路手撸了
    molvqingtai
        38
    molvqingtai  
    OP
       20 天前
    @all 感谢各位,目前已实现,逻辑如下:
    同步最大消息为 30 天内的历史记录,使用最后一条消息作为判断依据
    同步信息数量不一定是 30 天内所有的消息, 如果在同步之前,产生了新的时间点的信息,则不会同步


    A,B,C,D,E 5 个用户,A ,B 在线,C,D,E 离线

    A-B 聊天,产生信息两条 messageA, messageB

    A-B 离线

    C-D 上线,产生数据两条 messageC, messageD

    A-B 上线,C-D 将会给 A-B 推送两条消息 messageC 和 messageD ,但是 A-B 不会给 C-D 推送 messageA 和 messageB ,因为 C-D 的最新消息时间点比 A-B 早

    E 上线,A-B-C-D 均会给 E 推送消息 messageA, messageB, messageC, messageD

    最终结果:
    A-B 显示 4 条消息 messageC, messageD, messageA, messageB

    C-D 显示 2 条消息 messageA, messageB

    E 显示 4 条消息 messageA, messageB, messageC, messageD


    如上:
    C-D 没有同步到早于自己的消息
    一方面是,如果要全量同步 30 天,必然需要根据 30 天内的消息时间点 Diff ,然后插入,现在的实现只是增量追加,而且消息随着时间累积会越来越多

    先暂时这样,后续看看是否有必要将 30 天内的数据全量同步
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2727 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 23ms · UTC 09:49 · PVG 17:49 · LAX 01:49 · JFK 04:49
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.