RabbitMQ延迟队列


什么是延迟队列

延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费。

使用场景

  • 用户生成订单之后,需要过一段时间校验订单的支付状态,如果订单仍未支付则需要及时地关闭订单。
  • 用户注册成功之后,需要过一段时间比如一周后校验用户的使用情况,如果发现用户活跃度较低,则发送邮件或者短信来提醒用户使用。

如果不使用延迟队列,那么我们只能通过一个轮询扫描程序去完成。这种方案既不优雅,也不方便做成统一的服务便于开发人员使用。但是使用延迟队列的话,我们就可以轻而易举地完成

RabbitMQ的两个特性

Time-To-Live Extensions

RabbitMQ允许我们为消息或者队列设置TTL(time to live),也就是过期时间。TTL表明了一条消息可在队列中存活的最大时间,单位为毫秒。也就是说,当某条消息被设置了TTL或者当某条消息进入了设置了TTL的队列时,这条消息会在经过TTL秒后“死亡”,成为Dead Letter。如果既配置了消息的TTL,又配置了队列的TTL,那么较小的那个值会被取用。

Dead Letter Exchange

刚才提到了,被设置了TTL的消息在过期后会成为Dead Letter。其实在RabbitMQ中,一共有三种消息的“死亡”形式:

  • 消息被拒绝。通过调用basic.reject或者basic.nack并且设置的requeue参数为false。
  • 消息因为设置了TTL而过期。
  • 消息进入了一条已经达到最大长度的队列。

如果队列设置了Dead Letter Exchange(DLX),那么这些Dead Letter就会被重新publish到Dead Letter Exchange,通过Dead Letter Exchange路由到其他队列。

流程图

将RabbitMQ的TTL和DLX特性结合在一起,实现一个延迟队列。

延迟消费是延迟队列最为常用的使用模式。如下图所示,生产者产生的消息首先会进入缓冲队列(图中红色队列)。通过RabbitMQ提供的TTL扩展,这些消息会被设置过期时间,也就是延迟消费的时间。等消息过期之后,这些消息会通过配置好的DLX转发到实际消费队列(图中蓝色队列),以此达到延迟消费的效果

代码实现

首先我们在Intellij中创建一个Spring Boot工程,并且添加spring-boot-starter-amqp扩展。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.yml

spring:
  rabbitmq:
    addresses: 127.0.0.1:5672
    username: admin
    password: 123456
    connection-timeout: 15000
    # 生产者配置: 启动消息确认模式
    publisher-confirm-type: correlated
    listener:
      direct:
        acknowledge-mode: manual
      simple:
        # 消费者配置: 表示消费成功之后需要手工进行签收(ack),默认为auto
        acknowledge-mode: manual
        concurrency: 5
        max-concurrency: 10
        # 消费者配置: 批量消息消费则一条条的消费
        prefetch: 1

从上述的流程图中我们可以看到,一个延迟队列的实现,需要一个缓冲队列以及一个实际的消费队列。

MqConstant

String DELAY_QUEUE = "test-delay-queue"; // 实际的消费队列
String DEAD_QUEUE = "dead-letter-queue";  // 缓冲队列

String DEAD_EXCHANGE = "dead-letter-exchange"; // 缓冲队列exchange

String DELAY_ROUTING_KEY = "test-delay-routingKey.*"; // 实际消费队列routing-key
String DEAD_ROUTING_KEY = "dead-letter-routingKey.*"; // 缓冲队列routing-key

创建缓冲队列:

@Bean
public Queue deadLetterQueue() {
    HashMap<String, Object> map = new HashMap<>(2);
    // 出现dead letter之后将dead letter重新发送到指定exchange
    map.put("x-dead-letter-exchange", MqConstant.DEAD_EXCHANGE);
    // 出现dead letter之后将dead letter重新按照指定的routing-key发送
    map.put("x-dead-letter-routing-key", MqConstant.DELAY_ROUTING_KEY);
    return QueueBuilder.nonDurable(MqConstant.DEAD_QUEUE).withArguments(map).build();
}

