V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
jiangxinlingdu
V2EX  ›  程序员

任意时间延时消息原理讲解:设计与实现

  •  
  •   jiangxinlingdu · 2019-05-17 21:42:33 +08:00 · 1999 次点击
    这是一个创建于 1799 天前的主题,其中的信息可能已经有所发展或是发生改变。

    场景

    ​ 延时消息即消息发送后并不立即对消费者可见,而是在用户指定的时间投递给消费者。比如我们现在发送一条延时 1 分钟的消息,消息发送后立即发送给服务器,但是服务器在 1 分钟后才将该消息交给消费者。

    ​ 这种延时消息有一些什么应用场景呢?比如在电商网站上我们购物的时候,下单后我们没有立即支付,这个时候界面上往往会提醒你如果 xx 分钟还未支付订单将被取消。对于这么一个功能如果不使用延时消息,那我们就需要使用类似定时任务的功能,比如每分钟我们跑一个定时任务对订单表进行扫描,将未支付订单扫出,如果从下单时间到现在已经超过了 45 分钟则将该订单取消。但是定时扫描有一个问题是效率不高,如果订单很多将会严重的影响 db 的性能。如果使用延时消息就没有这样的问题了,只需要发送一条延时 xx 分钟的的延时消息即可,在消息里携带有订单号,xx 分钟后消费者收到该消息检查对应订单状态做出对应处理,这种方式将大大减轻对 db 的压力,实现起来也更优雅。

    ​ 上面描述的是一种延时时间固定的场景,还有一些是要指定时间执行。比如买了一张一周后北京去东京的机票,那么在乘机时间到来之前可能要发送数次提醒的短信给用户,那么我们也可以在用户下单后发送一条延时消息,延时到乘机时间之前发送。

    需求

    ​ 有了场景,我们首先来分析一下需求:

    1. 延时时间是不固定的,比如我们无法预测用户订未来多久的机票,所以我们不能仅仅提供几种不同延时单位的延时功能。
    2. 延时时间精确在秒这个级别就可以了,不需要精确到 1 秒以内。
    3. 最大的延时时间应该有个度。比如最大延时 1 年或 2 年(可能有同学问难道不能提供任意最大延时时间么?任意最大延时时间会增加系统的实现的复杂度,而在实际中并没有什么用处,一般我们都尽量不推荐延时太久的消息,因为系统在不断地演变,比如当前设计的时候消息是延时两年,但是两年后系统早已大变样了,两年前的消息都不一定有人记得,更别人说兼容两年前的消息格式了)。

    有了上面的限定,我们来讨论一下延时消息的设计。

    设计

    ​ 延时说白了就是一个定时任务的功能,指定一个未来的时间执行消息投递的任务,时间到了再将消息投递出去。

    ​ 如果遇到定时任务的场景往往会有这么几个方案来考虑:

    1. 优先级队列(堆) 比如 JAVA 里的 ScheduledThreadPoolExecutor。定时任务都丢到一个优先级队列里,按照到期时间进行排序,线程池从队列里取任务出来执行,算法复杂度是 O(logN)。
    2. 扫描 所有任务都放到一个 List 里,然后一个死循环,比如每 100ms 执行一次,扫描 List 里所有任务,当某任务到期后取出执行。这种方式实现简单,算法复杂度是 O(N),如果任务太多的话效率会很低,适合任务比较少的场景。
    3. hash wheel 按照任务的到期时间将任务放到一个刻度盘里,比如未来 1 秒的放到位置 1,未来 2 秒的放到位置 2,依次类推。每次刻度盘转一个刻度,转到该可读则将该刻度上所有任务执行,算法复杂度是 O(1)。

    ​ 上面这三种方式都是基于内存的数据结构,也就是我们得将所有任务都放到内存里,如果用在延时消息上,显然是不现实的,实际上也是没有必要的。如果这个消息是几个小时后需要投递,我们为什么需要现在就将其加载进来一直占着内存呢?看起来我们只需要提前一段时间加载未来某段时间需要投递的消息即可。比如我们将消息按照一个小时为一个段,每次只加载一个段的消息到内存里。其实我们可以用一个很形象的比如来描述这种结构:两层时间轮(hash wheel)。第一层 hash wheel 位于磁盘上,精度较粗,每个小时为 1 个刻度。第二层 hash wheel 位于内存里,只包含第一层 hash wheel 一个刻度的数据,精度为 1 秒。

    ​ 但是我们怎么去加载这些需要的消息将其组织为第一层 hash wheel 呢?消息接收后存储到一个顺序的 log 文件,消息接收的顺序和消息的延时时间之间是没有任何关系的。比如现在收到了一条消息,是 1 个小时后需要投递,稍后收到一条消息可能是 5 分钟之后投递。我们加载时候是按照延时时间进行加载的,比如我们需要加载未来一个小时需要投递的消息:

    img

    ​ 比如上图所示,3 seconds 是最近要投递的消息,然后是 5minutes,而排在最头上的是 1 个小时后要投递的。我们不可能每次要预加载的时候都从头扫描一遍,然后将需要的消息加载。

    ​ 怎么办呢?对于需要快速查找,我们肯定会想到建立索引。那么我们只需要按照我们的预加载的时间段划分索引即可了,比如我们建立 2019021813, 2019021814...这样的索引文件,文件里每一个 entry 就是一个索引,该索引包含以下结构:

    index:

    ​ schedule time: int64

    ​ offset: int64

    ​ offset 是指向前面 log 的偏移,而 schedule time 是消息的到期时间。这样我们每次只需要加载一个段(比如 2019021813)的索引到内存就行了,内存中的 hashwheel 根据 schedule time 决定到期时间,到期后根据 offset 读取到消息内容将消息投递出去。

    ​ 这个存储结构到这里基本上就 ok 了,但是存在一个落地实施的问题(磁盘的空间是有限的):如果一开始收到一条消息是 6 个月之后投递的,后面收到了一些一个小时内投递的,实际上只要消息投递后我们就可以将消息删除了,这样可以大大的节约内存空间,但是因为 log 的头部有一条 6 个月之后的消息,所以我们还不能将该 log 删除掉,也就是至少 6 个月我们不能删除消息,除非我们按照消息来删除,也就是将 6 个月后的消息保留下来,而一个小时内已经投递了的消息删除掉(一种 compact 机制),但是这种实现就变得很复杂。

    ​ 其实换个方式就简单了,在前面我们按照每个时间段建立索引文件,那么如果我们不仅仅建立索引呢?也就是索引文件里不仅仅是索引,而是包括完整的消息:消息收到后先进入一个按照接收顺序的 log(qmq 里称之为 message log),然后回放该 log,按照 log 里每条消息的投递时间将消息放到对应的时间段上(qmq 里称之为 schedule log),这样只要回放完成后 message log 里的消息就可以删除了,message log 只需要保留很少的内容,而 schedule log 是按照投递时间段来组织的,已经投递过的时间段也可以立即删除了。通过这种变化我们顺利的解决了磁盘占用问题,另外还有一个副产品:读写分离。这种方式我们在如何用不到两千块大幅度提升 QMQ 性能里已经有过介绍,我们可以将延时消息里的 message log 放到小容量高性能的 SSD 里,提高消息发送的吞吐量和延时,而将 schedule log 放到大容量低成本的 HDD 里,可以支撑时间更久的延时消息(下图即延时消息的存储结构):

    img

    其他细节

    1. Server 重启如何发现未投递消息

    ​ 在这里还有一些具体实现细节需要处理。虽然我们按照每个时间单位重新组织了消息(schedule log),但是在该时间段内的消息并不是按照投递时间排序的。比如每个小时为一个时间段,那么可能第 59 分钟的消息排在最前面,而几秒内需要投递的排在最后面,那如果某个时间段内的消息正在投递时应用突然挂掉了,那么再次恢复的时候我们并不能准确的知道消息投递到哪儿了。所以我们增加了一个 dispatch log,dispatch log 在消息投递完成后写入,dispatch log 里每一个 entry 记录的是 schedule log 里的 offset,表示该 offset 的消息已经投递,当应用重启后我们会对比 schedule log 和 dispatch log,将未投递的消息找出来重新加载投递,dispatch log 相当于一个位图数据结构。

    1. 正在加载某个时间段内的消息过程中又来了属于该时间段内消息如何处理,会不会重复加载

    ​ 在我们决定加载某个时间段消息时(正在加载的时间段称之为 current loading segment),我们首先会取得该时间段文件的最大 offset,然后加载只会加载这个 offset 范围内的消息(qmq 内称之为 loading offset),而加载过程中如果又来了该时间段内消息,那这个消息的 offset 也是>loading offset:

    if( message.offset in current loading segment && message.offset > loading offset){

    ​ add to memory hash wheel

    }

    1. 加载一个时间段内的消息是不是需要占用太多的内存

    ​ 实际上我们并不会将 schedule log 里完整的消息加载到内存,只会加载索引到内存,根据前面的介绍,每个索引是 16 个字节(实际大小可以参照代码,略有出入)。假设我们使用 1G 内存加载一个小时索引的话,则可以装载 1G/16B = (1024M * 1024K * 1024B)/(16B) = 67108864 条消息索引。则每秒 qps 可以达到 18641(67108864 / 60 / 60)。如果我们想每秒达到 10 万 qps,每个小时一个刻度则需要 5493MB,如果觉得内存占用过高,则可以相应的缩小时间段大小,比如 10 分钟一个时间段,则 10 万 qps 只需要占用 915MB 内存。通过计算可知这种设计方式还是在合理的范围内的。

    qmq 示例代码-github,本人在基础上做了注释!

    目前尚无回复
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   2809 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 31ms · UTC 02:16 · PVG 10:16 · LAX 19:16 · JFK 22:16
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.