工作队列
Table of Contents
工作队列
把耗时长的工作作为消息发送给队列,多个工作者监听这个队列,任意一个可以接受消息然后执行任务。工作队列特别适合互联网应用
Figure 1: work queue
round-robin
把消息轮流发送给各个工作者,理论上每个工作者获得的任务是一样多
消息确认
工作者处理完消息通知rabbitMQ服务器,消息已经接受,任务已经处理完毕。如果消费者没有成功通知rabbitMQ,rabbitMQ会认为消息没有被成功处理,并重新放入队列
消息默认是没有超时,这会导致rabbitMQ不停向同一个消费者发送消息,哪怕这个消费者已经无法连接
开启消息确认
def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, no_ack=False queue='task_queue')
持久性
即使开启了消息确认,然而当rabbitMQ服务器宕机的时候,仍然会丢失消息。因此需要设置消息以及队列的持久性
即使开启了队列和消息的持久性,仍然有小概率会丢失消息。比如在rabbitMQ接受到消息,但还没有来得及保存到硬盘的这个间歇宕机。如果要更好的保障,请考虑使用订阅模式
队列持久性
channel.queue_declare(queue='task_queue', durable=True)
消息持久性
设置消息deliver mode为2
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
fair dispatch
如果一个消费者还没有发送处理消息完毕的确认信息,就不再给它分发消息任务
Figure 2: fair dispatch
预获取消息数量
channel.basic_qos(prefetch_count=1)
测试
在一个终端开启一个worker作为消费者
$ python worker.py [*] Waiting for messages. To exit press CTRL+C
在第二个终端再开启一个worker作为消费者
$ python worker.py [*] Waiting for messages. To exit press CTRL+C
在第三个终端向rabbitMQ服务器发送消息
$ python new_task.py First message. $ python new_task.py Second message.. $ python new_task.py Third message... $ python new_task.py Fourth message.... $ python new_task.py Fifth message.....
终端1中的显示结果
$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....'
终端2中的显示结果:
$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....'