PC端QT实现mqtt客户端发布和订阅
在Windows11-64位系统下使用QT开发桌面应用程序,实现mqtt客户端的发布和订阅功能。需求:mqtt代理服务器 --mosquitto;mqtt客户端工具 -- mqtt.fx;qtcreator开发工具 -- qtcreator6.8.2版本;
在Windows11-64位系统下使用QT开发桌面应用程序,实现mqtt客户端的发布和订阅功能。
需求:
mqtt代理服务器 --mosquitto;
mqtt客户端工具 -- mqtt.fx;
qtcreator开发工具 -- qtcreator6.8.2版本;
过程:
Windows下MQTT服务器搭建
安装mosquitto-1.6.9-install-windows-x64.exe,全部按默认安装即可,路径可以自己修改。
安装后在安装路径会有一个名为mosquitto.conf的配置文件,按照如下方式进行配置,默认端口1883。
修改allow_anonymous为true以允许主机匿名访问。

在mosquitto安装目录下启用终端(cmd),执行命令启动mosquitto服务

windows的mosquitto刚启动时候不会输出任何信息,只要能启动,就代表成功了。
如果想查看调试信息,那么就把log_type的配置项打开。


MQTT客户端工具使用



在客户端发布过程中,可以通过MQTT服务端终端查看相关debug,如下图:

QT开发
由于使用的qt6.8.2版本,使用mqtt驱动库需要重新编译,在安装插件时需要选择source

这边已经安装过,实际安装可能需要几个G的安装内存,安装过程中会出现一些下载错误的情况,不用管,直接重新下载就可以。

安装好后,在qt安装目录下能看到qtmqtt文件夹,那么就说明source源码安装成功。

接下来需要编译MQTT源码,需要CMake和Ninja两个命令。
执行cmake --version和ninja --version查看版本号,能查看说明安装成功。注意需要更新环境变量path。

CSDN上有比较详细的编译MQTT驱动方式,这里简单介绍下。
mkdir build
cd build
cmake -G "Ninja" -DCMAKE_PREFIX_PATH=C:/Qt/6.8.2/msvcxxx_64 ../
ninja
ninja install
有的情况下,可能需要配置环境变量path

一般这样就能完成mqtt编译,打开qt工程,如果出现以下输出,说明工程没有获取到mqtt驱动,再仔细检查下,还有qt工程的.pro中增加mqtt模块。


QT开发UI界面

