(SOLVED) How to use the incoming value from two subscribed MQTT topics

Hello all.

I bod a MLX90614 sensor.I have it connected to a ESP32 and it post the temps to MQTT topics.
Now a also have a Sensirion SCD30 CO2 sensor, that is connected to a ESP-12 and also post its Temperature , humidity and CO2 to MQTT topics.
Now I have the ESP32 subscribe to the MQTT topics that receive the temperature and humidity from the SCD30 sensor. They are posted to the serial monitor so I can see if it work, witch it does.
But i would like to use the incoming temperature and humidity to make calculations.
But i have no idea how to excess the actual value from the topics.
Now in the bottom of my code i have this:

Serial.println(mlx.readAmbientTempC() / mlx.readObjectTempC());

But i would like to use the incoming value for the calculations.
How do iI do that ?

The incoming topics are:
SCD30/temperature
SCD30/humidity

Below the full code i have. I am not a programmer, so code might not be professional but it is working.

#include <OneWire.h>
#include <Adafruit_MLX90614.h>
#include <WiFi.h>
#include <PubSubClient.h>

Adafruit_MLX90614 mlx = Adafruit_MLX90614();




//Wifi Settings
const char* ssid = "SSID";
const char* password =  "PASS";

//MQTT Settings
const char* mqttServer = "MQTT IP";
const int mqttPort = 1883;
const char* mqttUser = "MQTT USER";
const char* mqttPassword = "MQTT PASS";

WiFiClient espClient;
PubSubClient client(espClient);




int period = 5000;
unsigned long time_now = 0;

long lastReconnectAttempt = 0;

uint64_t chipid = ESP.getEfuseMac(); // MAC address of ESP32
uint16_t chip = (uint16_t)(chipid >> 32);
char clientid[25];

boolean reconnect() {
  if (client.connect(clientid, mqttUser, mqttPassword )) {
    // Once connected, publish an announcement...
    client.subscribe("SCD30/temperature");
    client.subscribe("SCD30/humidity");
    client.publish("MLX90614/Info", "Reconnected", true);
  }
  return client.connected();
}



void callback(char* topic, byte* payload, unsigned int length)
{
  Serial.print("Message arrived in topic: ");
  Serial.println(topic);
  Serial.print("Message:");
  for (int i = 0; i < length; i++)
  {
    Serial.print((char)payload[i]);
  }
  Serial.println();
  Serial.println("-----------------------");

}


void setup() {
  // Start the Serial Monitor
  Serial.begin(115200);
  mlx.begin();



  WiFi.begin(ssid, password);

  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.println("Connecting to WiFi..");
  }
  Serial.println("Connected to the WiFi network");

  snprintf(clientid, 25, "ESP32-%08X", chip);

  client.setServer(mqttServer, mqttPort);

  client.setCallback(callback);

  while (!client.connected()) {
    Serial.println("Connecting to MQTT...");

    if (client.connect(clientid, mqttUser, mqttPassword )) {

      Serial.println("connected");


    } else {

      Serial.print("failed with state ");
      Serial.print(client.state());
      delay(2000);
      lastReconnectAttempt = 0;
    }

  }
  client.subscribe("SCD30/temperature");
  client.subscribe("SCD30/humidity");
}



void loop() {
  time_now = millis();
  //sensors.requestTemperatures();
  // float temperatureC = sensors.getTempCByIndex(0);
  float  ambientCelsius = mlx.readAmbientTempC();
  float  objectCelsius = mlx.readObjectTempC();



  if (WiFi.status() != WL_CONNECTED) {
    WiFi.begin(ssid, password);
  }


  if (!client.connected()) {
    unsigned long now = millis();
    if (now - lastReconnectAttempt > 5000UL) {
      lastReconnectAttempt = now;
      // Attempt to reconnect
      Serial.print("Reconnecting");
      if (reconnect()) {
        lastReconnectAttempt = 0;
      }
    }
  } else {
    // Client connected

    while (millis() < time_now + period) {
      //wait approx. [period] ms
    }


    {
      Serial.print("Ambient = ");
      Serial.print(mlx.readAmbientTempC());
      Serial.print("*C\tObject = ");
      Serial.print(mlx.readObjectTempC());
      Serial.println("*C");
      Serial.println(mlx.readAmbientTempC() / mlx.readObjectTempC());


      client.publish("MLX90614/Temp/Ambient",  String(ambientCelsius, 2).c_str());

      client.publish("MLX90614/Temp/Object",  String(objectCelsius, 2).c_str());

    }

  }
  client.loop();
}

Perhaps the following code may be of help:

