在这里插入图片描述

Ⅰ. 安装 RabbitMQ

RabbitMQ 服务安装与使用

sudo apt install -y rabbitmq-server

​ 一般来说默认端口为 15672

# 启动服务
sudo systemctl start rabbitmq-server.service
# 查看服务状态
sudo systemctl status rabbitmq-server.service

# 安装完成的时候默认有个用户 guest ,但是权限不够,要创建一个 administrator 用户,才可以做为远程登录和发表订阅消息:
 # 添加用户和密码
sudo rabbitmqctl add_user root 123456
 # 设置用户tag
sudo rabbitmqctl set_user_tags root administrator
 # 设置用户权限
sudo rabbitmqctl set_permissions -p / root "." "." ".*"

# RabbitMQ 自带了 web 管理界面,执行下面命令开启
sudo rabbitmq-plugins enable rabbitmq_management

​ 网页访问的默认端口为 15672:(注意 rabbitmq 服务的端口是 5672,注意区分!

在这里插入图片描述

安装 RabbitMQ 的 C++ 客户端库

  • C 语言库:https://github.com/alanxz/rabbitmq-c
  • C++库: https://github.com/CopernicaMarketingSoftware/AMQP-CPP/tree/master

​ 我们这里使用 AMQP-CPP 库来编写客户端程序!

安装 AMQP-CPP

sudo apt install -y libev-dev #libev 网络库组件
git clone https://github.com/CopernicaMarketingSoftware/AMQP-CPP.git
cd AMQP-CPP/
make 
sudo make install

​ 如果安装时候出现了 SSL 相关的版本问题,可以通过以下命令来解决:

dpkg -l |grep ssl
sudo dpkg -P --force-all libevent-openssl-2.1-7
sudo dpkg -P --force-all openssl
sudo dpkg -P --force-all libssl-dev 
sudo apt --fix-broken install

​ 然后重新 make 即可!

Ⅱ. RabbitMQ 的介绍

RabbitMQ 是一个基于 AMQPAdvanced Message Queuing Protocol)的开源消息代理,广泛用于消息传递和消息队列管理。其核心功能是 提供异步的消息传递机制,支持多个消费者和生产者之间的松耦合通信

