#include "mqttclient.h" #include #include #include #include #include "../global.h" #include "../logger.h" MqttClient::MqttClient(QObject *parent) : QObject(parent), _mqClient(new QMqttClient(this)) { // 连接信号和槽 connect(_mqClient, &QMqttClient::stateChanged, this, &MqttClient::onStateChanged); connect(_mqClient, &QMqttClient::disconnected, this, [this]() { QMetaObject::invokeMethod( this, [this]() { QMessageBox msgBox; msgBox.setWindowTitle("MQtt 错误"); msgBox.setText("mqtt连接失败, 请检查网络连接或MQTT服务器状态"); QPushButton *yesButton = msgBox.addButton("重连", QMessageBox::YesRole); QPushButton *noButton = msgBox.addButton("取消", QMessageBox::NoRole); yesButton->setStyleSheet( "QPushButton { background-color:rgb(5, 58, 156); color: white; padding: 5px 15px; " "border-radius: 4px; }"); noButton->setStyleSheet( "QPushButton { background-color: #f44336; color: white; padding: 5px 15px; " "border-radius: 4px; }"); msgBox.exec(); if (msgBox.clickedButton() == yesButton) { _mqClient->connectToHost(); } }, Qt::ConnectionType::QueuedConnection); }); connect(_mqClient, &QMqttClient::messageReceived, this, &MqttClient::onMessageReceived); connect(_mqClient, &QMqttClient::errorChanged, this, &MqttClient::onError); // connect(_mqClient, &QMqttClient::connected, this, &MqttClient::onConnected); } void MqttClient::connectToMqttBroker(const QString &hostname, quint16 port, const QString &username, const QString &password, const QString &clientId) { _mqClient->setHostname(hostname); _mqClient->setPort(port); _mqClient->setKeepAlive(20); // 设置账号和密码 _mqClient->setUsername(username); _mqClient->setPassword(password); // 设置客户端 ID _mqClient->setClientId(clientId); _mqClient->connectToHost(); // 保存要订阅的主题列表 // m_subscribeTopics = topicsToSubscribe; } void MqttClient::onConnected() { Logger::getInstance().debug("MQTT conncted"); // subscribeToTopics(m_subscribeTopics); } void MqttClient::subscribeToTopics(const QStringList &topics) { for (const auto &topic : topics) { subscribeToTopic(topic); } } void MqttClient::disconnectFromMqttBroker() { Logger::getInstance().info("MQTT client disconnecting from broker" + _mqClient->state()); if (_mqClient->state() == QMqttClient::Connected) { _mqClient->disconnectFromHost(); } } bool MqttClient::isConnected() const { return _mqClient->state() == QMqttClient::Connected; } QMqttSubscription *MqttClient::subscribeToTopic(const QString &topic) { return _mqClient->subscribe(topic); } void MqttClient::sendMessage(const QString &topic, const QByteArray &message, quint8 qos, bool isRetainedMsg) { if (_mqClient->state() == QMqttClient::Connected) { auto pub = _mqClient->publish(QMqttTopicName(topic), message, qos, isRetainedMsg); if (pub == -1) Logger::getInstance().error( QString("Error MQTT client sent message to topic: %1, msg: %2, qos: %3, isRetainedMsg: %4") .arg(topic, QString(message), QString::number(qos), isRetainedMsg ? "true" : "false")); else Logger::getInstance().info( QString("MQTT client sent message to topic: %1, msg: %2, qos: %3, isRetainedMsg: %4") .arg(topic, QString(message), QString::number(qos), isRetainedMsg ? "true" : "false")); } else { qDebug() << "Not connected to MQTT server"; Logger::getInstance().error( QString("MQTT client is not connected to the server. topic: %1, msg: %2").arg(topic, QString(message))); } } void MqttClient::onMessageReceived(const QByteArray &message, const QMqttTopicName &topic) { if (topic.name() == MQTT_TOPIC_COMPANY_PROJECTS_SUB) { emit proMessageReceived(message, topic); } else { emit messageReceived(message, topic); } } void MqttClient::onStateChanged(QMqttClient::ClientState state) { switch (state) { case QMqttClient::ClientState::Disconnected: { Logger::getInstance().info("mqtt disconnected"); break; } case QMqttClient::ClientState::Connecting: Logger::getInstance().info("mqtt connecting"); break; case QMqttClient::ClientState::Connected: Logger::getInstance().info("mqtt connected"); break; default: Logger::getInstance().info("mqtt other state" + state); break; } } void MqttClient::onError(QMqttClient::ClientError error) { Logger::getInstance().error(QString("MQTT client error: %1").arg(static_cast(error))); }