一、介绍

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

  1. MQ特点: MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。MQ和JMS类似,但不同的是JMS是SUN JAVA消息中间件服务的一个标准和API定义,而MQ则是遵循了AMQP协议的具体实现和产品。

  2. 含义:RabbitMQ是一个在AMQP基础上完成的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。

  3. 概念:RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。如果不熟悉AMQP,直接看RabbitMQ的文档会比较困难。不过它也只有几个关键概念,这里简单介绍。

RabbitMQ的结构图如下:
在这里插入图片描述
在这里插入图片描述

Broker:简单来说就是消息队列服务器实体:

  • Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
  • Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
  • Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
  • Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
  • vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
  • producer:消息生产者,就是投递消息的程序。
  • consumer:消息消费者,就是接受消息的程序。
  • channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

消息队列的使用过程大概如下:

  • 客户端连接到消息队列服务器,打开一个channel。
  • 客户端声明一个exchange,并设置相关属性。
  • 客户端声明一个queue,并设置相关属性。
  • 客户端使用routing key,在exchange和queue之间建立好绑定关系。
  • 客户端投递消息到exchange。

exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。

exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing
key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。对key进行模式匹配后进行投递的叫做Topic交换机,符号#匹配一个或多个词,符号*匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。还有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。

————————————————
版权声明:以上为CSDN博主「默-存」的原创文章,遵循CC 4.0 BY-SA版权协议,转载附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/typ1805/article/details/82835318

二、引入RabbitMQ配置工作

pom

	<!--RabbitMq-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

yml

spring:
  #配置rabbitmq
  rabbitmq:
    host: 127.0.0.1  #服务地址
    port: 5672       #端口号
    username: guest  #账号
    password: guest  #密码

config:

	/**
	*为了让保存到消息队列中的对象能以json的格式展示需要
	*/
	@Bean
    public MessageConverter messageConverter(){
        return  new Jackson2JsonMessageConverter();
    }

三、基本使用

SpringBoot项目中可以使用RabbitTemplate直接对RabbitMQ进行操作。

	@Autowired
    private RabbitTemplate rabbitTemplate;

一、队列——Queue

在这里插入图片描述
生产者发送的消息,根据先后顺便被保存到消息队列中,等待着被消费者取出,一旦一条消息被取出之后,这条消息在队列中就会被删除。
在这里插入图片描述
多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。

新建队列的几个属性:

  • name:队列名字
  • durable:是否支持持久化,默认true
  • exclusive:表示该消息队列是否只在当前connection中生效,默认是false
  • autoDelete:没有消息和连接时是否自动删除,默认时false

在这里插入图片描述

    //添加一个名为hello的队列
     @Bean
    public Queue helloQueue() {

        /**
         * name:队列名字
         * durable:是否支持持久化,默认true
         * exclusive:表示该消息队列是否只在当前connection中生效,默认是false
         * autoDelete:没有消息和连接时是否自动删除,默认时false
         */
        Queue hello = new Queue("hello",true,false,true);
        return hello;
    }

添加消息进队列:因为没有交换器,所以绑定key(routingKey)和队列名字(queue)相同

 	/**
     * 直接把消息发送到hello队列中
     * (这里的hello实际上是routingKey,因为没有交换器,所以直接匹配名字为“hello”的队列)
     */
    @GetMapping("/sendHello")
    public R sendHello(){
        String message = "hello:helloQueue";
        log.info("发送消息到hellod队列:"+message);
        rabbitTemplate.convertAndSend("hello",message);
        return R.ok();
    }

项目启动执行sendHello方法可以发现队列hello已经加到rabbitmq中且有一条消息
在这里插入图片描述
取出消息有两种方式:

  1. 手动取出消息
 	/**
     * 从hello队列中取出发送的消息
     */
    @GetMapping("/getHello")
    public R getHello(){
        Object helloMessage = rabbitTemplate.receiveAndConvert("hello");
        log.info("从hello队列中取出发送的消息:"+helloMessage);
        return R.ok(helloMessage);
    }
  1. 通过监听队列自动获取消息——@RabbitListener
	@RabbitListener(queues = "hello")
    public void getHelloLer(String helloMessage){
        log.info("监听hello队列获取消息:"+helloMessage);
    }

二、交换器

生产者可以通过把消息放给交换器,交换器根据一定的绑定规则,当一条消息发送过来的时候传递给一个或者多个队列,使每个被传递的队列中都有这条消息。
在这里插入图片描述
在这里插入图片描述

