190707Python-RabbitMQ 怼烎@ 2021-11-14 13:04 210阅读 0赞 ### 一、简单的RabbitMQ示例 ### * 生产者 # Author:Li Dongfei import pika connection = pika.BlockingConnection( pika.ConnectionParameters('localhost') ) #建立一个连接 channel = connection.channel() #声明一个管道 channel.queue_declare(queue='hello') #生成一个queue channel.basic_publish(exchange='', routing_key='hello', #queue名字 body='Hello World!') print(" [x] Sent 'Hello World!'") connection.close() * 消费者 # Author:Li Dongfei import pika connection = pika.BlockingConnection( pika.ConnectionParameters('localhost') ) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print("-->", ch, method, properties) print(" [x] Received %r" %body) channel.basic_consume(callback, #开始消费消息,如果收到消息就调用callback函数来处理消息 queue='hello', no_ack=True ) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() #开始收消息 ### 二、RabbitMQ命令行工具 ### C:\>cd C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.12\sbin C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.12\sbin>rabbitmqctl.bat list_queues #列出当前的queue ### 三、rabbitmq持久化 ### * 队列持久化 channel.queue_declare(queue='hello', durable=True) #durable=True声明队列持久化 * 消息持久化 channel.basic_publish(exchange='', routing_key='hello', #queue名字 body='Hello World!', properties=pika.BasicProperties( delivery_mode=2 #消息持久化 )) ### 四、消息调度 ### * 在消费者中定义 channel.basic_qos(prefetch_count=1) #只要当前有一条消息在处理则不要给我发消息 ### 五、广播模式 ### * fanout:所有bind到此exchange的queue都可以接受消息 * 订阅/发布 生成者 # Author:Li Dongfei import pika connection = pika.BlockingConnection( pika.ConnectionParameters('localhost') ) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') message = "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" %message) connection.close() 消费者 # Author:Li Dongfei import pika connection = pika.BlockingConnection( pika.ConnectionParameters('localhost') ) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') result = channel.queue_declare(exclusive=True) #exclusive 唯一的 queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" %body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming() * direct:通过routingKey和exchange决定的哪个唯一的queue可以接受消息 生产者 # Author:Li Dongfei import pika, sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost' )) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') serverity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=serverity, body=message) print(" [x] Sent %r:%r" %(serverity, message)) connection.close() 消费者 # Author:Li Dongfei import pika, sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue serverities = sys.argv[1:] if not serverities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" %sys.argv[0]) sys.exit(1) for serverity in serverities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=serverity) print(' [*] Wating for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" %(method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming() * topic:所有符合routingKey的routingKey所bind的queue可以接受消息 ### 六、rabbitMQ rpc ### * client # Author:Li Dongfei import pika, uuid class FibonacciTpcClient(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost' )) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id, ), body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response) fibonacci_rpc = FibonacciTpcClient() print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(30) print(" [.] Got %r" %response) * server # Author:Li Dongfei import pika, time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost' )) channel = connection.channel() channel.queue_declare(queue='rpc_queue') def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2) def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" %n) response = fib(n) ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id=props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue='rpc_queue') print(" [x] Awatiing RPC requests") channel.start_consuming() 转载于:https://www.cnblogs.com/L-dongf/p/11145708.html
相关 190707Python-RabbitMQ 一、简单的RabbitMQ示例 生产者 Author:Li Dongfei import pika connection = pika. 怼烎@/ 2021年11月19日 12:20/ 0 赞/ 116 阅读
相关 190707select和selector模块 一、select模块 Python select socket server代码示例 Author:Li Dongfei import sele 比眉伴天荒/ 2021年11月14日 13:04/ 0 赞/ 316 阅读
相关 190707Python-RabbitMQ 一、简单的RabbitMQ示例 生产者 Author:Li Dongfei import pika connection = pika. 怼烎@/ 2021年11月14日 13:04/ 0 赞/ 211 阅读
还没有评论,来说两句吧...