#include <WiFi.h>
#include <PubSubClient.h>
#include "certs.h" // include the connection infor for WiFi and MQTT
#include "sdkconfig.h" // used for log printing
#include "esp_system.h"
#include "freertos/FreeRTOS.h" //freeRTOS items to be used
#include "freertos/task.h"
#include <SPI.h>
#include <Adafruit_Sensor.h>
#include "Adafruit_BME680.h"
#include <Adafruit_GFX.h>    // Core graphics library
#include <Adafruit_ST7789.h> // Hardware-specific library for ST7789
#include <driver/adc.h>
#include "esp32-hal-ledc.h"
#include <HardwareSerial.h>
#include <SimpleKalmanFilter.h>
#include "MHZ19.h"
////
MHZ19 myMHZ19;
////
Adafruit_BME680 bme( GPIO_NUM_5 ); // use hardware SPI, set GPIO pin to use
//Adafruit_ST7789 tft = Adafruit_ST7789( TFT_CS     , TFT_DC    , TFT_MOSI   , TFT_SCLK   , TFT_RST     );
Adafruit_ST7789 tft   = Adafruit_ST7789( GPIO_NUM_15, GPIO_NUM_0, GPIO_NUM_13, GPIO_NUM_14, GPIO_NUM_22 );
WiFiClient   wifiClient; // do the WiFi instantiation thing
PubSubClient MQTTclient( mqtt_server, mqtt_port, wifiClient ); //do the MQTT instantiation thing
//////
#define evtDoParticleRead     ( 1 << 0 ) // declare an event
#define evtWaitForBME         ( 1 << 1 )
#define evtParseMQTT          ( 1 << 3 )
EventGroupHandle_t eg; // variable for the event group handle
//////
QueueHandle_t xQ_WindChillDewPoint;
QueueHandle_t xQ_eData; // environmental data to be displayed on the screen
struct stu_eData
{
  float  Temperature = 0.0f;
  float  Pressure    = 0.0f;
  float  Humidity    = 0.0f;
  float  IAQ         = 0.0f; // Index Air Quality
  float  RM0         = 0.0f; // Remaining Moisture from sensor 0
  float  PM2         = 0.0f; // particles in air
  float  WS          = 0.0f; // wind speed
  String WD          = "";   // wind direction
  float  RF          = 0.0f; // rainfall
  float  WSV         = 0.0f; // weather station volts
  float  WSC         = 0.0f;  // weather station current
  float  WSP         = 0.0f;  // weather station power
  float  WindChill   = 0.0f; //windchill
  float  DewPoint    = 0.0f;   //dew point or dew index
} x_eData; // environmental data
QueueHandle_t xQ_Message; // payload and topic queue of MQTT payload and topic
const int payloadSize = 100;
struct stu_message
{
  char payload [payloadSize] = {'\0'};
  String topic ;
} x_message;
////
const float oGasResistanceBaseLine = 149598.0f;
int mqttOK = 0;
int CO2    = 0;
//////
esp_timer_handle_t oneshot_timer; //veriable to store the hardware timer handle
//////
SemaphoreHandle_t sema_MQTT_KeepAlive;
SemaphoreHandle_t sema_PublishPM;
SemaphoreHandle_t sema_mqttOK;
////
//serial(2) = pin25 RX, pin26 TX
HardwareSerial co2Serial ( 2 );
//////
// interrupt service routine for WiFi events put into IRAM
void IRAM_ATTR WiFiEvent(WiFiEvent_t event)
{
  switch (event) {
    case SYSTEM_EVENT_STA_CONNECTED:
      log_i("Connected to WiFi access point");
      break;
    case SYSTEM_EVENT_STA_DISCONNECTED:
      log_i("Disconnected from WiFi access point");
      break;
    case SYSTEM_EVENT_AP_STADISCONNECTED:
      log_i("WiFi client disconnected");
      break;
    default: break;
  }
} // void IRAM_ATTR WiFiEvent(WiFiEvent_t event)
//////
void IRAM_ATTR oneshot_timer_callback( void* arg )
{
  BaseType_t xHigherPriorityTaskWoken;
  xEventGroupSetBitsFromISR( eg, evtDoParticleRead, &xHigherPriorityTaskWoken );
} //void IRAM_ATTR oneshot_timer_callback( void* arg )
//////
void IRAM_ATTR mqttCallback(char* topic, byte * payload, unsigned int length)
{
  memset( x_message.payload, '\0', payloadSize ); // clear payload char buffer
  x_message.topic = ""; //clear topic string buffer
  x_message.topic = topic; //store new topic
  int i = 0; // extract payload
  for ( i; i < length; i++)
  {
    x_message.payload[i] = ((char)payload[i]);
  }
  x_message.payload[i] = '\0';
  xQueueOverwrite( xQ_Message, (void *) &x_message );// send data to queue
} // void mqttCallback(char* topic, byte* payload, unsigned int length)
////
void setup()
{
  co2Serial.begin( 9600 , SERIAL_8N1, 25, 26 ); // pin25 RX, pin26 TX
  x_eData.WD.reserve(50);
  x_message.topic.reserve( payloadSize );
  xQ_WindChillDewPoint = xQueueCreate( 1, sizeof(stu_eData) );
  xQ_Message  = xQueueCreate( 1, sizeof(stu_message) );
  xQ_eData    = xQueueCreate( 1, sizeof(stu_eData) ); // sends a queue copy of the structure
  //
  sema_PublishPM = xSemaphoreCreateBinary();
  xSemaphoreGive( sema_PublishPM );
  sema_mqttOK    =  xSemaphoreCreateBinary();
  xSemaphoreGive( sema_mqttOK );
  //
  ledcSetup( 4, 12000, 8 ); // ledc: 4  => Group: 0, Channel: 2, Timer: 1, led frequency, resolution  bits
  ledcAttachPin( GPIO_NUM_12, 4 );   // gpio number and channel
  ledcWrite( 4, 0 ); // write to channel number 4
  //
  eg = xEventGroupCreate(); // get an event group handle
  // output mode
  gpio_config_t io_cfg = {}; // initialize the gpio configuration structure
  io_cfg.mode = GPIO_MODE_OUTPUT; // set gpio mode
  io_cfg.pin_bit_mask = ( (1ULL << GPIO_NUM_4) ); //bit mask of the pins to set
  gpio_config(&io_cfg); // configure the gpio based upon the parameters as set in the configuration structure
  gpio_set_level( GPIO_NUM_4, LOW); // set air particle sensor trigger pin to LOW
  // input mode
  io_cfg = {}; // reinitialize the gpio configuration structure
  io_cfg.mode = GPIO_MODE_INPUT; // set gpio mode. GPIO_NUM_0 input from water level sensor
  io_cfg.pin_bit_mask = ( (1ULL << GPIO_NUM_0) | (1ULL << GPIO_NUM_27)  ); //bit mask of the pins to set, assign gpio number to be configured
  gpio_config(&io_cfg); // configure the gpio based upon the parameters as set in the configuration structure
  // set up A:D channels, refer: https://dl.espressif.com/doc/esp-idf/latest/api-reference/peripherals/adc.html
  adc1_config_width(ADC_WIDTH_12Bit);
  adc1_config_channel_atten(ADC1_CHANNEL_0, ADC_ATTEN_DB_11);// using GPIO 36
  // https://docs.espressif.com/projects/esp-idf/en/latest/esp32/api-reference/system/esp_timer.html?highlight=hardware%20timer High Resoultion Timer API
  esp_timer_create_args_t oneshot_timer_args = {}; // initialize High Resoulition Timer (HRT) configuration structure
  oneshot_timer_args.callback = &oneshot_timer_callback; // configure for callback, name of callback function
  esp_timer_create( &oneshot_timer_args, &oneshot_timer ); // assign configuration to the HRT, receive timer handle
  //
  xTaskCreatePinnedToCore( fparseMQTT, "fparseMQTT", 7000,  NULL, 5, NULL, 1 );
  xTaskCreatePinnedToCore( MQTTkeepalive, "MQTTkeepalive", 15000, NULL, 6, NULL, 1 );
  xTaskCreatePinnedToCore( DoTheBME680Thing, "DoTheBME280Thing", 20000, NULL, 5, NULL, 1);
  xTaskCreatePinnedToCore( fDoParticleDetector, "fDoParticleDetector", 6000, NULL, 3, NULL, 1 );
  xTaskCreatePinnedToCore( fmqttWatchDog, "fmqttWatchDog", 3000, NULL, 3, NULL, 1 );
  xTaskCreatePinnedToCore( fDoTheDisplayThing, "fDoTheDisplayThing", 23000, NULL, 3, NULL, 1 );
  xTaskCreatePinnedToCore( fScreenBlanking, "fScreenBlanking", 2000, NULL, 2, NULL, 1 );
  xTaskCreatePinnedToCore( fGetCO2, "fGetCO2", 4500, NULL, 2, NULL, 1 );
  xTaskCreatePinnedToCore( fParseDewPointWindChill, "fParseDewPointWindChill", 4500, NULL, 2, NULL, 1 );
} //void setup()
////
/*
  250-400ppm Normal background concentration in outdoor ambient air
  400-1,000ppm  Concentrations typical of occupied indoor spaces with good air exchange
  1,000-2,000ppm  Complaints of drowsiness and poor air.
  2,000-5,000 ppm Headaches, sleepiness and stagnant, stale, stuffy air. Poor concentration, loss of attention, increased heart rate and slight nausea may also be present.
  5,000 Workplace exposure limit (as 8-hour TWA) in most jurisdictions.
  >40,000 ppm Exposure may lead to serious oxygen deprivation resulting in permanent brain damage, coma, even death.
*/
void fParseDewPointWindChill( void *pvParameters )
{
  while ( !MQTTclient.connected() )
  {
    vTaskDelay( 250 );
  }
  struct stu_message px_message;
  String sDewPoint = "";
  String sWindChill = "";
  sDewPoint.reserve( payloadSize );
  sWindChill.reserve( payloadSize );
  for (;;)
  {
    if ( xQueueReceive(xQ_WindChillDewPoint, &px_message, portMAX_DELAY) == pdTRUE )
    {
      sDewPoint = px_message.payload;
      int commaIndex = sDewPoint.indexOf(',');
      sWindChill.concat ( sDewPoint.substring(0, commaIndex) );
      sDewPoint.remove( 0, (commaIndex + 1) );
      x_eData.WindChill = sWindChill.toFloat();
      x_eData.DewPoint = sDewPoint.toFloat();
      sDewPoint = "";
      sWindChill = "";
    }
    //log_i( " high watermark % d",  uxTaskGetStackHighWaterMark( NULL ) );
  }
  vTaskDelete( NULL );
}
////
void fGetCO2 ( void *pvParameters )
{
  uint64_t TimePastKalman  = esp_timer_get_time();
  myMHZ19.begin( co2Serial );
  myMHZ19.autoCalibration();
  TickType_t xLastWakeTime = xTaskGetTickCount();
  const TickType_t xFrequency = 1000; //delay for mS
  SimpleKalmanFilter KF_CO2( 1.0f, 1.0f, .01f );
  for ( ;; )
  {
    KF_CO2.setProcessNoise( (esp_timer_get_time() - TimePastKalman) / 1000000.0f );
    CO2 = KF_CO2.updateEstimate( myMHZ19.getCO2() ); // apply simple Kalman filter
    TimePastKalman = esp_timer_get_time();
    xSemaphoreTake( sema_MQTT_KeepAlive, portMAX_DELAY );
    MQTTclient.publish( topicCO2, String(CO2).c_str() );
    xSemaphoreGive( sema_MQTT_KeepAlive );
    // process wind chill and dew point
    xLastWakeTime = xTaskGetTickCount();
    vTaskDelayUntil( &xLastWakeTime, xFrequency );
    //log_i( " high watermark % d",  uxTaskGetStackHighWaterMark( NULL ) );
  }
  vTaskDelete( NULL );
} //void fMHZ19B ( void *pvParameters )
////
void fScreenBlanking( void *pvParameters )
{
  int       TimeOfPause = 10000 * 1000;
  uint64_t  PauseStartTime = esp_timer_get_time();
  bool      Pause = false;
  const int brightness = 250;
  int       countUpDown = brightness;
  for ( ;; )
  {
    if (!Pause )
    {
      //if motion detect then show display otherwise blank display
      if ( !(gpio_get_level( GPIO_NUM_27)) )
      {
        for ( countUpDown; countUpDown-- > 0; )
        {
          ledcWrite( 4, countUpDown ); // write to channel number 4, dim backlight
          vTaskDelay( 7 );
        }
      } else {
        Pause = true;
        PauseStartTime = esp_timer_get_time();
        ledcWrite( 4, brightness );
        countUpDown = brightness;
      }
    } else {
      // still detecting movement reset blanking pause time
      if ( gpio_get_level( GPIO_NUM_27) )
      {
        PauseStartTime = esp_timer_get_time(); // extend pause blanking time
      }
      if ( (esp_timer_get_time() - PauseStartTime) >= TimeOfPause )
      {
        Pause = false;
      }
    }
    vTaskDelay( 250 );
  }
  vTaskDelete( NULL );
} //void fScreenBlanking( void *pvParameters )
//////
void fparseMQTT( void *pvParameters )
{
  struct stu_message px_message;
  for (;;)
  {
    if ( xQueueReceive(xQ_Message, &px_message, portMAX_DELAY) == pdTRUE )
    {
      xSemaphoreTake( sema_mqttOK, portMAX_DELAY );
      mqttOK = 0;
      xSemaphoreGive( sema_mqttOK );
      if ( px_message.topic == topicRemainingMoisture_0 )
      {
        x_eData.RM0  = String(px_message.payload).toFloat();
      }
      if ( px_message.topic == topicWindSpeed )
      {
        x_eData.WS = String(px_message.payload).toFloat();
      }
      if ( px_message.topic == topicWindDirection )
      {
        x_eData.WD = "";
        x_eData.WD = String(px_message.payload);
      }
      if ( px_message.topic == topicRainfall )
      {
        x_eData.RF = String(px_message.payload).toFloat();
      }
      if ( px_message.topic == topicWSVolts )
      {
        x_eData.WSV = String(px_message.payload).toFloat();
      }
      if ( px_message.topic == topicWSCurrent )
      {
        x_eData.WSC = String(px_message.payload).toFloat();
      }
      if ( px_message.topic == topicWSPower )
      {
        x_eData.WSP = String(px_message.payload).toFloat();
      }
      if ( px_message.topic == topicDPnWI )
      {
        xQueueSend( xQ_WindChillDewPoint, (void *) &px_message, 1 );
      }
    } //if ( xQueueReceive(xQ_Message, &px_message, portMAX_DELAY) == pdTRUE )
  } //for(;;)
  vTaskDelete( NULL );
} // void fparseMQTT( void *pvParameters )
////
void fDoTheDisplayThing( void * parameter )
{
  tft.init( 240, 320 ); // Init ST7789 320x240
  tft.setRotation( 3 );
  tft.setTextSize( 3 );
  tft.fillScreen( ST77XX_BLACK );
  tft.setTextWrap( false );
  struct stu_eData px_eData;
  const int brightness = 250;
  ledcWrite( 4, brightness ); //backlight set
  const int MaxString      = 20;
  String oldTempString     = "";
  String oldHumidityString = "";
  String oldAQIString      = "";
  String oldRainfall       = "";
  String oldWindDirection  = "";
  String oldAirPressure    = "";
  String oldRMO            = "";
  String oldPM2            = "";
  String oldPower          = "";
  oldHumidityString.reserve( MaxString );
  oldWindDirection.reserve( MaxString );
  oldAirPressure.reserve( MaxString );
  oldTempString.reserve( MaxString );
  oldAQIString.reserve( MaxString );
  oldRainfall.reserve( MaxString );
  oldPower.reserve( MaxString );
  oldRMO.reserve( MaxString );
  oldPM2.reserve( MaxString );
  bool Tick = true;
  const int numOfColors = 40;
  /* https://chrishewett.com/blog/true-rgb565-colour-picker/#:~:text=A%20true%20RGB565%20colour%20picker%2021st%20Oct%202017,in%205%20bits%20and%20green%20in%206%20bits. */
  int colors[numOfColors] = { ST77XX_BLACK, ST77XX_RED, ST77XX_WHITE, ST77XX_BLUE, ST77XX_GREEN, ST77XX_CYAN, ST77XX_MAGENTA, ST77XX_YELLOW, 0xd55b, 0xee09,
                              0x2e15, 0xcb43, 0x6bad, 0x126f, 0x1264, 0xe264, 0xe7e4, 0x87e4, 0x87fe, 0x876a,
                              0xe304, 0x1cc4, 0xf4c4, 0xf4da, 0xcf66, 0xa879, 0x7f28, 0x4f37, 0xfa97, 0x6195,
                              0X8162, 0xc962, 0x517b, 0x325b, 0xea5b, 0x179b, 0xff80, 0xf960, 0x416d, 0x7bd1
                            };
  int colorCounter = 1;
  for (;;)
  {
    if ( xQueueReceive(xQ_eData, &px_eData, portMAX_DELAY) == pdTRUE )
    {
      tft.setCursor( 0, 0 );
      tft.setTextColor( colors[0] );
      tft.print( oldTempString );
      tft.setCursor( 0, 0 );
      tft.setTextColor( colors[colorCounter] );
      oldTempString = "";
      if ( Tick )
      {
        oldTempString.concat( "iTemp " + String(px_eData.Temperature) + "F" );
      } else {
        oldTempString.concat( "Wind Chill " + String(px_eData.WindChill) + "F" );
      }
      tft.println( oldTempString );
      tft.setCursor( 0, 30 );
      tft.setTextColor( colors[0] );
      tft.print( oldHumidityString );
      tft.setCursor( 0, 30 );
      tft.setTextColor( colors[colorCounter] );
      oldHumidityString = "";
      oldHumidityString.concat( "iHum  " + String(px_eData.Humidity) + "%" );
      tft.println( oldHumidityString );
      tft.setCursor( 0, 60 );
      tft.setTextColor( colors[0] );
      tft.print( oldAirPressure );
      tft.setCursor( 0, 60 );
      tft.setTextColor( colors[colorCounter] );
      oldAirPressure = "";
      //oldAirPressure.concat( "Pres " + String(px_eData.Pressure) + "mmHg" );
      oldAirPressure.concat( "Dew Pt. " + String(px_eData.DewPoint) + "F" );
      tft.println( oldAirPressure );
      tft.setCursor( 0, 90 );
      tft.setTextColor( colors[0] );
      tft.print( oldAQIString );
      tft.setCursor( 0, 90 );
      tft.setTextColor( colors[colorCounter] );
      oldAQIString = "";
      oldAQIString.concat( "iAQI " + String(px_eData.IAQ) + "%" );
      tft.println( oldAQIString );
      tft.setCursor( 0, 120 );
      tft.setTextColor( colors[0] );
      tft.print( oldRMO );
      tft.setCursor( 0, 120 );
      tft.setTextColor( colors[colorCounter] );
      oldRMO = "";
      if ( Tick )
      {
        oldRMO.concat( "iRM0 " + String(px_eData.RM0) + "%" );
      } else {
        oldRMO.concat( "iCO2 " + String(CO2) + "ppm" );
      }
      tft.println( oldRMO );
      tft.setCursor( 0, 150 );
      tft.setTextColor( colors[0] );
      tft.print( oldPM2 );
      tft.setCursor( 0, 150 );
      tft.setTextColor( colors[colorCounter] );
      oldPM2 = "";
      oldPM2.concat( "PM2 " + String(px_eData.PM2) + "ug/m3" );
      tft.println( oldPM2 );
      tft.setCursor( 0, 180 );
      tft.setTextColor( colors[0] );
      tft.print( oldPower );
      tft.setCursor( 0, 180 );
      tft.setTextColor( colors[colorCounter] );
      oldPower = "";
      oldPower.concat(  String(px_eData.WSV) + " Volts" );
      //oldPower.concat(  String(px_eData.WSV) + "V " + String(int(px_eData.WSC * 1000.0f)) + "mA " + String((int(px_eData.WSP * 1000.0f))) + "mW" );
      tft.println( oldPower );
      colorCounter++;
      if ( colorCounter > (numOfColors - 1) )
      {
        colorCounter = 1;
      }
      Tick = !Tick;
      //log_i( " high watermark % d",  uxTaskGetStackHighWaterMark( NULL ) );
    } //if ( xQueueReceive(xQ_eData, &px_eData, portMAX_DELAY) == pdTRUE )
  } //for (;;)
  vTaskDelete( NULL );
} //void fDoTheDisplayTHing( void * parameter )
////
void fmqttWatchDog( void * paramater )
{
  int maxNonMQTTresponse = 5;
  for (;;)
  {
    vTaskDelay( 1000 );
    if ( mqttOK >= maxNonMQTTresponse )
    {
      ESP.restart();
    }
  }
  vTaskDelete( NULL );
}
////
float fCalulate_IAQ_Index( int gasResistance, float Humidity)
{
  float hum_baseline = 40.0f;
  float hum_weighting = 0.25f;
  float gas_offset = 0.0f;
  float hum_offset = 0.0f;
  float hum_score = 0.0f;
  float gas_score = 0.0f;
  gas_offset = oGasResistanceBaseLine - float( gasResistance );
  hum_offset = float( Humidity ) - hum_baseline;
  // calculate hum_score as distance from hum_baseline
  if ( hum_offset > 0.0f )
  {
    hum_score = 100.0f - hum_baseline - hum_offset;
    hum_score /= ( 100.0f - hum_baseline );
    hum_score *= ( hum_weighting * 100.0f );
  } else {
    hum_score = hum_baseline + hum_offset;
    hum_score /= hum_baseline;
    hum_score *= ( 100.0f - (hum_weighting * 100.0f) );
  }
  //calculate gas score as distance from baseline
  if ( gas_offset > 0.0f )
  {
    gas_score = float( gasResistance ) / oGasResistanceBaseLine;
    gas_score *= ( 100.0f - (hum_weighting * 100.0f ) );
  } else {
    gas_score = 100.0f - ( hum_weighting * 100.0f );
  }
  return ( hum_score + gas_score );
} //void fCalulate_IAQ_Index( int gasResistance, float Humidity):
////
void fDoParticleDetector( void * parameter )
{
  /*
    ug/m3     AQI                 Lvl AQ (Air Quality)
    (air Quality Index)
    0-35     0-50                1   Excellent
    35-75    51-100              2   Average
    75-115   101-150             3   Light pollution
    115-150  151-200             4   moderate
    150-250  201-300             5   heavy
    250-500  >=300               6   serious
  */
  float ADbits = 4095.0f;
  float uPvolts = 3.3f;
  float adcValue = 0.0f;
  float dustDensity = 0.0f;
  float Voc = 0.6f; // Set the typical output voltage, when there is zero dust.
  const float K = 0.5f; // Use the typical sensitivity in units of V per 100ug/m3.
  xEventGroupWaitBits (eg, evtWaitForBME, pdTRUE, pdTRUE, portMAX_DELAY );
  TickType_t xLastWakeTime = xTaskGetTickCount();
  const TickType_t xFrequency = 100; //delay for mS
  for (;;)
  {
    //enable sensor led
    gpio_set_level( GPIO_NUM_4, HIGH ); // set gpio 4 to high to turn on sensor internal led for measurement
    esp_timer_start_once( oneshot_timer, 280 ); // trigger one shot timer for a 280uS timeout, warm up time.
    xEventGroupWaitBits (eg, evtDoParticleRead, pdTRUE, pdTRUE, portMAX_DELAY ); // event will be triggered by the timer expiring, wait here for the 280uS
    adcValue = float( adc1_get_raw(ADC1_CHANNEL_0) ); //take a raw ADC reading from the dust sensor
    gpio_set_level( GPIO_NUM_4, LOW );//Shut off the sensor LED
    adcValue = ( adcValue * uPvolts ) / ADbits; //calculate voltage
    dustDensity = (adcValue / K) * 100.0; //convert volts to dust density
    if ( dustDensity < 0.0f )
    {
      dustDensity = 0.00f; // make negative values a 0
    }
    if ( xSemaphoreTake( sema_PublishPM, 0 ) == pdTRUE )  // don't wait for semaphore to be available
    {
      xSemaphoreTake( sema_MQTT_KeepAlive, portMAX_DELAY );
      //log_i( "ADC volts %f Dust Density = %ug / m3 ", adcValue, dustDensity ); // print the calculated voltage and dustdensity
      MQTTclient.publish( topicInsidePM, String(dustDensity).c_str() );
      xSemaphoreGive( sema_MQTT_KeepAlive );
      x_eData.PM2 = dustDensity;
    }
    xLastWakeTime = xTaskGetTickCount();
    vTaskDelayUntil( &xLastWakeTime, xFrequency );
    //log_i( " high watermark % d",  uxTaskGetStackHighWaterMark( NULL ) );
  }
  vTaskDelete( NULL );
}// end fDoParticleDetector()
////
void DoTheBME680Thing( void *pvParameters )
{
  SPI.begin(); // initialize the SPI library
  vTaskDelay( 10 );
  if (!bme.begin()) {
    log_i("Could not find a valid BME680 sensor, check wiring!");
    while (1);
  }
  // Set up oversampling and filter initialization
  bme.setTemperatureOversampling(BME680_OS_8X);
  bme.setHumidityOversampling(BME680_OS_2X);
  bme.setPressureOversampling(BME680_OS_4X);
  bme.setIIRFilterSize(BME680_FILTER_SIZE_3);
  bme.setGasHeater(320, 150); // 320*C for 150 ms
  //wait for a mqtt connection
  while ( !MQTTclient.connected() )
  {
    vTaskDelay( 250 );
  }
  xEventGroupSetBits( eg, evtWaitForBME );
  TickType_t xLastWakeTime    = xTaskGetTickCount();
  const TickType_t xFrequency = 1000 * 15; //delay for mS
  String bmeInfo = "";
  bmeInfo.reserve( 100 );
  for (;;)
  {
    x_eData.Temperature  = bme.readTemperature();
    x_eData.Temperature  = ( x_eData.Temperature * 1.8f ) + 32.0f; // (Celsius x 1.8) + 32
    x_eData.Pressure     = bme.readPressure();
    x_eData.Pressure     = x_eData.Pressure / 133.3223684f; //converts to mmHg
    x_eData.Humidity     = bme.readHumidity();
    x_eData.IAQ          = fCalulate_IAQ_Index( bme.readGas(), x_eData.Humidity );
    //log_i( " temperature % f, Pressure % f, Humidity % f IAQ % f", x_eData.Temperature, x_eData.Pressure, x_eData.Humidity, x_eData.IAQ);
    bmeInfo.concat( String(x_eData.Temperature, 2) );
    bmeInfo.concat( "," );
    bmeInfo.concat( String(x_eData.Pressure, 2) );
    bmeInfo.concat( "," );
    bmeInfo.concat( String(x_eData.Humidity, 2) );
    bmeInfo.concat( "," );
    bmeInfo.concat( String(x_eData.IAQ, 2) );
    xSemaphoreTake( sema_MQTT_KeepAlive, portMAX_DELAY );
    if ( MQTTclient.connected() )
    {
      MQTTclient.publish( topicInsideInfo, bmeInfo.c_str() );
    }
    xSemaphoreGive( sema_MQTT_KeepAlive );
    xSemaphoreGive( sema_PublishPM ); // release publish of dust density
    xSemaphoreTake( sema_mqttOK, portMAX_DELAY );
    mqttOK ++;
    xSemaphoreGive( sema_mqttOK );
    xQueueOverwrite( xQ_eData, (void *) &x_eData );// send data to display
    //
    bmeInfo = ""; // empty the string buffer
    findDewPointWithHumidity( x_eData.Humidity, x_eData.Temperature );
    xLastWakeTime = xTaskGetTickCount();
    vTaskDelayUntil( &xLastWakeTime, xFrequency );
    // log_i( "DoTheBME280Thing high watermark % d",  uxTaskGetStackHighWaterMark( NULL ) );
  }
  vTaskDelete ( NULL );
}
////
/*
  Important to not set vTaskDelay/vTaskDelayUntil to less then 10. Errors begin to develop with the MQTT and network connection.
  makes the initial wifi/mqtt connection and works to keeps those connections open.
*/
void MQTTkeepalive( void *pvParameters )
{
  sema_MQTT_KeepAlive   = xSemaphoreCreateBinary();
  xSemaphoreGive( sema_MQTT_KeepAlive ); // found keep alive can mess with a publish, stop keep alive during publish
  MQTTclient.setKeepAlive( 90 ); // setting keep alive to 90 seconds makes for a very reliable connection, must be set before the 1st connection is made.
  TickType_t xLastWakeTime = xTaskGetTickCount();
  const TickType_t xFrequency = 250; //delay for ms
  for (;;)
  {
    //check for a is-connected and if the WiFi 'thinks' its connected, found checking on both is more realible than just a single check
    if ( (wifiClient.connected()) && (WiFi.status() == WL_CONNECTED) )
    {
      xSemaphoreTake( sema_MQTT_KeepAlive, portMAX_DELAY ); // whiles MQTTlient.loop() is running no other mqtt operations should be in process
      MQTTclient.loop();
      xSemaphoreGive( sema_MQTT_KeepAlive );
    }
    else {
      log_i( "MQTT keep alive found MQTT status % s WiFi status % s", String(wifiClient.connected()), String(WiFi.status()) );
      if ( !(wifiClient.connected()) || !(WiFi.status() == WL_CONNECTED) )
      {
        connectToWiFi();
      }
      connectToMQTT();
    }
    xLastWakeTime = xTaskGetTickCount();
    vTaskDelayUntil( &xLastWakeTime, xFrequency );
  }
  vTaskDelete ( NULL );
}
////
void connectToMQTT()
{
  byte mac[5]; // create client ID from mac address
  WiFi.macAddress(mac); // get mac address
  String clientID = String(mac[0]) + String(mac[4]) ; // use mac address to create clientID
  while ( !MQTTclient.connected() )
  {
    MQTTclient.connect( clientID.c_str(), mqtt_username, mqtt_password );
    vTaskDelay( 250 );
  }
  MQTTclient.setCallback( mqttCallback );
  MQTTclient.subscribe  ( topicOK );
  MQTTclient.subscribe  ( topicRemainingMoisture_0 );
  MQTTclient.subscribe  ( topicWindSpeed );
  MQTTclient.subscribe  ( topicWindDirection );
  MQTTclient.subscribe  ( topicRainfall );
  MQTTclient.subscribe  ( topicWSVolts );
  MQTTclient.subscribe  ( topicWSCurrent );
  MQTTclient.subscribe  ( topicWSPower );
  MQTTclient.subscribe  ( topicDPnWI );
} //void connectToMQTT()
void connectToWiFi()
{
  int TryCount = 0;
  while ( WiFi.status() != WL_CONNECTED )
  {
    TryCount++;
    WiFi.disconnect();
    WiFi.begin( SSID, PASSWORD );
    vTaskDelay( 4000 );
    if ( TryCount == 10 )
    {
      ESP.restart();
    }
  }
  WiFi.onEvent( WiFiEvent );
}
////
float findDewPointWithHumidity( float humi, float temperature )
{
  //Celcius
  float ans =  (temperature - (14.55 + 0.114 * temperature) * (1 - (0.01 * humi)) - pow(((2.5 + 0.007 * temperature) * (1 - (0.01 * humi))), 3) - (15.9 + 0.117 * temperature) * pow((1 - (0.01 * humi)), 14));
  //log_i( "%f", ans );
  return ans;
}
void loop() { }
////

