在Windows11-64位系统下使用QT开发桌面应用程序,实现mqtt客户端的发布和订阅功能。

需求:

mqtt代理服务器 --mosquitto;

mqtt客户端工具 -- mqtt.fx;

qtcreator开发工具 -- qtcreator6.8.2版本;

过程:

Windows下MQTT服务器搭建

安装mosquitto-1.6.9-install-windows-x64.exe,全部按默认安装即可,路径可以自己修改。

安装后在安装路径会有一个名为mosquitto.conf的配置文件,按照如下方式进行配置,默认端口1883。

修改allow_anonymous为true以允许主机匿名访问。

在mosquitto安装目录下启用终端(cmd),执行命令启动mosquitto服务

windows的mosquitto刚启动时候不会输出任何信息,只要能启动,就代表成功了。

如果想查看调试信息,那么就把log_type的配置项打开。

MQTT客户端工具使用

在客户端发布过程中,可以通过MQTT服务端终端查看相关debug,如下图:

QT开发

由于使用的qt6.8.2版本,使用mqtt驱动库需要重新编译,在安装插件时需要选择source

这边已经安装过,实际安装可能需要几个G的安装内存,安装过程中会出现一些下载错误的情况,不用管,直接重新下载就可以。

安装好后,在qt安装目录下能看到qtmqtt文件夹,那么就说明source源码安装成功。

接下来需要编译MQTT源码,需要CMake和Ninja两个命令。

执行cmake --version和ninja --version查看版本号,能查看说明安装成功。注意需要更新环境变量path。

CSDN上有比较详细的编译MQTT驱动方式,这里简单介绍下。

mkdir build
cd build
cmake -G "Ninja" -DCMAKE_PREFIX_PATH=C:/Qt/6.8.2/msvcxxx_64 ../
ninja
ninja install

有的情况下,可能需要配置环境变量path

一般这样就能完成mqtt编译,打开qt工程,如果出现以下输出,说明工程没有获取到mqtt驱动,再仔细检查下,还有qt工程的.pro中增加mqtt模块。

QT开发UI界面

QT代码
#include "widget.h"
#include "ui_widget.h"

