diff --git a/src/main.cpp b/src/main.cpp index 8f1699c..0245f06 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -660,8 +660,8 @@ void sendValues() { jsonDoc["timestamp"] = sensor_readings.lastUpdate; serializeJson(jsonDoc, buf, JSON_BUF_LEN); String topic_json = String("thomas/sensor/") + ota.getMAC() + String("/json"); - mqtt.publish(topic_json.c_str(), buf, strlen(buf), 1, 1); - delay(10); + int msg_id = mqtt.publish(topic_json.c_str(), buf, strlen(buf), 1, 1); + mqtt.waitForMsg(msg_id, 5000 / portTICK_PERIOD_MS); } @@ -821,6 +821,9 @@ void loop() } getSensorMeasurements(); + if (!mqtt.waitForConnectResult(5000)) { + ESP_LOGE(TAG, "failed to establish full connection"); + } sendValues(); delay(1); displayValues(); diff --git a/src/network/XD0MQTT.cpp b/src/network/XD0MQTT.cpp index 7ba6a03..97f920b 100644 --- a/src/network/XD0MQTT.cpp +++ b/src/network/XD0MQTT.cpp @@ -39,7 +39,12 @@ esp_err_t XD0MQTT::mqtt_event_handler_cb(esp_mqtt_event_handle_t event) connected = true; for (const auto &subscription : subscriptions_) { int msg_id = esp_mqtt_client_subscribe(client, subscription.topic, subscription.qos); - ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); + if (msg_id == -1) { + ESP_LOGE(TAG, "Failed subscribing to \"%s\"", subscription.topic); + } else { + connectWaitingForMsg.push_back(msg_id); + ESP_LOGI(TAG, "sent subscribe, msg_id=%d", msg_id); + } } break; case MQTT_EVENT_DISCONNECTED: @@ -48,12 +53,16 @@ esp_err_t XD0MQTT::mqtt_event_handler_cb(esp_mqtt_event_handle_t event) break; case MQTT_EVENT_SUBSCRIBED: ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id); + connectWaitingForMsg.erase(std::remove(connectWaitingForMsg.begin(), connectWaitingForMsg.end(), event->msg_id), connectWaitingForMsg.end()); + wakeTask(event->msg_id); break; case MQTT_EVENT_UNSUBSCRIBED: ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id); + wakeTask(event->msg_id); break; case MQTT_EVENT_PUBLISHED: ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id); + wakeTask(event->msg_id); break; case MQTT_EVENT_DATA: ESP_LOGI(TAG, "MQTT_EVENT_DATA"); @@ -126,6 +135,7 @@ void XD0MQTT::end(void) { //} subscriptions_.clear(); esp_mqtt_client_stop(client); + waitingForMsg.clear(); delay(10); esp_mqtt_client_destroy(client); } @@ -134,9 +144,10 @@ bool XD0MQTT::isConnected(void) { return connected; } -bool XD0MQTT::publish(const char* topic, const char* data, int len, int qos, int retain) { +int XD0MQTT::publish(const char* topic, const char* data, int len, int qos, int retain) { int msg_id = esp_mqtt_client_publish(client, topic, data, len, qos, retain); ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id); + return msg_id; } bool XD0MQTT::publishf(const char* topic, const char* format, ...) { @@ -164,9 +175,17 @@ bool XD0MQTT::publishf2(const char* topic, int qos, int retain, const char* form } bool XD0MQTT::subscribe(const char* topic, const cb_t &cb, int qos) { - subscription_t subscription = {topic, cb, qos}; - subscriptions_.push_back(subscription); - return true; + subscription_t subscription = {topic, cb, qos}; + subscriptions_.push_back(subscription); + + if (connected) { + int msg_id = esp_mqtt_client_subscribe(client, subscription.topic, subscription.qos); + if (msg_id == -1) { + ESP_LOGE(TAG, "Failed subscribing to \"%s\"", subscription.topic); + return false; + } + } + return true; } bool XD0MQTT::unsubscribe(const char* topic) { @@ -180,3 +199,57 @@ bool XD0MQTT::unsubscribe(const char* topic) { ESP_LOGI(TAG, "sent unsubscribe successful, msg_id=%d", msg_id); return true; } + +bool XD0MQTT::waitForMsg(const int msgid, TickType_t delay) { + uint32_t ulNotifiedValue; BaseType_t ret; + waitingForMsg.insert(std::make_pair(msgid, xTaskGetCurrentTaskHandle())); + ret = xTaskNotifyWait( 0x00, // don't clear notification bits on entry + ULONG_MAX, // clear all notification bits on exit + &ulNotifiedValue, + delay ); + auto p = waitingForMsg.equal_range(msgid); + for (auto& it = p.first; it != p.second; ++it) { + if (it->second == xTaskGetCurrentTaskHandle()) { + waitingForMsg.erase(it); + break; + } + } + if (ret == pdFALSE) { + ESP_LOGI(TAG, "waitForMsg(): msg_id=%d timeout", msgid); + return false; + } else if (ulNotifiedValue == msgid) { + ESP_LOGI(TAG, "waitForMsg(): msg_id=%d successfully delivered", msgid); + return true; + } else { + ESP_LOGW(TAG, "waitForMsg(): msg_id=%d received unexpected notification value", msgid); + return false; + } +} + +bool XD0MQTT::wakeTask(const int msgid) { + int woken = 0; + auto p = waitingForMsg.equal_range(msgid); + for (auto& it = p.first; it != p.second; ++it) { + TaskHandle_t xTaskToNotify = it->second; + configASSERT( xTaskToNotify != NULL ); + if( xTaskNotify(xTaskToNotify, (uint32_t)msgid, eSetValueWithoutOverwrite) == pdPASS ) { + woken++; + } else { + ESP_LOGW(TAG, "woke task waiting for msg_id=%d, but it had other pending notifications", msgid); + } + } + if (woken > 0) return true; + return false; +} + +bool XD0MQTT::waitForConnectResult(int delay_ms) { + int start_time = millis(); + while(!isConnected() || connectWaitingForMsg.size() > 0) { + delay(10); + if (millis() - start_time >= delay_ms) { + ESP_LOGI(TAG, "mqtt still waiting for connection (%d subscriptions pending)\n", connectWaitingForMsg.size()); + return false; + } + } + return true; +} diff --git a/src/network/XD0MQTT.h b/src/network/XD0MQTT.h index 43837da..1ef3a89 100644 --- a/src/network/XD0MQTT.h +++ b/src/network/XD0MQTT.h @@ -14,6 +14,7 @@ #include #include #include +#include #include "mqtt_client.h" @@ -30,7 +31,7 @@ class XD0MQTT { XD0MQTT(void); bool begin(void); void end(void); - bool publish(const char* topic, const char* data, int len, int qos=1, int retain=0); + int publish(const char* topic, const char* data, int len, int qos=1, int retain=0); bool publishf(const char* topic, const char* format, ...); bool publishf2(const char* topic, int qos, int retain, const char* format, ...); bool publishf2(const char* topic, int qos, int retain, const char* format, va_list argp); @@ -38,9 +39,15 @@ class XD0MQTT { bool unsubscribe(const char* topic); esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event); bool isConnected(void); + bool waitForConnectResult(int delay_ms = -1); + bool waitForMsg(const int msgid, TickType_t delay); std::vector subscriptions_; private: bool connected = false; + std::multimap waitingForMsg; + std::vector connectWaitingForMsg; + + bool wakeTask(const int msgid); // openssl s_client -showcerts -connect home.xd0.de:8883 /dev/null|openssl x509 -outform PEM >mqtt_xd0.de.pem const char* rootCACertificate = \ "-----BEGIN CERTIFICATE-----\n" \