Parsing MQTT Payload Data

This is my first Arduino / ESP32 project and I am working off this Random Nerd Tutorial and have everything working as laid out on this website.

I added additional code to also pull in data from my Davis Weather station via MQTT. I have successfully connected too my MQTT broker and have the following data/format being updated to a global variable named "davisdata".

{"time":"12:20:57","windspeed":9,"winddir":60,"press":29.82}

How do I extract and place just the windspeed into a variable for use elsewhere in my code?

I assume I can do this within the Callback function?

Below is my full sketch, any recommendations for my code will be very well received!

#include <Arduino.h>
#include <WiFi.h>
#include <AsyncTCP.h> 
#include <ESPAsyncWebServer.h>
#include "SPIFFS.h"
#include <Arduino_JSON.h>
#include <Adafruit_BME280.h>
#include <Adafruit_Sensor.h>
#include <PubSubClient.h>
WiFiClient WifiClient;
PubSubClient mqttClient(WifiClient);


char* mqttServer = "*.*.*.*";
int mqttPort = 1883;
char davisdata[1000];

void callback(char* topic, byte* payload, unsigned int length) {     
    for (int i = 0; i < length; i++) {
     davisdata[i]=char(payload[i]);     
   }
 } 
void setupMQTT() {
  mqttClient.setServer(mqttServer, mqttPort);
   mqttClient.setCallback(callback);  
   }

// Replace with your network credentials
const char* ssid = "**********";
const char* password = "********";

// Create AsyncWebServer object on port 80
AsyncWebServer server(80);

// Create an Event Source on /events
AsyncEventSource events("/events");

// Json Variable to Hold Sensor Readings
JSONVar readings;

// Timer variables
unsigned long lastTime = 0;
unsigned long timerDelay = 1000;

// Create a sensor object
Adafruit_BME280 bme; // BME280 connect to ESP32 I2C (GPIO 21 = SDA, GPIO 22 = SCL)

// Init BME280
void initBME(){
  if (!bme.begin(0x77)) {
    Serial.println("Could not find a valid BME280 sensor, check wiring!");
    while (1);
  }
}

// Get Sensor Readings and return JSON object
String getSensorReadings(){
  readings["temperature"] = String(1.8*bme.readTemperature()+32);
  readings["humidity"] =  String(bme.readHumidity()+3.1);
  readings["pressure"] = String(bme.readPressure()/3312.50);
  //readings["windspeed"] = String("10");// Need to bring reading from MQTT topic    
  String jsonString = JSON.stringify(readings);

        return jsonString;  
 }

 
// Initialize SPIFFS // Files to build the webpage
void initSPIFFS() {
  if (!SPIFFS.begin()) {
    Serial.println("An error has occurred while mounting SPIFFS");
  }
  Serial.println("SPIFFS mounted, ESP32 Booting...");
}

// Connect to WiFi
void initWiFi() {
  WiFi.mode(WIFI_STA);
  WiFi.begin(ssid, password);   
  while (WiFi.status() != WL_CONNECTED) {
   }     
}

void setup() {
  // Serial port for debugging purposes
  Serial.begin(115200);
  initSPIFFS();
  setupMQTT();
  initBME();
  initWiFi();
  
  
// Connect to MQTT Broker
while (!mqttClient.connected()) {
  
  if (mqttClient.connect("ESP32")){ 
  mqttClient.subscribe("my MQTT topic");  
  }
else {
  Serial.print("Connection failed ");
    Serial.print(mqttClient.state());
   }
}
  // Web Server Root URL
  server.on("/", HTTP_GET, [](AsyncWebServerRequest *request){
    request->send(SPIFFS, "/index.html", "text/html");
  });

  server.serveStatic("/", SPIFFS, "/");

  // Request for the latest sensor readings
  server.on("/readings", HTTP_GET, [](AsyncWebServerRequest *request){
    String json = getSensorReadings();
    request->send(200, "application/json", json);
    json = String();
  });

  events.onConnect([](AsyncEventSourceClient *client){
    if(client->lastId()){
      Serial.printf("Client reconnected! Last message ID that it got is: %u\n", client->lastId());
    }
    // send event with message "hello!", id current millis
    // and set reconnect delay to 1 second
    client->send("hello!", NULL, millis(), 1000);
  });
  server.addHandler(&events);

  // Start server
  server.begin();
}