Widget::Widget(QWidget *parent)
    : QWidget(parent)
    , ui(new Ui::Widget)
{
    ui->setupUi(this);
    tcpSocket = new QTcpSocket(this);
    connect(tcpSocket,SIGNAL(readyRead()),this,SLOT(readMessage()));
    //连接信号和相应槽函数
    connect(tcpSocket,SIGNAL(connected()),this,SLOT(sendMessage()));

    connect(tcpSocket,SIGNAL(error(QAbstractSocket::SocketError)),
             this,SLOT(displayError(QAbstractSocket::SocketError)));
    connect(ui->pushButton_recv,SIGNAL(clicked()),this,SLOT(pushButton_recv_clicked()));
    connect(ui->pushButton_del,SIGNAL(clicked()),this,SLOT(pushButton_del_clicked()));


    //连接mqtt
    connect(&mqtt_client, &QMqttClient::connected, [&]() {
        ui->pushButton_mqttconnect->setText("mqtt断开");
        ui->lineEdit_host->setEnabled(false);
        ui->lineEdit_port->setEnabled(false);
        ui->pushButton_mqttrecv->setEnabled(true);
        ui->pushButton_mqttsend->setEnabled(true);
    });
    //断开mqtt
    connect(&mqtt_client, &QMqttClient::disconnected, [&]() {
        ui->pushButton_mqttconnect->setText("mqtt连接");
        ui->lineEdit_host->setEnabled(true);
        ui->lineEdit_port->setEnabled(true);
    });
    // 检测连接状态
    // connect(&mqtt_client, &QMqttClient::stateChanged, [&](QMqttClient::ClientState state) {
    //     if (state == QMqttClient::Connected) {
    //         //qDebug() << "Connected to MQTT broker!";
    //         ui->textEdit_db->append("Connected to MQTT broker!"+ui->lineEdit_host->text()+":"+ui->lineEdit_port->text());
    //     } else if (state == QMqttClient::Disconnected) {
    //         //qDebug() << "Disconnected from MQTT broker!";
    //         ui->textEdit_db->append("Disconnected from MQTT broker!"+ui->lineEdit_host->text()+":"+ui->lineEdit_port->text());
    //     } else if (state == QMqttClient::Connecting) {
    //         //qDebug() << "Connecting to MQTT broker...";
    //         ui->textEdit_db->append("Connecting to MQTT broker...!"+ui->lineEdit_host->text()+":"+ui->lineEdit_port->text());
    //     }
    // });

    // 连接错误时的槽函数
    connect(&mqtt_client, &QMqttClient::errorChanged, [&](QMqttClient::ClientError error) {
        //qDebug() << "Error occurred:" << error;
        ui->textEdit_db->append("Error occurred: "+QString::fromStdString(std::to_string(error)));
    });
    //接收消息
    connect(&mqtt_client, &QMqttClient::messageReceived, [&](const QByteArray &message, const QMqttTopicName &topic) {
        ui->textEdit_db->append("mqtt_recv: "+topic.name()+":"+message);
    });

    connect(ui->pushButton_mqttconnect,SIGNAL(clicked()),this,SLOT(pushButton_mqttconnect_clicked()));
    connect(ui->pushButton_mqttrecv,SIGNAL(clicked()),this,SLOT(pushButton_mqttrecv_clicked()));
    connect(ui->pushButton_mqttsend,SIGNAL(clicked()),this,SLOT(pushButton_mqttsend_clicked()));
    ui->pushButton_mqttrecv->setEnabled(false);
    ui->pushButton_mqttsend->setEnabled(false);
    // 设置 QTextEdit 无法选择文本
    ui->textEdit_db->setTextInteractionFlags(Qt::NoTextInteraction);
}

Widget::~Widget()
{
    delete ui;
}

void Widget::connect_db()
{
    QStringList drivers = QSqlDatabase::drivers();
    foreach(QString driver, drivers) {
        ui->textEdit_db->append(driver);
    }
    ui->textEdit_db->append("test.");
    db = QSqlDatabase::addDatabase("QMYSQL");
    db.setHostName("118.31.2.73");
    db.setPort(3306); // MySQL的默认端口是3306
    db.setDatabaseName("test_db");
    db.setUserName("root");
    db.setPassword("123456");

    if (!db.open()) {
        ui->textEdit_db->append("Error: Unable to connect to the database.");
        return;
    }
    //ui->pushButton_mqttconnect->setText("数据库断连");
}

