V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
princelai
V2EX  ›  Python

请教这段 Python 协程代码还能如何优化?

  •  
  •   princelai · 2019-12-18 15:48:49 +08:00 · 5138 次点击
    这是一个创建于 1828 天前的主题,其中的信息可能已经有所发展或是发生改变。

    这是一个被我修改过的是生产 /消费者模型,由于需要消费者返回数据,所以不得已用了两个 Queue,第二个用于把结果写回去,最后在 main 函数里取出返回,下面的代码在我的 python3.7 和 python3.8 环境都可以正常运行,先上代码,请大家过目。

    import asyncio
    import pandas as pd
    import random
    import re
    
    
    async def crawler(u):
        """
        模拟爬虫返回结果
        :param u: fake url
        :return:
        """
        i = int(re.search(r"\d+", u).group(0))
        await asyncio.sleep(random.random())
        return {'url': u, 'result': i}
    
    
    async def worker(qin, qout, w):
        """
        消费者异步函数
        :param qin: Queue1,用于生产者写入和消费者读出
        :param qout: Queue2,用于让消费者回写结果
        :param w: worker id
        :return:
        """
        while True:
            if qin.empty():
                break
            u = await qin.get()
            print(f"Worker-{w} crawling {u}")
            resp = await crawler(u)
            await qout.put(resp)
            qout.task_done()
    
    
    async def generate_url(url, qin):
        """
        生产者异步函数
        :param url:
        :param qin: Queue1,写出生产者产出的(这里为传入的) url
        :return:
        """
        await qin.put(url)
        print(f"Queue size = {qin.qsize()}")
        # qin.task_done()
    
    
    async def main(qmax=20):
        q_in = asyncio.Queue(qmax)
        q_out = asyncio.Queue()
        urls = [f"url{i}" for i in range(100)]
    
        producers = [asyncio.create_task(generate_url(u, q_in)) for u in urls]
        # producers = [await q_in.put(u) for u in urls]
        consumers = [asyncio.create_task(worker(q_in, q_out, i)) for i in range(1, qmax // 2 + 1)]
        await asyncio.gather(*consumers)
        await asyncio.gather(*producers)
        # await q_in.join()
        await q_out.join()
        for c in consumers:
            c.cancel()
        return [await q_out.get() for _ in range(q_out.qsize())]
    
    
    if __name__ == "__main__":
        result = asyncio.run(main(30))
        df = pd.DataFrame(result).set_index('url')
    
    

    目前有三个问题,

    1. 被注释掉的 qin.join,qin.task_done 是需要?还是让第二个 queue 阻塞就好了?
    2. main 函数中的 producers 是我写的异步列表推导式,想用于替代 generate_url 函数,但是不能正常运行
    3. 是否还有更优雅的实现方式?
    19 条回复    2019-12-22 11:41:26 +08:00
    princelai
        1
    princelai  
    OP
       2019-12-18 17:09:45 +08:00
    没有大佬来指点下吗?
    GoLand
        2
    GoLand  
       2019-12-18 17:34:22 +08:00
    queue 是多余的。直接:

    urls = [f"url{i}" for i in range(100)]
    tasks = [crawler(url) for url in urls]
    results = await asyncio.gather(*tasks)

    就可以。
    princelai
        3
    princelai  
    OP
       2019-12-18 17:43:34 +08:00
    @GoLand #2 谢谢,不过你说这个我知道可以,我这么设计的目的我忘说了,因为 url 是本地生成的,所以会很快,如果一次性把 url 全部创建为 task,那么 gather 后会一次性创建非常多的链接链接目标网站,我怕网站受不了,也怕自己 IP 被封,所以才不得已使用生产 /消费者,用输入的 Queue 的最大容量限制爬取速度。
    ClericPy
        4
    ClericPy  
       2019-12-18 17:53:42 +08:00
    限频可以用 semphore 和 sleep, Queue 这么用有点怪
    princelai
        5
    princelai  
    OP
       2019-12-18 18:00:53 +08:00
    @ClericPy #4 那就如 2 楼的代码,不需要用 Queue,请问信号量应该在哪里写啊?
    superrichman
        6
    superrichman  
       2019-12-18 18:20:41 +08:00 via iPhone   ❤️ 2
    @princelai 搜一下 aiohttp 和 async with semaphore 就知道怎么写了
    ClericPy
        7
    ClericPy  
       2019-12-18 18:29:55 +08:00   ❤️ 1
    @princelai #5 信号量可以粗暴的理解成并发锁, 类似 golang 里 channel 的那个数字, 也就是同时并发的个数, 只有申请到的人才有资格执行, 其他人等待, 楼上已经给你看了
    你那个 producers 是想用 async for 么
    xiaozizayang
        8
    xiaozizayang  
       2019-12-18 21:40:21 +08:00
    楼主,不知道我写的异步爬虫框架能不能帮到你:

    https://github.com/howie6879/ruia

    楼上说的信号量也支持~
    gwy15
        9
    gwy15  
       2019-12-18 22:48:59 +08:00   ❤️ 1
    楼主你要的这个功能能抽象出来啊,没必要写这么复杂。

    我写的库(不推荐,当时没发现成熟第三方库)

    https://github.com/gwy15/async_pool

    调用方式:
    ```
    with Pool(4) as pool:
    results = pool.map(fetch, urls)
    ```

    更好更全的第三方库:

    https://github.com/h2non/paco

    调用方式:
    ```
    responses = await paco.map(fetch, urls, limit=3)
    ```
    princelai
        10
    princelai  
    OP
       2019-12-19 11:39:24 +08:00
    @gwy15 #9 感谢,这个库试了下,写出来很简洁,就是可能是我的 py 版本太高,在 pycharm 里有错误提示,但是稍微修改下可以正常运行。

    ```python
    import asyncio
    import random
    import re

    import paco


    async def crawler(u):
    i = int(re.search(r"\d+", u).group(0))
    await asyncio.sleep(random.random() * 3)
    print(f"crawled {u}")
    return i


    async def main():
    urls = [f"url{i}" for i in range(100)]
    gather = await paco.map(crawler, urls, limit=20)
    return gather


    if __name__ == "__main__":
    result = asyncio.run(main())

    ```
    princelai
        11
    princelai  
    OP
       2019-12-19 11:41:06 +08:00
    @ClericPy #7
    @superrichman #6

    感谢二位,用信号量的代码写出来了,比原来好很多

    ```python
    import asyncio
    import random
    import re


    async def crawler(u, sem):
    async with sem:
    i = int(re.search(r"\d+", u).group(0))
    await asyncio.sleep(random.random() * 5)
    print(f"crawled {u}")
    return i


    async def main():
    sem = asyncio.Semaphore(20)
    urls = [f"url{i}" for i in range(100)]
    tasks = [crawler(u, sem) for u in urls]
    gather = await asyncio.gather(*tasks)
    return gather


    if __name__ == "__main__":
    result = asyncio.run(main())

    ```
    princelai
        12
    princelai  
    OP
       2019-12-19 11:41:51 +08:00
    不支持 markdown 吗,格式全乱了
    yedashuai
        13
    yedashuai  
       2019-12-19 12:59:15 +08:00
    @princelai queue 的使用是不是也有一点好处,可以限制内存的使用量,如果数据量很大,所有的 tasks 都存在 list 里
    sxd96
        14
    sxd96  
       2019-12-20 23:59:20 +08:00
    @yedashuai 这些都相当于是 generator 吧?不会把所有数据都直接放进内存的吧
    sxd96
        15
    sxd96  
       2019-12-21 10:01:40 +08:00 via iPhone
    @princelai 想问下如果爬虫 async 的话,requests 支持嘛?好像是要换用 httpx 或者 aiohttp ?这俩哪个比较好用?
    princelai
        16
    princelai  
    OP
       2019-12-21 19:49:04 +08:00 via Android
    @sxd96 如果所有都并发开始了在那就不是生成器,就已经在内存中运行了,我一般都用官方的 aiohttp,没用过另一个
    sxd96
        17
    sxd96  
       2019-12-21 21:17:59 +08:00
    @princelai 哦哦是这样啊。那我的需求如果是从数据库里拿 url 出来给 crawler,也就是说那边 coroutine 在跑,然后生产者在产生新的 url,是不是还是得用 asyncio.Queue ?
    princelai
        18
    princelai  
    OP
       2019-12-21 22:37:29 +08:00 via Android
    @sxd96 应该是,生产者 aiomysql 取数据存入 queue,我这么写主要是因为我的 URL 是本地生成但又想控制速度,和你的不一样
    sxd96
        19
    sxd96  
       2019-12-22 11:41:26 +08:00
    @princelai 明白了,感谢啦。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1010 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 21:06 · PVG 05:06 · LAX 13:06 · JFK 16:06
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.