新建交换器时的几个属性

  • name:交换器的名字
  • durable:是否支持持久化,默认为true
  • autoDelete:不使用时是否自动删除,默认为false

在这里插入图片描述

交换器分为四个类型:
类型一、direct

direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。
在这里插入图片描述

//==========================direct==========================================================

    /**
     * 直连交换器
     */
    @Bean
    public DirectExchange directExchange(){
        /**
         * name:交换器的名字
         * durable:是否支持持久化,默认为true
         * autoDelete:不使用时是否自动删除,默认为false
         */
        return new DirectExchange("directExchange",true,true);
    }

    /**
     * user队列
     */
    @Bean
    public Queue userQueue() {
        return new Queue("user",true,false,true);
    }

    /**
     * user2队列
     */
    @Bean
    public Queue userQueue2() {
        return new Queue("user2",true,false,true);
    }

    /**
     * 把user队列绑定到 directExchange 交换器中 routingKey 为 user
     * @param userQueue user队列,注意参数名要和user队列的方法名统一
     * @param directExchange 直连交换器,注意参数名要和直连交换器方法名统一
     */
    @Bean
    public Binding bindingDirectUserQueue(Queue userQueue,DirectExchange directExchange){
        return BindingBuilder.bind(userQueue).to(directExchange).with("user_routingKey");
    }
    /**
     * 把user2队列绑定到 directExchange 交换器中 routingKey 为 user
     * @param userQueue2 user队列,注意参数名要和user队列的方法名统一
     * @param directExchange 直连交换器,注意参数名要和直连交换器方法名统一
     */
    @Bean
    public Binding bindingDirectUserQueue2(Queue userQueue2,DirectExchange directExchange){
        return BindingBuilder.bind(userQueue2).to(directExchange).with("user_routingKey");
    }

启动项目可以发现绑定关系已经生效
在这里插入图片描述

发送消息给directExchange。注意这次不是发送消息给队列,而是发送给交换器

	/**
     * 发送消息到 directExchange 交换器中
     * @return
     */
    @GetMapping("/sendUser")
    public R sendUser(){

        User user = new User();
        user.setCreateTime(new Date());
        user.setId(12);
        user.setGuid("liguanghao");
        log.info("发送user消息到交换器:"+user);
		//发送消息给 directExchange,设置 routingKey 为 user_routingKey
        rabbitTemplate.convertAndSend("directExchange","user_routingKey",user);
        return R.ok();
    }

在这里插入图片描述
两个队列各自取出其中的消息

	/**
     * 从user队列中手动取出消息
     */
    @GetMapping("/getUser")
    public R getUser(){
        User user = (User) rabbitTemplate.receiveAndConvert("user");
        log.info("从user队列中手动取出消息:"+user);
        return R.ok(user);
    }

    /**
     * 从user2队列中手动取出消息
     * @return
     */
    @GetMapping("/getUser2")
    public R getUser2(){
        User user = (User) rabbitTemplate.receiveAndConvert("user2");
        log.info("从user2队列中手动取出消息:"+user);
        return R.ok(user);
    }

监听模式取出消息

	/**
     * 监听 user 和 user2 队列,自动取出消息
     * @param user
     */
    @RabbitListener(queues = {"user","user2"})
    public void getUser2Ler(User user){
        log.info("监听user和user2队列,自动取出消息:"+user);
    }
类型二、fanout

消息路由到那些binding key与routing key完全匹配的Queue中。
在这里插入图片描述

添加队列,交换器并绑定

//==============================fanout================================================================

   /**
     * 广播类型的交换器
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange",true,true);
    }

    // 创建消息队列 A、B、C

    @Bean
    public Queue AMessage() {
        return new Queue("fanout.A",true,false,true);
    }

    @Bean
    public Queue BMessage() {
        return new Queue("fanout.B",true,false,true);
    }

    @Bean
    public Queue CMessage() {
        return new Queue("fanout.C",true,false,true);
    }

    // 队列和交换器绑定

    @Bean
    public Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }

    @Bean
    public Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(BMessage).to(fanoutExchange);
    }

    @Bean
    public Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(CMessage).to(fanoutExchange);
    }

发送消息和取出消息

    /**
     * 发送广播消息
     */
    @GetMapping("/sendFanout")
    public R sendFanout(){
        String message = "fanout-Message";//发送到队列的消息内容
        
        //因为时广播,所以路由key:routingKey 设置为空就可以了,也可以随意写。但是不能没有这个入参!
        rabbitTemplate.convertAndSend("fanoutExchange","",message);
        return R.ok();
    }

    //从三个队列中自动取出消息

    @RabbitListener(queues = "fanoutQA")
    public void getFanoutA(String message){
        log.info("fanoutQA:"+message);
    }

    @RabbitListener(queues = "fanoutQB")
    public void getFanoutB(String message){
        log.info("fanoutQB:"+message);
    }

    @RabbitListener(queues = "fanoutQC")
    public void getFanoutC(String message){
        log.info("fanoutQC:"+message);
    }