void Widget::select_db()
{
    /*
    check_flag = 0;
    check_value.fill("Default", 10);  // 设置列表大小为 10,并用 "Default" 填充
    if(ui->checkBox_ar->isChecked())
    {
        check_value[check_flag] = ui->checkBox_ar->text();
        ++check_flag;
    }
    if(ui->checkBox_hum->isChecked())
    {
        check_value[check_flag] = ui->checkBox_hum->text();
        ++check_flag;
    }
    if(ui->checkBox_tem->isChecked())
    {
        check_value[check_flag] = ui->checkBox_tem->text();
        ++check_flag;
    }

    model = new QStandardItemModel(0, 0, this);  // 0 行 0 列
    switch (check_flag) {
    case 1:
        model->setHorizontalHeaderLabels({check_value[0], "date"});
        break;
    case 2:
        model->setHorizontalHeaderLabels({check_value[0], check_value[1], "date"});
        break;
    case 3:
        model->setHorizontalHeaderLabels({check_value[0], check_value[1], check_value[2], "date"});
        break;
    default:
        model->setHorizontalHeaderLabels({check_value[0], check_value[1], check_value[2], "date"});
        break;
    }

    QSqlQuery query;
    query.exec("SELECT * FROM sensor_data");

    while (query.next()) {
        QString hum = query.value(0).toString(); // 获取第一列的值
        QString tem = query.value(1).toString(); // 获取第二列的值
        QString ar = "";
        QString created_at = query.value(2).toString(); // 获取第三列的值
        QList<QStandardItem*> row;
        switch(check_flag)
        {
        case 1:
            if(check_value[0] == "hum")
                row << new QStandardItem(hum) << new QStandardItem(created_at);
            if(check_value[0] == "tem")
                row << new QStandardItem(tem) << new QStandardItem(created_at);
            if(check_value[0] == "ar")
                row << new QStandardItem(ar) << new QStandardItem(created_at);
            break;
        case 2:

            if(check_value[0] == "hum" && check_value[1] == "tem")
                row << new QStandardItem(hum) << new QStandardItem(tem) << new QStandardItem(created_at);
            if(check_value[0] == "ar" && check_value[1] == "hum")
                row << new QStandardItem(ar) << new QStandardItem(hum) << new QStandardItem(created_at);
            if(check_value[0] == "ar" && check_value[1] == "tem")
                row << new QStandardItem(ar) << new QStandardItem(tem) << new QStandardItem(created_at);
            break;
        case 3:
            row << new QStandardItem(ar) << new QStandardItem(hum) << new QStandardItem(tem) << new QStandardItem(created_at);
            break;
        default:
            row << new QStandardItem(ar) << new QStandardItem(hum) << new QStandardItem(tem) << new QStandardItem(created_at);
            break;
        }

        model->appendRow(row);  // 在末尾添加一行
    }
    ui->tableView->setModel(model);
*/
    if (!db.open()) {
        ui->textEdit_db->append("db disconnect,cannot select db");
        return;
    }
    ui->textEdit_db->append("test select_button");
}

void Widget::close_db()
{
    db.close(); //关闭数据库连接以释放资源
}

bool isValidIP(const QString &ip)
{
    QRegularExpression ipRegex("^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$");
    return ipRegex.match(ip).hasMatch();
}
bool isValidPort(int port)
{
    return port > 0 && port <= 65535;
}

void Widget::newConnect()
{
    // 创建数据模型
    model = new QStandardItemModel(0, 0, this);  // 0 行 0 列
    QString ip = ui->hostLineEdit->text();
    if (!isValidIP(ip)) {
        ui->textEdit_db->append("Invalid IP address!");
        return;
    }
    int port = ui->portLineEdit->text().toInt();
    if (!isValidPort(port)) {
        ui->textEdit_db->append("Invalid port number!");
        return;
    }

    check_flag = 0;
    check_value.fill("Default", 10);  // 设置列表大小为 10,并用 "Default" 填充
    if(ui->checkBox_ar->isChecked())
    {
        check_value[check_flag] = ui->checkBox_ar->text();
        ++check_flag;
    }
    if(ui->checkBox_hum->isChecked())
    {
        check_value[check_flag] = ui->checkBox_hum->text();
        ++check_flag;
    }
    if(ui->checkBox_tem->isChecked())
    {
        check_value[check_flag] = ui->checkBox_tem->text();
        ++check_flag;
    }

    switch (check_flag) {
    case 1:
        model->setHorizontalHeaderLabels({check_value[0], "created_at"});
        break;
    case 2:
        model->setHorizontalHeaderLabels({check_value[0], check_value[1], "created_at"});
        break;
    case 3:
        model->setHorizontalHeaderLabels({check_value[0], check_value[1], check_value[2], "created_at"});
        break;
    default:
        model->setHorizontalHeaderLabels({check_value[0], check_value[1], check_value[2], "created_at"});
        break;
    }
    //连接到主机,这里从界面获取主机地址和端口号
    tcpSocket->connectToHost(ui->hostLineEdit->text(),
                             ui->portLineEdit->text().toInt());
    // 你可以检查连接状态来确定是否已成功断开,如何已经断开连接,waitForDisconnected会直接返回0
    if (tcpSocket->waitForDisconnected(3000)) { // 等待最多3000毫秒
        ui->textEdit_db->append("Disconnected successfully.");
    } else {
        ui->textEdit_db->append("Failed to disconnect within the timeout period.");
    }
}

