Browse Source

mqtt subscriptions

ir
Hendrik Langer 5 years ago
parent
commit
582f1278b7
  1. 51
      src/network/XD0MQTT.cpp
  2. 15
      src/network/XD0MQTT.h

51
src/network/XD0MQTT.cpp

@ -27,32 +27,27 @@ static const char *TAG = "MQTT";
XD0MQTT::XD0MQTT() {} XD0MQTT::XD0MQTT() {}
static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event) esp_err_t XD0MQTT::mqtt_event_handler_cb(esp_mqtt_event_handle_t event)
{ {
esp_mqtt_client_handle_t client = event->client; esp_mqtt_client_handle_t client = event->client;
int msg_id; int msg_id;
//int mbedtls_err; esp_err_t err; //int mbedtls_err; esp_err_t err;
// your_context_t *context = event->context;
switch (event->event_id) { switch (event->event_id) {
case MQTT_EVENT_CONNECTED: case MQTT_EVENT_CONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
msg_id = esp_mqtt_client_subscribe(client, "/topic/qos0", 0); connected = true;
ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); for (const auto &subscription : subscriptions_) {
int msg_id = esp_mqtt_client_subscribe(client, subscription.topic, subscription.qos);
msg_id = esp_mqtt_client_subscribe(client, "/topic/qos1", 1);
ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id);
}
msg_id = esp_mqtt_client_unsubscribe(client, "/topic/qos1");
ESP_LOGI(TAG, "sent unsubscribe successful, msg_id=%d", msg_id);
break; break;
case MQTT_EVENT_DISCONNECTED: case MQTT_EVENT_DISCONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED"); ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
connected = false;
break; break;
case MQTT_EVENT_SUBSCRIBED: case MQTT_EVENT_SUBSCRIBED:
ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id); ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
msg_id = esp_mqtt_client_publish(client, "/topic/qos0", "data", 0, 0, 0);
ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id);
break; break;
case MQTT_EVENT_UNSUBSCRIBED: case MQTT_EVENT_UNSUBSCRIBED:
ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id); ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
@ -62,8 +57,13 @@ static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event)
break; break;
case MQTT_EVENT_DATA: case MQTT_EVENT_DATA:
ESP_LOGI(TAG, "MQTT_EVENT_DATA"); ESP_LOGI(TAG, "MQTT_EVENT_DATA");
printf("TOPIC=%.*s\r\n", event->topic_len, event->topic); ESP_LOGI(TAG, "TOPIC=%.*s\r\n", event->topic_len, event->topic);
printf("DATA=%.*s\r\n", event->data_len, event->data); ESP_LOGI(TAG, "DATA=%.*s\r\n", event->data_len, event->data);
for (const auto &subscription : subscriptions_) {
if (strcmp(event->topic, subscription.topic) == 0) {
subscription.cb(event->topic, event->data);
}
}
break; break;
case MQTT_EVENT_ERROR: case MQTT_EVENT_ERROR:
ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
@ -81,8 +81,9 @@ static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event)
//static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) { //static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) {
static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event) { static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event) {
esp_mqtt_client_handle_t client = event->client; //esp_mqtt_client_handle_t client = event->client;
mqtt_event_handler_cb(event); XD0MQTT *context = (XD0MQTT*)event->user_context;
context->mqtt_event_handler_cb(event);
//ESP_LOGD(TAG, "Event dispatched from event loop base=%s, event_id=%d", base, event_id); //ESP_LOGD(TAG, "Event dispatched from event loop base=%s, event_id=%d", base, event_id);
//mqtt_event_handler_cb(event_data); //mqtt_event_handler_cb(event_data);
} }
@ -113,3 +114,21 @@ bool XD0MQTT::publish(const char* topic, const char* data, int len, int qos, int
int msg_id = esp_mqtt_client_publish(client, topic, data, len, qos, retain); int msg_id = esp_mqtt_client_publish(client, topic, data, len, qos, retain);
ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id); ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id);
} }
bool XD0MQTT::subscribe(const char* topic, const cb_t &cb, int qos) {
subscription_t subscription = {topic, cb, qos};
subscriptions_.push_back(subscription);
return true;
}
bool XD0MQTT::unsubscribe(const char* topic) {
std::vector<subscription_t>::iterator it = subscriptions_.begin();
for (auto it = std::begin(subscriptions_); it != std::end(subscriptions_);) {
if (strcmp(topic, it->topic) == 0) {
it = subscriptions_.erase(it);
}
}
int msg_id = esp_mqtt_client_unsubscribe(client, topic);
ESP_LOGI(TAG, "sent unsubscribe successful, msg_id=%d", msg_id);
return true;
}

15
src/network/XD0MQTT.h

@ -12,14 +12,29 @@
#define _XD0MQTT_H #define _XD0MQTT_H
#include <Arduino.h> #include <Arduino.h>
#include <functional>
#include <vector>
#include "mqtt_client.h" #include "mqtt_client.h"
typedef std::function<void(char*, char*)> cb_t;
struct subscription_t {
const char* topic;
cb_t cb;
int qos;
};
class XD0MQTT { class XD0MQTT {
public: public:
XD0MQTT(void); XD0MQTT(void);
bool begin(void); bool begin(void);
bool publish(const char* topic, const char* data, int len, int qos=1, int retain=0); bool publish(const char* topic, const char* data, int len, int qos=1, int retain=0);
bool subscribe(const char* topic, const cb_t &cb, int qos=1);
bool unsubscribe(const char* topic);
esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event);
bool connected = false;
std::vector<subscription_t> subscriptions_;
private: private:
// openssl s_client -showcerts -connect home.xd0.de:8883 </dev/null 2>/dev/null|openssl x509 -outform PEM >mqtt_xd0.de.pem // openssl s_client -showcerts -connect home.xd0.de:8883 </dev/null 2>/dev/null|openssl x509 -outform PEM >mqtt_xd0.de.pem
const char* rootCACertificate = \ const char* rootCACertificate = \

Loading…
Cancel
Save