一个典型的 rq 使用示例如下:
import requests
def count_words_at_url(url):
resp = requests.get(url)
return len(resp.text.split())
from redis import Redis
from rq import Queue
from worker import count_words_at_url
q = Queue(connection=Redis())
result = q.enqueue(count_words_at_url, 'http://nvie.com')
但是我希望生产者( main.py )和消费者( worker.py )代码是解耦的,就是不要在 main.py 中 from worker import count_words_at_url ,生产者只知道要调用函数名和参数,比如类似下面这样的形式:
result = q.send_task(task_name="count_words_at_url", args=["http://nvie.com",])
这样,我就可以把 worker 的代码项目单独独立出去,而 main.py 项目里完全不会有 count_words_at_url 函数的相关实现。
我知道 celery 是可以这样实现的,我的问题是 rq 、huey 、dramatiq 这三个库可以实现出这样吗?
1
DejaVud OP |
2
Zhuzhuchenyan 259 天前
Celery 可以实现类似的需求,参考 https://docs.celeryq.dev/en/latest/userguide/canvas.html#signatures
可以将 Worker 部分完全解耦到另一个项目,只要保持 Celery 配置一致即可 |
3
ospider 259 天前
redis lpush/rpop 解君愁,这些工具基本都是过度封装。
|
4
pollux 259 天前
借助 gRPC 与任何语言做 rpc 调用
|
5
alexsz 259 天前
rq 本身就支持:
result = q.send_task(task_name="worker.count_words_at_url", args=["http://nvie.com",]) |
9
kice 258 天前
本质上入队的时候都是把函数名转成字符串。但是把函数放在一起的话,按理是可以提供类型提示(例如 Tab 自动完成)。
目前看的话 taskiq 做得比较好,其他的任务队列一般般。┑( ̄Д  ̄)┍ |
10
DejaVud OP 结贴了,为了之后来到这里搜索答案的人:
|
11
DejaVud OP celery:
task = signature("xxx.task_name", options={"queue":"queue_name"}) task.apply_async((args,), expires=expires) rq: job = q.enqueue("module_xxx.task_fucname", args=[a, b,]) "module_xxx.task_fucname"是 rq worker 启动的项目目录包函数相对路径 dramatiq: broker = XXXBroker(...) msg = Message(queue_name="queue_aaa", actor_name="task_name”, args=...) broker.enqueue(msg) |