RabbitMQ - Routing

 提示:转载请注明原文链接

 本文永久链接:https://www.360us.net/article/62.html

接着上篇发布-订阅的日志记录,如果只订阅消息的子集,比如,只记录错误日志到硬盘,全部的消息打印出来,那么就需要用到这里要讲的内容了。

exchang的绑定操作可以传递一个叫routing_key的额外参数,这里把它称为binding key

$binding_key = 'black';
$channel->queue_bind($queue_name, $exchange_name, $binding_key);

binding key依赖于exchange的类型,fanout类型是会忽略这个值的。

在日志系统里面,如果需要根据消息级别过滤消息,就是不同的订阅者收到不同级别的错误日志消息会有不同的处理方式。

之前用的fanoutexchange类型就不行了,这个类型只能广播消息,而没有其他的灵活性了。

这里的需求需要用directexchange类型来代替,这种类型的算法是消息会推送到binding keyrouting key相同的队列。

这里binding key指的是$channel::queue_bindrouting_key参数,routing key指的是$channel::basic_publishrouting_key参数,用以区分这两个不同 的地方。

如果有多个binding key绑定到一个exchange,那么exchange会广播消息到对应binding key的所有队列。

最终代码:

生产者:

<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('192.168.33.10', 5672, 'admin', 'admin');
$channel = $connection->channel(); //创建channel

//声明一个direct类型exchange logs
$channel->exchange_declare('direct_logs', 'direct', false, false, false);

//routing key从参数里面手动给出
$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';

$data = implode(' ', array_slice($argv, 2));
if (empty($data)) {
    $data = "info: Hello World!";
}

//往队列里面推消息
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'direct_logs', $severity);

echo " [x] Sent '$data'\n";

//关闭channel和连接
$channel->close();
$connection->close();

消费者:

<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('192.168.33.10', 5672, 'admin', 'admin');
$channel = $connection->channel();

//声明一个exchange logs
$channel->exchange_declare('direct_logs', 'direct', false, false, false);

//创建一个临时队列
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

//绑定exchange和临时队列
$severities = array_slice($argv, 1);
if (empty($severities)) {
    file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");
    exit(1);
}
foreach ($severities as $severity) {
    $channel->queue_bind($queue_name, 'direct_logs', $severity);
}

echo " [*] Waiting for logs. To exit press CTRL+C\n";

//声明一个匿名函数,用来处理接收到的消息
$callback = function ($msg) {
    sleep(3);
    echo ' [x] ', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

//有回调函数的时候,代码会阻塞,当有消息到来的时候就会调用$callback来处理
while (count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

分别运行两个消费者: php consumer.php warning infophp consumer.php info error

运行生产者推送消息php protucer.php info logs2php protucer.php error logs2php protucer.php warning logs2

观察消费者的输出,消息会推送到绑定了相同routing key的队列里,各个队列可以有选择的收到消息。

参考 https://www.rabbitmq.com/tutorials/tutorial-four-php.html

 评论
暂无评论