See void fparseMQTT( void *pvParameters ) how the payload is broken out and applied to variables that are used elsewhere.

1 Like

[quote="Idahowalker, post:2, topic:931843"]

Thanks I am going to dive in to your code

I have been playing around with the code, but don't seem to get it working.
Can you have a look at what I did and see if I am on the right track?

#include <OneWire.h>
#include <Adafruit_MLX90614.h>
#include <WiFi.h>
#include <PubSubClient.h>

Adafruit_MLX90614 mlx = Adafruit_MLX90614();

QueueHandle_t xQ_eData; // environmental data to be displayed on the screen
struct stu_eData
{
  float  SCDtemperature = 0.0f;
  float  SCDhumidity;

} x_eData; // environmental data
QueueHandle_t xQ_Message; // payload and topic queue of MQTT payload and topic
const int payloadSize = 100;
struct stu_message
{
  char payload [payloadSize] = {'\0'};
  String topic ;
} x_message;

//Wifi Settings
const char* ssid = "SSID";
const char* password =  "PASS";

//MQTT Settings
const char* mqttServer = "MQTT IP";
const int mqttPort = 1883;
const char* mqttUser = "MQTT USER";
const char* mqttPassword = "MQTT PASS";

WiFiClient espClient;
PubSubClient client(espClient);