void loop() {
 if ((millis() - lastTime) > timerDelay) {
    // Send Events to the client with the Sensor Readings Every 10 seconds
    events.send("ping",NULL,millis());
    events.send(getSensorReadings().c_str(),"new_readings" ,millis());
    lastTime = millis();
           
  } 
mqttClient.loop(); 

}

Welcome to the forum

Yes, or in a function called from it

Did you invent the data format of the message and do you have control of its layout ? It looks very like JSON to me but it is not something that I have had any dealings with

If it is JSON then there are libraries with functions to parse it for you

The data is actually being generated by another application: CumulusMX. I too assumed it was JSON but didn't have any luck trying JASON.Parse, in or out of the Callback function. I am still missing somthing..

What exactly is "JASON.Parse" that you refer to ?

sorry, not JASON but json rather

For what it is worth, Home Assistant is able to parse this data format into individual sensors.

The Arduino ArduinoJson parses it too

#include <ArduinoJson.h>

void setup()
{
    // Initialize serial port
    Serial.begin(115200);
    while (!Serial) continue;
    StaticJsonDocument<300> doc;
    char json[] = "{\"time\":\"12:20:57\",\"windspeed\":9,\"winddir\":60,\"press\":29.82}";

    // Deserialize the JSON document
    DeserializationError error = deserializeJson(doc, json);

    if (error)
    {
        Serial.print(F("deserializeJson() failed: "));
        Serial.println(error.f_str());
        return;
    }

    const char* time = doc["time"];
    int windspeed = doc["windspeed"];
    int winddir = doc["winddir"];
    float press = doc["press"];

    Serial.printf("time : %s\n",time);
    Serial.printf("windspeed : %d\n",windspeed);
    Serial.printf("direction : %d\n",winddir);
    Serial.printf("press : %f\n",press);
}

void loop()
{
    // not used in this example
}

Output

time : 12:20:57
windspeed : 9
direction : 60
press : 29.820000

Thank you for taking the time, this is much appreciated!! My mind is still missing one more piece...

How do I link the data from the MQTT payload, "davisdata" into this code snippet?

Thank you again Bob!

First you need to turn the payload into a string

Something like this

void callback(char* topic, byte* payload, unsigned int length)
{
  char messageBuffer[200];       //an array to hold the payload
  memcpy(messageBuffer, payload, length);   //copy the payload into the array
  messageBuffer[length] = '\0';     //turn the char array into a C string
  Serial.printf("%s\n", messageBuffer);     //print the string
}

Now the payload is in the messagBuffer string you should be able to use it as the input to parsing of the JSON message

Try this

#include <WiFi.h>
#include <PubSubClient.h>
#include <ArduinoJson.h>

const char* ssid = "XXXXXXX";
const char* password = "XXXXXXX";
const char* mqtt_server = "192.168.1.112";  //change to your server address

WiFiClient espClient;

PubSubClient client(espClient);
long lastMsg = 0;
char msg[50];
int value = 0;

void setup_wifi()
{
    delay(10);
    Serial.println();
    Serial.print("Connecting to ");
    Serial.println(ssid);
    WiFi.begin(ssid, password);
    while (WiFi.status() != WL_CONNECTED)
    {
        delay(500);
        Serial.print(".");
    }
    Serial.println("");
    Serial.println("WiFi connected");
    Serial.println("IP address: ");
    Serial.println(WiFi.localIP());
}

void callback(char* topic, byte* payload, unsigned int length)
{
    char messageBuffer[300];                 //an array to hold the payload
    memcpy(messageBuffer, payload, length);  //copy the payload into the array
    messageBuffer[length] = '\0';            //turn the char array into a C string
    Serial.printf("%s\n", messageBuffer);    //print the strin
    parseMessage(messageBuffer);
}