类型三、topic

前面讲到direct类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定:

  • routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
  • binding key与routing key一样也是句点号“. ”分隔的字符串
  • binding key中可以存在两种特殊字符“”与“#”,用于做模糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
  • 比如:
abcd       不匹配     abc#        更不匹配   abc*
abc.de     不匹配     abc.d#      更不匹配   abc.d*

上边两种的routingKey的写法都是错误的。

abc.de     匹配       abc.#       也匹配   abc.*
abc.de.fg  匹配       abc.#       不匹配   abc.*

总结:被 . 分割的字符串算是一个单词;# 和 * 代表被 . 分割的单词;# 可以代表多个被 . 分割的单词即 # 和 * 两边必须是 . 。

在这里插入图片描述
创建队列和交换器并绑定

//========================topic=====================================================================

    /**
     * 根据规则匹配实现多对多的交换器
     * @return
     */
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("topicExchange",true,true);
    }

    // 创建两个队列

    @Bean
    public Queue queueMessage1() {
        return new Queue("topicQ1",true,false,true);
    }

    @Bean
    public Queue queueMessage2() {
        return new Queue("topicQ2",true,false,true);
    }

    // 队列和交换器绑定

    /**
     * 将队列topic.messages1与exchange绑定,binding_key为topic.#,模糊匹配
     */
    @Bean
    public Binding bindingExchangeMessage1(Queue queueMessage1, TopicExchange topicExchange) {
        return BindingBuilder.bind(queueMessage1).to(topicExchange).with("topic.#");
    }

    /**
     * 将队列topic.messages2与exchange绑定,binding_key为topic.#,模糊匹配
     */
    @Bean
    public Binding bindingExchangeMessage2(Queue queueMessage2, TopicExchange topicExchange) {
        return BindingBuilder.bind(queueMessage2).to(topicExchange).with("topic.*");
    }

发送消息和取出消息

 	/**
     * 发送topic消息
     * @return
     */
    @GetMapping("/sendTopic")
    public R sendTopic(String s){
        String message = "topic-Message";
        String routingKey = "topic."+s;

        log.info("发送topic消息,消息内容为:"+message+"路由key为:"+routingKey);

        rabbitTemplate.convertAndSend("topicExchange",routingKey,message);
        return R.ok();
    }

    //两个队列自动取消息

    @RabbitListener(queues = "topicQ1")
    public void getTopic1(String message){
        log.info("topicQ1:"+message);
    }

    @RabbitListener(queues = "topicQ2")
    public void getTopic2(String message){
        log.info("topicQ2:"+message);
    }
类型四、headers

headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。

在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。

该类型的Exchange没有用到过(不过也应该很有用武之地),所以不做介绍。

三、注解版的使用

注解版的rabbitmq整合springboot不需要再config配置类中通过@Bean来配置队列、交换器和绑定,而是通过在消费者区消息的时候通过@RabbitListener及里边的参数来创建队列或者交换器并绑定的。

  1. 直接创建队列并发送消息
	/**
     * 发送消息
     */
    @GetMapping("/sentNote")
    public void sentNote(){
       	String message = "noteMessage";
        rabbitTemplate.convertAndSend("noteRK",message);
        log.info("发送注解消息:"+message);
    }

    /**
     * 自动取出消息
     * queuesToDeclare:创建队列
     * name:队列名
     * autoDelete:当没有人用到队列的时候是否自动删除
     */
    @RabbitListener(
            queuesToDeclare = @Queue(name = "noteRK",autoDelete = "true")
    )
    public void getNoteLer(String s){
        log.info("取出消息:"+s);
    }

@RabbitListener可以标注在类上通过@RabbitHandler标注方法取出消息