int period = 5000;
unsigned long time_now = 0;

long lastReconnectAttempt = 0;

uint64_t chipid = ESP.getEfuseMac(); // MAC address of ESP32
uint16_t chip = (uint16_t)(chipid >> 32);
char clientid[25];

boolean reconnect() {
  if (client.connect(clientid, mqttUser, mqttPassword )) {
    // Once connected, publish an announcement...
    client.subscribe("SCD30/temperature");
    client.subscribe("SCD30/humidity");
    client.publish("MLX90614/Info", "Reconnected", true);
  }
  return client.connected();
}



void callback(char* topic, byte* payload, unsigned int length)
{
  Serial.print("Message arrived in topic: ");
  Serial.println(topic);
  Serial.print("Message:");
  for (int i = 0; i < length; i++)
  {
    Serial.print((char)payload[i]);
  }
  Serial.println();
  Serial.println("-----------------------");

}


void setup() {
  // Start the Serial Monitor
  Serial.begin(115200);
  mlx.begin();



  WiFi.begin(ssid, password);

  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.println("Connecting to WiFi..");
  }
  Serial.println("Connected to the WiFi network");

  snprintf(clientid, 25, "ESP32-%08X", chip);

  client.setServer(mqttServer, mqttPort);

  client.setCallback(callback);

  while (!client.connected()) {
    Serial.println("Connecting to MQTT...");

    if (client.connect(clientid, mqttUser, mqttPassword )) {

      Serial.println("connected");


    } else {

      Serial.print("failed with state ");
      Serial.print(client.state());
      delay(2000);
      lastReconnectAttempt = 0;
    }

  }
  client.subscribe("SCD30/temperature");
  client.subscribe("SCD30/humidity");
}