void reconnect()
{
    char* topic = "BBB";  //change to your required topic
    char clientID[20];
    for (int c = 0; c < 8; c++)
    {
        clientID[c] = random('A', 'Z' + 1);
        clientID[c + 1] = '\0';
    }
    Serial.printf("clientID : %s\n", clientID);
    while (!client.connected())
    {
        Serial.print("Attempting MQTT connection...");
        if (client.connect(clientID))
        {
            Serial.println("connected");
            client.subscribe(topic);
        }
        else
        {
            Serial.print("failed, rc=");
            Serial.print(client.state());
            Serial.println(" try again in 5 seconds");
            delay(5000);
        }
    }
}

void setup()
{
    Serial.begin(115200);
    setup_wifi();
    client.setServer(mqtt_server, 1883);
    client.setCallback(callback);
}

void loop()
{
    if (!client.connected())
    {
        reconnect();
    }
    client.loop();
}

void parseMessage(char* json)
{
    StaticJsonDocument<300> doc;
    DeserializationError error = deserializeJson(doc, json);

    if (error)
    {
        Serial.print(F("deserializeJson() failed: "));
        Serial.println(error.f_str());
        return;
    }

    const char* time = doc["time"];
    int windspeed = doc["windspeed"];
    int winddir = doc["winddir"];
    float press = doc["press"];

    Serial.printf("time : %s\n", time);
    Serial.printf("windspeed : %d\n", windspeed);
    Serial.printf("direction : %d\n", winddir);
    Serial.printf("press : %f\n", press);
}

This works as expected! Now to incorporate what you have shown me into my original project...

Thank you again for taking the time!! I will keep you posted with my progress. I am building a wall display to show live (every 2.5 seconds) outdoor reading from my Davis Vue System, indoor readings from the BME280 and then eventually use a Open weather API to pull in the 5-day forecast. All hosted from a ESP32 board. Building this myself give complete control of what and how the data is displayed.

I am glad that it works

It could be neater but I just bodged the parsing code into an existing MQTT test sketch that I wrote a couple of years ago to produce the example

Good luck with your project going forward and come back here if you need more help

ok, I copied over the "callback" and "parseMessage" functions to my project.. result below. I am getting "StaticJsonDocument" was not declared in this scope @ the first line in the parse Message function. I did notice your code is using the "ArduinoJSON.h library and mine is Arduino_JSON.h, not sure yet if that is an issue,

#include <Arduino.h>
#include <WiFi.h>
#include <AsyncTCP.h> 
#include <ESPAsyncWebServer.h>
#include "SPIFFS.h"
#include <Arduino_JSON.h>
#include <Adafruit_BME280.h>
#include <Adafruit_Sensor.h>
#include <PubSubClient.h>
WiFiClient WifiClient;
PubSubClient mqttClient(WifiClient);


char* mqttServer = "my server";
int mqttPort = 1883;
char davisdata[1000];
long lastMsg = 0;
char msg[50];
int value = 0;



void callback(char* topic, byte* payload, unsigned int length) 
{        
  char messageBuffer[200];       //an array to hold the payload
  memcpy(messageBuffer, payload, length);   //copy the payload into the array
  messageBuffer[length] = '\0';     //turn the char array into a C string
  //Serial.printf("%s\n", messageBuffer);     //print the string
 }

 
 void parseMessage(char* json)
{
    StaticJsonDocument<300> doc;
    DeserializationError error = deserializeJson(doc, json);

    if (error)
    {
        Serial.print(F("deserializeJson() failed: "));
        Serial.println(error.f_str());
        return;
    }

    const char* time = doc["time"];
    int windspeed = doc["windspeed"];
    int winddir = doc["winddir"];
    float press = doc["press"];

    //Serial.printf("time : %s\n", time);
    //Serial.printf("windspeed : %d\n", windspeed);
    //Serial.printf("direction : %d\n", winddir);
    //Serial.printf("press : %f\n", press);
    Serial.print(windspeed);
}
 


void setupMQTT() {
  mqttClient.setServer(mqttServer, mqttPort);
   mqttClient.setCallback(callback);  
   }

// Replace with your network credentials
const char* ssid = "";
const char* password = "";

// Create AsyncWebServer object on port 80
AsyncWebServer server(80);

// Create an Event Source on /events
AsyncEventSource events("/events");

// Json Variable to Hold Sensor Readings
JSONVar readings;

