Browse Source

async mqtt

main
Hendrik Langer 5 years ago
parent
commit
5ee042410e
  1. 7
      src/main.cpp
  2. 77
      src/network/XD0MQTT.cpp
  3. 9
      src/network/XD0MQTT.h

7
src/main.cpp

@ -660,8 +660,8 @@ void sendValues() {
jsonDoc["timestamp"] = sensor_readings.lastUpdate;
serializeJson(jsonDoc, buf, JSON_BUF_LEN);
String topic_json = String("thomas/sensor/") + ota.getMAC() + String("/json");
mqtt.publish(topic_json.c_str(), buf, strlen(buf), 1, 1);
delay(10);
int msg_id = mqtt.publish(topic_json.c_str(), buf, strlen(buf), 1, 1);
mqtt.waitForMsg(msg_id, 5000 / portTICK_PERIOD_MS);
}
@ -821,6 +821,9 @@ void loop()
}
getSensorMeasurements();
if (!mqtt.waitForConnectResult(5000)) {
ESP_LOGE(TAG, "failed to establish full connection");
}
sendValues();
delay(1);
displayValues();

77
src/network/XD0MQTT.cpp

@ -39,7 +39,12 @@ esp_err_t XD0MQTT::mqtt_event_handler_cb(esp_mqtt_event_handle_t event)
connected = true;
for (const auto &subscription : subscriptions_) {
int msg_id = esp_mqtt_client_subscribe(client, subscription.topic, subscription.qos);
ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id);
if (msg_id == -1) {
ESP_LOGE(TAG, "Failed subscribing to \"%s\"", subscription.topic);
} else {
connectWaitingForMsg.push_back(msg_id);
ESP_LOGI(TAG, "sent subscribe, msg_id=%d", msg_id);
}
}
break;
case MQTT_EVENT_DISCONNECTED:
@ -48,12 +53,16 @@ esp_err_t XD0MQTT::mqtt_event_handler_cb(esp_mqtt_event_handle_t event)
break;
case MQTT_EVENT_SUBSCRIBED:
ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
connectWaitingForMsg.erase(std::remove(connectWaitingForMsg.begin(), connectWaitingForMsg.end(), event->msg_id), connectWaitingForMsg.end());
wakeTask(event->msg_id);
break;
case MQTT_EVENT_UNSUBSCRIBED:
ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
wakeTask(event->msg_id);
break;
case MQTT_EVENT_PUBLISHED:
ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id);
wakeTask(event->msg_id);
break;
case MQTT_EVENT_DATA:
ESP_LOGI(TAG, "MQTT_EVENT_DATA");
@ -126,6 +135,7 @@ void XD0MQTT::end(void) {
//}
subscriptions_.clear();
esp_mqtt_client_stop(client);
waitingForMsg.clear();
delay(10);
esp_mqtt_client_destroy(client);
}
@ -134,9 +144,10 @@ bool XD0MQTT::isConnected(void) {
return connected;
}
bool XD0MQTT::publish(const char* topic, const char* data, int len, int qos, int retain) {
int XD0MQTT::publish(const char* topic, const char* data, int len, int qos, int retain) {
int msg_id = esp_mqtt_client_publish(client, topic, data, len, qos, retain);
ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id);
return msg_id;
}
bool XD0MQTT::publishf(const char* topic, const char* format, ...) {
@ -166,6 +177,14 @@ bool XD0MQTT::publishf2(const char* topic, int qos, int retain, const char* form
bool XD0MQTT::subscribe(const char* topic, const cb_t &cb, int qos) {
subscription_t subscription = {topic, cb, qos};
subscriptions_.push_back(subscription);
if (connected) {
int msg_id = esp_mqtt_client_subscribe(client, subscription.topic, subscription.qos);
if (msg_id == -1) {
ESP_LOGE(TAG, "Failed subscribing to \"%s\"", subscription.topic);
return false;
}
}
return true;
}
@ -180,3 +199,57 @@ bool XD0MQTT::unsubscribe(const char* topic) {
ESP_LOGI(TAG, "sent unsubscribe successful, msg_id=%d", msg_id);
return true;
}
bool XD0MQTT::waitForMsg(const int msgid, TickType_t delay) {
uint32_t ulNotifiedValue; BaseType_t ret;
waitingForMsg.insert(std::make_pair(msgid, xTaskGetCurrentTaskHandle()));
ret = xTaskNotifyWait( 0x00, // don't clear notification bits on entry
ULONG_MAX, // clear all notification bits on exit
&ulNotifiedValue,
delay );
auto p = waitingForMsg.equal_range(msgid);
for (auto& it = p.first; it != p.second; ++it) {
if (it->second == xTaskGetCurrentTaskHandle()) {
waitingForMsg.erase(it);
break;
}
}
if (ret == pdFALSE) {
ESP_LOGI(TAG, "waitForMsg(): msg_id=%d timeout", msgid);
return false;
} else if (ulNotifiedValue == msgid) {
ESP_LOGI(TAG, "waitForMsg(): msg_id=%d successfully delivered", msgid);
return true;
} else {
ESP_LOGW(TAG, "waitForMsg(): msg_id=%d received unexpected notification value", msgid);
return false;
}
}
bool XD0MQTT::wakeTask(const int msgid) {
int woken = 0;
auto p = waitingForMsg.equal_range(msgid);
for (auto& it = p.first; it != p.second; ++it) {
TaskHandle_t xTaskToNotify = it->second;
configASSERT( xTaskToNotify != NULL );
if( xTaskNotify(xTaskToNotify, (uint32_t)msgid, eSetValueWithoutOverwrite) == pdPASS ) {
woken++;
} else {
ESP_LOGW(TAG, "woke task waiting for msg_id=%d, but it had other pending notifications", msgid);
}
}
if (woken > 0) return true;
return false;
}
bool XD0MQTT::waitForConnectResult(int delay_ms) {
int start_time = millis();
while(!isConnected() || connectWaitingForMsg.size() > 0) {
delay(10);
if (millis() - start_time >= delay_ms) {
ESP_LOGI(TAG, "mqtt still waiting for connection (%d subscriptions pending)\n", connectWaitingForMsg.size());
return false;
}
}
return true;
}

9
src/network/XD0MQTT.h

@ -14,6 +14,7 @@
#include <Arduino.h>
#include <functional>
#include <vector>
#include <map>
#include "mqtt_client.h"
@ -30,7 +31,7 @@ class XD0MQTT {
XD0MQTT(void);
bool begin(void);
void end(void);
bool publish(const char* topic, const char* data, int len, int qos=1, int retain=0);
int publish(const char* topic, const char* data, int len, int qos=1, int retain=0);
bool publishf(const char* topic, const char* format, ...);
bool publishf2(const char* topic, int qos, int retain, const char* format, ...);
bool publishf2(const char* topic, int qos, int retain, const char* format, va_list argp);
@ -38,9 +39,15 @@ class XD0MQTT {
bool unsubscribe(const char* topic);
esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event);
bool isConnected(void);
bool waitForConnectResult(int delay_ms = -1);
bool waitForMsg(const int msgid, TickType_t delay);
std::vector<subscription_t> subscriptions_;
private:
bool connected = false;
std::multimap<int, TaskHandle_t> waitingForMsg;
std::vector<int> connectWaitingForMsg;
bool wakeTask(const int msgid);
// openssl s_client -showcerts -connect home.xd0.de:8883 </dev/null 2>/dev/null|openssl x509 -outform PEM >mqtt_xd0.de.pem
const char* rootCACertificate = \
"-----BEGIN CERTIFICATE-----\n" \

Loading…
Cancel
Save