ESP32 PubSubClient Disconnection Problem

Good day, I'm trying to send data with mqtt via pubsub library using w5500 ethernet module, but I'm having disconnection problems for 3-5 minutes maybe longer. What is your solution suggestion?


/////////////////////////////////////////GLOBAL LİB/////////////////////////////////////

#include <Ethernet.h>
#include <ArduinoJson.h>
#include <PubSubClient.h>
#include <DHT.h>
#include <WiFi.h>
#include <ESP32Time.h>
#include <SPI.h>

/////////////////////////////////////ESP LİB////////////////////////////////////////////

#include "esp_system.h"
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "freertos/timers.h"
#include "freertos/event_groups.h"

//////////////////////// FREERTOS CONFİG ///////////////////////////////////////////////

#define evtDoMQTTwd          ( 1 << 1 )
EventGroupHandle_t eg;

SemaphoreHandle_t sema_MQTT_KeepAlive;
SemaphoreHandle_t sema_mqttOK;

QueueHandle_t xQ_Message;
const int     payloadSize = 800;
bool          TimeSet = false;
struct        stu_message
{
  char        payload [payloadSize] = {'\0'};
  String      topic ;
} x_message;

int mqttOK = 0;

////////////////////////DHT22 CONFİG ///////////////////////////////////////////////////

byte temp = 15;
#define DHTTYPE DHT22
DHT dht = DHT(temp, DHTTYPE);

/////////////////////////////////////Client/////////////////////////////////////////////

EthernetClient net;
PubSubClient client("xx.xx.xxx.xxx", 1883, net);

/////////////////////////////////////MQTT Callback//////////////////////////////////////

void IRAM_ATTR mqttCallback(char* topic, byte * payload, unsigned int length)
{
  // clear locations
  memset( x_message.payload, '\0', payloadSize );
  x_message.topic = ""; //clear string buffer
  x_message.topic = topic;
  int i = 0;
  for ( i; i < length; i++)
  {
    x_message.payload[i] = ((char)payload[i]);
  }
  x_message.payload[i] = '\0';
  xQueueOverwrite( xQ_Message, (void *) &x_message );// send data to queue
}

/////////////////////////////////////Setup//////////////////////////////////////////////
byte mac[] = {0x9C, 0x9C, 0x1F, 0xCB, 0x14, 0x9C};

void setup() {
  // Serial.begin(57600);
  dht.begin();
  Ethernet.init(5);

  x_message.topic.reserve( payloadSize );
  xQ_Message  = xQueueCreate( 1, sizeof(stu_message) );
  sema_mqttOK = xSemaphoreCreateBinary();
  xSemaphoreGive( sema_mqttOK );
  eg = xEventGroupCreate();

  xTaskCreatePinnedToCore( mqttA, "mqqtA", 10000, NULL, 3, NULL, 1);
  xTaskCreatePinnedToCore( fmqttWatchDog, "fmqttWatchDog", 3000, NULL, 3, NULL, 1 );
  xTaskCreatePinnedToCore( MQTTkeepalive, "MQTTkeepalive", 20000, NULL, 5, NULL, 1 );
}

/////////////////////////////////////WatchDog///////////////////////////////////////////

void fmqttWatchDog( void * paramater )
{
  // Serial.println("fmqttWatchDog RUN");
  int UpdateImeTrigger = 86400;
  int UpdateTimeInterval = 85000;
  int maxNonMQTTresponse = 3;
  TickType_t xLastWakeTime = xTaskGetTickCount();
  const TickType_t xFrequency = 60000;
  for (;;)
  {
    // Serial.println("fmqttWatchDog for(;;) IN");
    xLastWakeTime = xTaskGetTickCount();
    vTaskDelayUntil( &xLastWakeTime, xFrequency );
    xSemaphoreTake( sema_mqttOK, portMAX_DELAY ); // update mqttOK
    mqttOK++;
    xSemaphoreGive( sema_mqttOK );
    if ( mqttOK >= maxNonMQTTresponse )
    {
      // Serial.println("mqttOK IN");
      vTaskDelay( 200 );
      ESP.restart();
    }
    UpdateTimeInterval++;
    if ( UpdateTimeInterval >= UpdateImeTrigger )
    {
      //  Serial.println("UpdateTimeInterval IN");
      TimeSet = false;
      UpdateTimeInterval = 0;
    }
  }
  // Serial.println("fmqttWatchDog for(;;) OUT");
  vTaskDelete( NULL );
}

/////////////////////////////////////MQTT Connect///////////////////////////////////////