void Widget::readMessage()
{
    QDataStream in(tcpSocket);
    in.setVersion(QDataStream::Qt_5_14);

    if (tcpSocket->bytesAvailable() == 0) return;

    QByteArray data = tcpSocket->readAll();
    QString receive_data = QString::fromUtf8(data);

    QStringList lines = receive_data.split('\n');
    for(const QString &line : lines){
        // 跳过空行(可选)
        if (line.trimmed().isEmpty()) {
            continue;
        }
        ui->textEdit_db->setText(line);

        int columnCount = model->columnCount();

        QList<QStandardItem*> row;
        int i = 0;
        for(; i < columnCount-1; i++)
        {
            QVariant columnName = model->headerData(i, Qt::Horizontal, Qt::DisplayRole);

            // 正则表达式用于匹配数字(整数和浮点数)
            QRegularExpression regex(QString(R"(%1=(\d+(?:\.\d+)?))").arg(columnName.toString()));
            QRegularExpressionMatchIterator j = regex.globalMatch(line);
            while(j.hasNext())
            {
                QRegularExpressionMatch match = j.next();
                QString matchedString = match.captured(1);
                row << new QStandardItem(matchedString);
            }

        }
        QVariant columnName1 = model->headerData(i, Qt::Horizontal, Qt::DisplayRole);
        ui->textEdit_db->append(columnName1.toString());
        // 正则表达式用于匹配日期
        QRegularExpression regex1(QString(R"(%1=(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}))").arg(columnName1.toString()));
        QRegularExpressionMatchIterator k = regex1.globalMatch(line);
        while(k.hasNext())
        {
            QRegularExpressionMatch match1 = k.next();
            QString matchedString1 = match1.captured(1);
            row << new QStandardItem(matchedString1);
        }

        model->appendRow(row);
        /*
        //2025-03-03 10:11:52
        QRegularExpression re("hum=(\\d+),tem=(\\d+\\.\\d+),created_at=(\\d+\\-\\d+\\-\\d+\\ \\d+\\:\\d+\\:\\d+)");
        QRegularExpressionMatch match = re.match(line);

        if(match.hasMatch()) {
            QString hum = match.captured(1);
            QString tem = match.captured(2);
            QString ar = "";
            QString created_at = match.captured(3);
            //QString displayMessage = QString("Humidity: %1, Temperature: %2,time: %3").arg(hum).arg(tem).arg(created_at);

            QList<QStandardItem*> row;
            switch(check_flag)
            {
            case 1:
                if(check_value[0] == "hum")
                    row << new QStandardItem(hum) << new QStandardItem(created_at);
                if(check_value[0] == "tem")
                    row << new QStandardItem(tem) << new QStandardItem(created_at);
                if(check_value[0] == "ar")
                    row << new QStandardItem(ar) << new QStandardItem(created_at);
                break;
            case 2:

                if(check_value[0] == "hum" && check_value[1] == "tem")
                    row << new QStandardItem(hum) << new QStandardItem(tem) << new QStandardItem(created_at);
                if(check_value[0] == "ar" && check_value[1] == "hum")
                    row << new QStandardItem(ar) << new QStandardItem(hum) << new QStandardItem(created_at);
                if(check_value[0] == "ar" && check_value[1] == "tem")
                    row << new QStandardItem(ar) << new QStandardItem(tem) << new QStandardItem(created_at);
                break;
            case 3:
                row << new QStandardItem(ar) << new QStandardItem(hum) << new QStandardItem(tem) << new QStandardItem(created_at);
                break;
            default:
                row << new QStandardItem(ar) << new QStandardItem(hum) << new QStandardItem(tem) << new QStandardItem(created_at);
                break;
            }

            model->appendRow(row);  // 在末尾添加一行
        }
*/
    }
    ui->tableView->setModel(model);
    //tcpSocket->abort(); //立即关闭套接字,而不保证所有数据都已发送或接收。
    tcpSocket->disconnectFromHost();//取消已有的连接

    //tcpSocket->abort();
}