@RabbitListener(queuesToDeclare = @Queue("noteRK"))
@Slf4j
public class NoteLerController {

    @RabbitHandler
    public void getNoteLer(String s){
      log.info("自动取出消息类:"+s);
    }

}
  1. 通过交换器发送消息
//==============direct类型的交换器=========================================================
    /**
     * 发送direct消息
     */
    @GetMapping("/sentNoteDic")
    public void sentNoteDic(){
        log.info("sentNoteDic");

        String message = "noteDicMessage";

        rabbitTemplate.convertAndSend("noteDicExchange","noteDicRK",message);

        log.info("发送direct消息:"+message);
    }

    /**
     * 自动取出direct消息
     * bindings:创建绑定关系
     * value:创建队列  @Queue:后边可以什么都不写,什么都不写的情况会创建一个临时队列名字随机生成一个如:Queue spring.gen-YaKy7ELPRkKgaKAkVWJ0ng,临时的队列autoDelete为true
     * exchange:创建交换器 :name:指定交换器的名字; type:指定交换器的类型,默认为:direct;  autoDelete:不适用时是否自动删除 ; declare:是否持久化
     * key:绑定关系的key,可以是多个
     */
    @RabbitListener(
            bindings =@QueueBinding(
                    value = @Queue(name = "noteDicQ",autoDelete = "true"),
                    exchange = @Exchange(name = "noteDicExchange",type = ExchangeTypes.DIRECT,autoDelete = "true",declare = "true"),
                    key = {"noteDicRK","noteDicRK2"}
            )
    )
    public void getNoteDicLer(String s){
        log.info("取出direct消息:"+s);
    }

    //==============fanout类型的交换器=========================================================

    /**
     * 发送direct消息
     */
    @GetMapping("/sentNoteFan")
    public void sentNoteFan(){
        String message = "noteFanMessage";
        rabbitTemplate.convertAndSend("noteFanExchange","noteFanRK",message);
        log.info("发送fanout消息:"+message);
    }

    /**
     * 自动取出fanout消息
     * fanout是广播类型的交换器,所以不需要配置 key 消息发送给所有绑定了他的队列
     */
    @RabbitListener(
            bindings =@QueueBinding(
                    value = @Queue(name = "noteFanQ",autoDelete = "true"),
                    exchange = @Exchange(name = "noteFanExchange",type = ExchangeTypes.FANOUT,autoDelete = "true")
            )
    )
    public void getNoteFanLer(String s){
        log.info("取出fanout消息:"+s);
    }

    //==============topic类型的交换器=========================================================


    /**
     * 发送topic消息
     */
    @GetMapping("/sentNoteTop")
    public void sentNoteTop(){
        String message = "noteTopMessage";
        rabbitTemplate.convertAndSend("noteTopExchange","noteTop.key",message);
        log.info("发送topic消息:"+message);
    }

    /**
     * 自动取出fanout消息
     * key:可以通过模糊匹配
     */
    @RabbitListener(
            bindings =@QueueBinding(
                    value = @Queue(name = "noteTopQ",autoDelete = "true"),
                    exchange = @Exchange(name = "noteTopExchange",type = ExchangeTypes.TOPIC,autoDelete = "true"),
                    key = {"noteTop.#","#.key"}
            )
    )
    public void getNoteTopLer(String s){
        log.info("取出topic消息:"+s);
    }

四、一些属性的解释说明:

autoDelete:是否自动删除。true:自动删除;false:不自动删除
  1. 在Queue中使用:
  • 当设置成true时,如果队列创建之后没有被消费者订阅过(@RabbitListener)那么就算所有消费客户端连接断开,这个消息队列依然不会删除,只有被订阅过的消息队列,在所有消费客户端连接断开后,才会自动删除。
  • 当设置成true时,队列是否会产出和队列中是否有消息没有关系,也就是说就算队列中有未被消费的消息,当满足自动删除的条件之后依然会被直接删除。
  1. 在Exchange中的使用
  • 当设置成true时,当没有绑定的队列的时候,会被自动删除。
durable:是否支持持久化。true:持久化;false:不持久化
  1. 在Queue中使用:
  • 当设置成true时,如果RabbmitMQ服务器在使用的过程中异常宕机了,那么队列和队列中的消息就会写入服务器的磁盘中,当服务器正常启动后就会自动生成之前的消息队列和消息。
  1. 在Exchange中的使用:
  • 当设置成true时,Exchange会和队列一样持久化,没有消息。

