您好,欢迎来到飒榕旅游知识分享网。
搜索
您的当前位置:首页解决python3pika之连接断开的问题

解决python3pika之连接断开的问题

来源:飒榕旅游知识分享网
解决python3pika之连接断开的问题

问题描述

在消费rabbitMQ队列时, 每次进⼊回调函数内需要进⾏⼀些⽐较耗时的操作;操作完成后给rabbitMQ server发送ack信号以dequeue本条消息。

问题就发⽣在发送ack操作时, 程序提⽰链接已被断开或socket error。源码⽰例

#!/usr/bin#coding: utf-8import pikaimport time

USER = 'guest'PWD = 'guest'

TEST_QUEUE = 'just4test'

def callback(ch, method, properties, body): print(body)

time.sleep(600)

ch.basic_publish('', routing_key=TEST_QUEUE, body=\"fortest\") ch.basic_ack(delivery_tag = method.delivery_tag)def test_main():

s_conn = pika.BlockingConnection(

pika.ConnectionParameters('127.0.0.1',

credentials=pika.PlainCredentials(USER, PWD))) chan = s_conn.channel()

chan.queue_declare(queue=TEST_QUEUE)

chan.basic_publish('', routing_key=TEST_QUEUE, body=\"fortest\") chan.basic_consume(callback, queue=TEST_QUEUE) chan.start_consuming()if __name__ == \"__main__\": test_main()

运⾏⼀段时间后, 就会报错:

[ERROR][pika.adapters.base_connection][2017-08-18 12:33:49]Error event 25, None

[CRITICAL][pika.adapters.base_connection][2017-08-18 12:33:49]Tried to handle an error where no error existed

[ERROR][pika.adapters.base_connection][2017-08-18 12:33:49]Fatal Socket Error: BrokenPipeError(32, 'Broken pipe')

问题排查

猜测:pika客户端没有及时发送⼼跳,连接被server断开⼀开始修改了heartbeat_interval参数值, ⽰例如下:

def test_main():

s_conn = pika.BlockingConnection(

pika.ConnectionParameters('127.0.0.1', heartbeat_interval=10, socket_timeout=5,

credentials=pika.PlainCredentials(USER, PWD))) # ....

修改后运⾏依然报错,后来想想应该单线程被⼀直占⽤,pika⽆法发送⼼跳;于是⼜加了个⼼跳线程, ⽰例如下:

#!/usr/bin#coding: utf-8import pikaimport time

import loggingimport threading

USER = 'guest'PWD = 'guest'

TEST_QUEUE = 'just4test'class Heartbeat(threading.Thread): def __init__(self, connection): super(Heartbeat, self).__init__() self.lock = threading.Lock() self.connection = connection self.quitflag = False self.stopflag = True self.setDaemon(True)

def run(self):

while not self.quitflag: time.sleep(10) self.lock.acquire() if self.stopflag : self.lock.release() continue try:

self.connection.process_data_events() except Exception as ex:

logging.warn(\"Error format: %s\"%(str(ex))) self.lock.release() return

self.lock.release() def startHeartbeat(self): self.lock.acquire() if self.quitflag==True: self.lock.release() return

self.stopflag=False self.lock.release()

def callback(ch, method, properties, body): logging.info(\"recv_body:%s\" % body) time.sleep(600)

ch.basic_ack(delivery_tag = method.delivery_tag)def test_main():

s_conn = pika.BlockingConnection(

pika.ConnectionParameters('127.0.0.1', heartbeat_interval=10, socket_timeout=5,

credentials=pika.PlainCredentials(USER, PWD))) chan = s_conn.channel()

chan.queue_declare(queue=TEST_QUEUE) chan.basic_consume(callback, queue=TEST_QUEUE) heartbeat = Heartbeat(s_conn) heartbeat.start() #开启⼼跳线程 heartbeat.startHeartbeat() chan.start_consuming()if __name__ == \"__main__\": test_main()

尝试运⾏,结果还是不⾏,不得不安静下来思考⾃⼰是不是想错了。去看它的api,看到heartbeat_interval的解析:

:param int heartbeat_interval: How often to send heartbeats. Min between this value and server's proposal will be used. Use 0 to deactivate heartbeats and None to accept server's proposal.

按这样说法,应该还是没有把⼼跳值给设置好。上⾯的程序期望是10秒发⼀次⼼跳,但是理论上发送⼼跳的间隔会⽐10秒多⼀点。所以艾玛,我应该是把heartbeat_interval的作⽤搞错了, 它是指超过这个时间间隔不发⼼跳或不给server任何信

息,server就会断开连接, ⽽不是说pika会按这个间隔来发⼼跳。 结果我把heartbeat_interval值设置⾼⼀点(⽐实际发送⼼跳/信

息的间隔更长),⽐如上⾯设置成60秒,就正常运⾏了。

如果不指定heartbeat_interval, 它默认为None, 意味着按rabbitMQ server的配置来检测⼼跳是否正常。如果设置heartbeat_interval=0, 意味着不检测⼼跳,server端将不会主动断开连接。

以上这篇解决python3 pika之连接断开的问题就是⼩编分享给⼤家的全部内容了,希望能给⼤家⼀个参考,也希望⼤家多多⽀持。

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- sarr.cn 版权所有 赣ICP备2024042794号-1

违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务