2

超30min订单自动取消,RabbitMQ做延迟队列,下单成功把订单号推入RabbitMQ,超30min订单自动进入DLX死信队列,消费端监听死信队列得到超时订单,订单状态置为超时,这个已完成,目前卡在30min内支付(不超时)时如何取消相应的订单?即如何在RabbitMQ中删除指定的消息?消息确认后会清除,怎样确认指定的消息?不知道我的思路有问题还是怎样,目前卡在这了,请各位大神指点一下

相关代码

下单

@Override
@Transactional(isolation = Isolation.READ_COMMITTED, rollbackFor = Exception.class)
public Result order(Long productId, Long userId) {
    // redis中商品名
    String productNameKey = "PRODUCT_NAME_" + productId;
    // redis中商品库存
    String productNumberKey = "PRODUCT_" + productId;
    // redis中已抢购成功的用户ID
    String userIdKey = "USER_" + productId;

    String productName = redisUtil.get(productNameKey, 0);
    // 缓存中有商品
    if (NumberUtils.toInt(redisUtil.get(productNumberKey, 0)) > 0) {
        // 当前用户是否已抢购成功
        if (!redisUtil.sismember(userIdKey, userId + "")) {
            // 减缓存
            redisUtil.decr(productNumberKey);
            // 减库存
            boolean flag = productMapper.reduceRepertory(productId) == 1;
            if (flag) {
                // 生成订单
                Order order = new Order();
                // 订单号
                String orderNo = "ORDER_" + DATE_TIME_FORMATTER.format(LocalDateTime.now()) + "_" + productId + "_" + userId;
                order.setOrderNo(orderNo);
                order.setOrderPrice(new BigDecimal(100));
                order.setOrderStatus(1);
                order.setUserId(userId);
                save(order);
                // 建立订单商品关系
                ProductOrder productOrder = new ProductOrder();
                productOrder.setOrderId(order.getId());
                productOrder.setProductId(productId);
                productOrderMapper.insert(productOrder);
                // 缓存userId
                redisUtil.sadd(userIdKey, userId + "");
                // 订单号orderNo 加入MQ, 30分钟未支付加入死信队列
                messageProducer.sendMessage(orderNo, 30 * 60 * 1000);
                return Result.success("下单成功, 订单号" + order.getId());
            } else {
                Product product = productMapper.selectOne(new QueryWrapper<Product>().eq("id", productId));
                // 减库存失败,重置缓存
                redisUtil.set(productNumberKey, JacksonUtil.toJson(product.getProductNumber()), 0);
                log.error("用户{}下单失败, 商品名: {}", userId, productName);
                return Result.error("202", "用户" + userId + "下单失败, 商品名" + productName);
            }
        } else {
            // 该用户已抢到商品
            log.error("您已抢到商品{},订单已生成,请去收银台支付", productName);
            return Result.error("203", "您已抢到商品" + productName + ",订单已生成,请去收银台支付");
        }
    } else {
        // 商品售罄
        log.error("商品{}已被抢光", productName);
        return Result.error("201", "商品" + productName + "已被抢光");
    }
}

支付,这里如何删除RabbitMQ中指定的消息(下单时把订单都投入RabbitMQ里了)

@Override
public Result pay(String orderNo, Long userId) {
    Order order = orderMapper.selectOne(new QueryWrapper<Order>().eq("order_no", orderNo).eq("user_id", userId));
    order.setOrderStatus(2); // 已支付
    orderMapper.updateById(order);
    // MQ中删除支付成功的订单 TODO

    return Result.success("订单" + orderNo + "支付成功");
}

发消息

@Component
@Slf4j
public class MessageProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送消息
     *
     * @param message 消息
     * @param ttl     有效时间
     */
    public void sendMessage(String message, int ttl) {
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TTL, RabbitConfig.ROUTINGKEY_TTL, message, message1 -> {
            MessageProperties messageProperties = message1.getMessageProperties();
            messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 消息持久化
            messageProperties.setExpiration(ttl + ""); // 消息TTL,单位毫秒
            return message1;
        }, correlationId);
        log.info("发送消息: {}, 有效时间: {}秒, 回调ID: {}, 当前时间: {}", message, ttl / 1000, correlationId, LocalDateTime.now().toString());
    }
}

rabbit配置,声明TTL交换器及对应队列,声明AE备份交换器及对应队列绑定到TTL交换器上,保证消息路由失败不丢失,声明DLX死信交换器及队列绑定到TTL交换器上,实现延迟效果

@Configuration
@Slf4j
public class RabbitConfig {

@Value("${spring.rabbitmq.addresses}")
private String addresses;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
@Value("${spring.rabbitmq.publisher-confirms}")
private Boolean publisherConfirms;
@Value("${spring.rabbitmq.publisher-returns}")
private Boolean publisherReturns;

/**
 * 备份交换器
 */
public static final String EXCHANGE_AE = "EXCHANGE_AE";
/**
 * TTL交换器
 */
public static final String EXCHANGE_TTL = "EXCHANGE_TTL";
/**
 * 死信交换器
 */
public static final String EXCHANGE_DLX = "EXCHANGE_DLX";

/**
 * 备份队列
 */
public static final String QUEUE_AE = "QUEUE_AE";
/**
 * TTL队列
 */
public static final String QUEUE_TTL = "QUEUE_TTL";
/**
 * 死信队列
 */
public static final String QUEUE_DLX = "QUEUE_DLX";

/**
 * TTL Routing_Key
 * 特殊字符“*”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
 */
public static final String ROUTINGKEY_TTL = "routing.ttl.#";
/**
 * 死信Routing_Key
 */
public static final String ROUTINGKEY_DLX = "routing.dlx";

@Resource
private PublishConfirm publishConfirm;
@Resource
private PublishReturnCallBack publishReturnCallBack;

@Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setAddresses(addresses);
    connectionFactory.setVirtualHost(virtualHost);
    connectionFactory.setUsername(username);
    connectionFactory.setPassword(password);
    // 消息发送到RabbitMQ交换器后接收ack回调
    connectionFactory.setPublisherConfirms(publisherConfirms);
    // 消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调
    connectionFactory.setPublisherReturns(publisherReturns);
    return connectionFactory;
}

