什么是延迟队列
延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费。
使用场景
- 用户生成订单之后,需要过一段时间校验订单的支付状态,如果订单仍未支付则需要及时地关闭订单。
- 用户注册成功之后,需要过一段时间比如一周后校验用户的使用情况,如果发现用户活跃度较低,则发送邮件或者短信来提醒用户使用。
如果不使用延迟队列,那么我们只能通过一个轮询扫描程序去完成。这种方案既不优雅,也不方便做成统一的服务便于开发人员使用。但是使用延迟队列的话,我们就可以轻而易举地完成
RabbitMQ的两个特性
Time-To-Live Extensions
RabbitMQ允许我们为消息或者队列设置TTL(time to live),也就是过期时间。TTL表明了一条消息可在队列中存活的最大时间,单位为毫秒。也就是说,当某条消息被设置了TTL或者当某条消息进入了设置了TTL的队列时,这条消息会在经过TTL秒后“死亡”,成为Dead Letter。如果既配置了消息的TTL,又配置了队列的TTL,那么较小的那个值会被取用。
Dead Letter Exchange
刚才提到了,被设置了TTL的消息在过期后会成为Dead Letter。其实在RabbitMQ中,一共有三种消息的“死亡”形式:
- 消息被拒绝。通过调用basic.reject或者basic.nack并且设置的requeue参数为false。
- 消息因为设置了TTL而过期。
- 消息进入了一条已经达到最大长度的队列。
如果队列设置了Dead Letter Exchange(DLX),那么这些Dead Letter就会被重新publish到Dead Letter Exchange,通过Dead Letter Exchange路由到其他队列。
流程图
将RabbitMQ的TTL和DLX特性结合在一起,实现一个延迟队列。
延迟消费是延迟队列最为常用的使用模式。如下图所示,生产者产生的消息首先会进入缓冲队列(图中红色队列)。通过RabbitMQ提供的TTL扩展,这些消息会被设置过期时间,也就是延迟消费的时间。等消息过期之后,这些消息会通过配置好的DLX转发到实际消费队列(图中蓝色队列),以此达到延迟消费的效果
代码实现
首先我们在Intellij中创建一个Spring Boot工程,并且添加spring-boot-starter-amqp
扩展。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.yml
spring:
rabbitmq:
addresses: 127.0.0.1:5672
username: admin
password: 123456
connection-timeout: 15000
# 生产者配置: 启动消息确认模式
publisher-confirm-type: correlated
listener:
direct:
acknowledge-mode: manual
simple:
# 消费者配置: 表示消费成功之后需要手工进行签收(ack),默认为auto
acknowledge-mode: manual
concurrency: 5
max-concurrency: 10
# 消费者配置: 批量消息消费则一条条的消费
prefetch: 1
从上述的流程图中我们可以看到,一个延迟队列的实现,需要一个缓冲队列以及一个实际的消费队列。
MqConstant
String DELAY_QUEUE = "test-delay-queue"; // 实际的消费队列
String DEAD_QUEUE = "dead-letter-queue"; // 缓冲队列
String DEAD_EXCHANGE = "dead-letter-exchange"; // 缓冲队列exchange
String DELAY_ROUTING_KEY = "test-delay-routingKey.*"; // 实际消费队列routing-key
String DEAD_ROUTING_KEY = "dead-letter-routingKey.*"; // 缓冲队列routing-key
创建缓冲队列:
@Bean
public Queue deadLetterQueue() {
HashMap<String, Object> map = new HashMap<>(2);
// 出现dead letter之后将dead letter重新发送到指定exchange
map.put("x-dead-letter-exchange", MqConstant.DEAD_EXCHANGE);
// 出现dead letter之后将dead letter重新按照指定的routing-key发送
map.put("x-dead-letter-routing-key", MqConstant.DELAY_ROUTING_KEY);
return QueueBuilder.nonDurable(MqConstant.DEAD_QUEUE).withArguments(map).build();
}
创建缓冲队列交换机
@Bean
public Exchange deadLetterExchange() {
return ExchangeBuilder.directExchange(MqConstant.DEAD_EXCHANGE).durable(true).build();
}
绑定缓冲队列和缓冲队列交换机
@Bean
public Binding deadLetterBinding(Queue deadLetterQueue, Exchange deadLetterExchange) {
return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(MqConstant.DEAD_ROUTING_KEY).noargs();
}
创建实际消费队列
@Bean
public Queue redirectQueue() {
return QueueBuilder.nonDurable(MqConstant.DELAY_QUEUE).build();
}
将实际消费队列与缓冲队列的交换机绑定
@Bean
public Binding redirectBinding(Queue redirectQueue, Exchange deadLetterExchange) {
return BindingBuilder.bind(redirectQueue).to(deadLetterExchange).with(MqConstant.DELAY_ROUTING_KEY).noargs();
}
消息消费者,监听实际消费队列
@RabbitListener(queues = MqConstant.DELAY_QUEUE)
public void delay(Message<String> message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
log.info("延迟队列: {}", message.getPayload());
channel.basicAck(tag, false);
}
发送消息, 向缓冲队列发送
@Slf4j
@SpringBootTest
public class RabbitMQExampleApplicationTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void send() {
Message<String> msg = MessageBuilder.withPayload("hello delay").build();
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(
MqConstant.DEAD_EXCHANGE,
MqConstant.DEAD_ROUTING_KEY,
msg,
message -> {
// 设置该消息的延迟时间
long millis = TimeUnit.SECONDS.toMillis(10);
message.getMessageProperties().setExpiration(String.valueOf(millis));
return message;
},
correlationData);
log.info("已发送延迟队列消息");
}
}
结果
14:17:14.534 INFO 9400 --- [ main] c.s.e.r.RabbitMQExampleApplicationTest : 已发送延迟队列消息
// 10s 后
14:17:24.586 INFO 9396 --- [ntContainer#1-1] c.s.e.rabbitmq.receive.RabbitReceive : 延迟队列: hello delay
使用RabbitMQ插件实现延迟队列
首先去rabbitmq官网下载插件: https://www.rabbitmq.com/community-plugins.html, 找到rabbitmq_delayed_message_exchange
下载
安装
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
需要改一下RabbitMQ的Java配置文件
@Configuration
public class DelayedConfig {
public final static String QUEUE_NAME = "delayed.goods.order";
public final static String EXCHANGE_NAME = "delayedec";
@Bean
public Queue queue() {
return QueueBuilder.nonDurable(DelayedConfig.QUEUE_NAME).build();
}
/**
* 默认交换机
* @return
*/
@Bean
public CustomExchange customExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
//参数二为类型:必须是x-delayed-message
return new CustomExchange(DelayedConfig.EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
/**
* 绑定队列到交换器
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding binding(Queue queue, CustomExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs();
}
}
其他不变,和上面的保持一致