RabbitMQ实现延迟队列

发布网友 发布时间:2024-09-28 05:16

我来回答

1个回答

热心网友 时间:2024-09-29 14:56

延迟队列的实现方式有很多种,数据库轮询查询,判断任务延时时间是否已经到了,如果到了则执行任务,这种方案对服务器内存消耗大,存在延迟,数据库损耗大,只适合一些小项目使用。redis键过期触发事件来触发延时任务的执行,还有就是本期内容要介绍的RabbitMQ队列实现延时队列。

实现原理:给队列的消息设置过期时间(TTL),消息到期后就会投递到一个死信队列,我们就可以在这里处理延迟的任务。

一、介绍1. 死信队列

当消息在一个队列中变成死信之后,它会被重新投递到设置的Exchange中,这个Exchange就是DLX,通过routing_key的绑定投递到对应的队列,这个队列就是死信队列。

2. 死信消息

消息被拒绝(basic.reject / basic.nack),并且requeue = false。

消息TTL过期。TTL:Time To Live的简称,即过期时间。

队列达到最大长度。

3. 过期消息

通过队列进行设置,设置后该队列所有的消息都存在相同的过期时间。

通过对消息本身设置,队列中的每条消息的过期时间都可以不一样。如果要用来实现延迟队列不建议使用这种方式,因为队列只会判断第一个消息是否过期,过期则把消息投递到死信队列。如果第一个消息过期时间为30s,二个消息的过期时间为10s,那么队列等30s后把第一消息投递到死信队列,然后继续判断下一个消息,但是这样子第二个消息的延迟时间就变成30s了。

二、原理图三、上代码

composer require php-amqplib/php-amqplib 把代码贴到根目录的public.php文件运行

<?phpuse PhpAmqpLib\Message\AMQPMessage;require_once __DIR__ . '/vendor/autoload.php';//配置信息$conn_args = array('host' => '47.107.237.18','port' => '5672','login'=> 'guest','password' => 'guest','vhost'=> '/');$ttl = 10;//过期时间$exchange= 'exchange';//正常交换机$delayExchange = 'delayed_' . $exchange;//死信交换机$type= 'topic';$msg = 'hello world';$route = 'delay';$deadQueue = 'dead_queue';//死信队列$delayQueue= 'delayed_queue_' . $exchange . '_' . $ttl;//延迟队列$delayRoutingKey = $route . '_' . $ttl;$conn = new \PhpAmqpLib\Connection\AMQPStreamConnection($conn_args['host'],$conn_args['port'],$conn_args['login'],$conn_args['password'],$conn_args['vhost']);//创建连接和channel$channel = $conn->channel();//定义延迟交换器$channel->exchange_declare($delayExchange, 'topic', false, true, false);//定义延迟队列,$channel->queue_declare($delayQueue,false,true,false,false,false,new \PhpAmqpLib\Wire\AMQPTable(array("x-dead-letter-exchange"=> $delayExchange,/*队列信息超时后投递到这个交换机*/"x-dead-letter-routing-key" => $route,/*routingKey*/"x-message-ttl" => $ttl * 1000,/*超时时间*/)));/*定义死信队列,延迟队列超时的消息就会投递到这里处理*/$channel->queue_declare($deadQueue,false,true,false,false);/*绑定死信队列到延迟交换机*/$channel->queue_bind($deadQueue, $delayExchange, $route);//绑定延迟队列到交换器上$channel->queue_bind($delayQueue, $delayExchange, $delayRoutingKey);//生产者发送消息$msg = new AMQPMessage($msg);$channel->basic_publish($msg, $delayExchange, $delayRoutingKey);$channel->close();

谢谢观看! streetlamp 敬上!

原文:https://juejin.cn/post/7102346193128128548

声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com