解决的问题: kafka 不支持延迟队列
如何解决: 如果是延迟小时, push 之前先放到 redis 里, 然后 work 通过 lua 轮训拿到需要真的 push 到队列里的请求, 然后 push 到 kafka 里.
整个功能其实和 Python 的 celery 或者 Go 的 machinery 很像.但是前者需要单独部署项目太复杂, 后者不支持 kafka.
有搞头吗?
1
jaylee77 2019-05-29 18:08:16 +08:00
没有
|
2
Takamine 2019-05-29 20:13:15 +08:00 via Android
这样的感觉……还不如就只用 redis,从 list 里面取出来直接拿到 redis 订阅发布_(:з」∠)_。
|
3
Varobjs 2019-05-30 08:31:39 +08:00 via Android
beanstalk 了解下
|
5
petelin OP @Varobjs 多谢 跟 sqs 很像 不过看到基于内存的 可靠性可能要打折了 还没具体看 上班研究一下
|
6
ebingtel 2019-05-30 09:21:26 +08:00
有个疑问,真的用得上 kafka 的场景,为了延迟 N 分钟、塞进 redis,单机内存会爆的吧?
|
7
petelin OP @ebingtel 还好 Redis 也可以不是单机 分库分表嘛 具体的消息不要存进 Redis 存个 ID 就行 主要是必须解耦 容灾 可扩展 消息还最好只消费一次 后面这些要求任何一个消息队列都比现在的 Redis 实现要好
当然做玩具或者小公司不用考虑这些 |
8
airfling 2019-05-30 09:42:54 +08:00
延迟注定会有个问题是,如果并发足够大,你存储延迟时这些数据的地方会内存爆掉
|
10
nicoljiang 2019-05-30 15:14:59 +08:00
感觉是为了用新式的方案,出了一个坑,为了埋这个坑,又要用老的方案来弥补。
|
11
mooncakejs 2019-05-30 16:22:32 +08:00
kafka 的特点是高性能,搞了这玩意就没有高性能了,那为什么不用直接支持定时的队列系统呢。
其实高性能的队列中间件往往都不支持定时,或者有限支持。 kafka 也可以参考有限支持的,只要支持有限的定时时长,还是很简单的。 |
12
Feedline 2019-05-30 17:15:40 +08:00
为啥不用 rabbitmq ?
|
14
guagusi 2019-05-31 09:10:24 +08:00
时间轮了解一下
|
16
Damnever 2019-06-02 11:48:39 +08:00
用 Redis/MySQL 加个中间层就已经能做了一个延时队列了,仅仅是为了使用 kafka 的接口?
据说 kafka 已经支持基于 timestamp 的消费了;如果非要用困难模式也是可以的,在 topic 上做文章,对过期时间点根据需求进行分段,每个分段对应一个 topic,然后对 producer 和 consumer 搞层封装,并不觉得数据量大的情况下这么玩 kafka 能不出问题 |
19
petelin OP @Damnever 比方说只用 Redis 的话 有多少个 worker 就得 poll 多少次 这对 Redis 是一个挑战 不如只开很少的 worker 只进行分发消息 消息处理丢给 kafka
另外 kafka 等消息队列可以支持 只处理一次 或者最少处理一次 |
20
Damnever 2019-06-02 17:36:22 +08:00
@petelin 其实我不太明白你的具体场景和限制条件,但从你的描述来看我觉得这些其实都不是问题,再者又是基于什么理由判断对 Redis 是挑战的东西但是对 Kafka 就不是挑战了呢?当然不想重新造轮子觉得成本太高将各种系统的部分功能组合变成自己想要的功能也是可以的,但不管怎样都得造点东西.. (就我个人看来最简单干净的方法是使用支持延时队列的消息系统)
|
21
bthulu 2021-03-29 20:35:34 +08:00
可以做固定时间点的延时重发.
比如说, 1 秒, 10 秒, 30 秒, 1 分钟, 5 分钟, 1 小时, 8 小时, 24 小时延时等. 针对每个延时时间创建一个队列, 生产者按延时需求将数据(数据里包一层最终要去的队列名)发送到对应队列. 然后每个队列起一个消费者, 轮询数据, 到点发送到目标队列即可. ``` headers.put('finalTopic', topic); producer.send(new ProducerRecord(delayedTopic, key, value, headers)); ``` ``` // 60 秒延时队列 int delay = 60_000; while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { long timeLeft = record.timestamp() + delay - System.currentTimeMillis(); if (timeLeft > 0) { Thread.sleep(timeLeft); } var topic = record.headers.lastHeader('finalTopic') record.headers.remove('finalTopic'); producer.send(new ProducerRecord(topic, key, value, headers)); } } ``` |