Repeat sending mqtt message until an acknowledgment

Hello,

I use an esp8266 to send the readings of a sensor to my mqtt broker / to node-red.
from time to time it happens that a mqtt message does not get to node-red and so I loose this reading. I don't excactly know why but maybe it has to do something with the network quality. However.
I reconfigured my node-red flow in that way, that as soon as a message arrives, it sends another message "back" to the broker/my esp8266.

So, I want to change my code in that way, that it is trying to send the message more than one time to the broker. Just as long as it receives the answer.
I have a void callback in my code and I am able to receive mqtt messages.
But I do not know how I can make kind of an loop (it would be best if it would not block the other part of the code, the ongoing measurement of the sensor; but it would also be ok if that's not possible or too complicate) which is sending the message, waits one second and tries again if no confirmation has arrived.

Do you know any example of this?

does you mqtt broker send an acknowledgement?

Basically, If you are using pubsubclient you can set QoS to 1.

There are some basic settings that PubSubClient has that can increase reliability. One setting would be to change the default MQTT disconnect time and the other would be to set for your messages to be retained.

Basically, I am unsure how to modify your code to do the thing cause I cannot see your well formatted code in code tags.

Perhaps this code will give you a few ideas in creating a more robust and realiable MQTT connection.

/*
   Chappie Weather upgrade/addition
   process wind speed direction and rain fall.
*/
#include "esp32/ulp.h"
//#include "ulptool.h"
#include "driver/rtc_io.h"
#include <WiFi.h>
#include <PubSubClient.h>
#include "certs.h"
#include "sdkconfig.h"
#include "esp_system.h"
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "freertos/timers.h"
#include "freertos/event_groups.h"
#include "driver/pcnt.h"
#include <driver/adc.h>
#include <SimpleKalmanFilter.h>
#include <ESP32Time.h>
////
ESP32Time rtc;
WiFiClient wifiClient;
PubSubClient MQTTclient(mqtt_server, mqtt_port, wifiClient);
////
float CalculatedVoltage = 0.0f;
float kph = 0.0f;
float rain  = 0.0f;
/*
   PCNT PCNT_UNIT_0, PCNT_CHANNEL_0 GPIO_NUM_15 = pulse input pin
   PCNT PCNT_UNIT_1, PCNT_CHANNEL_0 GPIO_NUM_4 = pulse input pin
*/
pcnt_unit_t pcnt_unit00 = PCNT_UNIT_0; //pcnt unit 0 channel 0
pcnt_unit_t pcnt_unit10 = PCNT_UNIT_1; //pcnt unit 1 channel 0
//
//
hw_timer_t * timer = NULL;
//
#define evtAnemometer  ( 1 << 0 )
#define evtRainFall    ( 1 << 1 )
#define evtParseMQTT   ( 1 << 2 )
EventGroupHandle_t eg;
#define OneMinuteGroup ( evtAnemometer | evtRainFall )
////
QueueHandle_t xQ_Message; // payload and topic queue of MQTT payload and topic
const int payloadSize = 100;
struct stu_message
{
  char payload [payloadSize] = {'\0'};
  String topic ;
} x_message;
////
SemaphoreHandle_t sema_MQTT_KeepAlive;
SemaphoreHandle_t sema_mqttOK;
SemaphoreHandle_t sema_CalculatedVoltage;
////
int mqttOK = 0; // stores a count value that is used to cause an esp reset
volatile bool TimeSet = false;
////
/*
   A single subject has been subscribed to, the mqtt broker sends out "OK" messages if the client receives an OK message the mqttOK value is set back to zero.
*/
////
void IRAM_ATTR mqttCallback(char* topic, byte * payload, unsigned int length)
{
  memset( x_message.payload, '\0', payloadSize ); // clear payload char buffer
  x_message.topic = ""; //clear topic string buffer
  x_message.topic = topic; //store new topic
  int i = 0; // extract payload
  for ( i; i < length; i++)
  {
    x_message.payload[i] = ((char)payload[i]);
  }
  x_message.payload[i] = '\0';
  xQueueOverwrite( xQ_Message, (void *) &x_message );// send data to queue
} // void mqttCallback(char* topic, byte* payload, unsigned int length)
////
// interrupt service routine for WiFi events put into IRAM
void IRAM_ATTR WiFiEvent(WiFiEvent_t event)
{
  switch (event) {
    case SYSTEM_EVENT_STA_CONNECTED:
      break;
    case SYSTEM_EVENT_STA_DISCONNECTED:
      log_i("Disconnected from WiFi access point");
      break;
    case SYSTEM_EVENT_AP_STADISCONNECTED:
      log_i("WiFi client disconnected");
      break;
    default: break;
  }
} // void IRAM_ATTR WiFiEvent(WiFiEvent_t event)
////
void IRAM_ATTR onTimer()
{
  BaseType_t xHigherPriorityTaskWoken;
  xEventGroupSetBitsFromISR(eg, OneMinuteGroup, &xHigherPriorityTaskWoken);
} // void IRAM_ATTR onTimer()
////
void setup()
{
  eg = xEventGroupCreate(); // get an event group handle
  x_message.topic.reserve(100);
  adc1_config_width(ADC_WIDTH_12Bit);
  adc1_config_channel_atten(ADC1_CHANNEL_6, ADC_ATTEN_DB_11);// using GPIO 34 wind direction
  adc1_config_channel_atten(ADC1_CHANNEL_3, ADC_ATTEN_DB_11);// using GPIO 39 current
  adc1_config_channel_atten(ADC1_CHANNEL_0, ADC_ATTEN_DB_11);// using GPIO 36 battery volts

  // hardware timer 4 set for one minute alarm
  timer = timerBegin( 3, 80, true );
  timerAttachInterrupt( timer, &onTimer, true );
  timerAlarmWrite(timer, 60000000, true);
  timerAlarmEnable(timer);
  /* Initialize PCNT's counter */
  int PCNT_H_LIM_VAL         = 3000;
  int PCNT_L_LIM_VAL         = -10;
  // 1st PCNT counter
  pcnt_config_t pcnt_config  = {};
  pcnt_config.pulse_gpio_num = GPIO_NUM_15;// Set PCNT input signal and control GPIOs
  pcnt_config.ctrl_gpio_num  = PCNT_PIN_NOT_USED;
  pcnt_config.channel        = PCNT_CHANNEL_0;
  pcnt_config.unit           = PCNT_UNIT_0;
  // What to do on the positive / negative edge of pulse input?
  pcnt_config.pos_mode       = PCNT_COUNT_INC;   // Count up on the positive edge
  pcnt_config.neg_mode       = PCNT_COUNT_DIS;   // Count down disable
  // What to do when control input is low or high?
  pcnt_config.lctrl_mode     = PCNT_MODE_KEEP; // Keep the primary counter mode if low
  pcnt_config.hctrl_mode     = PCNT_MODE_KEEP;    // Keep the primary counter mode if high
  // Set the maximum and minimum limit values to watch
  pcnt_config.counter_h_lim  = PCNT_H_LIM_VAL;
  pcnt_config.counter_l_lim  = PCNT_L_LIM_VAL;
  pcnt_unit_config(&pcnt_config); // Initialize PCNT unit
  pcnt_set_filter_value( PCNT_UNIT_0, 1); //Configure and enable the input filter
  pcnt_filter_enable( PCNT_UNIT_0 );
  pcnt_counter_pause( PCNT_UNIT_0 );
  pcnt_counter_clear( PCNT_UNIT_0 );
  pcnt_counter_resume( PCNT_UNIT_0); // start the show
  // setup 2nd PCNT
  pcnt_config = {};
  pcnt_config.pulse_gpio_num = GPIO_NUM_4;
  pcnt_config.ctrl_gpio_num  = PCNT_PIN_NOT_USED;
  pcnt_config.channel        = PCNT_CHANNEL_0;
  pcnt_config.unit           = PCNT_UNIT_1;
  pcnt_config.pos_mode       = PCNT_COUNT_INC;
  pcnt_config.neg_mode       = PCNT_COUNT_DIS;
  pcnt_config.lctrl_mode     = PCNT_MODE_KEEP;
  pcnt_config.hctrl_mode     = PCNT_MODE_KEEP;
  pcnt_config.counter_h_lim  = PCNT_H_LIM_VAL;
  pcnt_config.counter_l_lim  = PCNT_L_LIM_VAL;
  pcnt_unit_config(&pcnt_config);
  //pcnt_set_filter_value( PCNT_UNIT_1, 1 );
  //pcnt_filter_enable  ( PCNT_UNIT_1 );
  pcnt_counter_pause  ( PCNT_UNIT_1 );
  pcnt_counter_clear  ( PCNT_UNIT_1 );
  pcnt_counter_resume ( PCNT_UNIT_1 );
  //
  xQ_Message = xQueueCreate( 1, sizeof(stu_message) );
  //
  sema_CalculatedVoltage = xSemaphoreCreateBinary();
  xSemaphoreGive( sema_CalculatedVoltage );
  sema_mqttOK = xSemaphoreCreateBinary();
  xSemaphoreGive( sema_mqttOK );
  sema_MQTT_KeepAlive = xSemaphoreCreateBinary();
  ///
  xTaskCreatePinnedToCore( MQTTkeepalive, "MQTTkeepalive", 15000, NULL, 5, NULL, 1 );
  xTaskCreatePinnedToCore( fparseMQTT, "fparseMQTT", 10000, NULL, 5, NULL, 1 ); // assign all to core 1, WiFi in use.
  xTaskCreatePinnedToCore( fReadBattery, "fReadBattery", 4000, NULL, 3, NULL, 1 );
  xTaskCreatePinnedToCore( fReadCurrent, "fReadCurrent", 4000, NULL, 3, NULL, 1 );
  xTaskCreatePinnedToCore( fWindDirection, "fWindDirection", 10000, NULL, 4, NULL, 1 );
  xTaskCreatePinnedToCore( fAnemometer, "fAnemometer", 10000, NULL, 4, NULL, 1 );
  xTaskCreatePinnedToCore( fRainFall, "fRainFall", 10000, NULL, 4, NULL, 1 );
  xTaskCreatePinnedToCore( fmqttWatchDog, "fmqttWatchDog", 3000, NULL, 3, NULL, 1 ); // assign all to core 1
} //void setup()
static void init_ulp_program()
{

}
////
void fWindDirection( void *pvParameters )
// read the wind direction sensor, return heading in degrees
{
  float adcValue = 0.0f;
  uint64_t TimePastKalman  = esp_timer_get_time();
  SimpleKalmanFilter KF_ADC( 1.0f, 1.0f, .01f );
  float high = 0.0f;
  float low = 2000.0f;
  float ADscale = 3.3f / 4096.0f;
  TickType_t xLastWakeTime = xTaskGetTickCount();
  const TickType_t xFrequency = 100; //delay for mS
  int count = 0;
  String windDirection;
  windDirection.reserve(20);
  String MQTTinfo = "";
  MQTTinfo.reserve( 150 );
  while ( !MQTTclient.connected() )
  {
    vTaskDelay( 250 );
  }
  for (;;)
  {
    windDirection = "";
    adcValue = float( adc1_get_raw(ADC1_CHANNEL_6) ); //take a raw ADC reading
    KF_ADC.setProcessNoise( (esp_timer_get_time() - TimePastKalman) / 1000000.0f ); //get time, in microsecods, since last readings
    adcValue = KF_ADC.updateEstimate( adcValue ); // apply simple Kalman filter
    TimePastKalman = esp_timer_get_time(); // time of update complete
    adcValue = adcValue * ADscale;
    if ( (adcValue >= 0.0f) & (adcValue <= .25f )  )
    {
      // log_i( " n" );
      windDirection.concat( "N" );
    }
    if ( (adcValue > .25f) & (adcValue <= .6f ) )
    {
      //  log_i( " e" );
      windDirection.concat( "E" );
    }
    if ( (adcValue > 2.0f) & ( adcValue < 3.3f) )
    {
      //   log_i( " s" );
      windDirection.concat( "S");
    }
    if ( (adcValue >= 1.7f) & (adcValue < 2.0f ) )
    {
      // log_i( " w" );
      windDirection.concat( "W" );
    }
    if ( count >= 30 )
    {
      MQTTinfo.concat( String(kph, 2) );
      MQTTinfo.concat( ",");
      MQTTinfo.concat( windDirection );
      MQTTinfo.concat( ",");
      MQTTinfo.concat( String(rain, 2) );
      xSemaphoreTake( sema_MQTT_KeepAlive, portMAX_DELAY );
      MQTTclient.publish( topicWSWDRF, MQTTinfo.c_str() );
      xSemaphoreGive( sema_MQTT_KeepAlive );
      count = 0;
    }
    count++;
    MQTTinfo = "";
    xLastWakeTime = xTaskGetTickCount();
    vTaskDelayUntil( &xLastWakeTime, xFrequency );
  }
  vTaskDelete ( NULL );
}
// read rainfall
void fRainFall( void *pvParemeters )
{
  int count = 0;
  pcnt_counter_pause( PCNT_UNIT_1 );
  pcnt_counter_clear( PCNT_UNIT_1 );
  pcnt_counter_resume( PCNT_UNIT_1 );
  for  (;;)
  {
    xEventGroupWaitBits (eg, evtRainFall, pdTRUE, pdTRUE, portMAX_DELAY);
    pcnt_counter_pause( PCNT_UNIT_1 );
    pcnt_get_counter_value( PCNT_UNIT_1, &count );
    pcnt_counter_clear( PCNT_UNIT_1 );
    pcnt_counter_resume( PCNT_UNIT_1 );
    if ( count != 0 )
    {
      // 0.2794mm of rain per click clear clicks at mid night
      rain = 0.2794f * (float)count;
      //log_i( "count %d, rain rain = %f mm", count, rain );
    }
    if ( (rtc.getHour(true) == 0) && (rtc.getMinute() == 0) )
    {
      pcnt_counter_pause( PCNT_UNIT_1 );
      rain = 0.0;
      count = 0;
      pcnt_counter_clear( PCNT_UNIT_1 );
      pcnt_counter_resume( PCNT_UNIT_1 );
    }
  }
  vTaskDelete ( NULL );
}
////
void fAnemometer( void *pvParameters )
{
  //int count = 0;
  pcnt_counter_clear(PCNT_UNIT_0);
  pcnt_counter_resume(PCNT_UNIT_0);
  for (;;)
  {
    xEventGroupWaitBits (eg, evtAnemometer, pdTRUE, pdTRUE, portMAX_DELAY);
    pcnt_counter_pause( PCNT_UNIT_0 );
    pcnt_get_counter_value( PCNT_UNIT_0, &count); //int16_t *count
    // A wind speed of 2.4km/h causes the switch to close once per second
    kph = 2.4f * ((float)count / 60.0f);
    /*
      if ( count != 0 )
      {
        log_i( "count %d, wind KPH = %f", count, kph );
      }
    count = 0;
    */
    pcnt_counter_clear( PCNT_UNIT_0 );
    pcnt_counter_resume( PCNT_UNIT_0 );
  }
  vTaskDelete ( NULL );
}
//////
void fmqttWatchDog( void * paramater )
{
  int UpdateImeTrigger = 86400; //seconds in a day
  int UpdateTimeInterval = 86300; // 1st time update in 100 counts
  int maxNonMQTTresponse = 60;
  for (;;)
  {
    vTaskDelay( 1000 );
    if ( mqttOK >= maxNonMQTTresponse )
    {
      ESP.restart();
    }
    xSemaphoreTake( sema_mqttOK, portMAX_DELAY );
    mqttOK++;
    xSemaphoreGive( sema_mqttOK );
    UpdateTimeInterval++; // trigger new time get
    if ( UpdateTimeInterval >= UpdateImeTrigger )
    {
      TimeSet = false; // sets doneTime to false to get an updated time after a days count of seconds
      UpdateTimeInterval = 0;
    }
  }
  vTaskDelete( NULL );
}
//////
void fparseMQTT( void *pvParameters )
{
  struct stu_message px_message;
  for (;;)
  {
    if ( xQueueReceive(xQ_Message, &px_message, portMAX_DELAY) == pdTRUE )
    {
      // parse the time from the OK message and update MCU time
      if ( String(px_message.topic) == topicOK )
      {
        if ( !TimeSet)
        {
          String temp = "";
          temp =  px_message.payload[0];
          temp += px_message.payload[1];
          temp += px_message.payload[2];
          temp += px_message.payload[3];
          int year =  temp.toInt();
          temp = "";
          temp =  px_message.payload[5];
          temp += px_message.payload[6];
          int month =  temp.toInt();
          temp =  "";
          temp =  px_message.payload[8];
          temp += px_message.payload[9];
          int day =  temp.toInt();
          temp = "";
          temp = px_message.payload[11];
          temp += px_message.payload[12];
          int hour =  temp.toInt();
          temp = "";
          temp = px_message.payload[14];
          temp += px_message.payload[15];
          int min =  temp.toInt();
          rtc.setTime( 0, min, hour, day, month, year );
          log_i( "rtc  %s ", rtc.getTime() );
          TimeSet = true;
        }
      }
      //
    } //if ( xQueueReceive(xQ_Message, &px_message, portMAX_DELAY) == pdTRUE )
    xSemaphoreTake( sema_mqttOK, portMAX_DELAY );
    mqttOK = 0;
    xSemaphoreGive( sema_mqttOK );
  }
} // void fparseMQTT( void *pvParameters )#include <ESP32Time.h>
//////
void fReadCurrent( void * parameter )
{
  float ADbits = 4096.0f;
  float ref_voltage = 3.3f;
  uint64_t TimePastKalman  = esp_timer_get_time(); // used by the Kalman filter UpdateProcessNoise, time since last kalman calculation
  SimpleKalmanFilter KF_I( 1.0f, 1.0f, .01f );
  float mA = 0.0f;
  int   printCount = 0;
  /*
     185mv/A = 5 AMP MODULE
     100mv/A = 20 amp module
     66mv/A = 30 amp module
  */
  const float mVperAmp = 185.0f;
  float adcValue = 0;
  float Voltage = 0;
  float Power = 0.0;
  String powerInfo = "";
  powerInfo.reserve( 150 );
  while ( !MQTTclient.connected() )
  {
    vTaskDelay( 250 );
  }
  TickType_t xLastWakeTime = xTaskGetTickCount();
  const TickType_t xFrequency = 1000; //delay for mS
  for (;;)
  {
    adc1_get_raw(ADC1_CHANNEL_3); // read once discard reading
    adcValue = ( (float)adc1_get_raw(ADC1_CHANNEL_3) );
    //log_i( "adcValue I = %f", adcValue );
    Voltage = ( (adcValue * ref_voltage) / ADbits ) + offSET; // Gets you mV
    mA = Voltage / mVperAmp; // get amps
    KF_I.setProcessNoise( (esp_timer_get_time() - TimePastKalman) / 1000000.0f ); //get time, in microsecods, since last readings
    mA = KF_I.updateEstimate( mA ); // apply simple Kalman filter
    TimePastKalman = esp_timer_get_time(); // time of update complete
    printCount++;
    if ( printCount == 60 )
    {
      xSemaphoreTake( sema_CalculatedVoltage, portMAX_DELAY);
      Power = CalculatedVoltage * mA;
      log_i( "Voltage=%f mA=%f Power=%f", CalculatedVoltage, mA, Power );
      printCount = 0;
      powerInfo.concat( String(CalculatedVoltage, 2) );
      xSemaphoreGive( sema_CalculatedVoltage );
      powerInfo.concat( ",");
      powerInfo.concat( String(mA, 4) );
      powerInfo.concat( ",");
      powerInfo.concat( String(Power, 4) );
      xSemaphoreTake( sema_MQTT_KeepAlive, portMAX_DELAY );
      MQTTclient.publish( topicPower, powerInfo.c_str() );
      xSemaphoreGive( sema_MQTT_KeepAlive );
      powerInfo = "";
    }
    xLastWakeTime = xTaskGetTickCount();
    vTaskDelayUntil( &xLastWakeTime, xFrequency );
  }
  vTaskDelete( NULL );
} //void fReadCurrent( void * parameter )
////
void fReadBattery( void * parameter )
{
  float adcValue = 0.0f;
  const float r1 = 50500.0f; // R1 in ohm, 50K
  const float r2 = 10000.0f; // R2 in ohm, 10k potentiometer
  float Vbatt = 0.0f;
  int printCount = 0;
  float vRefScale = (3.3f / 4096.0f) * ((r1 + r2) / r2);
  uint64_t TimePastKalman  = esp_timer_get_time(); // used by the Kalman filter UpdateProcessNoise, time since last kalman calculation
  SimpleKalmanFilter KF_ADC_b( 1.0f, 1.0f, .01f );
  TickType_t xLastWakeTime = xTaskGetTickCount();
  const TickType_t xFrequency = 1000; //delay for mS
  for (;;)
  {
    adc1_get_raw(ADC1_CHANNEL_0); //read and discard
    adcValue = float( adc1_get_raw(ADC1_CHANNEL_0) ); //take a raw ADC reading
    KF_ADC_b.setProcessNoise( (esp_timer_get_time() - TimePastKalman) / 1000000.0f ); //get time, in microsecods, since last readings
    adcValue = KF_ADC_b.updateEstimate( adcValue ); // apply simple Kalman filter
    Vbatt = adcValue * vRefScale;
    xSemaphoreTake( sema_CalculatedVoltage, portMAX_DELAY );
    CalculatedVoltage = Vbatt;
    xSemaphoreGive( sema_CalculatedVoltage );
    /*
      printCount++;
      if ( printCount == 3 )
      {
      log_i( "Vbatt %f", Vbatt );
      printCount = 0;
      }
    */
    TimePastKalman = esp_timer_get_time(); // time of update complete
    xLastWakeTime = xTaskGetTickCount();
    vTaskDelayUntil( &xLastWakeTime, xFrequency );
    //log_i( "fReadBattery %d",  uxTaskGetStackHighWaterMark( NULL ) );
  }
  vTaskDelete( NULL );
}
////
void MQTTkeepalive( void *pvParameters )
{
  sema_MQTT_KeepAlive   = xSemaphoreCreateBinary();
  xSemaphoreGive( sema_MQTT_KeepAlive ); // found keep alive can mess with a publish, stop keep alive during publish
  // setting must be set before a mqtt connection is made
  MQTTclient.setKeepAlive( 90 ); // setting keep alive to 90 seconds makes for a very reliable connection, must be set before the 1st connection is made.
  for (;;)
  {
    //check for a is-connected and if the WiFi 'thinks' its connected, found checking on both is more realible than just a single check
    if ( (wifiClient.connected()) && (WiFi.status() == WL_CONNECTED) )
    {
      xSemaphoreTake( sema_MQTT_KeepAlive, portMAX_DELAY ); // whiles MQTTlient.loop() is running no other mqtt operations should be in process
      MQTTclient.loop();
      xSemaphoreGive( sema_MQTT_KeepAlive );
    }
    else {
      log_i( "MQTT keep alive found MQTT status %s WiFi status %s", String(wifiClient.connected()), String(WiFi.status()) );
      if ( !(wifiClient.connected()) || !(WiFi.status() == WL_CONNECTED) )
      {
        connectToWiFi();
      }
      connectToMQTT();
    }
    vTaskDelay( 250 ); //task runs approx every 250 mS
  }
  vTaskDelete ( NULL );
}
////
void connectToWiFi()
{
  int TryCount = 0;
  while ( WiFi.status() != WL_CONNECTED )
  {
    TryCount++;
    WiFi.disconnect();
    WiFi.begin( SSID, PASSWORD );
    vTaskDelay( 4000 );
    if ( TryCount == 10 )
    {
      ESP.restart();
    }
  }
  WiFi.onEvent( WiFiEvent );
} // void connectToWiFi()
////
void connectToMQTT()
{
  MQTTclient.setKeepAlive( 90 ); // needs be made before connecting
  byte mac[5];
  WiFi.macAddress(mac);
  String clientID = String(mac[0]) + String(mac[4]) ; // use mac address to create clientID
  while ( !MQTTclient.connected() )
  {
    // boolean connect(const char* id, const char* user, const char* pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage);
    MQTTclient.connect( clientID.c_str(), mqtt_username, mqtt_password, NULL , 1, true, NULL );
    vTaskDelay( 250 );
  }
  MQTTclient.setCallback( mqttCallback );
  MQTTclient.subscribe( topicOK );
} // void connectToMQTT()
////
void loop() {}

Basically. Oh, and there is always the off chance that someone will just write the code for you.

1 Like

Yes

do you have code the receives and recognizes that an acknowledgement has been sent?

Ok, I am one step further now. I recognized that my code does not receive every mqtt message. the messages corresponding the first topic are send once a second by the sending device.
I only get the every 5 seconds. THat's absolutly ok for me.
But if i try to send a msg to the second topic manually it does not appear. It seems that there is something like a queue of topic1 messages in the ESP32 which are blocking other messages or something like that.
From time to time the connection to the broker is re-establishing and if I try to send the topic2 messages just after that connection re-establishing, they appear on the console.

The bloody mess of the code:

//published zusätzlich den fritz!DECT-Wert   client.publish("sensors/display/solar", solar);
#include <WiFi.h>
#include <WiFiMulti.h>
#include <Wire.h>  // Only needed for Arduino 1.6.5 and earlier
#include <PubSubClient.h>
#include <ArduinoFritzApi.h>

const char* fritz_user     = "user";
const char* fritz_password = "pw";
const char* fritz_ip       = "192.168.178.1"; // ip or fritz.local
const char* fritz_ain      = "116570483787";
/*
   The actor identification number (ain) can be fount in the fritzbox
   web interface or is found on a label on the device

*/


unsigned long previousMillis = 0;        // will store last time LED was updated
const long interval = 4680;           // interval at which to blink (milliseconds)

FritzApi fritz(fritz_user, fritz_password, fritz_ip);

const char* mqtt_server = "192.168.178.44";
const int mqttPort = 1883;
const char* mqttUser = "mosq";
const char* mqttPassword = "pw";

int LED_BUILTIN = 2;
String temp = "";  //temporär
String zaehler = "0";
String packet = "";
String received = "";
float output = 0;
float output2 = 0;
float output3 = 0;
float einsparung = 0;
float solar = 0;
float check = 0;
float verbrauchkorr = 0;
float verbrauch = 0;
WiFiMulti wifiMulti;
WiFiClient espClient;
PubSubClient client(espClient);
long lastMsg = 0;
char msg[50];
int value = 0;
int loops = 0;
int n = 0;
int pubwerteinsparung = 0;
int pubwertsolar = 0;
int pubwertverbrauch = 0;
int pubwerteinsparung1 = 0;
int pubwertsolar1 = 0;
int pubwertverbrauch1 = 0;

void setup() {
  // initialize digital pin LED_BUILTIN as an output.
  pinMode(LED_BUILTIN, OUTPUT);
  Serial.begin(9600);
  setup_wifi();
  client.setServer(mqtt_server, 1883);
  client.setCallback(callback);

  reconnect();
  // Initialize Fritzbox stuff;
  try {
    fritz.init();
  } catch (int e) {
    Serial.println("Could not connect to fritzbox: " + String(e));
  }
  Serial.println("Fritz connected");
}


void setup_wifi() {

  delay(10);

  wifiMulti.addAP("ssid", "password");
  wifiMulti.addAP("ssid2", "pw2");

  Serial.println("Connecting Wifi...");
  if (wifiMulti.run() == WL_CONNECTED) {
    Serial.println("");
    Serial.println("WiFi connected");
    Serial.println("IP address: ");
    Serial.println(WiFi.localIP());
  }

}

void callback(char* topic, byte* payload, unsigned int length) {
  String sTopic = String(topic);
  Serial.print("topic empfangen: ");
  Serial.println(String(topic));

  if (sTopic == "strom/smartmeter/sensor/1/obis/1-0:16.7.0/255/value") {

    temp = "";

    for (int i = 0; i < length; i++) {
      temp += ((char)payload[i]);
    }
    zaehler = temp;
    Serial.println(zaehler);

  }
  else  if (sTopic == "sensors/received") {

    temp = "";

    for (int i = 0; i < length; i++) {
      temp += ((char)payload[i]);
    }
    received = temp;
    Serial.println(received);

  }
  else {
      Serial.print("Diese Topic kenne ich nicht: ");
      Serial.println(sTopic);}

}

void reconnect() {

      if(wifiMulti.run() != WL_CONNECTED) {
        Serial.println("WiFi not connected!");
        void setup_wifi();
    }
  // Loop until we're reconnected
  while (!client.connected()) {
    Serial.print("Attempting MQTT connection...");
    // Attempt to connect
    //  uint32_t chipid=ESP.getChipId();
    char clientid[25];
    snprintf(clientid, 25, "WIFI-Display-%08X", "12345"); //this adds the mac address to the client for a unique id
    Serial.print("Client ID: ");
    Serial.println(clientid);
    if (client.connect(clientid)) {
      Serial.println("connected");

      client.subscribe("strom/smartmeter/sensor/1/obis/1-0:16.7.0/255/value");
      client.subscribe("sensors/received");

    } else {
      Serial.print("failed, rc=");
      Serial.print(client.state());
      Serial.println(" try again in 5 seconds");
      // Wait 5 seconds before retrying
      delay(5000);
    }
  }
}

void loop()
{
  
  reconnect();
  digitalWrite(LED_BUILTIN, LOW);    // turn the LED off by making the voltage LOW
  unsigned long currentMillis = millis();
  if (currentMillis - previousMillis >= interval) {
    if ( loops < 5) {
      check = fritz.getSwitchPower(fritz_ain);
      if ( check != 0 )  {
      solar = fritz.getSwitchPower(fritz_ain);
      }
      verbrauch = zaehler.toFloat();  //Zählerstand
      digitalWrite(LED_BUILTIN, HIGH);   // turn the LED on (HIGH is the voltage level)
      Serial.print("Zähleranzeige: ");
      Serial.println(verbrauch);
      Serial.print ("Solarertrag: ");
      Serial.println (solar);
     // client.publish("sensors/display/solar", String(solar).c_str());   //früher    client.publish("sensors/power/solar", String(solar).c_str());

      verbrauchkorr = ((verbrauch) + (solar));
      Serial.print("Zähleranzeige + Solar: ");
      Serial.println(verbrauchkorr);

      if ((verbrauch) <= 0) {
        einsparung = verbrauchkorr;
        Serial.print("Einsparung: ");
        Serial.println(einsparung);
      }
      else {
        einsparung = solar;
        Serial.print("Einsparung: ");
        Serial.println(einsparung);
      }

      pubwerteinsparung = pubwerteinsparung + einsparung;
      pubwertsolar = pubwertsolar + solar;
      pubwertverbrauch = pubwertverbrauch + verbrauch;
      Serial.print("hochaddition momentan: ");
      Serial.println(pubwertverbrauch);
      loops += 1;
      Serial.println(loops);

    }
    else  {
      //    Serial.print("hochaddierter verbrauch: ");
      //    Serial.println(pubwertverbrauch);
      pubwertverbrauch = pubwertverbrauch / 6;
      pubwertsolar = pubwertsolar / 6;
      pubwerteinsparung = pubwerteinsparung / 6;
      Serial.print("Mittelwert verbrauch momentan: ");
      Serial.println(pubwertverbrauch);
      Serial.print("Mittelwert solar momentan: ");
      Serial.println(pubwertsolar);
      Serial.print("Mittelwert einsparung momentan: ");
      Serial.println(pubwerteinsparung);
      Serial.print("hochaddierter verbrauchgeteilt durch 5: ");
      Serial.println(pubwertverbrauch);
      addition();
      loops = 0;
      Serial.print("loops zurückgesetzt auf: ");
      Serial.println(loops);
    }
    client.loop();
    previousMillis = currentMillis;
  }
}
void addition() {

  Serial.print("n war: ");
  Serial.println(n);


  n += 1;
  Serial.print("n ist jetzt: ");
  Serial.println(n);
  output = pubwerteinsparung;
  output2 = pubwertsolar;
  output3 = pubwertverbrauch;

  //    Serial.print("pubwertverbrauch hier: ");
  //    Serial.println(pubwertverbrauch);
  //    Serial.print("wert  wird zu output3: ");
  //    Serial.println(output3);

  pubwerteinsparung1 = (pubwerteinsparung1 + output);
  Serial.print("pubwerteinsparung ist jetzt: ");
  Serial.println(pubwerteinsparung1);

  pubwertsolar1 = (pubwertsolar1 + output2);
  Serial.print("pubwertsolar ist jetzt: ");
  Serial.println(pubwertsolar1);

  pubwertverbrauch1 = (pubwertverbrauch1 + output3);
  Serial.print("pubwertverbrauch ist jetzt: ");
  Serial.println(pubwertverbrauch1);

  if ( n > 1) {

    pubwerteinsparung = pubwerteinsparung1 / 15;
    pubwertsolar = pubwertsolar1 / 15;
    pubwertverbrauch = pubwertverbrauch1 / 15;
    do {
     
    convert2Json();
    delay(3000); 
    Serial.println(received);
    } while (received != "true");
  pubwerteinsparung1 = 0;
  pubwertsolar1 = 0;
  pubwertverbrauch1 = 0;
  received = "false";
    n = 0;
    Serial.print("n zurückgesetzt auf: ");
    Serial.println(n);
  }

}

void convert2Json()
{
  reconnect();
  packet = "";


  packet.concat(("{\"einsparung\": "));
  packet.concat(pubwerteinsparung);

  packet.concat((", \"solarertrag\": "));
  packet.concat(pubwertsolar);

  packet.concat((", \"verbrauch\": "));
  packet.concat(pubwertverbrauch);

  packet.concat("}");
  Serial.println(packet);
  client.publish("sensors2/power", packet.c_str());
  Serial.print("send to node-red: ");
  Serial.println("sensors2/power  published");

}

I can see in a MQTT Client (MQTTBox) that the acknowledgement has been send everytime.

???

Everytime a message arrives at node-red. Of course if not, node-red cannot answer.

From my observations and experience I had issue with keeping the dang MQTT connection open. Once I was able to keep a reliable client connection open, all my data got through.

My MQTT Broker is running on a RPi. A Python program serves as the communications broker. Once a second the Python program broadcasts a OK topic the date/time as a payload.

The client, as per the code posted previously, subscribes to the OK topic being published once a second.

In

You can see

Which clears a count.

In this function

Which runs once a second, it checks to see if 60 counts have passed and if so reset the client.

The other thing the function does is to cause the client to reacquire the time.

when a message is sent, a timer is set. the timer is reset when an ACK is received. if the timer expires, the msg is resent. if this occurs 3 times, the link is reset

Oi! So the code from post #3 can be used as a model, post#10, to cause the OP's desired outcome.

A message is sent from the client, the client starts a count. When the NodeRed replies the count is cleared. If count happens x number of time, redo the thing.

Basically.

i hope you don't mean reset the processor.

i mentioned reset the link. this is from old HDLC protocols. today, this might mean reconnect with some end-point and if not possible, reconnect through some intermediary such as a WiFi node

and if this has happened, is the a backlog of data the now has to be forwarded?

Basically, in the case of my program, I am resetting the processor. In the case of the OP they can do their own thing.

If one was to take a look at the posted code, #3, one would see there is a MQTT keep alive function that handles WiFi and MQTT disconnect issues. From time to time I may update the RPi that is running the MQTT Broker. When I reset the RPi/MQTT Broker, the clients loose the brokers token. So I do not have to climb into my attic or into the backyard or creep around in the crawl space, I have the 12 ESP32's reboot if they cannot get a MQTT Broker response in x amount of time.

Ok guys, it's working now. The code got a little bit complex but it is reliable now.
I have added an additional millis-delay to wait for the acknowledgement. And also I have placed the client.loop-thing at another postion out of the if-statement which is runnig only once every 5 seconds.

void loop()
{
  
  reconnect();
  digitalWrite(LED_BUILTIN, LOW);    // turn the LED off by making the voltage LOW
  unsigned long currentMillis = millis();
  if (currentMillis - previousMillis >= interval) {

    Serial.print("received momentan: ");
    Serial.println(received);
    
    if ( loops < 5) {
      check = fritz.getSwitchPower(fritz_ain);
      if ( check != 0 )  {
      solar = fritz.getSwitchPower(fritz_ain);
      }
      verbrauch = zaehler.toFloat();  //Zählerstand
      digitalWrite(LED_BUILTIN, HIGH);   // turn the LED on (HIGH is the voltage level)
      Serial.print("Zähleranzeige: ");
      Serial.println(verbrauch);
      Serial.print ("Solarertrag: ");
      Serial.println (solar);
     // client.publish("sensors/display/solar", String(solar).c_str());   //früher    client.publish("sensors/power/solar", String(solar).c_str());

      verbrauchkorr = ((verbrauch) + (solar));
      Serial.print("Zähleranzeige + Solar: ");
      Serial.println(verbrauchkorr);

      if ((verbrauch) <= 0) {
        einsparung = verbrauchkorr;
        Serial.print("Einsparung: ");
        Serial.println(einsparung);
      }
      else {
        einsparung = solar;
        Serial.print("Einsparung: ");
        Serial.println(einsparung);
      }

      pubwerteinsparung = pubwerteinsparung + einsparung;
      pubwertsolar = pubwertsolar + solar;
      pubwertverbrauch = pubwertverbrauch + verbrauch;
      Serial.print("hochaddition momentan: ");
      Serial.println(pubwertverbrauch);
      loops += 1;
      Serial.println(loops);

    }
   if (loops >= 5  && count == 0) {  
      //    Serial.print("hochaddierter verbrauch: ");
      //    Serial.println(pubwertverbrauch);
      pubwertverbrauch = pubwertverbrauch / 6;
      pubwertsolar = pubwertsolar / 6;
      pubwerteinsparung = pubwerteinsparung / 6;
      Serial.print("Mittelwert verbrauch momentan: ");
      Serial.println(pubwertverbrauch);
      Serial.print("Mittelwert solar momentan: ");
      Serial.println(pubwertsolar);
      Serial.print("Mittelwert einsparung momentan: ");
      Serial.println(pubwerteinsparung);
      Serial.print("hochaddierter verbrauchgeteilt durch 5: ");
      Serial.println(pubwertverbrauch);
      addition();

  if ( n > 1 && count == 0) {

    pubwerteinsparung = pubwerteinsparung1 / 15;
    pubwertsolar = pubwertsolar1 / 15;
    pubwertverbrauch = pubwertverbrauch1 / 15;
    convert2Json();  //werte werden in den json string überführt, daher können sie jetzt genullt werden
    count = 2;
  pubwerteinsparung1 = 0;
  pubwertsolar1 = 0;
  pubwertverbrauch1 = 0;
    } 
if ( count == 2 && received != "true") {
    Serial.print("received bisher: ");
    Serial.println(received);
    Serial.println("jetzt publishen");
    client.publish("sensors2/power", packet.c_str());
    Serial.print("count: ");
    Serial.println(count);
    previousMillispub = millis();
  }
    


 }
       currentMillispub = millis();
   if (currentMillispub - previousMillispub > intervalpub) {
       previousMillispub = currentMillispub;

 if ( count == 2 && received != "true") {
    Serial.print("received bisher: ");
    Serial.println(received);
    Serial.println("jetzt wieder publishen");
    client.publish("sensors2/power", packet.c_str());
    Serial.print("count: ");
    Serial.println(count);
  }
    }
 
if ( count == 2 && received == "true") {
        Serial.print("received jetzt: ");
     Serial.println(received);
     received = "false";
     n = 0;
    Serial.print("n zurückgesetzt auf: ");
    Serial.println(n);
    count = 0;
   loops = 0;
      Serial.print("loops zurückgesetzt auf/bei: ");
      Serial.println(loops);
  }
   
    previousMillis = currentMillis;
  }
   client.loop();
}

the "count" is only a marker to make decision which if-statement is allowed to run.

thanks a lot for your help and input!

1 Like

you might count/report the number of times the ack-timer expires and needs to resend.

this would confirm that the code you added is really doing what is expected, that data is being communicated reliably despite being occasionally lost

This topic was automatically closed 180 days after the last reply. New replies are no longer allowed.