// Timer variables
unsigned long lastTime = 0;
unsigned long timerDelay = 1000;

// Create a sensor object
Adafruit_BME280 bme; // BME280 connect to ESP32 I2C (GPIO 21 = SDA, GPIO 22 = SCL)

// Init BME280
void initBME(){
  if (!bme.begin(0x77)) {
    Serial.println("Could not find a valid BME280 sensor, check wiring!");
    while (1);
  }
}

// Get Sensor Readings and return JSON object
String getSensorReadings(){
  readings["temperature"] = String(1.8*bme.readTemperature()+32);
  readings["humidity"] =  String(bme.readHumidity()+3.1);
  readings["pressure"] = String(bme.readPressure()/3312.50);
  //readings["windspeed"] = String("10");// Test Value  
    
  String jsonString = JSON.stringify(readings);
  
    // jsonString = davisdata;
  
   //Serial.println(davisdata); 
  
   return jsonString;  
 }


 
// Initialize SPIFFS // Files to build the webpage
void initSPIFFS() {
  if (!SPIFFS.begin()) {
    Serial.println("An error has occurred while mounting SPIFFS");
  }
  Serial.println("SPIFFS mounted, ESP32 Booting...");
}

// Connect to WiFi
void initWiFi() {
  WiFi.mode(WIFI_STA);
  WiFi.begin(ssid, password);   
  while (WiFi.status() != WL_CONNECTED) {
  delay(1000);
  Serial.println("Connecting to WiFi");
  delay(1000);
  Serial.println("Connected to the WiFi network"); 
  delay(1000);  
  Serial.print("ESP32 IP: ");
  Serial.println(WiFi.localIP());    
 }     
}


void setup() {
  // Serial port for debugging purposes
  Serial.begin(115200);
  initSPIFFS();
  setupMQTT();
  initBME();
  initWiFi();
  
  
// Connect to MQTT Broker
while (!mqttClient.connected()) {
  Serial.println("Connecting to MQTT broker 10.0.0.167");
  delay(1000);
  if (mqttClient.connect("ESP32")){
  Serial.println("Connected to MQTT Broker!");
  delay(1000);
  Serial.println("Subscribing to CumulusMX/DataUpdate");
  mqttClient.subscribe("CumulusMX/DataUpdate");
  delay(1000);
  Serial.println("Subscribed! ");
  delay(1000);
  Serial.println("System Ready!");   
  }
else {
  Serial.print("Connection failed ");
    Serial.print(mqttClient.state());
   }
}



  // Web Server Root URL
  server.on("/", HTTP_GET, [](AsyncWebServerRequest *request){
    request->send(SPIFFS, "/index.html", "text/html");
  });

  server.serveStatic("/", SPIFFS, "/");

  // Request for the latest sensor readings
  server.on("/readings", HTTP_GET, [](AsyncWebServerRequest *request){
    String json = getSensorReadings();
    request->send(200, "application/json", json);
    json = String();
  });

  events.onConnect([](AsyncEventSourceClient *client){
    if(client->lastId()){
      Serial.printf("Client reconnected! Last message ID that it got is: %u\n", client->lastId());
    }
    // send event with message "hello!", id current millis
    // and set reconnect delay to 1 second
    client->send("hello!", NULL, millis(), 1000);
  });
  server.addHandler(&events);

  // Start server
  server.begin();
}


void loop() {
 if ((millis() - lastTime) > timerDelay) {
    // Send Events to the client with the Sensor Readings Every 10 seconds
    events.send("ping",NULL,millis());
    events.send(getSensorReadings().c_str(),"new_readings" ,millis());
    lastTime = millis();
           
  } 
mqttClient.loop(); 
}

The two libraries are not the same, hence the error that you got. You can install the ArduinoJson library that I used from the IDE Library Manager

Yea, I am figuring that out, When I use ArduinoJson.h, which I am reading is an all-round better choice, I get past the "StaticJsonDocument" was not declared in this scope error only to now get 'JSONVar' does not name a type error @ this line:
// Json Variable to Hold Sensor Readings
JSONVar readings;. I plan to stick it out with ArduinoJson.h and work to correct the new syntax errors. This is the challenge I get for trying to piece together different programs. So I am working on this JSONVar statement and why now it needs a type defined.

