一、
消息的广播需要exchange:exchange是一个转发器,其实把消息发给RabbitMQ里的exchange
fanout: 所有bind到此exchange的queue都可以接收消息,广播
direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
headers:通过headers来决定把消息发给哪些queue,用的比较少
原理:消息发布端分别发送INFO,WARNING,ERROR类型的消息,C1 C2 C3订阅了不同类型的消息
消息发送端:
'''发布者publisher'''import pikaimport sysconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))channel = connection.channel()channel.exchange_declare(exchange='direct_logs', type='direct') # 1、改成type='direct'# 2、默认发送的消息级别为info,可以带参数,warning error等severity = sys.argv[1] if len(sys.argv) > 1 else "info"message = ' '.join(sys.argv[2:]) or " Hello World!"channel.basic_publish(exchange='direct_logs', routing_key=severity, # 3、把上面的消息发到这个queue中 body=message)print("send :", message)connection.close()
消息订阅者:
'''订阅者subscriber'''import pikaimport sysconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))channel = connection.channel()channel.exchange_declare(exchange='direct_logs', type='direct') # 4、改exchange的类型result = channel.queue_declare(exclusive=True)queue_name = result.method.queue# 5、启动订阅端的时候,severities存放订阅端订阅了哪些级别# 然后用routing_key把这些级别绑定到queue上,这些queue就放这些级别的消息severities = sys.argv[1]if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1)for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)print("Wait for logs...")# 6、使用method.routing_key可以得到消息的级别def callback(ch, method, properties, body): print("received:", method.routing_key, body)channel.basic_consume(callback, queue=queue_name, no_ack=True)channel.start_consuming()
运行过程:
'''启动订阅者1: python subscriber.py info启动订阅者2:python subscriber.py info error启动发布者1:python publisher.py info hello启动发布者2:python publisher.py error servicesdown订阅者1收到消息:info b'hello'订阅者2收到消息:info b'hello' error b'servicesdown''''
posted on 2018-11-08 23:03 阅读( ...) 评论( ...)