void fparseMQTT( void *pvParameters )
{
  struct stu_message px_message;
  for (;;)
  {
    if ( xQueueReceive(xQ_Message, &px_message, portMAX_DELAY) == pdTRUE )
    {

      if ( px_message.topic == "SCD30/temperature" )
      {
        x_eData.SCDtemperature  = String(px_message.payload).toFloat();
      }
      if ( px_message.topic == "SCD30/humidity" )
      {
        x_eData.SCDhumidity = String(px_message.payload).toFloat();
      }
    }
  }

}


  void loop() {
    time_now = millis();
    //sensors.requestTemperatures();
    // float temperatureC = sensors.getTempCByIndex(0);
    float  ambientCelsius = mlx.readAmbientTempC();
    float  objectCelsius = mlx.readObjectTempC();





    if (WiFi.status() != WL_CONNECTED) {
      WiFi.begin(ssid, password);
    }


    if (!client.connected()) {
      unsigned long now = millis();
      if (now - lastReconnectAttempt > 5000UL) {
        lastReconnectAttempt = now;
        // Attempt to reconnect
        Serial.print("Reconnecting");
        if (reconnect()) {
          lastReconnectAttempt = 0;
        }
      }
    } else {
      // Client connected

      while (millis() < time_now + period) {
        //wait approx. [period] ms
      }


      {
        Serial.print("Ambient = ");
        Serial.print(mlx.readAmbientTempC());
        Serial.print("*C\tObject = ");
        Serial.print(mlx.readObjectTempC());
        Serial.println("*C");
        Serial.println(ambientCelsius + objectCelsius);


        client.publish("MLX90614/Temp/Ambient",  String(ambientCelsius, 2).c_str());
        client.publish("MLX90614/Temp/Object",  String(objectCelsius, 2).c_str());

        client.publish("MLX90614/Temp/Ambient/SCD30temperature",  String(x_eData.SCDtemperature, 2).c_str());
        client.publish("MLX90614/Temp/Object/SCD30humidity",  String(x_eData.SCDhumidity, 2).c_str());

      }

    }
    client.loop();
  }

