模式匹配
Table of Contents
话题
尽管direct exchange比起fanout来说多了一点灵活性,但是direct exchange无法对多个条件做路由,比如既要根据日志级别又要根据日志产生的程序做路由,因此需要topic exchange
topic exchange
topic exchange的binding key只能是一串由'.'分割的字符,比如"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit",最长可以有255字节。其中的特殊符号:
- * (star) 必须替换一个单词
- # (hash) 可以替换零个或多个单词
根据对应binding key和队列的routing key做匹配,如果匹配成功,exchange就会推送相应的信息给队列
Figure 1: topic exchange
- quick.orange.rabbit, lazy.orange.elephant会被推送到2个队列
- quick.orange.fox只会被推送到第1个队列
- lazy.brown.fox只会被推送到第2个队列
- lazy.pink.rabbit也只会被推送到第二个队列一次,尽管匹配了2条规则
- quick.brown.fox无法匹配,不会推送
- orange,quick.orange.male.rabbit因为单数数量无法匹配,所以也不会推送
- lazy.orange.male.rabbit依旧会被推送到第2个队列
fanout exchange和使用'#'的topic exchange等效,而direct exchange和不使用'*'和'#'的topic exchange等效
实例
- 生产者声明topic exchange
channel.exchange_declare(exchange='topic_logs', type='topic') routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message)
- 消费者代码其实和direct change一样,只是使用topic exchange而已
binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)
测试
消费者接收所有的日志消息
$ python receive_logs_topic.py "#"
消费者只接收内核产生的日志消息
$ python receive_logs_topic.py "kern.*"
消费者只接收critical级别的日志消息
$ python receive_logs_topic.py "*.critical"
消费者即接受内核产生的日志消息,也接受error级别的日志消息
$ python receive_logs_topic.py "kern.*" "*.error"
生产者发送一条消息
$ python emit_log_topic.py "kern.critical" "A critical kernel error"