RT =-=小白想要提升一下现在的一个任务的性能
假设我们有一个 api 多次调用的需求在一个 celery 的 task 里面
最普通的方法可以是这样:
@task
def foo():
for i in range(20):
call_api_here(i)
do_something_else()
但是这个很耗时因为会卡在 api 调用上
然后如果我们用 multiprocessing (不是一个好的注意,但是粗暴), celery 有自己的 library 叫做 billiard.
然后我们可以这样做
@task
def foo():
with Pool(5) as p:
p.map(call_api_here, some_params)
do_something_else()
但是这样的问题是,每一个 task 都会开启一个 pool 然后结束后这个 pool 并没有被释放,最终导致内存被各种占用
所以我想了另外一个办法,用 multithreading
def foo()
threads = []
for i in range(20):
t = threading.Thread(target=call_api_here, args=(i,))
threads.append(t)
for t in threads:
t.start()
for t in threads:
t.join()
但是试了下发现,虽然线程会随着进程结束而被销毁,但是貌似在每个 task 里面只有前几个线程执行了。。。后面的都 gg 了。。。
想问问各位大佬有什么好的方法嘛😂
1
fanhaipeng0403 2019-01-03 11:16:30 +08:00
from time import sleep
from concurrent.futures import ThreadPoolExecutor\ ProcessPoolExecutor def child_1(): sleep(9) print(1) def child_2(): sleep(2) print(2) def child_3(): sleep(3) print(3) def child_4(): sleep(1) print(4) def child_5(): sleep(2) print(5) with ThreadPoolExecutor\ProcessPoolExecutor(max_workers=5) as executor: executor.submit(child_1) executor.submit(child_2) executor.submit(child_3) executor.submit(child_4) executor.submit(child_5) t1= executor.submit(child_1) t2=executor.submit(child_2) t3=executor.submit(child_3) t4=executor.submit(child_4) t5=executor.submit(child_5) print(t1.result()) |
2
YuuuZeee OP @fanhaipeng0403 这样的话每个任务都会创建一个 pool 还是占着内存呀
|
3
fanhaipeng0403 2019-01-03 11:17:28 +08:00
y worker -A app.tasks.celery -l INFO -Q default -c 20 (每个队列多搞几个 worker ) -n default_worker.%%i
|
4
fanhaipeng0403 2019-01-03 11:20:17 +08:00
import asyncio
async def slow_operation(n): await asyncio.sleep(n) print('Slow operation {} complete'.format(n)) return n loop = asyncio.get_event_loop() done, _ = loop.run_until_complete( asyncio.wait([ slow_operation(1), slow_operation(2), slow_operation(9), slow_operation(2), slow_operation(1), slow_operation(2), slow_operation(3), ])) for fut in done: print("return value is {}".format(fut.result())) 然后用 uvloop |
5
Hstar 2019-01-03 11:20:48 +08:00
关键词 celery worker
|
6
ipwx 2019-01-03 11:21:43 +08:00
不能拆成多个 celery 任务,让 celery 去管嘛?
|
7
YuuuZeee OP |
8
wizardoz 2019-01-03 11:24:32 +08:00
同意楼上的拆成很多任务
|
9
ohyeah521 2019-01-11 20:34:46 +08:00
遇到同样的问题,一个 task 启动了,然后在 task 里面读取一个目录下面的所有文件,查找文件内容是否包含我要查找的字符串,如果不用 threading,就是一个文件一个文件的读取,效率真的很低,请问各位大佬该怎么办?
每个文件启动一个 worker 感觉也很 low 啊 |