You now seen to be half way between using the 2 libraries but they don't use the same function names or, if they do, the syntax is different. For instance, look how the JSON document is declared in my sketch

    StaticJsonDocument<300> doc;

and yours

JSONVar readings;

The example sketch in post #10 works and should provide the basis of what you want to do as long as your revise your code to match the library

After several hours I am still stuck editing my code to work with #include <ArduinoJson.h> instead of #include <Arduino_JSON.h>
I replaced 'JsonVar readings' with 'StaticJsonDocument<400> readings;' and now I am on to what should be the final piece.
This "get sensorReadings" function:

// Get Sensor Readings and return JSON object
String getSensorReadings(){ 
  readings["temperature"] = String(1.8*bme.readTemperature()+32);
  readings["humidity"] =  String(bme.readHumidity()+3.1);
  readings["pressure"] = String(bme.readPressure()/3312.50);
 // readings["windspeed"] = String("10");// Test Value   

   String jsonString = JSON.stringify(readings);
    
  return jsonString;  
 }

The tutorial states we need to take the "readings" array, convert it to JSON String and save it to the "jsonString" variable.

I can not seem to get JSON.stringify to work. Perhaps using 'StaticJsonDocument<400> readings;' is a problem?

I don't know if I am going to be able to help dues to my lack of experience with JSON but start by posting your sketch as it is now

Which tutorial are you following and exactly what problem are you having ?

You have been a huge help! And I have made good progress...

By changing the variable types used in the "String getSensorReadings()" function to:

StaticJsonDocument<100> readings;
and
String jsonString; , then adding this line to the function:

serializeJson(readings, jsonString);

My code now complies and runs! Not great but it does run.

The jsonString variable was not resetting so data just started to just pile up each time the function was called. I added the line jsonString = ""; as a temporary fix. Also I noticed a issue with how the code was capturing the BME sensor readings. Randomly values report as NULL

image

So I still need to work on the "String getSensorReadings()" function...

THANK YOU for all your help Sir!!

Current sketch below..

#include <Arduino.h>
#include <WiFi.h>
#include <AsyncTCP.h> 
#include <ESPAsyncWebServer.h>
#include "SPIFFS.h"
#include <ArduinoJson.h>
#include <Adafruit_BME280.h>
#include <Adafruit_Sensor.h>
#include <PubSubClient.h>
WiFiClient WifiClient;
PubSubClient mqttClient(WifiClient);



void callback(char* topic, byte* payload, unsigned int length)
{  
      
  char messageBuffer[200];       //an array to hold the payload
  memcpy(messageBuffer, payload, length);   //copy the payload into the array
  messageBuffer[length] = '\0';     //turn the char array into a C string
 // Serial.printf("%s\n", messageBuffer);     //print the string
  parseMessage(messageBuffer);
   }
 

void parseMessage(char* json)
{
    StaticJsonDocument<300> doc;
    DeserializationError error = deserializeJson(doc, json);

    if (error)
    {
        Serial.print(F("deserializeJson() failed: "));
        Serial.println(error.f_str());
        return;
    }

    const char* time = doc["time"];
    int windspeed = doc["windspeed"];
    int winddir = doc["winddir"];
    float press = doc["press"];

    Serial.printf("time : %s\n", time);
    Serial.printf("windspeed : %d\n", windspeed);
    Serial.printf("direction : %d\n", winddir);
    Serial.printf("press : %f\n", press);
   
}

void setupMQTT() {
  mqttClient.setServer(mqttServer, mqttPort);
   mqttClient.setCallback(callback);  
   }

// Replace with your network credentials
const char* ssid = "***";
const char* password = "***";

// Create AsyncWebServer object on port 80
AsyncWebServer server(80);

// Create an Event Source on /events
AsyncEventSource events("/events");

// Json Variable to Hold Sensor Readings 
StaticJsonDocument<100> readings;
String jsonString; 


// Timer variables
unsigned long lastTime = 0;
unsigned long timerDelay = 2000;

// Create a sensor object
Adafruit_BME280 bme; // BME280 connect to ESP32 I2C (GPIO 21 = SDA, GPIO 22 = SCL)

