Spring Cloud Stream 进阶配置——使用延迟队列实现“定时关闭超时未支付订单”

时间:2018-09-25 09:41:35来源:杰瑞文章网点击:作文字数:300字
ps: 本文所有代码可在 这里 查看。 延迟队列 延迟队列 操作的对象是延迟消息,所谓 “延迟消息” 是指当消息被发送以后,并不想让消费者立刻消费消息,而是等待特定时间后,消费者才能拿到消息进行消费。 延迟队列比较经典的使用场景有: 在订单系统中,用户下单后,如果未在规定时间内(比如30分钟)支付,那么该订单会被关闭,即自动取消订单。 用户希望通过手机远程控制家里的智能设备在指定的时间进行工作。这时候可以将用户指令发送到延迟队列,当指令时间到了,再将指令推送到智能设备。 基于 RabbitMQ 的延迟队列 使用死信队列实现延迟队列 在 AMQP 协议中,或者 v3.5.8 之前的 RabbitMQ 本身并没有直接支持延迟队列功能,要想实现类似延迟队列的功能,可以通过死信队列的配合。即定义一组 ttl 为特定时长的队列,比如:5秒,10秒,30秒,1分钟等,然后再对这些队列,分别定义死信队列,当消息过期时,就会转存到相应的死信队列(即延迟队列)中,这样消费者根据业务自身的情况,分别选择不同延迟等级的延迟队列进行消费。 使用延迟消息交换机插件实现延迟队列 上面介绍的延迟队列实现方式,其实是比较繁琐的,好在,在版本 v3.5.8之后,RabbitMQ 推出了一个延迟消息交换机插件:rabbitmq_delayed_message_exchange,当启用该插件后,如果有一个队列声明为延迟交换机,那么当有消息发送到该交换机后,会根据延迟时长来决定投递的顺序,而如果延迟时长小于零,那么会立刻投递到相应的队列。 第一种实现方式,不在本文的讨论范围,就不细说,下面将对第二种实现方式进行介绍。 ps:RabbitMQ 的版本最好是 3.6.x 及以上,Erlang/OTP 的版本要在 18.0 以上。 使用延迟消息交换机插件 下载插件 因为该插件默认是没有在 RabbitMQ 的软件包的 plugins 目录下,需要自己下载然后放到 plugins 目录下,下载地址如下: 3.7.x / 3.8.x 3.6.x 下载下来后,解压,然后拷贝到 plugins 目录下,如果是通过 rpm 是方式安装,目录应该是:/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.15/plugins;如果是 Mac 用户,且使用 brew 安装,目录则在:/usr/local/Cellar/rabbitmq/3.7.7/plugins。 启用插件 # 启用插件 rabbitmq-delayed-message-exchange rabbitmq-plugins enable rabbitmq-delayed-message-exchange 配合 Spring Cloud Stream 使用延迟交换机 首先来看一下延迟交换机如何配置: spring: cloud: stream: bindings: delayedQueueOutput: destination: delayedQueueTopic content-type: application/json binder: rabbit delayedQueueInput: destination: delayedQueueTopic content-type: application/json group: ${spring.application.name} binder: rabbit rabbit: bindings: delayedQueueOutput: producer: delayedExchange: true # 是否将目标exchange声明为一个延迟消息交换机,默认false。即消息productor发布消息到延迟exchange后,延迟n长时间后才将消息推送到指定的queue中。 -RabbitMQ需要安装/启用插件: rabbitmq-delayed-message-exchange delayedQueueInput: consumer: delayedExchange: true # 是否将目标exchange声明为一个延迟消息交换机,默认false。即消息productor发布消息到延迟exchange后,延迟n长时间后才将消息推送到指定的queue中。 -RabbitMQ需要安装/启用插件: rabbitmq-delayed-message-exchange 重点关注2个配置:spring.cloud.stream.rabbit.bindings.ChannelName.producer.delayedExchange 和 spring.cloud.stream.rabbit.bindings.ChannelName.consumer.delayedExchange。 这2个配置分别属于生产者和消费者的配置,但都是用于告诉 Spring Cloud Stream 是否将交换机声明为一个延迟消息交换机。这2个是成对出现,如果少配置了一个,服务启动时会报一个警告,下文会说明。 延迟消息交换机的相关配置就这么简单,接下来通过测试用例来看一下效果。 ScasDelayedTest @Data @NoArgsConstructor @AllArgsConstructor public class DelayModel { /** * 延迟投递的时长. 单位: ms */ private long delay; } @Slf4j @RunWith(SpringRunner.class) @SpringBootTest @ActiveProfiles("delayed") @EnableBinding({ScasDelayedTest.MessageSink.class, ScasDelayedTest.MessageSource.class}) public class ScasDelayedTest { @Autowired private DelayedQueueProducer delayedQueueProducer; @Test public void test() throws InterruptedException { for (int i = 0; i < 5; i++) { // 随机延迟 3-8 秒 long delay = RandomUtil.randomLong(3, 8) * 1000; delayedQueueProducer.publish(new DelayModel(delay)); } Thread.sleep(1000000); } @Component public static class DelayedQueueProducer { @Autowired private MessageSource messageSource; public void publish(DelayModel model) { long delay = model.getDelay(); Message message = MessageBuilder.withPayload(model).setHeader("x-delay", delay).build(); messageSource.delayedQueueOutput().send(message); log.info("发布延迟队列消息: {}", model); } } @Component public static class DelayedQueueHandler { @StreamListener("delayedQueueInput") public void handle(DelayModel model) throws InterruptedException { log.info("消费延迟队列的消息. model: [{}].", model); } } public interface MessageSink { @Input("delayedQueueInput") SubscribableChannel delayedQueueInput(); } public interface MessageSource { @Output("delayedQueueOutput") MessageChannel delayedQueueOutput(); } } 上面的代码很简单,重点是在构建消息时,比平常多了一个步骤,即 .setHeader("x-delay", delay),其中变量 delay 为该消息需要延迟多久才被消费。 很好理解,通过 setHeader 方法,对 Message 添加一个名为 x-delay 的头部,对应的值则为延迟时长,单位为 ms。当该消息被投递到延迟交换机后,获取头部 x-delay 的值,如果小于0,那么立即将消息路由到相应的队列被消费,如果大于0,则延迟对应时间。 启动 ScasDelayedTest 启动测试用例后,控制台会出现类似如下图的输出: ScasDelayedTest 查看延迟投递的消息数量 怎么查看延迟投递的消息数量?可以在 RabbitMQ Management 的对应交换机页面查看, http://localhost:15672/#/exchanges/%2F/delayedQueueTopic 延迟投递的消息数量 ps: 为达到查看效果,可以适当增加延迟时长。 使用延迟队列实现“定时关闭超时未支付订单” 上面简单介绍了延迟交换机的使用方法,现在回到正题,如何使用延迟队列来实现 “定时关闭超时未支付订单” 呢? 针对上面的场景,一般的思路是:定义一个定时任务,比如每分钟查询一下订单表,找出接下来1、2钟内需要关闭的订单,然后再对每一笔订单执行 关闭订单 操作,当然在关闭之前需要再次确认订单是否 “已支付”。 为了简单,再通过一个测试用例来模拟一下具体场景。 ScasCloseUnpaidOrderTest @Data @NoArgsConstructor @AllArgsConstructor public class OrderModel { /** * 订单id */ private Long id; /** * 订单失效时间 */ private Long expireTime; @Override public String toString() { return "OrderModel{" + "id=" + id + ", expireTime=" + TimeUtil.format(TimeUtil.toLocalDateTime(expireTime)) + '}'; } } @Slf4j @RunWith(SpringRunner.class) @SpringBootTest @ActiveProfiles("delayed") @EnableBinding({ScasCloseUnpaidOrderTest.MessageSink.class, ScasCloseUnpaidOrderTest.MessageSource.class}) public class ScasCloseUnpaidOrderTest { @Autowired private CloseUnpaidOrderProducer closeUnpaidOrderProducer; @Test public void test() throws InterruptedException { // 模拟每分钟的0秒执行定时任务 long toSleep = 60000 - System.currentTimeMillis() % 60000; Thread.sleep(toSleep); List models = buildUnpaidOrderModel(); for (OrderModel model : models) { closeUnpaidOrderProducer.publish(model); } Thread.sleep(1000000); } private List buildUnpaidOrderModel() { long now = System.currentTimeMillis(); List models = new ArrayList<>(5); for (int i = 0; i < 5; i++) { long id = RandomUtil.randomLong(10000, 100000); // 模拟 订单将在小于60s内过期 long expireTime = now + RandomUtil.randomLong(0, 60) * 1000; OrderModel model = new OrderModel(); model.setId(id); model.setExpireTime(expireTime); models.add(model); } return models; } @Component public static class CloseUnpaidOrderProducer { @Autowired private MessageSource messageSource; public void publish(OrderModel model) { long now = System.currentTimeMillis(); long delay = model.getExpireTime() - now; Message message = MessageBuilder.withPayload(model).setHeader("x-delay", delay).build(); messageSource.closeUnpaidOrderOutput().send(message); log.info("发布 [关闭超时未支付订单] 消息. delay: {}, model: {}", delay, model); } } @Component public static class CloseUnpaidOrderHandler { private Random random = new Random(); @StreamListener("closeUnpaidOrderInput") public void handle(OrderModel model) throws InterruptedException { log.info("检查订单状态, 关闭支付超时订单. model: {}", model); if (isPaySuccess()) { log.info("订单 [{}] 支付超时. 关闭订单.", model.getId()); } else { log.info("订单 [{}] 支付完成.", model.getId()); } } private boolean isPaySuccess() { // 模拟从支付系统查询支付状态. return random.nextInt(10) % 3 == 0; } } public interface MessageSource { @Output("closeUnpaidOrderOutput") MessageChannel closeUnpaidOrderOutput(); } public interface MessageSink { @Input("closeUnpaidOrderInput") SubscribableChannel closeUnpaidOrderInput(); } } 配置文件跟上一个测试用例基本一样: spring: cloud: stream: bindings: closeUnpaidOrderOutput: destination: closeUnpaidOrderTopic content-type: application/json binder: rabbit closeUnpaidOrderInput: destination: closeUnpaidOrderTopic content-type: application/json group: ${spring.application.name} binder: rabbit rabbit: bindings: closeUnpaidOrderOutput: producer: delayedExchange: true # 是否将目标exchange声明为一个延迟消息交换机,默认false。即消息productor发布消息到延迟exchange后,延迟n长时间后才将消息推送到指定的queue中。 -RabbitMQ需要安装/启用插件: rabbitmq-delayed-message-exchange closeUnpaidOrderInput: consumer: delayedExchange: true # 是否将目标exchange声明为一个延迟消息交换机,默认false。即消息productor发布消息到延迟exchange后,延迟n长时间后才将消息推送到指定的queue中。 -RabbitMQ需要安装/启用插件: rabbitmq-delayed-message-exchange 启动 ScasCloseUnpaidOrderTest 启动后,可以看到控制台有类似输出: ScasCloseUnpaidOrderTest 相信上面的代码对应各位看官来说,理解起来肯定是毫无压力的,这里就不在赘述。 相关链接 https://www.rabbitmq.com/community-plugins.html https://github.com/rabbitmq/rabbitmq-delayed-message-exchange 推荐阅读 Spring Cloud Stream 进阶配置——高可用(二)——死信队列 Spring Cloud Stream 进阶配置——高可用(一)——失败重试 Spring Cloud Stream 进阶配置——高吞吐量(三)——批量预取消息(prefetch) Spring Cloud Stream 进阶配置——高吞吐量(二)——弹性消费者数量 Spring Cloud Stream 进阶配置——高吞吐量(一)——多消费者
作文投稿

Spring Cloud Stream 进阶配置——使用延迟队列实现“定时关闭超时未支付订单”一文由杰瑞文章网免费提供,本站为公益性作文网站,此作文为网上收集或网友提供,版权归原作者所有,如果侵犯了您的权益,请及时与我们联系,我们会立即删除!

杰瑞文章网友情提示:请不要直接抄作文用来交作业。你可以学习、借鉴、期待你写出更好的作文。

说说你对这篇作文的看法吧