void connect() {
  byte r = 0;
  //Serial.println("CONNECT FUNC IN");
  client.setKeepAlive( 90 );
  String clientId = "ESP32Client-";
  clientId += String(random(0xffff), HEX);
  while (!client.connected()) {
    //Serial.println("!client.connected() IN");
    client.connect(clientId.c_str(), "mosquitto", "mosquitto", NULL , 1, true, NULL );
    vTaskDelay( 100 );
    r++;
    if (r == 50) {
      ESP.restart();
    }

  }
  //Serial.println("!client.connected() OUT");
  client.setCallback( mqttCallback );
  client.subscribe("/PUB/data");
  //Serial.println("\nconnected!");
}

/////////////////////////////////////MQTT PUBLISH///////////////////////////////////////

void mqttA(void * parameter) {
  //Serial.println("mqttA IN");
  TickType_t xLastWakeTime    = xTaskGetTickCount();
  const TickType_t xFrequency = 1000 * 5;

  while (!client.connected()) {
    //Serial.println("mqttA !client.connected() IN");
    vTaskDelay( 250 );
  }
  // Serial.println("mqttA !client.connected() OUT");
  for (;;) {
    //Serial.println("mqttA for(;;) IN");
    StaticJsonBuffer<800> JSON;
    JsonObject& JSONencoder = JSON.createObject();

    JSONencoder["DEVICE = "] = WiFi.macAddress();
    JSONencoder["TEMP = "] = dht.readTemperature();
    JSONencoder["HUMD = "] = dht.readHumidity();
    JSONencoder["STATE = "] = true;
    JSONencoder["RAM = "] = esp_get_free_heap_size();
    JSONencoder["RAM2 = "] = uxTaskGetStackHighWaterMark(NULL);
    JSONencoder["RAM3 = "] = ESP.getFreeHeap();
    JSONencoder["RAM4 = "] = xPortGetFreeHeapSize();

    char JSONmessageBuffer[800];
    JSONencoder.printTo(JSONmessageBuffer, sizeof(JSONmessageBuffer));


    xSemaphoreTake( sema_MQTT_KeepAlive, portMAX_DELAY );

    client.publish("/MQTT/data", JSONmessageBuffer);

    xSemaphoreGive( sema_MQTT_KeepAlive );

    xEventGroupSetBits( eg, evtDoMQTTwd );

    xLastWakeTime = xTaskGetTickCount();
    vTaskDelayUntil( &xLastWakeTime, xFrequency );
  }
  // Serial.println("mqttA for(;;) OUT");
  vTaskDelete ( NULL );
}

/////////////////////////////////////MQTT KeepAlive/////////////////////////////////////

void MQTTkeepalive( void *pvParameters )
{
  //Serial.println("MQTTkeepalive IN");

  sema_MQTT_KeepAlive   = xSemaphoreCreateBinary();
  xSemaphoreGive( sema_MQTT_KeepAlive );
  client.setKeepAlive( 90 );
  for (;;)
  {
    //Serial.println("MQTTkeepalive for(;;) IN");
    if ( (net.connected()) && (Ethernet.linkStatus() == LinkON) )
    {
      //Serial.println("MQTTkeepalive for(;;) net.connected() IN");
      xSemaphoreTake( sema_MQTT_KeepAlive, portMAX_DELAY );
      client.loop();
      xSemaphoreGive( sema_MQTT_KeepAlive );
      //  Serial.println("MQTTkeepalive for(;;) net.connected() OUT");
    }
    else {

      if ( !(net.connected()) || (Ethernet.linkStatus() == LinkOFF) )
      { //Serial.println("MQTTkeepalive for(;;) !net.connected() IN");
        setup_eth();
      }
      //Serial.println("MQTTkeepalive for(;;) net.connected() OUT");
      // Serial.println("else connect in");
      connect();
    }
    vTaskDelay( 250 );
  }
  //Serial.println("MQTTkeepalive for(;;) OUT");
  vTaskDelete ( NULL );
}

/////////////////////////////////////Ethernet Connect///////////////////////////////////

void setup_eth() {
  //Serial.println("setup_eth() IN");
  Ethernet.init(5);
  byte TryCount = 0;
  if (Ethernet.begin(mac) == 0) {
    // Serial.println("setup_eth() Ethernet.linkStatus() == LinkOFF IN");
    //  Serial.println("Failed to configure Ethernet using DHCP");
    for (;;) {
      TryCount++;
      net.stop();
      vTaskDelay( 100 );
      if ( TryCount == 50 )
      {
        // Serial.println("ESP Restart IN");
        ESP.restart();
      }
      // Serial.println("setup_eth() Ethernet.linkStatus() == LinkOFF OUT");
    }
  }
  Ethernet.begin(mac, Ethernet.localIP());
  //Serial.println("setup_eth() OUT");
}

void loop() {}

I already answered in another thread. Don't use Arduino libraries in multiple threads. Most of them are not thread-safe.

This topic was automatically closed 120 days after the last reply. New replies are no longer allowed.