曾彪彪的个人网站
首页
文章列表
>>
文章详情
Python pika消费Rabbit MQ数据,慢消费引起的connection reset问题
作者:
曾彪彪
日期:
2025-02-05 05:53:16
阅读(200)
分类:
消息中间件
问题记录
### 问题描述 使用python pika框架,从Rabbit MQ消费数据时,遇到了connection reset的错误,错误内容如下: ```python Traceback (most recent call last): File "/app/utils/rabbit.py", line 27, in message_callback channel.basic_ack(delivery_tag=method.delivery_tag) File "/usr/local/lib/python3.11/site-packages/pika/adapters/blocking_connection.py", line 2130, in basic_ack self._flush_output() File "/usr/local/lib/python3.11/site-packages/pika/adapters/blocking_connection.py", line 1353, in _flush_output self._connection._flush_output(lambda: self.is_closed, *waiters) File "/usr/local/lib/python3.11/site-packages/pika/adapters/blocking_connection.py", line 523, in _flush_output raise self._closed_result.value.error pika.exceptions.StreamLostError: Stream connection lost: ConnectionResetError(104, 'Connection reset by peer') During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/app/app.py", line 14, in main rabbit.start_listen() File "/app/utils/rabbit.py", line 52, in start_listen channel.start_consuming() File "/usr/local/lib/python3.11/site-packages/pika/adapters/blocking_connection.py", line 1883, in start_consuming self._process_data_events(time_limit=None) File "/usr/local/lib/python3.11/site-packages/pika/adapters/blocking_connection.py", line 2044, in _process_data_events self.connection.process_data_events(time_limit=time_limit) File "/usr/local/lib/python3.11/site-packages/pika/adapters/blocking_connection.py", line 851, in process_data_events self._dispatch_channel_events() File "/usr/local/lib/python3.11/site-packages/pika/adapters/blocking_connection.py", line 567, in _dispatch_channel_events impl_channel._get_cookie()._dispatch_events() File "/usr/local/lib/python3.11/site-packages/pika/adapters/blocking_connection.py", line 1510, in _dispatch_events consumer_info.on_message_callback(self, evt.method, File "/app/utils/rabbit.py", line 34, in message_callback channel.basic_nack(delivery_tag=method.delivery_tag) File "/usr/local/lib/python3.11/site-packages/pika/adapters/blocking_connection.py", line 2151, in basic_nack self._impl.basic_nack( File "/usr/local/lib/python3.11/site-packages/pika/channel.py", line 401, in basic_nack self._raise_if_not_open() File "/usr/local/lib/python3.11/site-packages/pika/channel.py", line 1403, in _raise_if_not_open raise exceptions.ChannelWrongStateError('Channel is closed.') pika.exceptions.ChannelWrongStateError: Channel is closed. ``` 我的代码如下: ```python import pika from utils.logger import logger import traceback import json from utils.config import get_rabbit_config from utils.mail import send_mail import time EXCHANGE = 'GDCSV3_EXCHANGE' ROUTING_KEY = 'purchasing.system.contract' QUEUE_NAME = 'PURCHASING_CONTRACT_CONSUMER' class Rabbit(): def __init__(self, callback=None) -> None: self.callback = callback def message_callback(self, channel, method, properties, body): try: logger.info(f'receive message: {body}') message = json.loads(body) if self.callback: result = self.callback(message) # time.sleep(5) # result = False else: logger.warn('self.callback is None') if result: channel.basic_ack(delivery_tag=method.delivery_tag) else: channel.basic_nack(delivery_tag=method.delivery_tag) except Exception as e: logger.error(traceback.format_exc()) send_mail('purchasing-contract-consumer error', traceback.format_exc()) channel.basic_nack(delivery_tag=method.delivery_tag) time.sleep(3) def start_listen(self): config = get_rabbit_config() credentials = pika.PlainCredentials( username=config['username'], password=config['password']) parameters = pika.ConnectionParameters( # host=config['host'], port=config['port'], credentials=credentials, heartbeat=300, stack_timeout=300, socket_timeout=300, blocked_connection_timeout=300) host=config['host'], port=config['port'], credentials=credentials, heartbeat=120, socket_timeout=60) connection = pika.BlockingConnection(parameters=parameters) channel = connection.channel() # channel.exchange_declare(exchange=EXCHANGE,exchange_type='topic',durable=True) channel.queue_declare(queue=QUEUE_NAME, durable=True) channel.queue_bind(queue=QUEUE_NAME, exchange=EXCHANGE, routing_key=ROUTING_KEY) channel.basic_consume( queue=QUEUE_NAME, on_message_callback=self.message_callback, auto_ack=False) logger.info('start listening...') channel.start_consuming() logger.warning('warning...') logger.warning('warning...') logger.warning('warning...') logger.warning('warning...') logger.warning('warning...') logger.warning('warning...') ``` 我的应用场景是:从rabbit 消费数据,并通过restful API调用web服务(self.callback是一个web API调用),上报消费的数据。但是web服务响应非常慢,每次调用花费1min多钟,并且rabbit MQ中积累了大量消息。 ### 原因分析 刚开始以为是rabbit MQ connection的超时问题引起的,于是设置连接超时时间为300s,代码如下: host=config['host'], port=config['port'], credentials=credentials, heartbeat=300, stack_timeout=300, socket_timeout=300, blocked_connection_timeout=300) connection reset问题依然存在,于是又怀疑是result = self.callback(message)调用时间过长,超过了1min,于是将其注释掉,改成 ```python result = self.callback(message) time.sleep(5) result = False ``` 错误依然存在,也排除了不是self.callback(message)长时间阻塞引起的问题。 查阅相关资料,发现是因为未设置prefetch_count,如果不设置这个参数,框架默认设置为0,意味着无限消费数据,此时,如果消费端处理消息的速度非常慢,并且Rabbit MQ中有大量消息堆积,那么socket的缓存区就会塞满,此时客户端socket就会告诉服务端socket,将滑动窗口设置为0。由于客户端socket的缓存区一直被沾满,服务端长时间无法发送数据,甚至连socket的心跳消息也无法发出,就会导致connection reset异常;之后如果客户端调用restful API完成数据上传,尝试调用channel.basic_ack(delivery_tag=method.delivery_tag),此时,连接已经被重置,那么就抛出connection reset异常。 ### 解决方案 在代码中增加prefetch_count,代码如下: ```python import pika from utils.logger import logger import traceback import json from utils.config import get_rabbit_config from utils.mail import send_mail import time EXCHANGE = 'GDCSV3_EXCHANGE' ROUTING_KEY = 'purchasing.system.contract' QUEUE_NAME = 'PURCHASING_CONTRACT_CONSUMER' class Rabbit(): def __init__(self, callback=None) -> None: self.callback = callback def message_callback(self, channel, method, properties, body): try: logger.info(f'receive message: {body}') message = json.loads(body) if self.callback: result = self.callback(message) # time.sleep(5) # result = False else: logger.warn('self.callback is None') if result: channel.basic_ack(delivery_tag=method.delivery_tag) else: channel.basic_nack(delivery_tag=method.delivery_tag) except Exception as e: logger.error(traceback.format_exc()) send_mail('purchasing-contract-consumer error', traceback.format_exc()) channel.basic_nack(delivery_tag=method.delivery_tag) time.sleep(3) def start_listen(self): config = get_rabbit_config() credentials = pika.PlainCredentials( username=config['username'], password=config['password']) parameters = pika.ConnectionParameters( # host=config['host'], port=config['port'], credentials=credentials, heartbeat=300, stack_timeout=300, socket_timeout=300, blocked_connection_timeout=300) host=config['host'], port=config['port'], credentials=credentials, heartbeat=120, socket_timeout=60) connection = pika.BlockingConnection(parameters=parameters) channel = connection.channel() channel.basic_qos(prefetch_count=1) # 限制消费端消费的数据,防止缓存区被占满。 # channel.exchange_declare(exchange=EXCHANGE,exchange_type='topic',durable=True) channel.queue_declare(queue=QUEUE_NAME, durable=True) channel.queue_bind(queue=QUEUE_NAME, exchange=EXCHANGE, routing_key=ROUTING_KEY) channel.basic_consume( queue=QUEUE_NAME, on_message_callback=self.message_callback, auto_ack=False) logger.info('start listening...') channel.start_consuming() logger.warning('warning...') logger.warning('warning...') logger.warning('warning...') logger.warning('warning...') logger.warning('warning...') logger.warning('warning...') ``` 设置完成后,可以正常消费数据,connection reset的异常消息。
评论(0)
评论(必填)
名称(必填)
联系方式(可选)
验证码(必填)
提交
评论(必填)
名称(必填)
联系方式(可选)
验证码(必填)