- 安装插件并重启RabbitMQ,参见:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
- 生产者实现(生产者不需要知道队列名称)(注意使用的是官方库”php-amqplib/php-amqplib”: “>=3.0”):
$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
$channel = $connection->channel();
$channel->exchange_declare($exchange, 'x-delayed-message', false, true, false, false, false, new AMQPTable([
'x-delayed-type' => AMQPExchangeType::FANOUT,
]));
$headers = new AMQPTable(['x-delay' => $delay]);
// json_encode($data) 为数据
$message = new AMQPMessage(json_encode($data), ['delivery_mode' => 2]);
$message->set('application_headers', $headers);
$channel->basic_publish($message, $exchange);
3.消费者实现(消费者的队列名称是随意的):
$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
$channel = $connection->channel();
// 防止交换机不存在 $exchange 为交换机名称
$channel->exchange_declare($exchange, 'x-delayed-message', false, true, false, false, false, new AMQPTable([
'x-delayed-type' => AMQPExchangeType::FANOUT,
]));
// $queue 为队列名称
$channel->queue_declare($queue, false, false, false, false, false, new AMQPTable([
'x-dead-letter-exchange' => 'delayed',
]));
# echo " [*] Waiting for messages. To exit press CTRL+C\n";
// 绑定交换机和队列
$channel->queue_bind($queue, $exchange);
/*
queue: Queue from where to get the messages
consumer_tag: Consumer identifier
no_local: Don't receive messages published by this consumer.
no_ack: If set to true, automatic acknowledgement mode will be used by this consumer. See https://www.rabbitmq.com/confirms.html for details.
exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
nowait:
callback: A PHP Callback
function (AMQPMessage $message) {
$headers = $message->get('application_headers');
$nativeData = $headers->getNativeData();
var_dump($nativeData['x-delay']);
var_dump($message->getBody());
$message->ack(); # 必须ack
}
*/
// $callback 为处理数据的 callable
$channel->basic_consume($queue, '', false, false, false, false, $callback);
register_shutdown_function(function ($channel, $connection) {
$channel->close();
$connection->close();
}, $channel, $connection);
//循环监听
while (count($channel->callbacks)) {
try {
$channel->wait(null, false, rand(33, 44));
} catch (AMQPTimeoutException $e) {
} catch (Throwable $e) {
echo $e->getMessage(), "\n";
}
}