当Queue和Exchange的durable都为true时,他们的绑定关系也是持久化的。

exclusive:表示该消息队列是否只在当前connection中生效

此属性只在Queue中使用,当为true时:

  • 只在首次声明它的连接(Connection)可使用
  • 会在其连接断开的时候自动删除,不管是否配置了autoDelete-自动删除。

临时队列的exclusive就为true。

四、消息确认机制

  1. SpringBoot整合的RabbitMQ默认机制是开启了自动确认消息的机制,当消息生产者发送消息到队列之后,队列直接按照平均分配把所有的消息分配给了消费者,队列中的这些消息就被自动删除了。但是在生产环境中,消费者在接收到分配给自己的消息后,在处理的过程中可能会出现异常,或者可能在消费一半的时候出现宕机,那么这时候就会造成消息丢失的情况。
  2. 还有一种情况可能会造成数据丢失,那就是默认情况下消息生产者发送消息给交换器之,并不知道是否发送成功,就算整个交换器也不一定有与之绑定的队列,或者不一定有满足这条消息路由key的队列。这时,这条消息就会丢失,而消息生产者是不知道的。

从上面两方面考虑,要想保证消息不丢失,需要满足如下的条件:

  • 队列不能直接把消息全部分配给消费者,最好能一次分配一条消息
  • 消费者在接收消息的时候不要自动确认消息,而是在自己正确处理完消息之后告诉队列。
  • 消息生产者在发送消息后,需要知道消息是否正确发送到了交换器且又是否发送到了队列。

生产者配置

需要在yml中配置开启交互类型

spring:
	#配置rabbitmq
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest

    #确认消息已经发送到交换器(exchange)--ConfirmCallback
    #publisher-confirms: true               #旧版的配置-已过时
    publisher-confirm-type: correlated      #新版的配置,选择确认类型为 交互类型

    #开启消息找不到队列返回机制--ReturnCallback(新版本不配此项好像也可以)
    publisher-returns: true

配置RabbitTemplate

/**
     * 给RabbitTemplate配置属性
     * @return
     */
    @Bean
    public RabbitTemplate myRabbitTemplate(ConnectionFactory connectionFactory){

        //注意创建rabbitTemplate的写法
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

        /**
          *  以下方法要想起作用,yml需要配置 publisher-confirm-type: correlated
         */

        //消息发送给没有绑定队列或者通过路由键匹配不到队列的交换机时,消息会返回给发送者,调用ConfirmCallback方法
        rabbitTemplate.setMandatory(true);

        //确认消息是否发送到exchange
        rabbitTemplate.setConfirmCallback((correlationData,ack,cause)->{
            if(ack){//发送成功
                log.info("RabbitMq消息发送到exchange成功");
            }else{//发送失败
                log.info("RabbitMq消息发送到exchange失败,原因:{"+cause+"}");
            }
        });

        //消息发送失败执行方法此方法,成功不会执行此方法
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)->{
            String correlationId = new String(message.getBody());
            log.info("RabbitMQ:{"+correlationId+"}消息路由失败," +
                     "应答码:{"+replyCode+"}," +
                     "原因:{"+replyText+"}," +
                     "交互器:{"+exchange+"}," +
                     "路由键:{"+routingKey+"}");
        });

        return rabbitTemplate;

    }

通过以上两个配置可以解决的问题是:消息生产者可以知道消息是否正确发送给了交换器;消息生产者可以知道交换器是否又与之绑定的队列且有匹配的路由key最终发送到了队列中。
至此,我们可以确认到消息从 【产生------>交换器------>队列】保证不丢失。

消费者配置

需要在yml中添加配置①开启手动应答模式,关闭自动确认;②设置每次处理请求个数

spring:
  #配置rabbitmq
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
 	#添加配置
    listener:
      simple:
        acknowledge-mode: manual  # 采用手动应答
        concurrency: 1            # 消费者最小数量
        max-concurrency: 10       # 消费者最大数量
        prefetch: 1               # 在单个请求中处理的消息个数

