RabbitMQ 入门指南(四)

淡淡的烟草味﹌ 2022-12-25 06:00 217阅读 0赞

Routing

在这一章节,我们将介绍如何选择性地订阅消息的子集。

例如,我们要实现这个功能,一个消费者仅仅只需将错误的日志消息保存到磁盘文件;另一个消费者仍然可以打印所有的日志消息。

image

Binding

在上一节中,我们利用绑定建立了队列和交换机之间的绑定关系。

  1. $channel->queue_bind($queue_name, 'logs');

这种绑定关系,可以简单地理解为队列对交换机中的消息感兴趣。

binding_key

queue_bind() 方法的第三个参数是 routing_key,为了避免与 basic_publish() 方法的第三个参数 routing_key 搞混淆了。

这里,我们将 queue_bind() 方法的第三个参数称之为 binding_key。

  1. # 使用 binding_key 建立绑定关系
  2. $binding_key = 'black';
  3. $channel->queue_bind($queue_name, $exchange_name, $binding_key);

binding_key 的意义依赖于交换机的类型。fanout 类型的交换机,会直接忽略 binding_key 的值。

direct exchange

direct 类型的交换机的路由规则很简单,它会把消息路由给 binding_key 与 routing_key 完全匹配的队列。

我们可以使用 direct 类型的 exchange 来过滤路由到某个指定消费者的消息。而 fanout 类型的交换机无法实现这一点,因为它只是无脑地将所有消息广播给所有已绑定的消费者。

下面,我们来实现开头说到的功能。

发送日志消息

为了简化示例,我们将日志消息的严重性分为三种:info、warning、error。并且,将消息的严重性作为 routing_key,这样,消费者就可以自由选择接收何种严重性的消息。

这里,我们先修改好消息的生产者。

  1. $channel->exchange_declare('direct_logs', 'direct', false, false, false);
  2. $channel->basic_publish($msg, 'direct_logs', $severity);

订阅消息

消费者只需订阅自己感兴趣的消息。

  1. foreach ($severities as $severity) {
  2. $channel->queue_bind($queue_name, 'direct_logs', $severity);
  3. }

最终的代码示例

emit_log_direct.php,作为生产者,内容如下:

  1. <?php
  2. require_once __DIR__ . '/vendor/autoload.php';
  3. use PhpAmqpLib\Connection\AMQPStreamConnection;
  4. use PhpAmqpLib\Message\AMQPMessage;
  5. $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
  6. $channel = $connection->channel();
  7. $channel->exchange_declare('direct_logs', 'direct', false, false, false);
  8. $severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';
  9. $data = implode(' ', array_slice($argv, 2));
  10. if (empty($data)) {
  11. $data = "Hello World!";
  12. }
  13. $msg = new AMQPMessage($data);
  14. $channel->basic_publish($msg, 'direct_logs', $severity);
  15. echo ' [x] Sent ', $severity, ':', $data, "\n";
  16. $channel->close();
  17. $connection->close();

receive_logs_direct.php,作为消费者,内容如下:

  1. <?php
  2. require_once __DIR__ . '/vendor/autoload.php';
  3. use PhpAmqpLib\Connection\AMQPStreamConnection;
  4. $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
  5. $channel = $connection->channel();
  6. $channel->exchange_declare('direct_logs', 'direct', false, false, false);
  7. list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
  8. $severities = array_slice($argv, 1);
  9. if (empty($severities)) {
  10. file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");
  11. exit(1);
  12. }
  13. foreach ($severities as $severity) {
  14. $channel->queue_bind($queue_name, 'direct_logs', $severity);
  15. }
  16. echo " [*] Waiting for logs. To exit press CTRL+C\n";
  17. $callback = function ($msg) {
  18. echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
  19. };
  20. $channel->basic_consume($queue_name, '', false, true, false, false, $callback);
  21. while ($channel->is_consuming()) {
  22. $channel->wait();
  23. }
  24. $channel->close();
  25. $connection->close();

运行两个消费者 C1 和 C2。

  1. # C1,只接收 error 日志消息,并将其保存到磁盘文件
  2. php receive_logs_direct.php error > logs_from_rabbit.log
  3. # C2,接收三种日志消息,用于打印查看
  4. php receive_logs_direct.php info warning error

运行一个生产者 P

  1. # P,发送 error 日志消息
  2. php emit_log_direct.php error "Run. Run. Or it will explode."
  3. # P,发送 info 日志消息
  4. php emit_log_direct.php info "it is normal."

下一节,我们将介绍如何基于 topic 来监听消息。

参考文献

[1] https://www.rabbitmq.com/tutorials/tutorial-four-php.html

发表评论

表情:
评论列表 (有 0 条评论,217人围观)

还没有评论,来说两句吧...

相关阅读