【c++中间件】RabbitMQ介绍 && AMQP-CPP库的使用 && 二次封装
RabbitMQ 安装与 C++ 客户端开发指南 本文介绍了 RabbitMQ 消息队列服务的安装配置及 C++ 客户端开发方法。主要内容包括: RabbitMQ 服务安装 通过 apt 安装 RabbitMQ 服务 创建管理员用户并设置权限 启用 Web 管理界面(端口 15672) C++ 客户端开发 安装 AMQP-CPP 库(支持 C++17) 两种开发模式:原生 TCP 实现或使用 li
文章目录
Ⅰ. 安装 RabbitMQ
RabbitMQ 服务安装与使用
sudo apt install -y rabbitmq-server
一般来说默认端口为 15672。
# 启动服务
sudo systemctl start rabbitmq-server.service
# 查看服务状态
sudo systemctl status rabbitmq-server.service
# 安装完成的时候默认有个用户 guest ,但是权限不够,要创建一个 administrator 用户,才可以做为远程登录和发表订阅消息:
# 添加用户和密码
sudo rabbitmqctl add_user root 123456
# 设置用户tag
sudo rabbitmqctl set_user_tags root administrator
# 设置用户权限
sudo rabbitmqctl set_permissions -p / root "." "." ".*"
# RabbitMQ 自带了 web 管理界面,执行下面命令开启
sudo rabbitmq-plugins enable rabbitmq_management
网页访问的默认端口为 15672:(注意 rabbitmq 服务的端口是 5672,注意区分!)