使用@RabbitListener接收消息的时候需要开启ACK手动确认机制

	/**
     * 注意接收消息的方式
     */
	@RabbitListener(queues = "helloQ")
    public void getHelloLer(Message message, Channel channel)  {
        try {
            String s = new String(message.getBody(),"UTF-8");

            log.info("成功接收到helloQ的消息:"+s);


            /**手动确认接受消息,告诉队列删除对应的消息
             * 参数1:获取消息的id,表明接受的是哪个消息
             * 参数2:是否批量.true:将一次性ack所有小于deliveryTag的消息。
             */
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

        } catch (IOException e) {

            log.error("手动接收消息出错");


            //下面的代码会把Unacked中的消息返回给到Ready中,队列又发送给此消费者或者其他消费者
			//根据实际情况判断是否手动返回消息,手动返回可能造成一直死循环
            /**手动退还消息
             * 参数1:获取消息的id,表明接受的是哪个消息
             * 参数2:是否批量.true:将一次性ack所有小于deliveryTag的消息。
             * 参数3:被拒绝的是否重新入队列
             */
            try {
                channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
            } catch (IOException e1) {
                log.error("手动退还消息出错");
            }
        }

    }

最后、常见问题及解决办法

1. 消费者接收消息时出现异常造成异常信息一直死循环

造成这个现象的原因是因为被@RabbitListener注解的方法在取出队列中的消息时出现异常,异常被系统抛出了,此时这条消息就会一直被这个方法取出,造成死循环。
解决办法一:使用try-catch处理取消息的操作

	@RabbitListener(queues = "user")
    public void getUserLer(User user){
        try {
            log.info("getUserLer:"+user);
            int i = 10/0;
        } catch (Exception e) {
           log.error("getUserLer出错:"+user+"message:"+e.getMessage());
        }
    }

解决办法二:设置重试次数,当超过这个次数依然失败后,这条消息就会被丢弃。
(这种方式只有在未开启手动确认消息机制的情况下才有效)

rabbitmq:
    listener:
      simple:
        retry:
            max-attempts: 5         # 设置重试次数

2.消息确认机制的消费者接收消息时出现异常问题

如果使用消息确认机制,消费方在确认消息

	/**手动确认接受消息,告诉队列删除对应的消息
	 * 参数1:获取消息的id,表明接受的是哪个消息
	 * 参数2:是否批量.true:将一次性ack所有小于deliveryTag的消息。
	 */
	channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

之前出现异常的话,消息就会因为一直为被确认而处于Unacked中

1.等待投递给消费者的消息(下图中的Ready部分)
2.已经投递给消费者,但是还没有收到消费者确认信号的消息(下图中的Unacked部分)

  • 如果RabbitMQ一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则RabbitMQ会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可能还是原来的那个消费者。
  • RabbitMQ不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开,这么设计的原因是RabbitMQ允许消费者消费一条消息的时间可以很久很久。

在这里插入图片描述
此时这条消息就会一直存在unacked中,也不会被其他消费者消费,直到这个出现异常的消费者断开连接,当此消费者断开连接之后,这条消息就会进入ready中,在此之前,当有新的消息进入队列中时会进入Ready中;这个队列能存放的Unacked个数和如下配置有关,当超过这个个数之后,这个消费者将不会接收新的消息。

rabbitmq:
    listener:
      simple:
        prefetch: 3               #在单个请求中处理的消息个数

要想因为消费者异常而没有被确认的消息重新被其他消费者接收,可以通过channel的basicNack方法来实现

	@RabbitListener(queues = "helloQ")
    public void getHelloLer(Message message, Channel channel) {

        try {

            String s = new String(message.getBody(),"UTF-8");


            if (s.equals("err")){
                throw new IOException("出现err");
            }

            log.info("成功接收到helloQ的消息:"+s);


            /**手动确认接受消息,告诉队列删除对应的消息
             * 参数1:获取消息的id,表明接受的是哪个消息
             * 参数2:是否批量.true:将一次性ack所有小于deliveryTag的消息。
             */
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

        } catch (IOException e) {

            log.error("手动接收消息出错");


            //下面的代码会把Unacked中的消息返回给到Ready中,队列又发送给此消费者或者其他消费者

            /**手动退还消息
             * 参数1:获取消息的id,表明接受的是哪个消息
             * 参数2:是否批量.true:将一次性ack所有小于deliveryTag的消息。
             * 参数3:被拒绝的是否重新入队列
             */
            try {
                channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
            } catch (IOException e1) {
                log.error("手动退还消息出错");
            }
        }

    }

当把消息重新返回给队列,如果没有其他正常的消费者,就会造成 问题一 那样死循环的问题。

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