void Widget::sendMessage()
{
    //用于暂存我们要发送的数据
    QByteArray block;
    //设置数据流
    QDataStream out(&block,QIODevice::WriteOnly);
    out.setVersion(QDataStream::Qt_DefaultCompiledVersion);

    QString message_buf;

    switch(check_flag)
    {
    case 1:
        message_buf = check_value[0];
        break;
    case 2:
        message_buf = check_value[0]+","+check_value[1];
        break;
    case 3:
        message_buf = check_value[0]+","+check_value[1]+","+check_value[2];
        break;
    default:
        //message_buf = check_flag;
        break;
    }
    send_data = message_buf.toUtf8();
    out<<static_cast<quint8>(check_flag);

    block.append(send_data);

    tcpSocket->write(block);

    //发送数据成功后,显示提示
    ui->textEdit_db->append("send message successful!!!");
}

void Widget::displayError(QAbstractSocket::SocketError)
{
    //ui->textEdit_db->setText("error "+tcpSocket->errorString()); //输出错误信息
    ui->textEdit_db->append("error "+tcpSocket->errorString());
}
void Widget::pushButton_recv_clicked() //连接按钮
{
    newConnect(); //请求连接
}
void Widget::pushButton_del_clicked() //连接按钮
{
    model->clear();  // 清空模型中的数据
}
void Widget::pushButton_mqttconnect_clicked()
{
    if (ui->pushButton_mqttconnect->text() == "mqtt连接"){
        // 将QString转换为int
        bool ok;
        int value = ui->lineEdit_port->text().toInt(&ok);

        // 检查转换是否成功
        if (ok) {
            ui->textEdit_db->append("hostname:"+ui->lineEdit_host->text());
            ui->textEdit_db->append("port:"+ui->lineEdit_port->text());
        } else {
            ui->textEdit_db->append("port failed");
            return;
        }

        // 设置MQTT代理地址和端口
        mqtt_client.setHostname(ui->lineEdit_host->text()); // 公共MQTT代理
        mqtt_client.setPort(value); // MQTT默认端口

        // 连接MQTT代理
        mqtt_client.connectToHost();

    }
    else if (ui->pushButton_mqttconnect->text() == "mqtt断开"){
        ui->textEdit_db->append("pushButton_db_clicked!");
        mqtt_client.disconnectFromHost();

        ui->pushButton_mqttrecv->setEnabled(false);
        ui->pushButton_mqttsend->setEnabled(false);
    }
}
void Widget::pushButton_mqttrecv_clicked()
{
        // 订阅主题
        mqtt_client.subscribe(ui->lineEdit_recvtopic->text());
        ui->textEdit_db->append("recv-topic:"+ui->lineEdit_recvtopic->text());
}

void Widget::pushButton_mqttsend_clicked()
{
        // 发布消息
        mqtt_client.publish(ui->lineEdit_sendtopic->text(), ui->textEdit_send->toPlainText().toUtf8());
}

UI左侧通过选择控件(checkBox)将需要的数据内容通过tcp发送到云端获取数据,并在表格控件(tableView)中展示,UI右侧通过获取host、port等对应的输入,将这些参数通过mqtt处理连接代理服务器进行发布和订阅。该代码只是实现基本功能,各位可以增加一些前置条件,比如检测输入数据的格式是否正确、检测连接是否正常。在文本控件(textEdit)中输出一些debug打印。

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