UP | HOME

模式匹配

Table of Contents

话题

尽管direct exchange比起fanout来说多了一点灵活性,但是direct exchange无法对多个条件做路由,比如既要根据日志级别又要根据日志产生的程序做路由,因此需要topic exchange

topic exchange

topic exchange的binding key只能是一串由'.'分割的字符,比如"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit",最长可以有255字节。其中的特殊符号:

  • * (star) 必须替换一个单词
  • # (hash) 可以替换零个或多个单词

根据对应binding key和队列的routing key做匹配,如果匹配成功,exchange就会推送相应的信息给队列

topic.png

Figure 1: topic exchange

  • quick.orange.rabbit, lazy.orange.elephant会被推送到2个队列
  • quick.orange.fox只会被推送到第1个队列
  • lazy.brown.fox只会被推送到第2个队列
  • lazy.pink.rabbit也只会被推送到第二个队列一次,尽管匹配了2条规则
  • quick.brown.fox无法匹配,不会推送
  • orange,quick.orange.male.rabbit因为单数数量无法匹配,所以也不会推送
  • lazy.orange.male.rabbit依旧会被推送到第2个队列

fanout exchange和使用'#'的topic exchange等效,而direct exchange和不使用'*'和'#'的topic exchange等效

实例

  • 生产者声明topic exchange
    channel.exchange_declare(exchange='topic_logs',
                             type='topic')
    
    routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    channel.basic_publish(exchange='topic_logs',
                          routing_key=routing_key,
                          body=message)
    
  • 消费者代码其实和direct change一样,只是使用topic exchange而已
    binding_keys = sys.argv[1:]
    if not binding_keys:
        sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
        sys.exit(1)
    
    for binding_key in binding_keys:
        channel.queue_bind(exchange='topic_logs',
                           queue=queue_name,
                           routing_key=binding_key)
    

测试

消费者接收所有的日志消息

$ python receive_logs_topic.py "#"

消费者只接收内核产生的日志消息

$ python receive_logs_topic.py "kern.*"

消费者只接收critical级别的日志消息

$ python receive_logs_topic.py "*.critical"

消费者即接受内核产生的日志消息,也接受error级别的日志消息

$ python receive_logs_topic.py "kern.*" "*.error"

生产者发送一条消息

$ python emit_log_topic.py "kern.critical" "A critical kernel error"

Next:远程调用

Previous:消息路由

Home:目录