创建缓冲队列交换机

@Bean
public Exchange deadLetterExchange() {
    return ExchangeBuilder.directExchange(MqConstant.DEAD_EXCHANGE).durable(true).build();
}

绑定缓冲队列和缓冲队列交换机

@Bean
public Binding deadLetterBinding(Queue deadLetterQueue, Exchange deadLetterExchange) {
    return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(MqConstant.DEAD_ROUTING_KEY).noargs();
}

创建实际消费队列

@Bean
public Queue redirectQueue() {
    return QueueBuilder.nonDurable(MqConstant.DELAY_QUEUE).build();
}

将实际消费队列与缓冲队列的交换机绑定

@Bean
public Binding redirectBinding(Queue redirectQueue, Exchange deadLetterExchange) {
    return BindingBuilder.bind(redirectQueue).to(deadLetterExchange).with(MqConstant.DELAY_ROUTING_KEY).noargs();
}

消息消费者,监听实际消费队列

@RabbitListener(queues = MqConstant.DELAY_QUEUE)
public void delay(Message<String> message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
    log.info("延迟队列: {}", message.getPayload());
    channel.basicAck(tag, false);
}

发送消息, 向缓冲队列发送

@Slf4j
@SpringBootTest
public class RabbitMQExampleApplicationTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void send() {
        Message<String> msg = MessageBuilder.withPayload("hello delay").build();
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(
                MqConstant.DEAD_EXCHANGE,
                MqConstant.DEAD_ROUTING_KEY,
                msg,
                message -> {
                    // 设置该消息的延迟时间
                    long millis = TimeUnit.SECONDS.toMillis(10);
                    message.getMessageProperties().setExpiration(String.valueOf(millis));
                    return message;
                },
                correlationData);
        log.info("已发送延迟队列消息");
    }
}

结果

14:17:14.534  INFO 9400 --- [           main] c.s.e.r.RabbitMQExampleApplicationTest   : 已发送延迟队列消息
// 10s 后
14:17:24.586  INFO 9396 --- [ntContainer#1-1] c.s.e.rabbitmq.receive.RabbitReceive     : 延迟队列: hello delay

使用RabbitMQ插件实现延迟队列

首先去rabbitmq官网下载插件: https://www.rabbitmq.com/community-plugins.html, 找到rabbitmq_delayed_message_exchange 下载
安装

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

需要改一下RabbitMQ的Java配置文件

@Configuration
public class DelayedConfig {

    public final static String QUEUE_NAME = "delayed.goods.order";
    public final static String EXCHANGE_NAME = "delayedec";

    @Bean
    public Queue queue() {
        return QueueBuilder.nonDurable(DelayedConfig.QUEUE_NAME).build();
    }

    /**
     * 默认交换机
     * @return
     */
    @Bean
    public CustomExchange customExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        //参数二为类型:必须是x-delayed-message
        return new CustomExchange(DelayedConfig.EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }

    /**
     * 绑定队列到交换器
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding binding(Queue queue, CustomExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs();
    }

}

其他不变,和上面的保持一致


Author: Re:0
Reprint policy: All articles in this blog are used except for special statements CC BY 4.0 reprint policy. If reproduced, please indicate source Re:0 !
 Previous
建造者模式 建造者模式
定义所谓万丈高楼平地起,但是我们建造(Build)高楼时,需要经历很多阶段,比如打地基、搭框架、浇筑水泥、封顶等,这些都是很难一气呵成的。所以一般我们是先建造组成高楼的各个部分,然后将其一个个地组装起来,好比搭积木一般,分阶段拼接后组装成一
2022-03-07
Next 
单例模式 单例模式
定义大家都知道,一个对象的产生都是通过 new 关键字实现的(当然也存在其它方式,比如反射、复制等),new 的实现又是依托于构造函数的,默认一个类会自动生成一个无参的构造函数在不指定构造函数的情况下。构造函数一般都是 public 权限修
2022-03-07
  TOC