you'll need to do something else instead of using queues to pass message between functions, is the first thing I see needs be done.

Well, you guessed correctly that you have to do something with the incoming message.

Here's how I do it:


//==================================  mqtt callback ==================================
//This function is executed when some device publishes a message to a topic that this ESP8266 is subscribed to.
//
void callback(String topic, byte *message, unsigned int length) {

  char mess[24];

  Serial.println();
  Serial.print(F("Message arrived on topic: "));
  Serial.println(topic);
  Serial.print(F("Message length= "));
  Serial.println(length);

  for (unsigned int i = 0; i < length; i++) {
    mess[i] = message[i];
  }

  toLow(mess);         //Make mess LowerCase
  Serial.print("message: ");
  Serial.print(mess);
  Serial.println();


  if (topic == cmndTopic) {
    Serial.println(F("Received cmndTopic"));

    if (!strcmp(mess, "reset")) {          //if mess=="reset", then strcmp returns a zero (false).
      diagFlag = false;
      Speed.start();
      //Serial.println(F("diagFlag=false"));
    }
  }

  if (topic == ledTopic) {
    Serial.println(F("Received ledTopic"));
    diagFlag = true;
    diagLED = atoi(mess);
    if (diagLED < 0) diagLED = 0;           //No negative LED numbers.
    Diag.setTime(10000, true);              //How long we remain in diag mode.
    Speed.stop();                           //Turn off the drip timers.
    Hold.stop();
  }

  if (topic == holdTopic) {                //How long between drips
    Serial.println(F("Received holdTopic"));
    dripHold = atoi(mess);
    if (dripHold < 0) dripHold = 2000;     //Negative defaults to 2 seconds
  }

}



void toUp(char *p) {
  while (*p) {
    *p = toupper(*p);
    p++;
  }
}

void toLow(char *p) {
  while (*p) {
    *p = tolower(*p);
    p++;
  }
}

What you need to see is how I handle mess, how I test mess, and how I do something depending on the content of mess.

I do tend to make a mess of my code....

Thanks for the replys.

I cant even get this to work:

void callback(char* topic, byte* payload, unsigned int length) {

  Serial.print("Message arrived in topic: ");
  Serial.println(topic);
  Serial.print("Message:");

  for (int i = 0; i < length; i++) {
    Serial.print((char)payload[i]);
  }

  if (topic == "SCD/temperature") {
    Serial.println(F("Received cmndTopic"));
  }

}

I do receive this in the monitor:
23:27:31.092 -> Ambient = 20.47C Object = 20.05C

But its nor posting "Received cmndTopic"

How do i wright this part?

  if (topic == "SCD/temperature")

The topic i use is SCD with sub topic temperature, if i post the topic without the " i get SCD in not declared. I think this is the first thing i need to work. If this does not work it will never do enything.
How do i declare the topic?
.

Edit:
@SteveMann as far as i understand your code.
You creat a char named mess.
Then you give the incoming value/text to mess
Then you turn mess in to lowercase letters.
Then you check if mess is reset or not.
Then if mess has not been reset, you give the mess value to diagLED

How are you publishing a message to the topic?

