V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
stoopuak197
V2EX  ›  Python

celery 框架消费者 从 rabbitmq 中获取消息,因为单条消息大小超过 40m 而产生的 Connection reset by peer 该怎么解决

  •  
  •   stoopuak197 · 2020-12-08 23:45:35 +08:00 · 1494 次点击
    这是一个创建于 1449 天前的主题,其中的信息可能已经有所发展或是发生改变。
    环境
    linux ( centos 7 )
    python3.8.6
    - celery == 5.0.1
    - rabbitmq == 3.8.9


    问题:
    我有一个使用了 celery 框架的应用,work2(消费者) 需要从 rabbitmq 中获取由 work1 (生产者)生产的数据。
    但是现在出现了一个错误。由于业务需求生产者生产的数据大小剧增,导致其单条消息大小超过 40m 或者更大 。
    但是部署在外网的 rabbitmq 服务器出口流量只有大约 600~700kb/s,我通过排查怀疑产生错误的原因可能是 work2 从 rabbitmq 读取消息时由于 rabbitmq 服务器出口流量限制,使 work2 不能在短时间内获取到完整的消息从而导致 socket.timeout 从而引发的错误。

    为了验证我的猜想我在 rabbitmq 服务器运行了以下测试脚本由于没有了网络限制它能够很好的获取到队列中的消息

    ====================================
    import pika

    auth = pika.PlainCredentials('test', 'test')
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, credentials=auth, heartbeat=0))
    channel = connection.channel()

    channel.queue_declare(queue='b', durable=True)

    def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    exit()

    channel.basic_consume(on_message_callback=callback,
    queue='b',
    auto_ack=False)

    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

    ====================================


    我尝试阅读了 transport.py 的源码 并在 connection.py 第 525 行中修改了 timeout 参数的数值,发现问题还是不能解决
    基于我 socket 编程基础烂到极致,可能产生错误的原因并不是我上面描述的那么回事。
    5 条回复    2020-12-09 19:57:21 +08:00
    stoopuak197
        1
    stoopuak197  
    OP
       2020-12-08 23:48:21 +08:00
    由于发文限制存在的一些报错如下
    while not self.blocking_read(timeout):
    File "/usr/local/lib/python3.8/dist-packages/amqp/connection.py", line 527, in blocking_read
    frame = self.transport.read_frame()
    File "/usr/local/lib/python3.8/dist-packages/amqp/transport.py", line 286, in read_frame
    payload = read(size)
    File "/usr/local/lib/python3.8/dist-packages/amqp/transport.py", line 457, in _read
    s = recv(n - len(rbuf))
    ConnectionResetError: [Errno 104] Connection reset by peer
    err1y
        2
    err1y  
       2020-12-09 08:33:17 +08:00 via iPhone
    如果你能控制生产者端的话,生产者在产生数据的时候大数据放到其他存储里面,队列里面保存个数据地址,消费者再去通过地址获取数据
    stoopuak197
        3
    stoopuak197  
    OP
       2020-12-09 09:10:49 +08:00
    @err1y 非常感谢,这是个非常棒的解决方法! 但是我还是想知道碰上这种问题该怎么解决
    knightdf
        4
    knightdf  
       2020-12-09 10:08:52 +08:00
    两边的 timeout 都得改吧,然后得改 read timeout
    stoopuak197
        5
    stoopuak197  
    OP
       2020-12-09 19:57:21 +08:00
    @knightdf 两边的 timeout 指的是? read timeout 我改过但是依旧无效
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   3888 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 23ms · UTC 10:26 · PVG 18:26 · LAX 02:26 · JFK 05:26
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.