// Init BME280
void initBME(){
  if (!bme.begin(0x77)) {
    Serial.println("Could not find a valid BME280 sensor, check wiring!");
    while (1);
  }
}

// Get Sensor Readings and return JSON object
String getSensorReadings(){ 
  jsonString = "";
  readings["temperature"] = String(1.8*bme.readTemperature()+32);
  readings["humidity"] =  String(bme.readHumidity()+3.1);
  readings["pressure"] = String(bme.readPressure()/3312.50);
  readings["windspeed"] = windspeed;
  serializeJson(readings, jsonString); 
  return jsonString;  
 }


 
// Initialize SPIFFS // Files to build the webpage
void initSPIFFS() {
  if (!SPIFFS.begin()) {
    Serial.println("An error has occurred while mounting SPIFFS");
  }
  Serial.println("SPIFFS mounted, ESP32 Booting...");
}

// Connect to WiFi
void initWiFi() {
  WiFi.mode(WIFI_STA);
  WiFi.begin(ssid, password);   
  while (WiFi.status() != WL_CONNECTED) {
  delay(1000);
  Serial.println("Connecting to WiFi");
  delay(1000);
  Serial.println("Connected to the WiFi network"); 
  delay(1000);  
  Serial.print("ESP32 IP: ");
  Serial.println(WiFi.localIP());    
 }     
}


void setup() {
  // Serial port for debugging purposes
  Serial.begin(115200);
  initSPIFFS();
  setupMQTT();
  initBME();
  initWiFi();
  
  
// Connect to MQTT Broker
while (!mqttClient.connected()) {
  Serial.println("Connecting to MQTT broker ********");
  delay(1000);
  if (mqttClient.connect("ESP32")){
  Serial.println("Connected to MQTT Broker!");
  delay(1000);
  Serial.println("Subscribing to CumulusMX/DataUpdate");
  mqttClient.subscribe("CumulusMX/DataUpdate");
  delay(1000);
  Serial.println("Subscribed! ");
  delay(1000);
  Serial.println("System Ready!");   
  }
else {
  Serial.print("Connection failed ");
    Serial.print(mqttClient.state());
   }
}
  // Web Server Root URL
  server.on("/", HTTP_GET, [](AsyncWebServerRequest *request){
    request->send(SPIFFS, "/index.html", "text/html");
  });

  server.serveStatic("/", SPIFFS, "/");

  // Request for the latest sensor readings
  server.on("/readings", HTTP_GET, [](AsyncWebServerRequest *request){
    String json = getSensorReadings();
    request->send(200, "application/json", json);
    json = String();
  });

  events.onConnect([](AsyncEventSourceClient *client){
    if(client->lastId()){
      Serial.printf("Client reconnected! Last message ID that it got is: %u\n", client->lastId());
    }
    // send event with message "hello!", id current millis
    // and set reconnect delay to 1 second
    client->send("hello!", NULL, millis(), 1000);
  });
  server.addHandler(&events);

  // Start server
  server.begin();
}


void loop() {
 if ((millis() - lastTime) > timerDelay) {
    // Send Events to the client with the Sensor Readings Every 10 seconds
    events.send("ping",NULL,millis());
    events.send(getSensorReadings().c_str(),"new_readings" ,millis());
    lastTime = millis();
           
  } 
mqttClient.loop(); 
}

I increased the memory for the "readings" variable.

StaticJsonDocument<1000> readings;

And updated the function as follows:

// Get Sensor Readings and return JSON object
String getSensorReadings()
{ 
  String jsonString; 
  readings["temperature"] = String(1.8*bme.readTemperature()+32);
  readings["humidity"] =  String(bme.readHumidity()+3.1);
  readings["pressure"] = String(bme.readPressure()/3312.50);
  readings["windspeed"] = String(windspeed);
  serializeJson(readings, jsonString);   
  Serial.println(jsonString);    
  return jsonString;  
 } 

And so far the code has been running cleanly for over 30min, I am updating the BME sensor and outdoor weather data to the web site every second displaying live wind speed. When I lower the memory for the readings variable the data starts to get corrupt. So this leads me to think there are still efficiencies to build into my code.

Thank you again for helping me over this particular hurdle!!