Thanks for the reply.
This is from the SCD30 co2 sensor that is publishing the messages to the topic's

   if (airSensor.dataAvailable())
    {

      Serial.print(F("SCD30 co2(ppm) : "));
      Serial.print(airSensor.getCO2());
      co2 = airSensor.getCO2();
      client.publish("SCD/co2",  String(co2, 0).c_str());

      Serial.print(F("\ttemp(C): "));
      temps = airSensor.getTemperature();
      Serial.print(temps, 2);
      client.publish("SCD/temperature",  String(temps).c_str());

      Serial.print(F("\thumidity(%): "));
      hums = airSensor.getHumidity();
      Serial.print(hums, 1);
      client.publish("SCD/humidity",  String(hums).c_str());

      Serial.print(F("\tVPD(kPa): "));
      Serial.print(VPD);
      client.publish("SCD/VPD",  String(VPD).c_str());

      Serial.println();
    }

You really need MQTT Explorer to see what is being published through your broker.

Thanks for the reply.

I do have MQTT.fx

I have a ESP12 connected to a Sensirion SDC30 CO2 sensor.
Its has been working fine for more then a year.
The CO2, temp and humidity are received by the broker.
I also have them displayed on my sitemap on Openhab3.
Below is the code of that:

#include <Wire.h>
#include "paulvha_SCD30.h"
#include <ESP8266WiFi.h>
#include <PubSubClient.h>


//////////////////////////////////////////////////////////////////////////
// set SCD30 driver debug level (ONLY NEEDED CASE OF SCD30 ERRORS)      //
//                                                                      //
// 0 : no messages                                                      //
// 1 : request sending and receiving                                    //
// 2 : request sending and receiving + show protocol errors             //
//////////////////////////////////////////////////////////////////////////
#define scd_debug 0

//Wifi Settings
const char* ssid = "SSID";
const char* password =  "PASSWD";

//MQTT Settings
const char* mqttServer = "IP";
const int mqttPort = 1883;
const char* mqttUser = "USER";
const char* mqttPassword = "PASSWD";


WiFiClient espClient;
PubSubClient client(espClient);


long lastReconnectAttempt = 0;

uint32_t chipid = ESP.getChipId();
char clientid[25];

boolean reconnect() {
  if (client.connect(clientid, mqttUser, mqttPassword )) {
    // Once connected, publish an announcement...
    client.publish("SCD30/Info", "Reconnected");
  }
  return client.connected();
}



//////////////////////////////////////////////////////////////////////////
//////////////// NO CHANGES BEYOND THIS POINT NEEDED /////////////////////
//////////////////////////////////////////////////////////////////////////

SCD30 airSensor;


void setup()
{
  char buf[10];

  Wire.begin();

  Serial.begin(115200);
  WiFi.begin(ssid, password);
  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.println("Connecting to WiFi..");
  }
  Serial.println("Connected to the WiFi network");

  snprintf(clientid, 25, "ESP12-%08X", chipid);

  client.setServer(mqttServer, mqttPort);
  while (!client.connected()) {
    Serial.println("Connecting to MQTT...");
    if (client.connect(clientid, mqttUser, mqttPassword )) {
      Serial.println("connected");
    } else {
      Serial.print("failed with state ");
      Serial.print(client.state());
      delay(2000);
      lastReconnectAttempt = 0;
    }
  }


  // This will cause readings to occur every two seconds and automatic calibration
  // on an ESP8266 must called last to set the clock stretching correct for SCD30
  airSensor.begin(Wire);
  //This will cause SCD30 readings to occur every two seconds
  if (airSensor.begin() == false)
  {
    Serial.println("The SCD30 did not respond. Please check wiring.");
    while (1);
  }
  // Read SCD30 serial number as printed on the device
  // buffer MUST be at least 7 digits (6 serial + 0x0)
  airSensor.getSerialNumber(buf);
  Serial.print("serial number: ");
  Serial.println(buf);
}



void loop()
{
  float co2; 
  float temps; 
  float hums;
  float VPsat = 610.7 * pow(10, (7.5 * temps / (237.3 + temps)))/1000; // Saturation vapor pressure in Pascals
  float VPair = 610.7 * pow(10, (7.5 * temps / (237.3 + temps)))/1000 * (hums /100);  // Actual vapor pressure in Pascals
  float VPD = VPsat - VPair;  // Vapor Pressure Deficit in Pascals



  if (!client.connected()) {
    unsigned long now = millis();
    if (now - lastReconnectAttempt > 5000UL) {
      lastReconnectAttempt = now;
      // Attempt to reconnect
      if (reconnect()) {
        lastReconnectAttempt = 0;
      }
    }
  } else {
    if (airSensor.dataAvailable())
    {

      Serial.print(F("SCD30 co2(ppm) : "));
      Serial.print(airSensor.getCO2());
      co2 = airSensor.getCO2();
      client.publish("SCD/co2",  String(co2, 0).c_str());

      Serial.print(F("\ttemp(C): "));
      temps = airSensor.getTemperature();
      Serial.print(temps, 2);
      client.publish("SCD/temperature",  String(temps).c_str());

      Serial.print(F("\thumidity(%): "));
      hums = airSensor.getHumidity();
      Serial.print(hums, 1);
      client.publish("SCD/humidity",  String(hums).c_str());

      Serial.print(F("\tVPD(kPa): "));
      Serial.print(VPD);
      client.publish("SCD/VPD",  String(VPD).c_str());

      Serial.println();
    }
    else
      Serial.println(F("No SCD30 data"));
    client.loop();
  }
  delay(2000);
}

