RabbitMQ 入门指南(四)
Routing
在这一章节,我们将介绍如何选择性地订阅消息的子集。
例如,我们要实现这个功能,一个消费者仅仅只需将错误的日志消息保存到磁盘文件;另一个消费者仍然可以打印所有的日志消息。
Binding
在上一节中,我们利用绑定建立了队列和交换机之间的绑定关系。
$channel->queue_bind($queue_name, 'logs');
这种绑定关系,可以简单地理解为队列对交换机中的消息感兴趣。
binding_key
queue_bind() 方法的第三个参数是 routing_key,为了避免与 basic_publish() 方法的第三个参数 routing_key 搞混淆了。
这里,我们将 queue_bind() 方法的第三个参数称之为 binding_key。
# 使用 binding_key 建立绑定关系
$binding_key = 'black';
$channel->queue_bind($queue_name, $exchange_name, $binding_key);
binding_key 的意义依赖于交换机的类型。fanout 类型的交换机,会直接忽略 binding_key 的值。
direct exchange
direct 类型的交换机的路由规则很简单,它会把消息路由给 binding_key 与 routing_key 完全匹配的队列。
我们可以使用 direct 类型的 exchange 来过滤路由到某个指定消费者的消息。而 fanout 类型的交换机无法实现这一点,因为它只是无脑地将所有消息广播给所有已绑定的消费者。
下面,我们来实现开头说到的功能。
发送日志消息
为了简化示例,我们将日志消息的严重性分为三种:info、warning、error。并且,将消息的严重性作为 routing_key,这样,消费者就可以自由选择接收何种严重性的消息。
这里,我们先修改好消息的生产者。
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
$channel->basic_publish($msg, 'direct_logs', $severity);
订阅消息
消费者只需订阅自己感兴趣的消息。
foreach ($severities as $severity) {
$channel->queue_bind($queue_name, 'direct_logs', $severity);
}
最终的代码示例
emit_log_direct.php,作为生产者,内容如下:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';
$data = implode(' ', array_slice($argv, 2));
if (empty($data)) {
$data = "Hello World!";
}
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'direct_logs', $severity);
echo ' [x] Sent ', $severity, ':', $data, "\n";
$channel->close();
$connection->close();
receive_logs_direct.php,作为消费者,内容如下:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$severities = array_slice($argv, 1);
if (empty($severities)) {
file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");
exit(1);
}
foreach ($severities as $severity) {
$channel->queue_bind($queue_name, 'direct_logs', $severity);
}
echo " [*] Waiting for logs. To exit press CTRL+C\n";
$callback = function ($msg) {
echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();
运行两个消费者 C1 和 C2。
# C1,只接收 error 日志消息,并将其保存到磁盘文件
php receive_logs_direct.php error > logs_from_rabbit.log
# C2,接收三种日志消息,用于打印查看
php receive_logs_direct.php info warning error
运行一个生产者 P
# P,发送 error 日志消息
php emit_log_direct.php error "Run. Run. Or it will explode."
# P,发送 info 日志消息
php emit_log_direct.php info "it is normal."
下一节,我们将介绍如何基于 topic 来监听消息。
参考文献
[1] https://www.rabbitmq.com/tutorials/tutorial-four-php.html
还没有评论,来说两句吧...