1. rabbitmq集成和redisson做延迟队列的集成可以看我的另外两篇文档spring boot 集成redisson实现延迟队列-CSDN博客    springboot集成Rabbitmq-CSDN博客
  2. 核心代码,我使用的是rabbitmq的手动消息确认机制,当我调用第三方接口返回的状态码code不等于200 时,我会放入延迟队列中,10秒后再次放入rebbitmq中,在redis中使用一个键记录重试的次数,重试三次后如果仍然失败,basicReject拒绝消息后将会自动放入死信队列中。注意:每一条消息都需要basicAck,不然消息不会删除,而是以unasked还保留在正常队列中,如果重启服务,会造成二次消费
     try {
                // 处理消息逻辑
                log.info("接收到支付回调消息:" + message);
     //业务逻辑
                if (code.equals("200")) {
                    // 手动确认消息
                    channel.basicAck(rabbitMessage.getMessageProperties().getDeliveryTag(), false);
                    return;
                } else {
                    //判断该订单的key是否存在
                    Boolean aBoolean = redisService.hasKey(out_trade_no);
                    if (aBoolean) {
                        int cacheObject = redisService.getCacheObject(out_trade_no);
                        if (cacheObject>3){
                                // 重试多次之后仍失败,进入死信队列
                                channel.basicReject(rabbitMessage.getMessageProperties().getDeliveryTag(), false);
                                return;
                        }
                        //消息重试小于3次,重新放入消息队列中重试
                        cacheObject++;
                        System.out.println("重试" + (cacheObject-1) + "次");
                        channel.basicAck(rabbitMessage.getMessageProperties().getDeliveryTag(), false);
                        redisService.setCacheObject(out_trade_no, cacheObject, 300L, TimeUnit.SECONDS);
                        redisDelayQueueServer.addDelayQueue(jsonObject.toString(),10L,TimeUnit.SECONDS, RedisDelayQueueEnum.TEST_QUEUE.getQueueName());
    //                    rabbitTemplate.convertAndSend("testExchange","test",jsonObject.toJSONString(),retryMsg -> {
    //                        // 设置延迟毫秒值
    //                        retryMsg.getMessageProperties().setExpiration(String.valueOf(10 * 1000));
    //                        return retryMsg;
    //                    });
    
                        return;
                    } else if (!aBoolean) {
                        //第一次消息重试,新建缓存,记录重试次数
                        channel.basicAck(rabbitMessage.getMessageProperties().getDeliveryTag(), false);
                        redisService.setCacheObject(out_trade_no, 1, 300L, TimeUnit.SECONDS);
                        //rabbitTemplate.convertAndSend("testExchange","test",jsonObject.toJSONString());
                        redisDelayQueueServer.addDelayQueue(jsonObject.toString(),10L,TimeUnit.SECONDS, RedisDelayQueueEnum.TEST_QUEUE.getQueueName());
                        return;
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                // 处理异常,可以选择重新抛出或者记录日志
                // 如果不发送确认,RabbitMQ将重新发送消息
                channel.basicNack(rabbitMessage.getMessageProperties().getDeliveryTag(), false, false); // 放入死信队列
                // 或者
                // throw e; // 让异常传播,Spring Retry等机制可以捕获并处理
            }
            return;
        }

    3. 死信队列声明和绑定

    //普通队列指定死信队列
     @Bean
        public Queue testQueue() {
            return QueueBuilder.durable(TEST_NAME)
                    .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME)
                    .withArgument("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_NAME)
                    .withArgument("x-max-priority", 10) // 可选:设置队列的最大优先级
                    .build();
        }
    //死信交换机
    @Bean
        public DirectExchange deadLetterExchange() {
            return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);
        }
    //死信队列
        @Bean
        public Queue failQueue() {
            return QueueBuilder.durable(DEAD_LETTER_QUEUE_NAME).build();
        }
    
    //死信队列与交换机绑定
        @Bean
        public Binding bindingDeadLetterQueue(Queue failQueue, DirectExchange deadLetterExchange) {
            // 注意:死信队列的绑定通常不需要在普通业务逻辑中显式定义,除非你有特殊的路由需求
            return BindingBuilder.bind(failQueue).to(deadLetterExchange).with(DEAD_LETTER_QUEUE_NAME);
        }

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