In the code you shared, you have this:

  if (topic == cmndTopic) {
    Serial.println(F("Received cmndTopic"));

I also have seen many post on the internet with this, but then the name "cmndTopic" is different.
Is "cmndTopic" the topic where the value/message you look for is posted?
If yes, how would you have typed the topic if you had the value/message you are looking for was posted in sub topic of "cmndTopic" with lets say the name "test"

If i type this:

  if (topic == SCD/temperature) {

I get the error: SCD nor declared

if (topic == "SCD/temperature") {

In my example cmndTopic is a char array declared at the start of the sketch.

1 Like

An example of a declared topic that is located on another tab:

const char* topicSRSSDDT             = "Home/SRSSDDT"; // sunrise sunset dusk dawn transit
1 Like

If i have this in my skets:

void callback(char* topic, byte* payload, unsigned int length)
{
  Serial.print("Message arrived in topic: ");
  Serial.println(topic);
  Serial.print("Message:");
  for (int i = 0; i < length; i++)
  {
    Serial.print((char)payload[i]);
  }
  Serial.println();
  Serial.println("-----------------------");

}

It should print to serial monitor the topic and value of the topic i am subscribe to?

  client.subscribe("SCD/temperature");
  client.subscribe("SCD/humidity");
  

EDIT:
Strange, now this part works. But hé its a step forward.

I now have a output in my serial monitor, but it looks weird.
I has the value, but also allot of text end question marks
This is my code:

void callback(char* topic, byte* payload, unsigned int length)
{

  String strTopic;

  strTopic = String((char*)topic);
  if (strTopic  == "SCD/temperature") {
    char buffer[6];
    memcpy(buffer, payload, length);  //copy the payload to the buffer
    buffer[length] = '\0';  //terminate the string (NOT a String !
    String result = String((char*)payload);
    Serial.print("Message1:");
    Serial.println(result);
  }
}

And this is the output I am getting:

04:29:43.918 -> Connecting to WiFi..
04:29:43.918 -> Connected to the WiFi network
04:29:43.918 -> Connecting to MQTT...
04:29:44.339 -> connected
04:29:49.371 -> Ambient = 21.09*C	Object = 20.81*C
04:29:49.371 -> 41.82
04:29:54.360 -> Ambient = 21.07*C	Object = 20.77*C
04:29:54.360 -> 41.90
04:29:59.358 -> Ambient = 21.09*C	Object = 20.81*C
04:29:59.358 -> 41.84
04:29:59.358 -> Message1:29.63ient/SCD30temperature0.00debaas⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮xV⸮⸮t⸮⸮?4⸮⸮
04:30:04.376 -> Ambient = 21.09*C	Object = 20.81*C
04:30:04.376 -> 41.90
04:30:09.378 -> Ambient = 21.07*C	Object = 20.83*C
04:30:09.378 -> 41.90
04:30:09.378 -> Message1:29.65ient/SCD30temperature0.00debaas⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮⸮xV⸮⸮t⸮⸮?4⸮⸮
04:30:14.396 -> Ambient = 21.07*C	Object = 20.73*C
04:30:14.396 -> 41.90

The value part of it is the correct value.
Can you point me in a derection of what i did wrong here

@SteveMann @Idahowalker Thanks for helping end sharing you skets.
I have it working, maybe not a it should, but like i sad I am not a programmer.
I now print the temp on the serial monitor and in a new MQTT topic, so I can now do calculations with it, witch is my goal from all of this.
Will share my full code below.
Again thanks, I really appreciate this!

void callback

void callback(char* topic, byte* payload, unsigned int length)
{

  String strTopic;
  
  strTopic = String((char*)topic);
  if (strTopic  == "SCD/temperature") {
    char buffer[6];
    memcpy(buffer, payload, length);  //copy the payload to the buffer
    buffer[length] = '\0';  //terminate the string (NOT a String !
    temp = String((char*)payload).toFloat();
    Serial.print("Message1:");
    Serial.println(temp);
  }

    strTopic = String((char*)topic);
  if (strTopic  == "SCD/humidity") {
    char buffer[6];
    memcpy(buffer, payload, length);  //copy the payload to the buffer
    buffer[length] = '\0';  //terminate the string (NOT a String !
    hum = String((char*)payload).toFloat();
    Serial.print("Message2:");
    Serial.println(hum);
  }
}

Full code

#include <OneWire.h>
#include <Adafruit_MLX90614.h>
#include <WiFi.h>
#include <PubSubClient.h>

Adafruit_MLX90614 mlx = Adafruit_MLX90614();



//Wifi Settings
const char* ssid = "SSID";
const char* password =  "PASS";

//MQTT Settings
const char* mqttServer = "MQTT IP";
const int mqttPort = 1883;
const char* mqttUser = "MQTT USER";
const char* mqttPassword = "MQTT PASS";



WiFiClient espClient;
PubSubClient client(espClient);


float  temp;
float hum;

int period = 5000;
unsigned long time_now = 0;

long lastReconnectAttempt = 0;

uint64_t chipid = ESP.getEfuseMac(); // MAC address of ESP32
uint16_t chip = (uint16_t)(chipid >> 32);
char clientid[25];

boolean reconnect() {
  if (client.connect(clientid, mqttUser, mqttPassword )) {
    // Once connected, publish an announcement...
    client.subscribe("SCD/temperature");
    client.subscribe("SCD/humidity");
    client.publish("MLX90614/Info", "Reconnected", true);
  }
  return client.connected();
}



void callback(char* topic, byte* payload, unsigned int length)
{

  String strTopic;
  
  strTopic = String((char*)topic);
  if (strTopic  == "SCD/temperature") {
    char buffer[6];
    memcpy(buffer, payload, length);  //copy the payload to the buffer
    buffer[length] = '\0';  //terminate the string (NOT a String !
    temp = String((char*)payload).toFloat();
    Serial.print("Message1:");
    Serial.println(temp);
  }

    strTopic = String((char*)topic);
  if (strTopic  == "SCD/humidity") {
    char buffer[6];
    memcpy(buffer, payload, length);  //copy the payload to the buffer
    buffer[length] = '\0';  //terminate the string (NOT a String !
    hum = String((char*)payload).toFloat();
    Serial.print("Message2:");
    Serial.println(hum);
  }
}

void setup() {
  // Start the Serial Monitor
  Serial.begin(115200);
  mlx.begin();



  WiFi.begin(ssid, password);

  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.println("Connecting to WiFi..");
  }
  Serial.println("Connected to the WiFi network");

  snprintf(clientid, 25, "ESP32-%08X", chip);

  client.setServer(mqttServer, mqttPort);


  client.setCallback(callback);

  while (!client.connected()) {
    Serial.println("Connecting to MQTT...");

    if (client.connect(clientid, mqttUser, mqttPassword )) {

      Serial.println("connected");

      client.subscribe("SCD/temperature");
      client.subscribe("SCD/humidity");

    } else {

      Serial.print("failed with state ");
      Serial.print(client.state());
      delay(2000);
      lastReconnectAttempt = 0;
    }

  }
}




void loop() {
  time_now = millis();
  //sensors.requestTemperatures();
  // float temperatureC = sensors.getTempCByIndex(0);
  float  ambientCelsius = mlx.readAmbientTempC();
  float  objectCelsius = mlx.readObjectTempC();





  if (WiFi.status() != WL_CONNECTED) {
    WiFi.begin(ssid, password);
  }


  if (!client.connected()) {
    unsigned long now = millis();
    if (now - lastReconnectAttempt > 5000UL) {
      lastReconnectAttempt = now;
      // Attempt to reconnect
      Serial.print("Reconnecting");
      if (reconnect()) {
        lastReconnectAttempt = 0;
      }
    }
  } else {
    // Client connected

    while (millis() < time_now + period) {
      //wait approx. [period] ms
    }


    {
      Serial.print("Ambient = ");
      Serial.print(mlx.readAmbientTempC());
      Serial.print("*C\tObject = ");
      Serial.print(mlx.readObjectTempC());
      Serial.println("*C");
      Serial.println(ambientCelsius + objectCelsius);


      client.publish("MLX90614/Temp/Ambient",  String(ambientCelsius, 2).c_str());
      client.publish("MLX90614/Temp/Object",  String(objectCelsius, 2).c_str());

      client.publish("MLX90614/Temp/Ambient/SCD30temperature",  String(temp, 2).c_str());
      client.publish("MLX90614/Temp/Object/SCD30humidity",  String(hum, 2).c_str());

    }

  }
  client.loop();
}

EDIT:

A bit cleaner

void callback(char* topic, byte* payload, unsigned int length)
{

  String strTopic = String((char*)topic);
  

  if (strTopic  == "SCD/temperature") {
    char buffer[6];
    memcpy(buffer, payload, length);  //copy the payload to the buffer
    buffer[length] = '\0';  //terminate the string (NOT a String !
    temp = String((char*)payload).toFloat();
    Serial.print("Message1:");
    Serial.println(temp);
  }

    
  if (strTopic  == "SCD/humidity") {
    char buffer[6];
    memcpy(buffer, payload, length);  //copy the payload to the buffer
    buffer[length] = '\0';  //terminate the string (NOT a String !
    hum = String((char*)payload).toFloat();
    Serial.print("Message2:");
    Serial.println(hum);
  }
}
1 Like

This topic was automatically closed 180 days after the last reply. New replies are no longer allowed.