springboot集成rabbitmq实现手动确认消息机制,并重试3次(redisson做延迟队列)
rabbitmq结合redisson延时队列实现手动确认消息机制和消息重试
·
- rabbitmq集成和redisson做延迟队列的集成可以看我的另外两篇文档spring boot 集成redisson实现延迟队列-CSDN博客 springboot集成Rabbitmq-CSDN博客
- 核心代码,我使用的是rabbitmq的手动消息确认机制,当我调用第三方接口返回的状态码code不等于200 时,我会放入延迟队列中,10秒后再次放入rebbitmq中,在redis中使用一个键记录重试的次数,重试三次后如果仍然失败,basicReject拒绝消息后将会自动放入死信队列中。注意:每一条消息都需要basicAck,不然消息不会删除,而是以unasked还保留在正常队列中,如果重启服务,会造成二次消费
try { // 处理消息逻辑 log.info("接收到支付回调消息:" + message); //业务逻辑 if (code.equals("200")) { // 手动确认消息 channel.basicAck(rabbitMessage.getMessageProperties().getDeliveryTag(), false); return; } else { //判断该订单的key是否存在 Boolean aBoolean = redisService.hasKey(out_trade_no); if (aBoolean) { int cacheObject = redisService.getCacheObject(out_trade_no); if (cacheObject>3){ // 重试多次之后仍失败,进入死信队列 channel.basicReject(rabbitMessage.getMessageProperties().getDeliveryTag(), false); return; } //消息重试小于3次,重新放入消息队列中重试 cacheObject++; System.out.println("重试" + (cacheObject-1) + "次"); channel.basicAck(rabbitMessage.getMessageProperties().getDeliveryTag(), false); redisService.setCacheObject(out_trade_no, cacheObject, 300L, TimeUnit.SECONDS); redisDelayQueueServer.addDelayQueue(jsonObject.toString(),10L,TimeUnit.SECONDS, RedisDelayQueueEnum.TEST_QUEUE.getQueueName()); // rabbitTemplate.convertAndSend("testExchange","test",jsonObject.toJSONString(),retryMsg -> { // // 设置延迟毫秒值 // retryMsg.getMessageProperties().setExpiration(String.valueOf(10 * 1000)); // return retryMsg; // }); return; } else if (!aBoolean) { //第一次消息重试,新建缓存,记录重试次数 channel.basicAck(rabbitMessage.getMessageProperties().getDeliveryTag(), false); redisService.setCacheObject(out_trade_no, 1, 300L, TimeUnit.SECONDS); //rabbitTemplate.convertAndSend("testExchange","test",jsonObject.toJSONString()); redisDelayQueueServer.addDelayQueue(jsonObject.toString(),10L,TimeUnit.SECONDS, RedisDelayQueueEnum.TEST_QUEUE.getQueueName()); return; } } } catch (Exception e) { e.printStackTrace(); // 处理异常,可以选择重新抛出或者记录日志 // 如果不发送确认,RabbitMQ将重新发送消息 channel.basicNack(rabbitMessage.getMessageProperties().getDeliveryTag(), false, false); // 放入死信队列 // 或者 // throw e; // 让异常传播,Spring Retry等机制可以捕获并处理 } return; }
3. 死信队列声明和绑定
//普通队列指定死信队列 @Bean public Queue testQueue() { return QueueBuilder.durable(TEST_NAME) .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME) .withArgument("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_NAME) .withArgument("x-max-priority", 10) // 可选:设置队列的最大优先级 .build(); } //死信交换机 @Bean public DirectExchange deadLetterExchange() { return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME); } //死信队列 @Bean public Queue failQueue() { return QueueBuilder.durable(DEAD_LETTER_QUEUE_NAME).build(); } //死信队列与交换机绑定 @Bean public Binding bindingDeadLetterQueue(Queue failQueue, DirectExchange deadLetterExchange) { // 注意:死信队列的绑定通常不需要在普通业务逻辑中显式定义,除非你有特殊的路由需求 return BindingBuilder.bind(failQueue).to(deadLetterExchange).with(DEAD_LETTER_QUEUE_NAME); }

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐
所有评论(0)