QT代码
#include "widget.h"
#include "ui_widget.h"
Widget::Widget(QWidget *parent)
: QWidget(parent)
, ui(new Ui::Widget)
{
ui->setupUi(this);
tcpSocket = new QTcpSocket(this);
connect(tcpSocket,SIGNAL(readyRead()),this,SLOT(readMessage()));
//连接信号和相应槽函数
connect(tcpSocket,SIGNAL(connected()),this,SLOT(sendMessage()));
connect(tcpSocket,SIGNAL(error(QAbstractSocket::SocketError)),
this,SLOT(displayError(QAbstractSocket::SocketError)));
connect(ui->pushButton_recv,SIGNAL(clicked()),this,SLOT(pushButton_recv_clicked()));
connect(ui->pushButton_del,SIGNAL(clicked()),this,SLOT(pushButton_del_clicked()));
//连接mqtt
connect(&mqtt_client, &QMqttClient::connected, [&]() {
ui->pushButton_mqttconnect->setText("mqtt断开");
ui->lineEdit_host->setEnabled(false);
ui->lineEdit_port->setEnabled(false);
ui->pushButton_mqttrecv->setEnabled(true);
ui->pushButton_mqttsend->setEnabled(true);
});
//断开mqtt
connect(&mqtt_client, &QMqttClient::disconnected, [&]() {
ui->pushButton_mqttconnect->setText("mqtt连接");
ui->lineEdit_host->setEnabled(true);
ui->lineEdit_port->setEnabled(true);
});
// 检测连接状态
// connect(&mqtt_client, &QMqttClient::stateChanged, [&](QMqttClient::ClientState state) {
// if (state == QMqttClient::Connected) {
// //qDebug() << "Connected to MQTT broker!";
// ui->textEdit_db->append("Connected to MQTT broker!"+ui->lineEdit_host->text()+":"+ui->lineEdit_port->text());
// } else if (state == QMqttClient::Disconnected) {
// //qDebug() << "Disconnected from MQTT broker!";
// ui->textEdit_db->append("Disconnected from MQTT broker!"+ui->lineEdit_host->text()+":"+ui->lineEdit_port->text());
// } else if (state == QMqttClient::Connecting) {
// //qDebug() << "Connecting to MQTT broker...";
// ui->textEdit_db->append("Connecting to MQTT broker...!"+ui->lineEdit_host->text()+":"+ui->lineEdit_port->text());
// }
// });
// 连接错误时的槽函数
connect(&mqtt_client, &QMqttClient::errorChanged, [&](QMqttClient::ClientError error) {
//qDebug() << "Error occurred:" << error;
ui->textEdit_db->append("Error occurred: "+QString::fromStdString(std::to_string(error)));
});
//接收消息
connect(&mqtt_client, &QMqttClient::messageReceived, [&](const QByteArray &message, const QMqttTopicName &topic) {
ui->textEdit_db->append("mqtt_recv: "+topic.name()+":"+message);
});
connect(ui->pushButton_mqttconnect,SIGNAL(clicked()),this,SLOT(pushButton_mqttconnect_clicked()));
connect(ui->pushButton_mqttrecv,SIGNAL(clicked()),this,SLOT(pushButton_mqttrecv_clicked()));
connect(ui->pushButton_mqttsend,SIGNAL(clicked()),this,SLOT(pushButton_mqttsend_clicked()));
ui->pushButton_mqttrecv->setEnabled(false);
ui->pushButton_mqttsend->setEnabled(false);
// 设置 QTextEdit 无法选择文本
ui->textEdit_db->setTextInteractionFlags(Qt::NoTextInteraction);
}
Widget::~Widget()
{
delete ui;
}
void Widget::connect_db()
{
QStringList drivers = QSqlDatabase::drivers();
foreach(QString driver, drivers) {
ui->textEdit_db->append(driver);
}
ui->textEdit_db->append("test.");
db = QSqlDatabase::addDatabase("QMYSQL");
db.setHostName("118.31.2.73");
db.setPort(3306); // MySQL的默认端口是3306
db.setDatabaseName("test_db");
db.setUserName("root");
db.setPassword("123456");
if (!db.open()) {
ui->textEdit_db->append("Error: Unable to connect to the database.");
return;
}
//ui->pushButton_mqttconnect->setText("数据库断连");
}
void Widget::select_db()
{
/*
check_flag = 0;
check_value.fill("Default", 10); // 设置列表大小为 10,并用 "Default" 填充
if(ui->checkBox_ar->isChecked())
{
check_value[check_flag] = ui->checkBox_ar->text();
++check_flag;
}
if(ui->checkBox_hum->isChecked())
{
check_value[check_flag] = ui->checkBox_hum->text();
++check_flag;
}
if(ui->checkBox_tem->isChecked())
{
check_value[check_flag] = ui->checkBox_tem->text();
++check_flag;
}
model = new QStandardItemModel(0, 0, this); // 0 行 0 列
switch (check_flag) {
case 1:
model->setHorizontalHeaderLabels({check_value[0], "date"});
break;
case 2:
model->setHorizontalHeaderLabels({check_value[0], check_value[1], "date"});
break;
case 3:
model->setHorizontalHeaderLabels({check_value[0], check_value[1], check_value[2], "date"});
break;
default:
model->setHorizontalHeaderLabels({check_value[0], check_value[1], check_value[2], "date"});
break;
}
QSqlQuery query;
query.exec("SELECT * FROM sensor_data");
while (query.next()) {
QString hum = query.value(0).toString(); // 获取第一列的值
QString tem = query.value(1).toString(); // 获取第二列的值
QString ar = "";
QString created_at = query.value(2).toString(); // 获取第三列的值
QList<QStandardItem*> row;
switch(check_flag)
{
case 1:
if(check_value[0] == "hum")
row << new QStandardItem(hum) << new QStandardItem(created_at);
if(check_value[0] == "tem")
row << new QStandardItem(tem) << new QStandardItem(created_at);
if(check_value[0] == "ar")
row << new QStandardItem(ar) << new QStandardItem(created_at);
break;
case 2:
if(check_value[0] == "hum" && check_value[1] == "tem")
row << new QStandardItem(hum) << new QStandardItem(tem) << new QStandardItem(created_at);
if(check_value[0] == "ar" && check_value[1] == "hum")
row << new QStandardItem(ar) << new QStandardItem(hum) << new QStandardItem(created_at);
if(check_value[0] == "ar" && check_value[1] == "tem")
row << new QStandardItem(ar) << new QStandardItem(tem) << new QStandardItem(created_at);
break;
case 3:
row << new QStandardItem(ar) << new QStandardItem(hum) << new QStandardItem(tem) << new QStandardItem(created_at);
break;
default:
row << new QStandardItem(ar) << new QStandardItem(hum) << new QStandardItem(tem) << new QStandardItem(created_at);
break;
}
model->appendRow(row); // 在末尾添加一行
}
ui->tableView->setModel(model);
*/
if (!db.open()) {
ui->textEdit_db->append("db disconnect,cannot select db");
return;
}
ui->textEdit_db->append("test select_button");
}
void Widget::close_db()
{
db.close(); //关闭数据库连接以释放资源
}
bool isValidIP(const QString &ip)
{
QRegularExpression ipRegex("^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$");
return ipRegex.match(ip).hasMatch();
}
bool isValidPort(int port)
{
return port > 0 && port <= 65535;
}
void Widget::newConnect()
{
// 创建数据模型
model = new QStandardItemModel(0, 0, this); // 0 行 0 列
QString ip = ui->hostLineEdit->text();
if (!isValidIP(ip)) {
ui->textEdit_db->append("Invalid IP address!");
return;
}
int port = ui->portLineEdit->text().toInt();
if (!isValidPort(port)) {
ui->textEdit_db->append("Invalid port number!");
return;
}
check_flag = 0;
check_value.fill("Default", 10); // 设置列表大小为 10,并用 "Default" 填充
if(ui->checkBox_ar->isChecked())
{
check_value[check_flag] = ui->checkBox_ar->text();
++check_flag;
}
if(ui->checkBox_hum->isChecked())
{
check_value[check_flag] = ui->checkBox_hum->text();
++check_flag;
}
if(ui->checkBox_tem->isChecked())
{
check_value[check_flag] = ui->checkBox_tem->text();
++check_flag;
}
switch (check_flag) {
case 1:
model->setHorizontalHeaderLabels({check_value[0], "created_at"});
break;
case 2:
model->setHorizontalHeaderLabels({check_value[0], check_value[1], "created_at"});
break;
case 3:
model->setHorizontalHeaderLabels({check_value[0], check_value[1], check_value[2], "created_at"});
break;
default:
model->setHorizontalHeaderLabels({check_value[0], check_value[1], check_value[2], "created_at"});
break;
}
//连接到主机,这里从界面获取主机地址和端口号
tcpSocket->connectToHost(ui->hostLineEdit->text(),
ui->portLineEdit->text().toInt());
// 你可以检查连接状态来确定是否已成功断开,如何已经断开连接,waitForDisconnected会直接返回0
if (tcpSocket->waitForDisconnected(3000)) { // 等待最多3000毫秒
ui->textEdit_db->append("Disconnected successfully.");
} else {
ui->textEdit_db->append("Failed to disconnect within the timeout period.");
}
}
void Widget::readMessage()
{
QDataStream in(tcpSocket);
in.setVersion(QDataStream::Qt_5_14);
if (tcpSocket->bytesAvailable() == 0) return;
QByteArray data = tcpSocket->readAll();
QString receive_data = QString::fromUtf8(data);
QStringList lines = receive_data.split('\n');
for(const QString &line : lines){
// 跳过空行(可选)
if (line.trimmed().isEmpty()) {
continue;
}
ui->textEdit_db->setText(line);
int columnCount = model->columnCount();
QList<QStandardItem*> row;
int i = 0;
for(; i < columnCount-1; i++)
{
QVariant columnName = model->headerData(i, Qt::Horizontal, Qt::DisplayRole);
// 正则表达式用于匹配数字(整数和浮点数)
QRegularExpression regex(QString(R"(%1=(\d+(?:\.\d+)?))").arg(columnName.toString()));
QRegularExpressionMatchIterator j = regex.globalMatch(line);
while(j.hasNext())
{
QRegularExpressionMatch match = j.next();
QString matchedString = match.captured(1);
row << new QStandardItem(matchedString);
}
}
QVariant columnName1 = model->headerData(i, Qt::Horizontal, Qt::DisplayRole);
ui->textEdit_db->append(columnName1.toString());
// 正则表达式用于匹配日期
QRegularExpression regex1(QString(R"(%1=(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}))").arg(columnName1.toString()));
QRegularExpressionMatchIterator k = regex1.globalMatch(line);
while(k.hasNext())
{
QRegularExpressionMatch match1 = k.next();
QString matchedString1 = match1.captured(1);
row << new QStandardItem(matchedString1);
}
model->appendRow(row);
/*
//2025-03-03 10:11:52
QRegularExpression re("hum=(\\d+),tem=(\\d+\\.\\d+),created_at=(\\d+\\-\\d+\\-\\d+\\ \\d+\\:\\d+\\:\\d+)");
QRegularExpressionMatch match = re.match(line);
if(match.hasMatch()) {
QString hum = match.captured(1);
QString tem = match.captured(2);
QString ar = "";
QString created_at = match.captured(3);
//QString displayMessage = QString("Humidity: %1, Temperature: %2,time: %3").arg(hum).arg(tem).arg(created_at);
QList<QStandardItem*> row;
switch(check_flag)
{
case 1:
if(check_value[0] == "hum")
row << new QStandardItem(hum) << new QStandardItem(created_at);
if(check_value[0] == "tem")
row << new QStandardItem(tem) << new QStandardItem(created_at);
if(check_value[0] == "ar")
row << new QStandardItem(ar) << new QStandardItem(created_at);
break;
case 2:
if(check_value[0] == "hum" && check_value[1] == "tem")
row << new QStandardItem(hum) << new QStandardItem(tem) << new QStandardItem(created_at);
if(check_value[0] == "ar" && check_value[1] == "hum")
row << new QStandardItem(ar) << new QStandardItem(hum) << new QStandardItem(created_at);
if(check_value[0] == "ar" && check_value[1] == "tem")
row << new QStandardItem(ar) << new QStandardItem(tem) << new QStandardItem(created_at);
break;
case 3:
row << new QStandardItem(ar) << new QStandardItem(hum) << new QStandardItem(tem) << new QStandardItem(created_at);
break;
default:
row << new QStandardItem(ar) << new QStandardItem(hum) << new QStandardItem(tem) << new QStandardItem(created_at);
break;
}
model->appendRow(row); // 在末尾添加一行
}
*/
}
ui->tableView->setModel(model);
//tcpSocket->abort(); //立即关闭套接字,而不保证所有数据都已发送或接收。
tcpSocket->disconnectFromHost();//取消已有的连接
//tcpSocket->abort();
}
void Widget::sendMessage()
{
//用于暂存我们要发送的数据
QByteArray block;
//设置数据流
QDataStream out(&block,QIODevice::WriteOnly);
out.setVersion(QDataStream::Qt_DefaultCompiledVersion);
QString message_buf;
switch(check_flag)
{
case 1:
message_buf = check_value[0];
break;
case 2:
message_buf = check_value[0]+","+check_value[1];
break;
case 3:
message_buf = check_value[0]+","+check_value[1]+","+check_value[2];
break;
default:
//message_buf = check_flag;
break;
}
send_data = message_buf.toUtf8();
out<<static_cast<quint8>(check_flag);
block.append(send_data);
tcpSocket->write(block);
//发送数据成功后,显示提示
ui->textEdit_db->append("send message successful!!!");
}
void Widget::displayError(QAbstractSocket::SocketError)
{
//ui->textEdit_db->setText("error "+tcpSocket->errorString()); //输出错误信息
ui->textEdit_db->append("error "+tcpSocket->errorString());
}
void Widget::pushButton_recv_clicked() //连接按钮
{
newConnect(); //请求连接
}
void Widget::pushButton_del_clicked() //连接按钮
{
model->clear(); // 清空模型中的数据
}
void Widget::pushButton_mqttconnect_clicked()
{
if (ui->pushButton_mqttconnect->text() == "mqtt连接"){
// 将QString转换为int
bool ok;
int value = ui->lineEdit_port->text().toInt(&ok);
// 检查转换是否成功
if (ok) {
ui->textEdit_db->append("hostname:"+ui->lineEdit_host->text());
ui->textEdit_db->append("port:"+ui->lineEdit_port->text());
} else {
ui->textEdit_db->append("port failed");
return;
}
// 设置MQTT代理地址和端口
mqtt_client.setHostname(ui->lineEdit_host->text()); // 公共MQTT代理
mqtt_client.setPort(value); // MQTT默认端口
// 连接MQTT代理
mqtt_client.connectToHost();
}
else if (ui->pushButton_mqttconnect->text() == "mqtt断开"){
ui->textEdit_db->append("pushButton_db_clicked!");
mqtt_client.disconnectFromHost();
ui->pushButton_mqttrecv->setEnabled(false);
ui->pushButton_mqttsend->setEnabled(false);
}
}
void Widget::pushButton_mqttrecv_clicked()
{
// 订阅主题
mqtt_client.subscribe(ui->lineEdit_recvtopic->text());
ui->textEdit_db->append("recv-topic:"+ui->lineEdit_recvtopic->text());
}
void Widget::pushButton_mqttsend_clicked()
{
// 发布消息
mqtt_client.publish(ui->lineEdit_sendtopic->text(), ui->textEdit_send->toPlainText().toUtf8());
}
UI左侧通过选择控件(checkBox)将需要的数据内容通过tcp发送到云端获取数据,并在表格控件(tableView)中展示,UI右侧通过获取host、port等对应的输入,将这些参数通过mqtt处理连接代理服务器进行发布和订阅。该代码只是实现基本功能,各位可以增加一些前置条件,比如检测输入数据的格式是否正确、检测连接是否正常。在文本控件(textEdit)中输出一些debug打印。
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐


所有评论(0)