MQTT (PubSubClient) - Crashing Arduino Mega2560

Hey All,
As per the topic name I am struggling with the PubSub client crashing when I receive a flood of MQTT messages.
If I send a message with a velocity of more than 1 second it hangs the device, I moved to MQTT thinking I could retire my own homebrew HTTP interface which hasn’t missed a beat in years but this a real struggle at the moment.

On the publisher side I tried QOS 1 and QOS 2 but it hasn’t helped. I saw a thread about the standard MQTT library being prone to crashes and to use an Adafruit library.

Below is my callback function and I wonder if I am doing something to cause this such as converting char’s to strings and so on.

Any help is appreciated.

//MQTT callback  
    void callback(char* topic, byte* payload, unsigned int length) {
      IPAddress MQTTserver(10, 0, 0, 200);
      EthernetClient ethClient2;
      PubSubClient client2(ethClient2);    
      client2.setServer(MQTTserver, 1883);
      Serial.println("MQTT message arrived");
      MQTTTopic = String(topic);
      MQTTPayload = ""; 
      Serial.println("MQTT topic is [" + MQTTTopic +"]");


      // ROLLER 1 SHUTTER
      if (MQTTTopic.indexOf("Roller_1") > 0) {
        for (int i=0;i<length;i++) {
          MQTTPayload = String(MQTTPayload + (char)payload[i]);  
        }
          Serial.println("MQTT payload is [" + MQTTPayload + "]");
           
        if (MQTTPayload == "on") {
          digitalWrite(RELAY_CH9, HIGH);
          delay(1000);
          digitalWrite(RELAY_CH9, LOW);
          Serial.println("Toggling - Roller Shutter 1 - UP -  1000msec");
          Serial.println("Toggle Relay 9 High");
          Serial.println("Toggle Relay 9 Low");
                         
          //MQTT PUBLISH AN UPDATE BACK
          while (!client2.connected()) {
            // Attempt to connect
            if (client2.connect("arduinoClient")) {
               // Once connected, publish an announcement...
               client2.publish("stat/ARDUINO_Roller_1/POWER","on");
               Serial.println("Publishing stat/ARDUINO_Roller_1/POWER/on");
              }
            }
          
          }
  
        if (MQTTPayload == "off") {
          digitalWrite(RELAY_CH10, HIGH);
          delay(1000);
          digitalWrite(RELAY_CH10, LOW);
          Serial.println("Toggling - Roller Shutter 1 - DOWN -  1000msec");
          Serial.println("Toggle Relay 10 High");
          Serial.println("Toggle Relay 10 Low");
                         
          //MQTT PUBLISH AN UPDATE BACK
          while (!client2.connected()) {
            // Attempt to connect
            if (client2.connect("arduinoClient")) {
               // Once connected, publish an announcement...
               client2.publish("stat/ARDUINO_Roller_1/POWER","off");
               Serial.println("Publishing stat/ARDUINO_Roller_1/POWER/off,");
              }
            }
          
          }
      }  
      
      // ROLLER 2 SHUTTER
      if (MQTTTopic.indexOf("Roller_2") > 0) {
        for (int i=0;i<length;i++) {
          MQTTPayload = String(MQTTPayload + (char)payload[i]);  
        }
          Serial.println("MQTT payload is [" + MQTTPayload + "]");
           
        if (MQTTPayload == "on") {
          digitalWrite(RELAY_CH11, HIGH);
          delay(1000);
          digitalWrite(RELAY_CH11, LOW);
          Serial.println("Toggling - Roller Shutter 2 - UP -  1000msec");
          Serial.println("Toggle Relay 11 High");
          Serial.println("Toggle Relay 11 Low");
                         
          //MQTT PUBLISH AN UPDATE BACK
          while (!client2.connected()) {
            // Attempt to connect
            if (client2.connect("arduinoClient")) {
               // Once connected, publish an announcement...
               client2.publish("stat/ARDUINO_Roller_2/POWER","on");
               Serial.println("Publishing stat/ARDUINO_Roller_2/POWER/on");
              }
            }
          
          }
  
        if (MQTTPayload == "off") {
          digitalWrite(RELAY_CH12, HIGH);
          delay(1000);
          digitalWrite(RELAY_CH12, LOW);
          Serial.println("Toggling - Roller Shutter 2 - DOWN -  1000msec");
          Serial.println("Toggle Relay 12 High");
          Serial.println("Toggle Relay 12 Low");
                         
          //MQTT PUBLISH AN UPDATE BACK
          while (!client2.connected()) {
            // Attempt to connect
            if (client2.connect("arduinoClient")) {
               // Once connected, publish an announcement...
               client2.publish("stat/ARDUINO_Roller_2/POWER","off");
               Serial.println("Publishing stat/ARDUINO_Roller_2/POWER/off");
              }
            }
          
          }
      } 


      
      // ROLLER 3 SHUTTER
      if (MQTTTopic.indexOf("Roller_3") > 0) {
        for (int i=0;i<length;i++) {
          MQTTPayload = String(MQTTPayload + (char)payload[i]);  
        }
          Serial.println("MQTT payload is [" + MQTTPayload + "]");
           
        if (MQTTPayload == "on") {
          digitalWrite(RELAY_CH13, HIGH);
          delay(1000);
          digitalWrite(RELAY_CH13, LOW);
          Serial.println("Toggling - Roller Shutter 4 - UP -  1000msec");
          Serial.println("Toggle Relay 13 High");
          Serial.println("Toggle Relay 13 Low");
                         
          //MQTT PUBLISH AN UPDATE BACK
          while (!client2.connected()) {
            // Attempt to connect
            if (client2.connect("arduinoClient")) {
               // Once connected, publish an announcement...
               client2.publish("stat/ARDUINO_Roller_3/POWER","on");
               Serial.println("Publishing stat/ARDUINO_Roller_3/POWER/on");
              }
            }
          
          }
  
        if (MQTTPayload == "off") {
          digitalWrite(RELAY_CH14, HIGH);
          delay(1000);
          digitalWrite(RELAY_CH14, LOW);
          Serial.println("Toggling - Roller Shutter 4 - DOWN -  1000msec");
          Serial.println("Toggle Relay 14 High");
          Serial.println("Toggle Relay 14 Low");
                         
          //MQTT PUBLISH AN UPDATE BACK
          while (!client2.connected()) {
            // Attempt to connect
            if (client2.connect("arduinoClient")) {
               // Once connected, publish an announcement...
               client2.publish("stat/ARDUINO_Roller_3/POWER","off");
               Serial.println("Publishing stat/ARDUINO_Roller_3/POWER/off");
              }
            }
          
          }
      }  
      
      // ROLLER 4 SHUTTER
      if (MQTTTopic.indexOf("Roller_4") > 0) {
        for (int i=0;i<length;i++) {
          MQTTPayload = String(MQTTPayload + (char)payload[i]);  
        }
          Serial.println("MQTT payload is [" + MQTTPayload + "]");
           
        if (MQTTPayload == "on") {
          digitalWrite(RELAY_CH15, HIGH);
          delay(1000);
          digitalWrite(RELAY_CH15, LOW);
          Serial.println("Toggling - Roller Shutter 4 - UP -  1000msec");
          Serial.println("Toggle Relay 15 High");
          Serial.println("Toggle Relay 15 Low");
                         
          //MQTT PUBLISH AN UPDATE BACK
          while (!client2.connected()) {
            // Attempt to connect
            if (client2.connect("arduinoClient")) {
               // Once connected, publish an announcement...
               client2.publish("stat/ARDUINO_Roller_4/POWER","on");
               Serial.println("Publishing stat/ARDUINO_Roller_4/POWER/on");
              }
            }
          
          }
  
        if (MQTTPayload == "off") {
          digitalWrite(RELAY_CH16, HIGH);
          delay(1000);
          digitalWrite(RELAY_CH16, LOW);
          Serial.println("Toggling - Roller Shutter 4 - DOWN -  1000msec");
          Serial.println("Toggle Relay 16 High");
          Serial.println("Toggle Relay 16 Low");
                         
          //MQTT PUBLISH AN UPDATE BACK
          while (!client2.connected()) {
            // Attempt to connect
            if (client2.connect("arduinoClient")) {
               // Once connected, publish an announcement...
               client2.publish("stat/ARDUINO_Roller_4/POWER","off");
               Serial.println("Publishing stat/ARDUINO_Roller_4/POWER/off");
              }
            }
          
          }
      }       
}