安装 RabbitMQ 的 C++ 客户端库
- C 语言库:https://github.com/alanxz/rabbitmq-c
- C++库: https://github.com/CopernicaMarketingSoftware/AMQP-CPP/tree/master
我们这里使用 AMQP-CPP 库来编写客户端程序!
安装 AMQP-CPP
sudo apt install -y libev-dev #libev 网络库组件
git clone https://github.com/CopernicaMarketingSoftware/AMQP-CPP.git
cd AMQP-CPP/
make
sudo make install
如果安装时候出现了
SSL相关的版本问题,可以通过以下命令来解决:dpkg -l |grep ssl sudo dpkg -P --force-all libevent-openssl-2.1-7 sudo dpkg -P --force-all openssl sudo dpkg -P --force-all libssl-dev sudo apt --fix-broken install 然后重新
make即可!
Ⅱ. RabbitMQ 的介绍
RabbitMQ 是一个基于 AMQP(Advanced Message Queuing Protocol)的开源消息代理,广泛用于消息传递和消息队列管理。其核心功能是 提供异步的消息传递机制,支持多个消费者和生产者之间的松耦合通信。
RabbitMQ 服务与客户端的通信原理
- 连接建立(Connection)
- 客户端通过
AMQP协议与RabbitMQ服务器建立连接。RabbitMQ使用TCP/IP作为传输协议,默认端口为 5672。 - 客户端首先创建一个与
RabbitMQ的TCP连接,这个连接是双向的,可以承载多个 通道(Channel)。
- 通道(Channel)
- 在一个
TCP连接上,客户端和RabbitMQ可以开辟多个通道。每个通道都是一个独立的、轻量级的通信渠道,用于进行消息的发送与接收。 - 每个通道都对应一个
AMQP会话,所有操作(如声明队列、发布消息、消费消息等)都通过通道进行。 - 这样做的好处是,一个
TCP连接可以并发处理多个请求,减少了连接的开销和延迟。
- 声明队列(Queue)
- 在客户端和
RabbitMQ服务器之间进行消息传递前,客户端需要声明一个队列。队列是消息的存储位置,消息会被传递到指定的队列中。 - 队列是由生产者声明的,消费者会从队列中获取消息!
- 也可以通过
Durability和Persistence来确保队列和消息在RabbitMQ服务器重启后不丢失。
- 交换机(Exchange)
- 交换机是
RabbitMQ中的核心组件,负责根据路由规则将消息分发到适当的队列。客户端将消息发送到交换机,而交换机会根据绑定的路由键(routing key)将消息路由到对应的队列。 RabbitMQ支持几种类型的交换机:- Direct Exchange:根据路由键一对一将消息发送到队列。
- Fanout Exchange:将消息广播到所有绑定到它的队列。
- Topic Exchange:根据路由键模式(类似正则表达式)将消息路由到一个或多个队列。
- Headers Exchange:通过消息头部的属性来决定消息的路由。
- 发布消息(Publishing)
- 生产者(
Producer):生产者将消息发布到交换机。生产者通常不会直接与队列通信,而是将消息发送到交换机,交换机决定消息的路由。 - 消息可以携带属性,如
routing key(路由键),用于帮助交换机决定消息的路由方向。 - 如果消息发布时队列未创建,生产者可以要求
RabbitMQ自动创建队列,或自己事先声明队列。
- 消息消费(Consuming)
- 消费者(
Consumer):消费者订阅队列,从队列中获取消息并进行处理。一个队列可以有多个消费者,RabbitMQ会轮流将消息分发给所有绑定的消费者。 - 当消息被消费者成功消费后,可以根据消息的 确认机制 来决定是否从队列中删除消息。
- 消费者可以使用两种主要的确认方式:
- 自动确认:消费者在获取消息后会自动确认消息已处理,
RabbitMQ会将该消息从队列中移除。 - 手动确认:消费者处理完消息后需要明确调用确认方法,确保消息在处理完成后才会被移除。
- 自动确认:消费者在获取消息后会自动确认消息已处理,
- 消息确认(Acknowledgment)
- 为了保证消息的可靠性,
RabbitMQ提供了消息确认机制。当消费者从队列中获取到消息后,可以选择确认消息,表示消息已成功处理。 - 如果消费者在处理消息时出现异常,消息不会被确认,
RabbitMQ会将消息重新放入队列供其他消费者重新消费(也可以设置消息的重试次数和延迟时间)。
- 消息持久化(Persistence)
- 队列持久化:在声明队列时,可以将其设置为持久化,确保即使
RabbitMQ服务器重启,队列本身依然存在。 - 消息持久化:可以通过将消息标记为持久化(
delivery_mode=2)来确保消息在RabbitMQ重启后不丢失。
- 消息路由(Routing)
- 交换机根据绑定到它的队列和路由键,决定如何将消息路由到一个或多个队列。
- 如果消息没有找到匹配的队列,通常情况下,消息会被丢弃,除非启用了死信队列
DLQ(Dead Letter Queue)来接收无法路由的消息。
简单通信流程

