先用rabbitmq控制台创建正常的交换机,队列,死信的交换机,队列,绑定关系等,查看以下博文,
只要 以下文章的1-3步,因为这里演示另种情况,消息到死信队列
rabbitmq死信队列 用rabbitmq web控制台创建交换机,队列,绑定关系,发送TTL超时消息 来做演示_rabbitmq 控制台 添加死信队列-CSDN博客

生产者为方便,直接用rabbitmq控制台发送消息

创建一个消费者工程,并手动ack确认,部分api的使用可查看以下博文,
springboot整合rabbitmq 消费者Consumer 手动进行ack确认_rabbitmq中的basicconsume方法的使用,手动确定ack-CSDN博客
改动关键代码  channel.basicNack(deliveryTag, true, false); 
就是最后一个参数 ,并且不再重新投递到原交换机 requeue=false


剩下的改动没啥变化,直接用上面的工程就可以了,为了看起来不乱,还是直接再写下吧

1 消费者工程 application.yml文件

server:
  port: 8021
spring:
  #给项目来个名字
  application:
    name: rabbitmq-test
  #配置rabbitMq 服务器
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: need
    password: 123456
    #虚拟host 可以不设置,使用server默认host
    virtual-host: /testhost
 
    #ack 确认方式
    listener:
      simple:
        acknowledge-mode: manual
      direct:
        acknowledge-mode: manual

2 消费者代码

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;

@Component
public class xiaofeizhe_1 {

    
    //监听的队列名称
    @RabbitListener(queues = "zhengchang_queue")
    /**
     * 注意:后3个参数,需要生产者发送消息时加上,否则为报错,注解里添加 required=false 或 删除参数
     */
    public void process(Message message,
                        Channel channel,
                        @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,
                        @Header(AmqpHeaders.MESSAGE_ID) String messageId,
                        @Header(AmqpHeaders.CONSUMER_TAG) String consumerTag
                        ) throws IOException {

        //long deliveryTag = message.getMessageProperties().getDeliveryTag();
        //或 
        //@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag

        try {
            String msgbody = new String(message.getBody());
            //1.接收转换消息
            System.out.println("消费者 1 收到消息  : " + msgbody + " 编号: " + deliveryTag);

            //2. 处理业务逻辑
            System.out.println("处理业务逻辑...");
            //模拟出现错误
            System.out.println(500 / Double.valueOf(msgbody));

            //3. 手动签收
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            //4.拒绝签收
            /*
            第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
            这里要演示发到死信队列,就设置为false
             */
            channel.basicNack(deliveryTag, true, false);
            //channel.basicReject(deliveryTag,false);
        }
    }
}

3 用rabbitmq控制台发送消息来演示,如果发正常的数字,就可以直接消费,但是如果发乱七八糟的文字,做不了运算,就会出错,就会通过 消息被拒绝(basicReject/ basicNack)并且不再重新投递 requeue=false,发送到死信队列

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