@Bean
//    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) //必须是prototype类型
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
    rabbitTemplate.setConfirmCallback(publishConfirm);
    rabbitTemplate.setReturnCallback(publishReturnCallBack);
    return rabbitTemplate;
}

/**
 * 备份队列常用FanoutExchange
 */
@Bean
public FanoutExchange alternateExchange() {
    return new FanoutExchange(EXCHANGE_AE);
}

/**
 * TTL队列, 指定AE
 */
@Bean
public TopicExchange ttlExchange() {
    Map<String, Object> args = new HashMap<>();
    args.put("alternate-exchange", EXCHANGE_AE);
    return new TopicExchange(EXCHANGE_TTL, true, false, args);
}

/**
 * 死信交换器
 */
@Bean
public DirectExchange deathLetterExchange() {
    return new DirectExchange(EXCHANGE_DLX);
}

/**
 * 备份队列
 */
@Bean
public Queue queueAE() {
//        return new Queue(QUEUE_AE, true); //队列持久
    return QueueBuilder.durable(QUEUE_AE).build();
}

/**
 * TTL队列,指定DLX
 */
@Bean
public Queue queueTTL() {
    Map<String, Object> args = new HashMap<>();
    // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
    args.put("x-dead-letter-exchange", EXCHANGE_DLX);
    // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
    args.put("x-dead-letter-routing-key", ROUTINGKEY_DLX);
//        return new Queue(QUEUE_TTL, true, false, false, args);
    return QueueBuilder.durable(QUEUE_TTL).withArguments(args).build();
}

/**
 * 死信队列
 */
@Bean
public Queue queueDLX() {
//        return new Queue(QUEUE_DLX, true); //队列持久
    return QueueBuilder.durable(QUEUE_DLX).build();
}

/**
 * AE绑定队列
 */
@Bean
public Binding bindingAE() {
    return BindingBuilder.bind(queueAE()).to(alternateExchange());
}

/**
 * TTL绑定队列
 */
@Bean
public Binding bindingTTL() {
    return BindingBuilder.bind(queueTTL()).to(ttlExchange()).with(ROUTINGKEY_TTL);
}

/**
 * DLX绑定队列
 */
@Bean
public Binding bindingDLX() {
    return BindingBuilder.bind(queueDLX()).to(deathLetterExchange()).with(ROUTINGKEY_DLX);
}

监听死信队列,获取超时订单(未实现,目前只是demo)

@RabbitHandler // 声明接收方法
@RabbitListener(queues = {RabbitConfig.QUEUE_DLX})
public void processDLX(Message message, Channel channel) {
    String payload = new String(message.getBody());
    log.info("接收处理DLX队列当中的消息: {}, 当前时间: {}", payload, LocalDateTime.now().toString());
    try {
        // TODO 通知 MQ 消息已被成功消费,可以ACK了
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (IOException e) {
        // TODO 如果报错了,那么我们可以进行容错处理,比如转移当前消息进入其它队列
    }
}

另外,百度上找到的一个项目RabbitTemplate有这样的配置(RabbitTemplate必须是prototype类型) ,其他人项目并没有这个配置,想知道到底怎样才算对,小白一个,望大神多多指点

@Bean
//    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) //必须是prototype类型
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
    rabbitTemplate.setConfirmCallback(publishConfirm);
    rabbitTemplate.setReturnCallback(publishReturnCallBack);
    return rabbitTemplate;
}
6月2日提问
3 个回答
1

感觉不需要删除吧。消费超时订单消息的方法内,判断一下这个订单的状态是已支付还是未支付,未支付-删除订单并恢复库存,ack=true,正常消费消息。

0

建议超时订单也不用死信队列,采用普通的队列处理即可;
实际中单据真出现在死信队列中,无法分辨,存在二义性;
可以考虑采用补偿法,而且生成订单和实际支付应该是分开的步骤,请考虑实际的业务场景在做相应的处理!
希望我的回答能帮到你!

0

我是用node做的...也记不清当时的想法了...看代码也看不懂了...捂脸...

这是我的大概做法, 不保证正确哈:
大概是 3个队列

未支付订单, 处理中订单, 超时订单

未支付的订单30秒变为死信, 转发到处理中去处理(查数据库)
处理分两种情况(1.已支付: 啥也不做了, 2.未支付: 如果创建时间不足30分钟, 再次转发到未支付队列, 如果超过, 则发送到超时订单处理)

就这些了(印象中rabbitmq貌似不是特别推荐修改队列内容还是啥...忘了)

撰写答案

推广链接