- 生产者 将消息发送到指定的 交换机,消息携带一个路由键。
- 交换机 根据路由规则将消息发送到一个或多个队列。
- 消费者 监听并消费从队列中获取到的消息,进行处理。
- 消费者处理完消息后,通过确认机制告知
RabbitMQ消息已处理。 RabbitMQ根据确认机制和消息持久化设置,确保消息不会丢失。
Ⅲ. AMQP-CPP 库的简单使用
一、介绍
AMQP-CPP 是一个用于实现 AMQP (Advanced Message Queuing Protocol) 协议的 C++ 库。AMQP 是一种广泛应用于消息中间件的开放标准协议,支持在分布式系统中高效、安全地交换消息。AMQP-CPP 库使得 C++ 开发者能够轻松实现 AMQP 协议的客户端和服务器,通常用于与 RabbitMQ 等消息中间件进行交互。
AMQP-CPP 完全异步,没有阻塞式的系统调用,不使用线程就能够应用在高性能应用中。
注意事项:这个库需要 c++17 的支持。
主要功能有:
- 连接和通道管理:
AMQP-CPP支持通过AMQP协议连接到RabbitMQ服务器,并能够创建多个通道进行并发消息处理。 - 消息发布与订阅:可以方便地进行消息发布(生产者)和订阅(消费者),支持不同类型的交换机(
Direct、Topic、Fanout等)和队列。 - 队列管理:支持创建、删除、绑定和监听队列。
- 异常处理和重试机制:内置异常处理,能够在连接断开后自动进行重试,确保消息通信的稳定性。
二、使用
通常 AMQP-CPP 的使用有两种模式:
-
使用默认的
TCP模块进行网络通信(相当于自己手动实现,比较麻烦!)- 实现一个类继承自
AMQP::TcpHandler类, 它负责网络层的TCP连接 - 重写相关函数, 其中必须重写
monitor函数 - 在
monitor函数中需要实现的是将fd放入eventloop(select、epoll)中监控, 当fd可写可读就绪之后, 调用AMQP-CPP的connection->process(fd, flags)方法
- 实现一个类继承自
-
使用扩展的
libevent、libev、libuv、asio异步通信组件进行通信(相当于有现成的组件供我们直接使用,我们优先用这种模式!)- 以
libev为例, 我们不必要自己实现monitor函数, 可以直接使用AMQP::LibEvHandler
- 以
在本项目编译链接的时候,记得链接动态库:-lamqpcpp -lev。
三、常用类与接口介绍
Channel 类
channel 是一个虚拟连接,一个连接上可以建立多个通道。并且所有的 RabbitMq 指令都是通过 channel 传输,所以连接建立后的第一步,就是建立 channel。
因为所有操作是异步的,所以在 channel 上执行指令的返回值并不能作为操作执行结果,实际上它返回的是 Deferred 类,可以使用它安装处理函数。
namespace AMQP {
/**
* 通用回调函数类型,用于许多延迟处理对象
*/
using SuccessCallback = std::function<void()>; // 成功回调函数类型,无参数
using ErrorCallback = std::function<void(const char *message)>; // 错误回调函数类型,带错误信息
using FinalizeCallback = std::function<void()>; // 完成回调函数类型,无参数
// 队列回调,返回队列名称、消息数和消费者数
using QueueCallback = std::function<void(const std::string &name,
uint32_t messagecount,
uint32_t consumercount)>;
// 删除队列回调,返回删除的消息数
using DeleteCallback = std::function<void(uint32_t deletedmessages)>;
// 消息接收回调,返回消息对象、交付标签和是否重新投递的标志
using MessageCallback = std::function<void(const Message &message,
uint64_t deliveryTag,
bool redelivered)>;
/**
* 发布者确认回调,返回消息交付标签和是否是多个消息
* 当服务器确认消息已被接收和处理时,触发此回调
*/
using AckCallback = std::function<void(uint64_t deliveryTag, bool multiple)>;
/**
* 使用确认包裹通道时,当消息被 ack/nacked 时,调用这些回调
*/
using PublishAckCallback = std::function<void()>; // 发布确认回调
using PublishNackCallback = std::function<void()>; // 发布未确认回调
using PublishLostCallback = std::function<void()>; // 发布消息丢失回调
/**
* Channel 类定义
* 管理与 RabbitMQ 服务器之间的消息通道
*/
class Channel {
public:
// 构造函数,创建一个通道,并与指定连接绑定
Channel(Connection *connection);
// 检查通道是否已经连接
bool connected();
/**
* 声明交换机
* 如果交换机名称为空,服务器会自动分配一个名称
* 支持的交换机类型:fanout(广播)、direct(直接)、topic(主题)、headers、consistent_hash、message_deduplication
* 以下 flags 可用于交换机:
* -durable 持久化,重启后交换机依然有效
* -autodelete 删除所有连接的队列后,自动删除交换
* -passive 仅被动检查交换机是否存在
* -internal 创建内部交换
* 此函数返回一个延迟处理程序,可以安装回调 using onSuccess(), onError() 和 onFinalize()
* 这里我们只用到 onError(),如下所示:
* Deferred &onError(const char *message)
*/
Deferred &declareExchange(
const std::string_view &name, // 交换机名称
ExchangeType type, // 交换机类型
int flags, // 交换机标志
const Table &arguments // 其他参数
);
/**
* 声明队列
* 如果队列名称为空,服务器会自动分配一个名称
* flags 可以是以下值的组合:
* -durable 持久队列在代理重新启动后仍然有效
* -autodelete 当所有连接的使用者都离开时,自动删除队列
* -passive 仅被动检查队列是否存在
* -exclusive 队列仅存在于此连接,并且在连接断开时自动删除
*
* 可以安装的 onSuccess()回调应该具有以下签名:
void myCallback(const std::string &name,
uint32_t messageCount,
uint32_t consumerCount);
例如:
channel.declareQueue("myqueue").onSuccess(
[](const std::string &name,
uint32_t messageCount,
uint32_t consumerCount) {
std::cout << "Queue '" << name << "' ";
std::cout << "has been declared with ";
std::cout << messageCount;
std::cout << " messages and ";
std::cout << consumerCount;
std::cout << " consumers" << std::endl;
});
*/
DeferredQueue &declareQueue(
const std::string_view &name, // 队列名称
int flags, // 队列标志
const Table &arguments // 其他参数
);
/**
* 将队列绑定到交换机
* 参数:交换机名称、队列名称、路由密钥以及其他绑定参数
*/
Deferred &bindQueue(
const std::string_view &exchange, // 交换机名称
const std::string_view &queue, // 队列名称
const std::string_view &routingkey, // 路由密钥
const Table &arguments // 其他绑定参数
);
/**
* 将消息发布到交换机
* 必须提供交换机的名称和路由密钥,然后 RabbitMQ 会将消息发送到符合条件的队列
* 可选的 flags 参数可以指定如果消息无法路由到队列时应该发生的情况。默认情况下,不可更改的消息将被静默地丢弃。
* 如果设置了 mandatory 或 immediate 标志,则无法处理的消息将返回到应用程序。
* 在开始发布之前,请确保已经调用了 recall() 方法,并设置了所有适当的处理程序来处理这些返回的消息。
* 可以提供以下 flags:
* -mandatory 如果设置,服务器将返回未发送到队列的消息
* -immediate 如果设置,服务器将返回无法立即转发给使用者的消息。
*/
bool publish(
const std::string_view &exchange, // 交换机名称
const std::string_view &routingKey, // 路由密钥
const std::string &message, // 消息内容
int flags = 0 // 可选标志
);
/**
* 告诉 RabbitMQ 服务器我们已经准备好开始消费队列消息
* 如果未指定消费者标签,服务器会自动分配一个
* 支持以下 flags:
* -nolocal 如果设置了,则不会同时消耗在此通道上发布的消息
* -noack 如果设置了,则不必对已消费的消息进行确认
* -exclusive 请求独占访问,只有此使用者可以访问队列
* 可以安装的 onSuccess()回调应该具有以下格式:
void myCallback(const std::string_view&tag);
样例:
channel.consume("myqueue").onSuccess([](const std::string_view& tag) {
std::cout << "Started consuming under tag ";
std::cout << tag << std::endl;
});
*/
DeferredConsumer &consume(
const std::string_view &queue, // 队列名称
const std::string_view &tag, // 消费者标签
int flags, // 消费者标志
const Table &arguments // 其他参数
);
/**
* 确认接收到的消息
* 消费者需要确认收到消息,告知 RabbitMQ 将消息从队列中移除
* 支持多条确认:之前所有未确认的消息将一并确认
* 当在 DeferredConsumer::onReceived() 方法中接收到消息时,必须确认该消息,以便 RabbitMQ 将其从队列中删除(除非使用 noack 选项消费)。
*/
bool ack(uint64_t deliveryTag, int flags = 0); // 确认消息,返回是否成功
};
/**
* DeferredConsumer 类定义
* 管理延迟消费者,支持注册回调函数处理不同的事件
*/
class DeferredConsumer {
public:
// 注册消费者启动时的回调函数
DeferredConsumer &onSuccess(const ConsumeCallback& callback);
// 注册接收到消息时的回调函数
DeferredConsumer &onReceived(const MessageCallback& callback);
// 注册接收到消息的别名
DeferredConsumer &onMessage(const MessageCallback& callback);
// 注册服务器取消消费者时的回调函数
DeferredConsumer &onCancelled(const CancelCallback& callback);
};
/**
* 消息类,代表通过 RabbitMQ 发送的消息
*/
class Message : public Envelope {
public:
const std::string &exchange(); // 获取消息的交换机
const std::string &routingkey(); // 获取消息的路由键
};
/**
* 信封类,代表包含元数据和消息体的结构
*/
class Envelope : public MetaData {
public:
const char *body(); // 获取消息体内容(没有以'\0'结尾,所以获取时候需要结合下面的bodySize()接口)
uint64_t bodySize(); // 获取消息体的大小
};
}
libev
libev 是一个高效的事件循环库,广泛用于事件驱动的编程模型中。它提供了一个轻量级、低延迟的事件循环实现,能够高效地处理 I/O 操作、定时器和其他异步事件。比如:
ev_loop:事件循环对象,管理和调度事件ev_async:用于处理异步事件的结构体,通常用于线程间的异步通知ev_run:启动事件循环ev_break:退出事件循环
// 定义 ev_async 结构体,表示异步事件
typedef struct ev_async {
EV_WATCHER(ev_async) // 宏,标记此结构体为事件观察者(watcher)
EV_ATOMIC_T sent; // 标记是否发送过异步事件,通常是一个原子操作标志
} ev_async;
// 事件循环的中断类型枚举
enum {
EVBREAK_CANCEL = 0, // 取消循环,撤销unloop操作
EVBREAK_ONE = 1, // 退出一次循环,只停止当前事件循环
EVBREAK_ALL = 2 // 退出所有事件循环,完全停止
};
// 默认事件循环初始化函数
// flags 参数可以指定额外的标志,默认值为 0
// EV_CPP 是用于 C++ 环境的一个宏,通常设为0表示默认行为
struct ev_loop *ev_default_loop(unsigned int flags EV_CPP (= 0));
// 宏定义,表示初始化默认的事件循环
# define EV_DEFAULT ev_default_loop(0)
// 启动事件循环,loop 是事件循环对象
// 事件循环将监听并处理所有注册的事件和回调
int ev_run(struct ev_loop *loop);
// 退出事件循环
// break_type 表示退出的类型,控制如何退出循环
// -EVBREAK_CANCEL: 撤销之前的 unloop 操作
// -EVBREAK_ONE: 停止一次循环
// -EVBREAK_ALL: 停止所有循环,彻底退出
void ev_break(struct ev_loop *loop, int32_t break_type);
// 异步事件回调函数的类型定义
// 该回调函数会在事件触发时被调用
void (*callback)(struct ev_loop *loop, ev_async *watcher, int32_t revents);
// 初始化一个 ev_async 结构体,并设置其回调函数
// 当事件触发时,ev_async 的回调函数会被调用
void ev_async_init(ev_async *w, callback cb);
// 将异步事件 watcher 添加到事件循环 loop 中开始监听并等待触发
// 该事件会被添加到事件循环中,
void ev_async_start(struct ev_loop *loop, ev_async *w);
// 发送一个异步事件到事件循环中,通常用来唤醒事件循环中的等待状态
// 一旦事件被发送,事件循环将处理该异步事件
void ev_async_send(struct ev_loop *loop, ev_async *w);
使用样例
publish.cc 文件:
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>
int main()
{
// 1. 实例化底层网络通信框架的I/O事件监控句柄
auto *loop = EV_DEFAULT;
// 2. 实例化libEvHandler句柄 --- 将AMQP框架与事件监控关联起来
AMQP::LibEvHandler handler(loop);
// 2.5 实例化连接对象
AMQP::Address address("amqp://root:123456@127.0.0.1:5672/");
AMQP::TcpConnection connection(&handler, address);
// 3. 实例化信道对象
AMQP::TcpChannel channel(&connection);
// 4. 声明交换机
channel.declareExchange("test-exchange", AMQP::ExchangeType::direct) // 一对一消息传递模式
.onError([](const char *message) {
std::cout << "声明交换机失败:" << message << std::endl;
exit(0);
})
.onSuccess([](){
std::cout << "test-exchange 交换机创建成功!" << std::endl;
});
// 5. 声明队列
channel.declareQueue("test-queue")
.onError([](const char *message) {
std::cout << "声明队列失败:" << message << std::endl;
exit(0);
})
.onSuccess([](){
std::cout << "test-queue 队列创建成功!" << std::endl;
});
// 6. 针对交换机和队列进行绑定
channel.bindQueue("test-exchange", "test-queue", "test-queue-key")
.onError([](const char *message) {
std::cout << "test-exchange - test-queue 绑定失败:" << message << std::endl;
exit(0);
})
.onSuccess([](){
std::cout << "test-exchange - test-queue 绑定成功!" << std::endl;
});
// 7. 向交换机发布消息
for (int i = 0; i < 10; i++) {
std::string msg = "Hello lirendada-" + std::to_string(i);
bool ret = channel.publish("test-exchange", "test-queue-key", msg);
if (ret == false) {
std::cout << "publish 失败!\n";
}
}
// 启动底层网络通信框架--开启I/O
ev_run(loop, 0);
return 0;
}
consume.cc 文件:
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>
//消息回调处理函数的实现
void MessageCb(AMQP::TcpChannel *channel, const AMQP::Message &message, uint64_t deliveryTag, bool redelivered)
{
std::string msg;
msg.assign(message.body(), message.bodySize());
std::cout << msg << std::endl;
channel->ack(deliveryTag); // 对消息进行确认
}
int main()
{
// 1. 实例化底层网络通信框架的I/O事件监控句柄
auto *loop = EV_DEFAULT;
// 2. 实例化libEvHandler句柄 --- 将AMQP框架与事件监控关联起来
AMQP::LibEvHandler handler(loop);
// 2.5 实例化连接对象
AMQP::Address address("amqp://root:123456@127.0.0.1:5672/");
AMQP::TcpConnection connection(&handler, address);
// 3. 实例化信道对象
AMQP::TcpChannel channel(&connection);
// 4. 声明交换机
channel.declareExchange("test-exchange", AMQP::ExchangeType::direct)
.onError([](const char *message) {
std::cout << "声明交换机失败:" << message << std::endl;
exit(0);
})
.onSuccess([](){
std::cout << "test-exchange 交换机创建成功!" << std::endl;
});
// 5. 声明队列
channel.declareQueue("test-queue")
.onError([](const char *message) {
std::cout << "声明队列失败:" << message << std::endl;
exit(0);
})
.onSuccess([](){
std::cout << "test-queue 队列创建成功!" << std::endl;
});
// 6. 针对交换机和队列进行绑定
channel.bindQueue("test-exchange", "test-queue", "test-queue-key")
.onError([](const char *message) {
std::cout << "test-exchange - test-queue 绑定失败:" << message << std::endl;
exit(0);
})
.onSuccess([](){
std::cout << "test-exchange - test-queue 绑定成功!" << std::endl;
});
// 7. 订阅队列消息 -- 设置消息处理回调函数
auto callback = std::bind(MessageCb, &channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
channel.consume("test-queue", "consume-tag") //返回值 DeferredConsumer
.onReceived(callback)
.onError([](const char *message){
std::cout << "订阅 test-queue 队列消息失败:" << message << std::endl;
exit(0);
}); // 返回值是 AMQP::Deferred
// 8. 启动底层网络通信框架--开启I/O
ev_run(loop, 0);
return 0;
}
makefile 文件:
all : publish consume
publish : publish.cc
g++ -std=c++17 $^ -o $@ -lamqpcpp -lev
consume : consume.cc
g++ -std=c++17 $^ -o $@ -lamqpcpp -lev
Ⅳ. 二次封装
在项目中使用 rabbitmq 的时候,我们目前只需要交换机与队列的直接交换,实现一台主机将消息发布给另一台主机进行处理的功能,因此在这里可以对 rabbitmq 的操作进行简单的封装,使 rabbitmq 的操作在项目中更加简便:
封装一个 MQClient:
- 提供声明和绑定指定交换机与队列的功能
- 提供向指定交换机发布消息的功能
- 提供订阅指定队列消息,并设置回调函数进行消息消费处理的功能
// rabbitmq.hpp
#pragma once
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>
#include <iostream>
#include <functional>
#include "logger.hpp"
class MQClient
{
public:
using ptr = std::shared_ptr<MQClient>;
using MessageCallback = std::function<void(const char*, size_t)>; // 定义消息回调类型
// 构造函数,初始化连接信息,连接到AMQP服务器
MQClient(const std::string &user,
const std::string passwd,
const std::string host)
{
// 初始化Libev事件循环
_loop = EV_DEFAULT;
// 创建AMQP的LibEv事件循环处理器
_handler = std::make_unique<AMQP::LibEvHandler>(_loop);
// 构建AMQP连接地址(例如:amqp://root:123456@127.0.0.1:5672/)
std::string url = "amqp://" + user + ":" + passwd + "@" + host + "/";
AMQP::Address address(url);
// 创建TCP连接
_connection = std::make_unique<AMQP::TcpConnection>(_handler.get(), address);
// 创建TCP通道
_channel = std::make_unique<AMQP::TcpChannel>(_connection.get());
// 启动一个新线程,运行事件循环
_loop_thread = std::thread([this]() {
ev_run(_loop, 0);
});
}
// 析构函数,关闭事件循环并清理资源
~MQClient()
{
// 初始化异步事件
ev_async_init(&_async_watcher, watcher_callback);
ev_async_start(_loop, &_async_watcher);
ev_async_send(_loop, &_async_watcher);
// 等待事件循环线程结束
_loop_thread.join();
_loop = nullptr;
}
// 声明交换机、队列及绑定
void declareComponents(const std::string &exchange, // 交换机名称
const std::string &queue, // 队列名称
const std::string &routing_key = "routing_key",
AMQP::ExchangeType echange_type = AMQP::ExchangeType::direct) // 一对一模式
{
// 声明交换机
_channel->declareExchange(exchange, echange_type)
.onError([](const char *message) {
LOG_ERROR("声明交换机失败:{}", message);
exit(0);
})
.onSuccess([exchange](){
LOG_ERROR("{} 交换机创建成功!", exchange);
});
// 声明队列
_channel->declareQueue(queue)
.onError([](const char *message) {
LOG_ERROR("声明队列失败:{}", message);
exit(0);
})
.onSuccess([queue](){
LOG_ERROR("{} 队列创建成功!", queue);
});
// 绑定交换机与队列
_channel->bindQueue(exchange, queue, routing_key)
.onError([exchange, queue](const char *message) {
LOG_ERROR("{} - {} 绑定失败:", exchange, queue);
exit(0);
})
.onSuccess([exchange, queue, routing_key](){
LOG_ERROR("{} - {} - {} 绑定成功!", exchange, queue, routing_key);
});
}
// 发布消息到指定交换机
bool publish(const std::string &exchange,
const std::string &msg,
const std::string &routing_key = "routing_key")
{
LOG_DEBUG("向交换机 {}-{} 发布消息!", exchange, routing_key);
bool ret = _channel->publish(exchange, routing_key, msg);
if (ret == false) {
LOG_ERROR("{} 发布消息失败:", exchange);
return false;
}
return true;
}
// 从指定队列消费消息
void consume(const std::string &queue, const MessageCallback &cb)
{
LOG_DEBUG("开始订阅 {} 队列消息!", queue);
_channel->consume(queue, "consume-tag") // 返回值为DeferredConsumer,而不是Deferred,所以要先用onReceived再用onError
.onReceived([this, cb](const AMQP::Message &message,
uint64_t deliveryTag,
bool redelivered)
{
cb(message.body(), message.bodySize()); // 消息处理回调
_channel->ack(deliveryTag); // 确认消息处理
})
.onError([queue](const char *message){
LOG_ERROR("订阅 {} 队列消息失败: {}", queue, message);
exit(0);
});
}
private:
// Libev事件循环异步回调
static void watcher_callback(struct ev_loop *loop, ev_async *watcher, int32_t revents) {
ev_break(loop, EVBREAK_ALL); // 停止事件循环
}
private:
struct ev_async _async_watcher; // 异步事件监视器
struct ev_loop *_loop; // Libev事件循环
std::unique_ptr<AMQP::LibEvHandler> _handler; // Libev事件处理器
std::unique_ptr<AMQP::TcpConnection> _connection; // TCP连接
std::unique_ptr<AMQP::TcpChannel> _channel; // AMQP通道
std::thread _loop_thread; // 事件循环线程
};
测试代码
publish.cc 文件:
#include "../../../header/rabbitmq.hpp"
#include "../../../header/logger.hpp"
#include <gflags/gflags.h>
DEFINE_string(user, "root", "rabbitmq访问用户名");
DEFINE_string(pswd, "123456", "rabbitmq访问密码");
DEFINE_string(host, "127.0.0.1:5672", "rabbitmq服务器地址信息 host:port");
DEFINE_bool(run_mode, false, "程序的运行模式,false-调试; true-发布;");
DEFINE_string(log_file, "", "发布模式下,用于指定日志的输出文件");
DEFINE_int32(log_level, 0, "发布模式下,用于指定日志输出等级");
int main(int argc, char *argv[])
{
google::ParseCommandLineFlags(&argc, &argv, true);
init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);
// 创建客户端对象
MQClient client(FLAGS_user, FLAGS_pswd, FLAGS_host);
// 声明交换机、队列及绑定
client.declareComponents("test-exchange", "test-queue");
// 发布消息
for (int i = 0; i < 10; i++)
{
std::string msg = "Hello lirendada-" + std::to_string(i);
bool ret = client.publish("test-exchange", msg);
if (ret == false) {
LOG_WARN("publish 失败!\n");
}
}
std::this_thread::sleep_for(std::chrono::seconds(3));
return 0;
}
consume.cc 文件:
#include "../../../header/rabbitmq.hpp"
#include "../../../header/logger.hpp"
#include <gflags/gflags.h>
DEFINE_string(user, "root", "rabbitmq访问用户名");
DEFINE_string(pswd, "123456", "rabbitmq访问密码");
DEFINE_string(host, "127.0.0.1:5672", "rabbitmq服务器地址信息 host:port");
DEFINE_bool(run_mode, false, "程序的运行模式,false-调试; true-发布;");
DEFINE_string(log_file, "", "发布模式下,用于指定日志的输出文件");
DEFINE_int32(log_level, 0, "发布模式下,用于指定日志输出等级");
void callback(const char *body, size_t sz)
{
std::string msg;
msg.assign(body, sz);
LOG_DEBUG("{}", msg);
}
int main(int argc, char *argv[])
{
google::ParseCommandLineFlags(&argc, &argv, true);
init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);
// 创建客户端对象
MQClient client(FLAGS_user, FLAGS_pswd, FLAGS_host);
// 声明交换机、队列及绑定
client.declareComponents("test-exchange", "test-queue");
// 订阅消息
client.consume("test-queue", callback);
std::this_thread::sleep_for(std::chrono::seconds(60));
return 0;
}
makefile 文件:
all : publish consume
publish : publish.cc
g++ -g -std=c++17 $^ -o $@ -lamqpcpp -lev -lfmt -lspdlog -lgflags
consume : consume.cc
g++ -g -std=c++17 $^ -o $@ -lamqpcpp -lev -lfmt -lspdlog -lgflags

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐

所有评论(0)