UP | HOME

工作队列

Table of Contents

工作队列

把耗时长的工作作为消息发送给队列,多个工作者监听这个队列,任意一个可以接受消息然后执行任务。工作队列特别适合互联网应用

work_queue.png

Figure 1: work queue

round-robin

把消息轮流发送给各个工作者,理论上每个工作者获得的任务是一样多

消息确认

工作者处理完消息通知rabbitMQ服务器,消息已经接受,任务已经处理完毕。如果消费者没有成功通知rabbitMQ,rabbitMQ会认为消息没有被成功处理,并重新放入队列

消息默认是没有超时,这会导致rabbitMQ不停向同一个消费者发送消息,哪怕这个消费者已经无法连接

开启消息确认

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      no_ack=False
                      queue='task_queue')

持久性

即使开启了消息确认,然而当rabbitMQ服务器宕机的时候,仍然会丢失消息。因此需要设置消息以及队列的持久性

即使开启了队列和消息的持久性,仍然有小概率会丢失消息。比如在rabbitMQ接受到消息,但还没有来得及保存到硬盘的这个间歇宕机。如果要更好的保障,请考虑使用订阅模式

队列持久性

channel.queue_declare(queue='task_queue', durable=True)

消息持久性

设置消息deliver mode为2

channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                          delivery_mode = 2, # make message persistent
                      ))

fair dispatch

如果一个消费者还没有发送处理消息完毕的确认信息,就不再给它分发消息任务

prefetch-count.png

Figure 2: fair dispatch

预获取消息数量

channel.basic_qos(prefetch_count=1)

测试

在一个终端开启一个worker作为消费者

$ python worker.py

[*] Waiting for messages. To exit press CTRL+C

在第二个终端再开启一个worker作为消费者

$ python worker.py

[*] Waiting for messages. To exit press CTRL+C

在第三个终端向rabbitMQ服务器发送消息

$ python new_task.py First message.
$ python new_task.py Second message..
$ python new_task.py Third message...
$ python new_task.py Fourth message....
$ python new_task.py Fifth message.....

终端1中的显示结果

$ python worker.py
[*] Waiting for messages. To exit press CTRL+C

[x] Received 'First message.'
[x] Received 'Third message...'
[x] Received 'Fifth message.....'

终端2中的显示结果:

$ python worker.py
[*] Waiting for messages. To exit press CTRL+C

[x] Received 'Second message..'
[x] Received 'Fourth message....'

Next:发布和订阅

Previous:基础概念

Home:目录