RabbitMQ 服务与客户端的通信原理

  1. 连接建立(Connection)
  • 客户端通过 AMQP 协议与 RabbitMQ 服务器建立连接。RabbitMQ 使用 TCP/IP 作为传输协议,默认端口为 5672
  • 客户端首先创建一个与 RabbitMQTCP 连接,这个连接是双向的,可以承载多个 通道Channel)。
  1. 通道(Channel)
  • 在一个 TCP 连接上,客户端和 RabbitMQ 可以开辟多个通道。每个通道都是一个独立的、轻量级的通信渠道,用于进行消息的发送与接收。
  • 每个通道都对应一个 AMQP 会话,所有操作(如声明队列、发布消息、消费消息等)都通过通道进行。
  • 这样做的好处是,一个 TCP 连接可以并发处理多个请求,减少了连接的开销和延迟。
  1. 声明队列(Queue)
  • 在客户端和 RabbitMQ 服务器之间进行消息传递前,客户端需要声明一个队列。队列是消息的存储位置,消息会被传递到指定的队列中。
  • 队列是由生产者声明的,消费者会从队列中获取消息!
  • 也可以通过 DurabilityPersistence 来确保队列和消息在 RabbitMQ 服务器重启后不丢失。
  1. 交换机(Exchange)
  • 交换机是 RabbitMQ 中的核心组件,负责根据路由规则将消息分发到适当的队列。客户端将消息发送到交换机,而交换机会根据绑定的路由键(routing key)将消息路由到对应的队列。
  • RabbitMQ 支持几种类型的交换机:
    • Direct Exchange:根据路由键一对一将消息发送到队列。
    • Fanout Exchange:将消息广播到所有绑定到它的队列。
    • Topic Exchange:根据路由键模式(类似正则表达式)将消息路由到一个或多个队列。
    • Headers Exchange:通过消息头部的属性来决定消息的路由。
  1. 发布消息(Publishing)
  • 生产者(Producer:生产者将消息发布到交换机。生产者通常不会直接与队列通信,而是将消息发送到交换机,交换机决定消息的路由。
  • 消息可以携带属性,如 routing key(路由键),用于帮助交换机决定消息的路由方向。
  • 如果消息发布时队列未创建,生产者可以要求 RabbitMQ 自动创建队列,或自己事先声明队列。
  1. 消息消费(Consuming)
  • 消费者(Consumer:消费者订阅队列,从队列中获取消息并进行处理。一个队列可以有多个消费者,RabbitMQ 会轮流将消息分发给所有绑定的消费者。
  • 当消息被消费者成功消费后,可以根据消息的 确认机制 来决定是否从队列中删除消息。
  • 消费者可以使用两种主要的确认方式:
    • 自动确认:消费者在获取消息后会自动确认消息已处理,RabbitMQ 会将该消息从队列中移除。
    • 手动确认:消费者处理完消息后需要明确调用确认方法,确保消息在处理完成后才会被移除。
  1. 消息确认(Acknowledgment)
  • 为了保证消息的可靠性,RabbitMQ 提供了消息确认机制。当消费者从队列中获取到消息后,可以选择确认消息,表示消息已成功处理。
  • 如果消费者在处理消息时出现异常,消息不会被确认,RabbitMQ 会将消息重新放入队列供其他消费者重新消费(也可以设置消息的重试次数和延迟时间)。
  1. 消息持久化(Persistence)
  • 队列持久化:在声明队列时,可以将其设置为持久化,确保即使 RabbitMQ 服务器重启,队列本身依然存在。
  • 消息持久化:可以通过将消息标记为持久化(delivery_mode=2)来确保消息在 RabbitMQ 重启后不丢失。
  1. 消息路由(Routing)
  • 交换机根据绑定到它的队列和路由键,决定如何将消息路由到一个或多个队列。
  • 如果消息没有找到匹配的队列,通常情况下,消息会被丢弃,除非启用了死信队列 DLQDead Letter Queue)来接收无法路由的消息。

简单通信流程

在这里插入图片描述

  1. 生产者 将消息发送到指定的 交换机,消息携带一个路由键。
  2. 交换机 根据路由规则将消息发送到一个或多个队列。
  3. 消费者 监听并消费从队列中获取到的消息,进行处理。
  4. 消费者处理完消息后,通过确认机制告知 RabbitMQ 消息已处理。
  5. RabbitMQ 根据确认机制和消息持久化设置,确保消息不会丢失。

Ⅲ. AMQP-CPP 库的简单使用

一、介绍

AMQP-CPP 是一个用于实现 AMQP (Advanced Message Queuing Protocol) 协议的 C++ 库。AMQP 是一种广泛应用于消息中间件的开放标准协议,支持在分布式系统中高效、安全地交换消息。AMQP-CPP 库使得 C++ 开发者能够轻松实现 AMQP 协议的客户端和服务器,通常用于与 RabbitMQ 等消息中间件进行交互。

AMQP-CPP 完全异步,没有阻塞式的系统调用,不使用线程就能够应用在高性能应用中。

​ 注意事项:这个库需要 c++17 的支持

​ 主要功能有:

  • 连接和通道管理AMQP-CPP 支持通过 AMQP 协议连接到 RabbitMQ 服务器,并能够创建多个通道进行并发消息处理。
  • 消息发布与订阅:可以方便地进行消息发布(生产者)和订阅(消费者),支持不同类型的交换机(DirectTopicFanout 等)和队列。
  • 队列管理:支持创建、删除、绑定和监听队列。
  • 异常处理和重试机制:内置异常处理,能够在连接断开后自动进行重试,确保消息通信的稳定性。

二、使用

通常 AMQP-CPP 的使用有两种模式:

  1. 使用默认的 TCP 模块进行网络通信(相当于自己手动实现,比较麻烦!)

    • 实现一个类继承自 AMQP::TcpHandler 类, 它负责网络层的 TCP 连接
    • 重写相关函数, 其中必须重写 monitor 函数
    • monitor 函数中需要实现的是将 fd 放入 eventloopselectepoll)中监控, 当 fd 可写可读就绪之后, 调用 AMQP-CPPconnection->process(fd, flags) 方法
  2. 使用扩展的 libeventlibevlibuvasio 异步通信组件进行通信(相当于有现成的组件供我们直接使用,我们优先用这种模式!

    • libev 为例, 我们不必要自己实现 monitor 函数, 可以直接使用 AMQP::LibEvHandler

​ 在本项目编译链接的时候,记得链接动态库:-lamqpcpp -lev

三、常用类与接口介绍

Channel 类

channel 是一个虚拟连接,一个连接上可以建立多个通道。并且所有的 RabbitMq 指令都是通过 channel 传输,所以连接建立后的第一步,就是建立 channel

​ 因为所有操作是异步的,所以在 channel 上执行指令的返回值并不能作为操作执行结果,实际上它返回的是 Deferred 类,可以使用它安装处理函数。

namespace AMQP {
    /**
     * 通用回调函数类型,用于许多延迟处理对象
     */
    using SuccessCallback = std::function<void()>;  			    // 成功回调函数类型,无参数
    using ErrorCallback = std::function<void(const char *message)>;  // 错误回调函数类型,带错误信息
    using FinalizeCallback = std::function<void()>;  			    // 完成回调函数类型,无参数

    // 队列回调,返回队列名称、消息数和消费者数
    using QueueCallback = std::function<void(const std::string &name, 
                                             uint32_t messagecount, 
                                             uint32_t consumercount)>;  
    // 删除队列回调,返回删除的消息数
    using DeleteCallback = std::function<void(uint32_t deletedmessages)>; 
	
    // 消息接收回调,返回消息对象、交付标签和是否重新投递的标志
    using MessageCallback = std::function<void(const Message &message, 
                                               uint64_t deliveryTag, 
                                               bool redelivered)>;  
    /**
     * 发布者确认回调,返回消息交付标签和是否是多个消息
     * 当服务器确认消息已被接收和处理时,触发此回调
     */
    using AckCallback = std::function<void(uint64_t deliveryTag, bool multiple)>;  

    /**
     * 使用确认包裹通道时,当消息被 ack/nacked 时,调用这些回调
     */
    using PublishAckCallback = std::function<void()>;   // 发布确认回调
    using PublishNackCallback = std::function<void()>;  // 发布未确认回调
    using PublishLostCallback = std::function<void()>;  // 发布消息丢失回调

    /**
     * Channel 类定义
     * 管理与 RabbitMQ 服务器之间的消息通道
     */
    class Channel {
    public:
        // 构造函数,创建一个通道,并与指定连接绑定
        Channel(Connection *connection);

        // 检查通道是否已经连接
        bool connected();

        /**
         * 声明交换机
         * 如果交换机名称为空,服务器会自动分配一个名称
         * 支持的交换机类型:fanout(广播)、direct(直接)、topic(主题)、headers、consistent_hash、message_deduplication
         * 以下 flags 可用于交换机:
         *	 -durable 持久化,重启后交换机依然有效
         *	 -autodelete 删除所有连接的队列后,自动删除交换
         *	 -passive 仅被动检查交换机是否存在
         *	 -internal 创建内部交换
         * 此函数返回一个延迟处理程序,可以安装回调 using onSuccess(), onError() 和 onFinalize()
         * 这里我们只用到 onError(),如下所示:
         *   Deferred &onError(const char *message)
 		*/
        Deferred &declareExchange(
            const std::string_view &name,  // 交换机名称
            ExchangeType type,  // 交换机类型
            int flags,  // 交换机标志
            const Table &arguments  // 其他参数
        );

        /**
         * 声明队列
         * 如果队列名称为空,服务器会自动分配一个名称
         * flags 可以是以下值的组合:
         *	 -durable 持久队列在代理重新启动后仍然有效
         *	 -autodelete 当所有连接的使用者都离开时,自动删除队列
         *	 -passive 仅被动检查队列是否存在
         *	 -exclusive 队列仅存在于此连接,并且在连接断开时自动删除
         *
         * 可以安装的 onSuccess()回调应该具有以下签名:
             void myCallback(const std::string &name, 
             				uint32_t messageCount, 
             				uint32_t consumerCount);
             例如:
             channel.declareQueue("myqueue").onSuccess(
             		[](const std::string &name, 
             		   uint32_t messageCount,
             		   uint32_t consumerCount) {
             	std::cout << "Queue '" << name << "' ";
             	std::cout << "has been declared with ";
             	std::cout << messageCount;
                 std::cout << " messages and ";
                 std::cout << consumerCount;
                 std::cout << " consumers" << std::endl;
             });
         */
        DeferredQueue &declareQueue(
            const std::string_view &name,  // 队列名称
            int flags,  // 队列标志
            const Table &arguments  // 其他参数
        );

        /**
         * 将队列绑定到交换机
         * 参数:交换机名称、队列名称、路由密钥以及其他绑定参数
         */
        Deferred &bindQueue(
            const std::string_view &exchange,    // 交换机名称
            const std::string_view &queue,     	 // 队列名称
            const std::string_view &routingkey,  // 路由密钥
            const Table &arguments  // 其他绑定参数
        );

        /**
         * 将消息发布到交换机
         * 必须提供交换机的名称和路由密钥,然后 RabbitMQ 会将消息发送到符合条件的队列
         * 可选的 flags 参数可以指定如果消息无法路由到队列时应该发生的情况。默认情况下,不可更改的消息将被静默地丢弃。
         * 如果设置了 mandatory 或 immediate 标志,则无法处理的消息将返回到应用程序。
         * 在开始发布之前,请确保已经调用了 recall() 方法,并设置了所有适当的处理程序来处理这些返回的消息。
         * 可以提供以下 flags:
         *   -mandatory 如果设置,服务器将返回未发送到队列的消息
         *   -immediate 如果设置,服务器将返回无法立即转发给使用者的消息。
         */
        bool publish(
            const std::string_view &exchange,    // 交换机名称
            const std::string_view &routingKey,  // 路由密钥
            const std::string &message,  // 消息内容
            int flags = 0  // 可选标志
        );

        /**
         * 告诉 RabbitMQ 服务器我们已经准备好开始消费队列消息
         * 如果未指定消费者标签,服务器会自动分配一个
         * 支持以下 flags: 
         *   -nolocal 如果设置了,则不会同时消耗在此通道上发布的消息
         *   -noack 如果设置了,则不必对已消费的消息进行确认
         *   -exclusive 请求独占访问,只有此使用者可以访问队列
         
         * 可以安装的 onSuccess()回调应该具有以下格式:
         	 void myCallback(const std::string_view&tag);
         样例:
             channel.consume("myqueue").onSuccess([](const std::string_view& tag) {
             		std::cout << "Started consuming under tag ";
             		std::cout << tag << std::endl;
             });
         */
        DeferredConsumer &consume(
            const std::string_view &queue,  // 队列名称
            const std::string_view &tag,    // 消费者标签
            int flags,  // 消费者标志
            const Table &arguments  // 其他参数
        );

        /**
         * 确认接收到的消息
         * 消费者需要确认收到消息,告知 RabbitMQ 将消息从队列中移除
         * 支持多条确认:之前所有未确认的消息将一并确认
         * 当在 DeferredConsumer::onReceived() 方法中接收到消息时,必须确认该消息,以便 RabbitMQ 将其从队列中删除(除非使用 noack 选项消费)。
         */
        bool ack(uint64_t deliveryTag, int flags = 0);  // 确认消息,返回是否成功
    };

    /**
     * DeferredConsumer 类定义
     * 管理延迟消费者,支持注册回调函数处理不同的事件
     */
    class DeferredConsumer {
    public:
        // 注册消费者启动时的回调函数
        DeferredConsumer &onSuccess(const ConsumeCallback& callback);

        // 注册接收到消息时的回调函数
        DeferredConsumer &onReceived(const MessageCallback& callback);

        // 注册接收到消息的别名
        DeferredConsumer &onMessage(const MessageCallback& callback);

        // 注册服务器取消消费者时的回调函数
        DeferredConsumer &onCancelled(const CancelCallback& callback);
    };

    /**
     * 消息类,代表通过 RabbitMQ 发送的消息
     */
    class Message : public Envelope {
    public:
        const std::string &exchange();    // 获取消息的交换机
        const std::string &routingkey();  // 获取消息的路由键
    };

    /**
     * 信封类,代表包含元数据和消息体的结构
     */
    class Envelope : public MetaData {
    public:
        const char *body();   // 获取消息体内容(没有以'\0'结尾,所以获取时候需要结合下面的bodySize()接口)
        uint64_t bodySize();  // 获取消息体的大小
    };
}

libev

libev 是一个高效的事件循环库,广泛用于事件驱动的编程模型中。它提供了一个轻量级、低延迟的事件循环实现,能够高效地处理 I/O 操作、定时器和其他异步事件。比如:

  • ev_loop:事件循环对象,管理和调度事件
  • ev_async:用于处理异步事件的结构体,通常用于线程间的异步通知
  • ev_run:启动事件循环
  • ev_break:退出事件循环
// 定义 ev_async 结构体,表示异步事件
typedef struct ev_async {
    EV_WATCHER(ev_async)  // 宏,标记此结构体为事件观察者(watcher)
    EV_ATOMIC_T sent;     // 标记是否发送过异步事件,通常是一个原子操作标志
} ev_async;

// 事件循环的中断类型枚举
enum {
    EVBREAK_CANCEL = 0,   // 取消循环,撤销unloop操作
    EVBREAK_ONE = 1,      // 退出一次循环,只停止当前事件循环
    EVBREAK_ALL = 2       // 退出所有事件循环,完全停止
};

// 默认事件循环初始化函数
// flags 参数可以指定额外的标志,默认值为 0
// EV_CPP 是用于 C++ 环境的一个宏,通常设为0表示默认行为
struct ev_loop *ev_default_loop(unsigned int flags EV_CPP (= 0));

// 宏定义,表示初始化默认的事件循环
# define EV_DEFAULT ev_default_loop(0)

// 启动事件循环,loop 是事件循环对象
// 事件循环将监听并处理所有注册的事件和回调
int ev_run(struct ev_loop *loop);

// 退出事件循环
// break_type 表示退出的类型,控制如何退出循环
//   -EVBREAK_CANCEL: 撤销之前的 unloop 操作
//   -EVBREAK_ONE: 停止一次循环
//   -EVBREAK_ALL: 停止所有循环,彻底退出
void ev_break(struct ev_loop *loop, int32_t break_type);

// 异步事件回调函数的类型定义
// 该回调函数会在事件触发时被调用
void (*callback)(struct ev_loop *loop, ev_async *watcher, int32_t revents);

// 初始化一个 ev_async 结构体,并设置其回调函数
// 当事件触发时,ev_async 的回调函数会被调用
void ev_async_init(ev_async *w, callback cb);

// 将异步事件 watcher 添加到事件循环 loop 中开始监听并等待触发
// 该事件会被添加到事件循环中,
void ev_async_start(struct ev_loop *loop, ev_async *w);

// 发送一个异步事件到事件循环中,通常用来唤醒事件循环中的等待状态
// 一旦事件被发送,事件循环将处理该异步事件
void ev_async_send(struct ev_loop *loop, ev_async *w);

使用样例

publish.cc 文件:

#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>

int main()
{
    // 1. 实例化底层网络通信框架的I/O事件监控句柄
    auto *loop = EV_DEFAULT;

    // 2. 实例化libEvHandler句柄 --- 将AMQP框架与事件监控关联起来
    AMQP::LibEvHandler handler(loop);

    // 2.5 实例化连接对象
    AMQP::Address address("amqp://root:123456@127.0.0.1:5672/");
    AMQP::TcpConnection connection(&handler, address);

    // 3. 实例化信道对象
    AMQP::TcpChannel channel(&connection);

    // 4. 声明交换机
    channel.declareExchange("test-exchange", AMQP::ExchangeType::direct) // 一对一消息传递模式
        .onError([](const char *message) {
            std::cout << "声明交换机失败:" << message << std::endl;
            exit(0);
        })
        .onSuccess([](){
            std::cout << "test-exchange 交换机创建成功!" << std::endl;
        });

    // 5. 声明队列
    channel.declareQueue("test-queue")
        .onError([](const char *message) {
            std::cout << "声明队列失败:" << message << std::endl;
            exit(0);
        })
        .onSuccess([](){
            std::cout << "test-queue 队列创建成功!" << std::endl;
        });

    // 6. 针对交换机和队列进行绑定
    channel.bindQueue("test-exchange", "test-queue", "test-queue-key")
        .onError([](const char *message) {
            std::cout << "test-exchange - test-queue 绑定失败:" << message << std::endl;
            exit(0);
        })
        .onSuccess([](){
            std::cout << "test-exchange - test-queue 绑定成功!" << std::endl;
        });

    // 7. 向交换机发布消息
    for (int i = 0; i < 10; i++) {
        std::string msg = "Hello lirendada-" + std::to_string(i);
        bool ret = channel.publish("test-exchange", "test-queue-key", msg);
        if (ret == false) {
            std::cout << "publish 失败!\n";
        }
    }

    // 启动底层网络通信框架--开启I/O
    ev_run(loop, 0);
    return 0;
}

consume.cc 文件:

#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>

//消息回调处理函数的实现
void MessageCb(AMQP::TcpChannel *channel, const AMQP::Message &message, uint64_t deliveryTag, bool redelivered)
{
    std::string msg;
    msg.assign(message.body(), message.bodySize());
    std::cout << msg << std::endl;
    channel->ack(deliveryTag); // 对消息进行确认
}

int main()
{
    // 1. 实例化底层网络通信框架的I/O事件监控句柄
    auto *loop = EV_DEFAULT;

    // 2. 实例化libEvHandler句柄 --- 将AMQP框架与事件监控关联起来
    AMQP::LibEvHandler handler(loop);

    // 2.5 实例化连接对象
    AMQP::Address address("amqp://root:123456@127.0.0.1:5672/");
    AMQP::TcpConnection connection(&handler, address);

    // 3. 实例化信道对象
    AMQP::TcpChannel channel(&connection);

    // 4. 声明交换机
    channel.declareExchange("test-exchange", AMQP::ExchangeType::direct)
        .onError([](const char *message) {
            std::cout << "声明交换机失败:" << message << std::endl;
            exit(0);
        })
        .onSuccess([](){
            std::cout << "test-exchange 交换机创建成功!" << std::endl;
        });

    // 5. 声明队列
    channel.declareQueue("test-queue")
        .onError([](const char *message) {
            std::cout << "声明队列失败:" << message << std::endl;
            exit(0);
        })
        .onSuccess([](){
            std::cout << "test-queue 队列创建成功!" << std::endl;
        });

    // 6. 针对交换机和队列进行绑定
    channel.bindQueue("test-exchange", "test-queue", "test-queue-key")
        .onError([](const char *message) {
            std::cout << "test-exchange - test-queue 绑定失败:" << message << std::endl;
            exit(0);
        })
        .onSuccess([](){
            std::cout << "test-exchange - test-queue 绑定成功!" << std::endl;
        });

    // 7. 订阅队列消息 -- 设置消息处理回调函数
    auto callback = std::bind(MessageCb, &channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
    channel.consume("test-queue", "consume-tag")  //返回值 DeferredConsumer
        .onReceived(callback)
        .onError([](const char *message){
            std::cout << "订阅 test-queue 队列消息失败:" << message << std::endl;
            exit(0);
        }); // 返回值是 AMQP::Deferred
    
    // 8. 启动底层网络通信框架--开启I/O
    ev_run(loop, 0);
    return 0;
}

makefile 文件:

all : publish consume
publish : publish.cc
	g++ -std=c++17 $^ -o $@ -lamqpcpp -lev
consume : consume.cc
	g++ -std=c++17 $^ -o $@ -lamqpcpp -lev

Ⅳ. 二次封装

​ 在项目中使用 rabbitmq 的时候,我们目前只需要交换机与队列的直接交换,实现一台主机将消息发布给另一台主机进行处理的功能,因此在这里可以对 rabbitmq 的操作进行简单的封装,使 rabbitmq 的操作在项目中更加简便:

封装一个 MQClient

  • 提供声明和绑定指定交换机与队列的功能
  • 提供向指定交换机发布消息的功能
  • 提供订阅指定队列消息,并设置回调函数进行消息消费处理的功能
// rabbitmq.hpp
#pragma once
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>
#include <iostream>
#include <functional>
#include "logger.hpp"

class MQClient 
{
public:
    using ptr = std::shared_ptr<MQClient>;
    using MessageCallback = std::function<void(const char*, size_t)>; // 定义消息回调类型

    // 构造函数,初始化连接信息,连接到AMQP服务器
    MQClient(const std::string &user, 
             const std::string passwd,
             const std::string host) 
    {
        // 初始化Libev事件循环
        _loop = EV_DEFAULT;
        // 创建AMQP的LibEv事件循环处理器
        _handler = std::make_unique<AMQP::LibEvHandler>(_loop);

        // 构建AMQP连接地址(例如:amqp://root:123456@127.0.0.1:5672/)
        std::string url = "amqp://" + user + ":" + passwd + "@" + host + "/";
        AMQP::Address address(url);

        // 创建TCP连接
        _connection = std::make_unique<AMQP::TcpConnection>(_handler.get(), address);
        // 创建TCP通道
        _channel = std::make_unique<AMQP::TcpChannel>(_connection.get());

        // 启动一个新线程,运行事件循环
        _loop_thread = std::thread([this]() {
            ev_run(_loop, 0);
        });
    }

    // 析构函数,关闭事件循环并清理资源
    ~MQClient() 
    {
            // 初始化异步事件
            ev_async_init(&_async_watcher, watcher_callback);
            ev_async_start(_loop, &_async_watcher);
            ev_async_send(_loop, &_async_watcher);

            // 等待事件循环线程结束
            _loop_thread.join();
            _loop = nullptr;
    }

    // 声明交换机、队列及绑定
    void declareComponents(const std::string &exchange, // 交换机名称
                           const std::string &queue,    // 队列名称
                           const std::string &routing_key = "routing_key",
                           AMQP::ExchangeType echange_type = AMQP::ExchangeType::direct) // 一对一模式
    {
        // 声明交换机
        _channel->declareExchange(exchange, echange_type)
            .onError([](const char *message) {
                LOG_ERROR("声明交换机失败:{}", message);
                exit(0);
            })
            .onSuccess([exchange](){
                LOG_ERROR("{} 交换机创建成功!", exchange);
            });

        // 声明队列
        _channel->declareQueue(queue)
            .onError([](const char *message) {
                LOG_ERROR("声明队列失败:{}", message);
                exit(0);
            })
            .onSuccess([queue](){
                LOG_ERROR("{} 队列创建成功!", queue);
            });

        // 绑定交换机与队列
        _channel->bindQueue(exchange, queue, routing_key)
            .onError([exchange, queue](const char *message) {
                LOG_ERROR("{} - {} 绑定失败:", exchange, queue);
                exit(0);
            })
            .onSuccess([exchange, queue, routing_key](){
                LOG_ERROR("{} - {} - {} 绑定成功!", exchange, queue, routing_key);
            });
    }

    // 发布消息到指定交换机
    bool publish(const std::string &exchange, 
                 const std::string &msg, 
                 const std::string &routing_key = "routing_key")
    {
        LOG_DEBUG("向交换机 {}-{} 发布消息!", exchange, routing_key);
        bool ret = _channel->publish(exchange, routing_key, msg);
        if (ret == false) {
            LOG_ERROR("{} 发布消息失败:", exchange);
            return false;
        }
        return true;
    }

    // 从指定队列消费消息
    void consume(const std::string &queue, const MessageCallback &cb) 
    {
        LOG_DEBUG("开始订阅 {} 队列消息!", queue);
        _channel->consume(queue, "consume-tag")  // 返回值为DeferredConsumer,而不是Deferred,所以要先用onReceived再用onError
            .onReceived([this, cb](const AMQP::Message &message, 
                                   uint64_t deliveryTag, 
                                   bool redelivered) 
            {
                cb(message.body(), message.bodySize()); // 消息处理回调
                _channel->ack(deliveryTag); // 确认消息处理
            })
            .onError([queue](const char *message){
                LOG_ERROR("订阅 {} 队列消息失败: {}", queue, message);
                exit(0);
            });
    }

private:
    // Libev事件循环异步回调
    static void watcher_callback(struct ev_loop *loop, ev_async *watcher, int32_t revents) {
        ev_break(loop, EVBREAK_ALL);  // 停止事件循环
    }

private:
    struct ev_async _async_watcher;  // 异步事件监视器
    struct ev_loop *_loop;           // Libev事件循环
    std::unique_ptr<AMQP::LibEvHandler> _handler;      // Libev事件处理器
    std::unique_ptr<AMQP::TcpConnection> _connection;  // TCP连接
    std::unique_ptr<AMQP::TcpChannel> _channel;        // AMQP通道
    std::thread _loop_thread;  // 事件循环线程
};

测试代码

publish.cc 文件:

#include "../../../header/rabbitmq.hpp"
#include "../../../header/logger.hpp"
#include <gflags/gflags.h>

DEFINE_string(user, "root", "rabbitmq访问用户名");
DEFINE_string(pswd, "123456", "rabbitmq访问密码");
DEFINE_string(host, "127.0.0.1:5672", "rabbitmq服务器地址信息 host:port");

DEFINE_bool(run_mode, false, "程序的运行模式,false-调试; true-发布;");
DEFINE_string(log_file, "", "发布模式下,用于指定日志的输出文件");
DEFINE_int32(log_level, 0, "发布模式下,用于指定日志输出等级");

int main(int argc, char *argv[])
{
    google::ParseCommandLineFlags(&argc, &argv, true);
    init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);

    // 创建客户端对象
    MQClient client(FLAGS_user, FLAGS_pswd, FLAGS_host);

    // 声明交换机、队列及绑定
    client.declareComponents("test-exchange", "test-queue");

    // 发布消息
    for (int i = 0; i < 10; i++) 
    {
        std::string msg = "Hello lirendada-" + std::to_string(i);
        bool ret = client.publish("test-exchange", msg);
        if (ret == false) {
            LOG_WARN("publish 失败!\n");
        }
    }
    std::this_thread::sleep_for(std::chrono::seconds(3));
    return 0;
}

consume.cc 文件:

#include "../../../header/rabbitmq.hpp"
#include "../../../header/logger.hpp"
#include <gflags/gflags.h>

DEFINE_string(user, "root", "rabbitmq访问用户名");
DEFINE_string(pswd, "123456", "rabbitmq访问密码");
DEFINE_string(host, "127.0.0.1:5672", "rabbitmq服务器地址信息 host:port");

DEFINE_bool(run_mode, false, "程序的运行模式,false-调试; true-发布;");
DEFINE_string(log_file, "", "发布模式下,用于指定日志的输出文件");
DEFINE_int32(log_level, 0, "发布模式下,用于指定日志输出等级");

void callback(const char *body, size_t sz)
{
    std::string msg;
    msg.assign(body, sz);
    LOG_DEBUG("{}", msg);
}

int main(int argc, char *argv[])
{
    google::ParseCommandLineFlags(&argc, &argv, true);
    init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);

    // 创建客户端对象
    MQClient client(FLAGS_user, FLAGS_pswd, FLAGS_host);

    // 声明交换机、队列及绑定
    client.declareComponents("test-exchange", "test-queue");

    // 订阅消息
    client.consume("test-queue", callback);
    std::this_thread::sleep_for(std::chrono::seconds(60));
    return 0;
}

makefile 文件:

all : publish consume
publish : publish.cc
	g++ -g -std=c++17 $^ -o $@ -lamqpcpp -lev -lfmt -lspdlog -lgflags
consume : consume.cc
	g++ -g -std=c++17 $^ -o $@ -lamqpcpp -lev -lfmt -lspdlog -lgflags

在这里插入图片描述

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