Look at some examples of MQTT callbacks. Yours looks unusual, I wouldn't expect to see a new client being declared there.

Ok. So you want to process that much code , with serial prints, with a Mega in the callback function. Nope. Not, going to work.

PubSubClient does not do QOS2.

Oi! delay(1000); in a callback function and your complaint is you cannot process messages faster then 1 time a second when your callback function is sleeping for 1 second, in the least. Your callback has 6000 milliseconds of delay.

Many rules for a callback function are broken with your code.

PubSubClient works great and lasts a long time.

A MQTT callback

void IRAM_ATTR mqttCallback(char* topic, byte * payload, unsigned int length)
{
  xSemaphoreTake( sema_MQTT_Parser, portMAX_DELAY);
  str_eTopic = topic;
  int i = 0;
  for ( i; i < length; i++)
  {
    strPayload[i] = ((char)payload[i]);
  }
  strPayload[i] = NULL;
  //log_i( "topic %s payload %s" ,str_eTopicPtr, strPayloadPtr );
  xSemaphoreGive ( sema_MQTT_Parser );
  xEventGroupSetBits( eg, evtParseMQTT ); // trigger tasks
} // void mqttCallback(char* topic, byte* payload, unsigned int length)

that I use. its simple and dose nothing bet get the publication and topic, put it somewhere and signal the parser to do the thing.

void fparseMQTT( void *pvParameters )
{
  xSemaphoreGive ( sema_MQTT_Parser );
  for (;;)
  {
    xEventGroupWaitBits (eg, evtParseMQTT, pdTRUE, pdTRUE, portMAX_DELAY ); //
    xSemaphoreTake( sema_mqttOK, portMAX_DELAY );
    mqttOK = 0;
    xSemaphoreGive( sema_mqttOK );
    xSemaphoreTake( sema_MQTT_Parser, portMAX_DELAY );
    if ( str_eTopic == topicOutsideTemperature )
    {
      oTemperature = String(strPayload).toFloat();
    }
    if ( (String)str_eTopic == topicOutsideHumidity )
    {
      oHumidity = String(strPayload).toFloat();
    }
    if ( (String)str_eTopic == topicAQIndex )
    {
      oIAQ = String(strPayload).toFloat();
    }
    if ( String(str_eTopic) == topicOutsidePressure )
    {
      oPressure = String(strPayload).toFloat();
    }
    // clear pointer locations
    memset( strPayload, '\0', 300 );
    str_eTopic = ""; //clear string buffer
    xSemaphoreGive( sema_MQTT_Parser );
  }
} // void fparseMQTT( void *pvParameters )

^^^^Parsing code.

DO NOT DO SERIAL PRINTS IN A CALLBACK.

DO NOT USE DELAY’s IN A CALLBACK.

DO NOT PUT A LOT OF CODE IN A CALLBACK.


The MCU that code comes from process 11 MQTT publications per second.

All,
I refactored my code to be non blocking. Your points are understood crystal clear.

Thanks

This topic was automatically closed 120 days after the last reply. New replies are no longer allowed.