MQTT losing the connection after 30 min

I'm using a mqtt protocol to save my datas in a data base but the mqtt is just working for 30 min after that I start or restart my ESPs and than stop of receive the datas. Any idea about what is happening??

Yes, the Broker could be disconnecting the app or WiFi could be disconnecting the app.

Here is a code that I use that keeps 24/7 MQTT Broker connection and WiFI connection:

/*
   10/17/2020 Ambrose Campbell
   Project to display MQTT data on a OLED screen
*/
#include <WiFi.h>
#include <PubSubClient.h>
#include "certs.h"
#include "sdkconfig.h"
#include "esp_system.h"
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "freertos/timers.h"
#include "freertos/event_groups.h"
#include <Adafruit_GFX.h>
#include <Adafruit_SSD1351.h>
#include "time.h"
//
hw_timer_t * timer = NULL;
EventGroupHandle_t eg;
#define evtDoDisplay             ( 1 << 1 )
#define evtCollectHistory        ( 1 << 2 )
#define evtParseMQTT             ( 1 << 3 )
// the events in this event groups tasks have been given a delay to avoid a possible race condition.
#define evtGroupBits ( evtCollectHistory | evtDoDisplay )
//                                                 CS GPIO_NUM_27, DC GPIO_NUM_12, DIN GPIO_NUM_13, CLK GPIO_NUM_14, RST GPIO_NUM_26
Adafruit_SSD1351 tft = Adafruit_SSD1351( 128, 128, GPIO_NUM_27,    GPIO_NUM_12,    GPIO_NUM_13,     GPIO_NUM_14,     GPIO_NUM_26 );
////
WiFiClient   wifiClient;
PubSubClient MQTTclient( mqtt_server, mqtt_port, wifiClient );
//////
// black, blue, red, green, cyan, magenta, yellow, white, lightyellow, brown
const int DataCells = 60;
int    ColorPalette[10] = { 0x0000, 0x001F, 0xF800, 0x07E0, 0x07FF, 0xF81F, 0xFFE0, 0xFFFF, 0xFFFFD8, 0xFF8040 };
int    arrayCellPtr = 0;
float  oPressureHistory[DataCells] = {0.0f};
String str_eTopic;
char   strPayload [300] = {NULL};
float  oTemperature = 0.0f;
float  oHumidity = 0.0f;
float  oIAQ = 0.0f; // Indexed Air Quality
float  oPressure = 0.0f;
byte mac[6];
int mqttOK = 0;
////
////
SemaphoreHandle_t sema_HistoryCompleted;
SemaphoreHandle_t sema_MQTT_Parser;;
SemaphoreHandle_t sema_MQTT_KeepAlive;
SemaphoreHandle_t sema_mqttOK;
////
volatile int iDoTheThing = 0;
////
void IRAM_ATTR mqttCallback(char* topic, byte * payload, unsigned int length)
{
  xSemaphoreTake( sema_MQTT_Parser, portMAX_DELAY);
  str_eTopic = topic;
  int i = 0;
  for ( i; i < length; i++)
  {
    strPayload[i] = ((char)payload[i]);
  }
  strPayload[i] = NULL;
  //log_i( "topic %s payload %s" ,str_eTopicPtr, strPayloadPtr );
  xSemaphoreGive ( sema_MQTT_Parser );
  xEventGroupSetBits( eg, evtParseMQTT ); // trigger tasks
} // void mqttCallback(char* topic, byte* payload, unsigned int length)
////
void IRAM_ATTR WiFiEvent(WiFiEvent_t event)
{
  switch (event) {
    case SYSTEM_EVENT_STA_CONNECTED:
      log_i("Connected to access point");
      break;
    case SYSTEM_EVENT_STA_DISCONNECTED:
      log_i("Disconnected from WiFi access point");
      break;
    case SYSTEM_EVENT_AP_STADISCONNECTED:
      log_i("WiFi client disconnected");
      break;
    default: break;
  }
}
////
void IRAM_ATTR onTimer()
{
  BaseType_t xHigherPriorityTaskWoken;
  iDoTheThing++;
  if ( iDoTheThing == 60000 )
  {
    xEventGroupSetBitsFromISR( eg, evtGroupBits, &xHigherPriorityTaskWoken );
    iDoTheThing = 0;
  }
}
////
void setup()
{
  /* Use 4th timer of 4.
    1 tick 1/(80MHZ/80) = 1us set divider 80 and count up.
    Attach onTimer function to timer
    Set alarm to call timer ISR, every 1000uS and repeat / reset ISR (true) after each alarm
    Start an timer alarm
  */
  timer = timerBegin( 3, 80, true );
  timerAttachInterrupt( timer, &onTimer, true );
  timerAlarmWrite(timer, 1000, true);
  timerAlarmEnable(timer);
  //
  str_eTopic.reserve(300);
  //
  eg = xEventGroupCreate();
  //
  tft.begin();
  tft.setRotation( 1 );
  tft.fillScreen(0x0000);
  //
  sema_mqttOK    =  xSemaphoreCreateBinary();
  sema_MQTT_Parser      = xSemaphoreCreateBinary();
  sema_HistoryCompleted = xSemaphoreCreateBinary();
  sema_MQTT_KeepAlive   = xSemaphoreCreateBinary();
  xSemaphoreGive( sema_HistoryCompleted );
  xSemaphoreGive( sema_MQTT_KeepAlive ); // found keep alive can mess with a publish, stop keep alive during publish
  xSemaphoreGive( sema_mqttOK );
  ////
  xTaskCreatePinnedToCore( fparseMQTT,      "fparseMQTT",      7000,  NULL, 5, NULL, 1 ); // assign all to core 1, WiFi in use.
  xTaskCreatePinnedToCore( fCollectHistory, "fCollectHistory", 10000, NULL, 4, NULL, 1 );
  xTaskCreatePinnedToCore( MQTTkeepalive,   "MQTTkeepalive",   7000,  NULL, 2, NULL, 1 ); //this task makes a WiFi and MQTT connection.
  xTaskCreatePinnedToCore( fUpdateDisplay,  "fUpdateDisplay",  50000, NULL, 5, NULL, 1 );
  xTaskCreatePinnedToCore( fmqttWatchDog,   "fmqttWatchDog", 3000, NULL, 3, NULL, 1 ); // assign all to core 1
  ////
} //setup() END
////
////
void fmqttWatchDog( void * paramater )
{
  int maxNonMQTTresponse = 5;
  //wait for a mqtt connection
  while ( !MQTTclient.connected() )
  {
    vTaskDelay( 250 );
  }
  TickType_t xLastWakeTime = xTaskGetTickCount();
  const TickType_t xFrequency = 1000; //delay for mS
  for (;;)
  {
    xLastWakeTime = xTaskGetTickCount();
    vTaskDelayUntil( &xLastWakeTime, xFrequency );
    if ( mqttOK >= maxNonMQTTresponse )
    {
      ESP.restart();
    }
  }
  vTaskDelete( NULL );
} //void fmqttWatchDog( void * paramater )
////
/*
   Collect history information
   task triggered by hardware timer once a minute
   stores history into PSRAM
   Using a semaphore to protect the PSRAM from multiple tasks access the same PSRM locations
*/
void fCollectHistory( void * parameter )
{
  int StorageTriggerCount = 119;
  bool Tick = true;
  int maxStorageTriggerCount = 120;
  for (;;)
  {
    xEventGroupWaitBits (eg, evtCollectHistory, pdTRUE, pdTRUE, portMAX_DELAY );
    xSemaphoreTake( sema_HistoryCompleted, portMAX_DELAY );
    oPressureHistory[arrayCellPtr] = oPressure;
    StorageTriggerCount++; //triggered by the timer isr once a minute
    log_i( "storage trigger count %d", StorageTriggerCount );
    if ( StorageTriggerCount == maxStorageTriggerCount )
    {
      arrayCellPtr++;
      log_i( "arrayCellPtr %d", arrayCellPtr );
      if ( arrayCellPtr == DataCells )
      {
        arrayCellPtr = 0;
      }
      StorageTriggerCount = 0;
    }
    xSemaphoreGive( sema_HistoryCompleted );
    // log_i( " high watermark %d",  uxTaskGetStackHighWaterMark( NULL ) );
  }
  vTaskDelete( NULL );
} //void fCollectHistory( void * parameter )
////
// Using a semaphore to protect the PSRAM from multiple tasks access the same PSRM locations
////
void fUpdateDisplay( void * parameter )
{
  // Color definitions
  // http://www.barth-dev.de/online/rgb565-color-picker/
  /*
    ColorPalette[0] = 0x0000; //BLACK
    ColorPalette[1] = 0x001F; //BLUE
    ColorPalette[2] = 0xF800; //RED
    ColorPalette[3] = 0x07E0; //GREEN
    ColorPalette[4] = 0x07FF; //CYAN
    ColorPalette[5] = 0xF81F; //MAGENTA
    ColorPalette[6] = 0xFFE0; //YELLOW
    ColorPalette[7] = 0xFFFF; //WHITE
    ColorPalette[8] = 0xFFFFD8; //LIGHTYELLOW
    ColorPalette[9] = 0xFF8040; //BROWN
  */
  int rowRef = 80;
  int hRef = int( oPressureHistory[0] );
  const int nextPoint = 2;
  int nextCol = 0;
  int16_t aqColor = 216; //minimum value
  for (;;)
  {
    xEventGroupWaitBits (eg, evtDoDisplay, pdTRUE, pdTRUE, portMAX_DELAY ); //
    xSemaphoreTake( sema_mqttOK, portMAX_DELAY );
    mqttOK ++;
    xSemaphoreGive( sema_mqttOK );
    vTaskDelay( 1 );
    tft.fillScreen( ColorPalette[0] );
    tft.setTextSize( 2 );
    log_i( "oTemperature %f", oTemperature );
    if ( (oTemperature >= 73.0f) && (oTemperature <= 100.0f) )
    {
      log_i( "if ( (oTemperature >= 73.0f) && (oTemperature <= 100.0f) )" );
      // green to decrease and red to increase no blue
      int tempDiff = int( 100.0f - oTemperature );
      log_i( "oTemperature %f tempDiff %d", oTemperature, tempDiff );
      int NewRed = tempDiff << 10;
      int NewGreen =  0x07E0 - (tempDiff << 5 );
      //int NewRed = ColorPalette[3] - (tempDiff << 5 ) ; // load in green decreased by temperature
      log_i( "NewRed %d NewGreen %d", NewRed, NewGreen );
      NewRed |= NewGreen ;
      log_i( "NewRed | NewGreen %d ", NewRed );
      tft.setTextColor( NewRed );
    }
    if ( oTemperature <= 0.0f )
    {
      log_i("white");
      tft.setTextColor( ColorPalette[7] ); // white
    }
    if ((oTemperature >= 32.0f) && (oTemperature < 72.0f) )
    {
      //log_i("if ((oTemperature >= 32.0f) && (oTemperature < 72.0f) )");
      //blue decreases, no red, green increases
      int tempDiff = int( oTemperature - 32.0f );
      if( tempDiff > 31 ) tempDiff = 31;
      log_i(" oTemperature %f tempDiff %d", oTemperature, tempDiff );
      int NewBlue =  ColorPalette[1] - tempDiff; // decrease blue
      log_i( "NewBlue %d", NewBlue );
      int NewGreen =  (tempDiff) << 5 ;// increase green
      log_i( "NewGreen %d", NewGreen );
      NewGreen |= NewBlue; // combine blue into newgreen
      log_i( "NewGreen or'ed %d", NewGreen );
      tft.setTextColor( NewGreen );
    }
    if ((oTemperature >= 72.0f) && (oTemperature < 73.0f) ) tft.setTextColor( ColorPalette[3] ); //green
    if ( (oTemperature < 32.0f) && (oTemperature >= 0.0f) )
    {
      log_i( "if ( (oTemperature < 32.0f) && (oTemperature >= 0.0f) )" );
      int NewBlue = ColorPalette[1]; // blue
      //blue is max for each step below 32 red and green increase by one
      //get steps below 32
      int tempDiff = int( 32.0f - oTemperature );
      NewBlue = NewBlue + ( (tempDiff) << 5); //add green
      NewBlue =  NewBlue | (xOff << 10);//add red
      tft.setTextColor( NewBlue ); // red
    } //if( (oTemperature <= 32.0f) %% (oTemperature >= 0.0f) )
    if ( oTemperature >= 100.0f ) tft.setTextColor( ColorPalette[2] ); // red
    tft.setCursor( 0, 0 );
    if ( oTemperature < 100.0f )
    {
      tft.print( "Tmp " + String(oTemperature) );
    } else
    {
      tft.print( "Tmp " + String(int(oTemperature)) );
    }
    tft.setCursor( 0, 20 );
    tft.setTextSize( 2 );
    //set humidity color
    // log_i( "%f", oHumidity );
//    if ( oHumidity < 33.0f )
//    {
//      tft.setTextColor( ColorPalette[6] ); //yellow
//    }
//    if ( (oHumidity >= 33.0f) && ( oTemperature <= 60.0f) )
//    {
//      tft.setTextColor( ColorPalette[3] ); //green
//    }
//    if ( (oTemperature > 60.0f) )
//    {
//      tft.setTextColor( ColorPalette[2] ); //red
//    }
    //below 50%
    if ( oHumidity < 49.0f )
    {
      //reduce green component, increase blue component, red @0
      //get difference from 50%
      int xOff = 2 * ( abs(50 - int(oHumidity)) ); // used to make adjustments to green and blue
      //shift xOff to the left by 5 places reduce green by xOff
      newGreen = ColorPalette[3] - ( (xOff) << 5); //green falls off by xOff
      // increase blue by xOff
      newGreen = newGreen | (xOff);
      //log_i( "ColorPalette[3] %d newGreen %d", ColorPalette[3], newGreen );
      tft.setTextColor( newGreen );
    }
    //@50%
    if ( (oHumidity <= 51.0f) && (oHumidity >= 49.0f) )
    {
      tft.setTextColor( ColorPalette[3] ); //green
    }
    // above



    
    if ( oHumidity > 51.0f )
    {
      //log_i( "humidity %f", oHumidity );
      //reduce green component, increase red component, blue @0
      //get difference from 50%
      int tempDiff = (int)oHumidity - 50;
      if( tempDiff > 31 ) tempDiff = 31;
      int newGreen = ColorPalette[3] - ( tempDiff << 5 ); //green falls off by xOff
      int newRed = tempDiff << 10;
      //log_i( "newGreen %d", newGreen );
      // increase red by xOff
      newGreen =  newGreen | tempDiff;
      //log_i( "newGreen %d", newGreen );
      tft.setTextColor( newGreen );
    }



    
    tft.print( "Hum " + String(oHumidity) );
    // set AQI text color
    if ( (oIAQ * 216) >= (90.0f * 216))
    {
      int redColor = (int)oIAQ - 90;
      redColor *= 216;
      int green = ColorPalette[3] - (redColor << 5);
      int blue = ColorPalette[1] - redColor;
      aqColor &= 0x7FF; //set green to and blue to max
      redColor = redColor << 10;
      aqColor |= redColor;
      tft.setTextColor( aqColor );
    }
    if ( oIAQ * 216 > 60.0f * 216 && oIAQ * 216 < 90.0f * 216 )
    {
      int blueColor = (int)oIAQ - 60;
      blueColor *= 216;
      aqColor &= (0x7E0 - (blueColor << 5) ); //set green reduce green by blue
      aqColor |= blueColor;
      tft.setTextColor( aqColor );
    }
    if ( oIAQ * 216 <= 60.0f * 216 )
    {
      int greenColor = oIAQ * 216;
      greenColor = greenColor << 5;
      tft.setTextColor( greenColor );
    }
    // display AQI
    tft.setCursor( 0, 40 );
    tft.print( "AQI " + String(oIAQ) );
    //
    tft.setTextSize( 1 );
    tft.setTextColor( ColorPalette[1] ); //set graph line color
    tft.setCursor( 0, 118 );
    tft.print( "  Pres: " + String(oPressure) + "mmHg" );
    hRef = int( oPressureHistory[0] );
    nextCol = 0;
    xSemaphoreTake( sema_HistoryCompleted, portMAX_DELAY );
    for (int i = 0; i <= DataCells; i++)
    {
      int hDisplacement = hRef - int( oPressureHistory[i] ); // cell height displacement from baseline
      tft.setCursor( nextCol , (rowRef + hDisplacement) );
      tft.print( "_" );
      nextCol += nextPoint;
      //place vertical bar representing a 24 hour time period.
      if ( (nextCol % 24) == 0 )
      {
        //tft.drawLine( col, row, width, height, color );
        tft.drawLine( uint16_t(nextCol), uint16_t(rowRef + 5), uint16_t(nextCol), uint16_t(rowRef + 11), ColorPalette[7] );
      }
    }
    // advance cursor to place a begin of graph marker. each letter is 5 spaces wide but graph position marker only advances 2 spaces
    // to keep cursor at begining place cursor +2 the nextCol position.
    tft.setCursor( (arrayCellPtr * nextPoint), (rowRef + 3) );
    tft.print( "I" );
    xSemaphoreGive( sema_HistoryCompleted );
    //     log_i( "fUpdateDisplay MEMORY WATERMARK %d", uxTaskGetStackHighWaterMark(NULL) );
  }
  vTaskDelete( NULL );
} // void fUpdateDisplay( void * parameter )
/*
    Important to not set vTaskDelay to less then 10. Errors begin to develop with the MQTT and network connection.
    makes the initial wifi/mqtt connection and works to keeps those connections open.
*/
void MQTTkeepalive( void *pvParameters )
{
  // setting must be set before a mqtt connection is made
  MQTTclient.setKeepAlive( 90 ); // setting keep alive to 90 seconds makes for a very reliable connection, must be set before the 1st connection is made.
  TickType_t xLastWakeTime = xTaskGetTickCount();
  const TickType_t xFrequency = 250; // 250mS
  for (;;)
  {
    //check for a is connected and if the WiFi thinks its connected, found checking on both is more realible than just a single check
    if ( (wifiClient.connected()) && (WiFi.status() == WL_CONNECTED) )
    {
      xSemaphoreTake( sema_MQTT_KeepAlive, portMAX_DELAY ); // whiles loop() is running no other mqtt operations should be in process
      MQTTclient.loop();
      xSemaphoreGive( sema_MQTT_KeepAlive );
    }
    else {
      log_i( "MQTT keep alive found MQTT status %s WiFi status %s", String(wifiClient.connected()), String(WiFi.status()) );
      if ( !(WiFi.status() == WL_CONNECTED) )
      {
        connectToWiFi();
      }
      connectToMQTT();
    }
    xLastWakeTime = xTaskGetTickCount();
    vTaskDelayUntil( &xLastWakeTime, xFrequency );
  }
  vTaskDelete ( NULL );
}
////
void connectToMQTT()
{
  // create client ID from mac address
  byte mac[5];
  WiFi.macAddress(mac); // get mac address
  String clientID = String(mac[0]) + String(mac[4]) ; // use mac address to create clientID
  log_i( "connect to mqtt as client %s", clientID );
  while ( !MQTTclient.connected() )
  {
    MQTTclient.connect( clientID.c_str(), mqtt_username, mqtt_password );
    log_i( "connecting to MQTT" );
    vTaskDelay( 250 );
  }
  log_i("MQTT Connected");
  MQTTclient.setCallback( mqttCallback );
  MQTTclient.subscribe( mqtt_topic );
}
//
void connectToWiFi()
{
  log_i( "connect to wifi" );
  while ( WiFi.status() != WL_CONNECTED )
  {
    WiFi.disconnect();
    WiFi.begin( SSID, PASSWORD );
    log_i(" waiting on wifi connection" );
    vTaskDelay( 4000 );
  }
  log_i( "Connected to WiFi" );
  WiFi.onEvent( WiFiEvent );
}
////
void fparseMQTT( void *pvParameters )
{
  xSemaphoreGive ( sema_MQTT_Parser );
  for (;;)
  {
    xEventGroupWaitBits (eg, evtParseMQTT, pdTRUE, pdTRUE, portMAX_DELAY ); //
    xSemaphoreTake( sema_mqttOK, portMAX_DELAY );
    mqttOK = 0;
    xSemaphoreGive( sema_mqttOK );
    xSemaphoreTake( sema_MQTT_Parser, portMAX_DELAY );
    if ( str_eTopic == topicOutsideTemperature )
    {
      oTemperature = String(strPayload).toFloat();
    }
    if ( (String)str_eTopic == topicOutsideHumidity )
    {
      oHumidity = String(strPayload).toFloat();
    }
    if ( (String)str_eTopic == topicAQIndex )
    {
      oIAQ = String(strPayload).toFloat();
    }
    if ( String(str_eTopic) == topicOutsidePressure )
    {
      oPressure = String(strPayload).toFloat();
    }
    // clear pointer locations
    memset( strPayload, '\0', 300 );
    str_eTopic = ""; //clear string buffer
    xSemaphoreGive( sema_MQTT_Parser );
  }
} // void fparseMQTT( void *pvParameters )
////
////
//void GetTheTime()
//{
//  char* ntpServer = "2.us.pool.ntp.org";
//  int gmtOffset_sec = -(3600 * 7 );
//  int daylightOffset_sec = 3600;
//  configTime(gmtOffset_sec, daylightOffset_sec, ntpServer);
//}
////
// http://www.cplusplus.com/reference/ctime/strftime/
////
//int getHour()
//{
//  struct tm timeinfo;
//  getLocalTime(&timeinfo);
//  char _time[ 5 ];
//  strftime( _time, 80, "%T", &timeinfo );
//  return String(_time).toInt();
//}
////
//void printLocalTime()
//{
//  struct tm timeinfo;
//  getLocalTime(&timeinfo);
//  char _time[ 80 ];
//  strftime( _time, 80, "%T", &timeinfo );
//  log_i( "%s", _time);
//}
////
void loop() { }

Perhaps you ca use it as a model for your thingy.

Note things like the MQTTkeepalive and fmqttWatchDog tasks, which does the job of connection, connection maintenance, and reboot if necessary.

1 Like

Just to confirm what Idahowalker said. There are plenty of reason why the connection can get lost and most simple example do not show how to keep a connection alive for 24/7.

I write all my WiFi code in a way that it is handled by a state machine and no code in setup. The state machine checks at different levels (this depends on the application) and restarts everything if the connection is lost. The rest of the application can run without WiFi until everything is back up.

Make sure to find the functions that free resources used by your libraries of choice. Otherwise if you call begin() too often without end(), you are likely running out of memory and your code will crash maybe after a day or a week.

1 Like

Really im just call begin() without end(). Do u have some example of how can I connect wifi and mqtt, collect the data and then disconnect and connect again??

Here is a basic example of a state machine handling WiFi and MQTT, I posted a while ago. See reply #31 in the following post.

https://forum.arduino.cc/t/wifi-not-stable-over-longer-periods-is-this-normal-ideas/636846/31

It is important that you run loop as often as possible. Avoid the use of delay or other blocking functions as much as possible. The WiFi task will do one little thing every time it is called and then returns.

Here is an example of how I implement MQTT on my Wemos D1 Mini projects

#define SKETCH "mqttExample"
#define VERSION "1.0"

/*
   A demonstration of how I implement MQTT in my projects.
   This is tested on a Wemos D1 mini

   kaywinnet.h is my WiFi credentials #defines for
   MQTT_SERVER, MY_SSID, and MY_PASSWORD
*/

//GPIO Pins
const int greenLED = D1;
const int yellowLED = D2;



//--------------- WiFi declarations ---------------
// WiFi declarations
#include <ESP8266WiFi.h>        // Not needed if also using the Arduino OTA Library...
#include <Kaywinnet.h>          // WiFi credentials
char macBuffer[24];             // Holds the last three digits of the MAC, in hex.
char hostName[24];              // Holds nodeName + the last three bytes of the MAC address.
char nodeName[] = SKETCH;       // Give this node a name



//--------------- MQTT declarations ---------------
#include <PubSubClient.h>       // connect to a MQTT broker and publish/subscribe messages in topics.
// Declare an object of class WiFiClient, which allows to establish a connection to a specific IP and port
// Declare an object of class PubSubClient, which receives as input the constructor previously defined with WiFiClient.
// The constructor MUST be unique on the network.
WiFiClient xyzzyx;
PubSubClient client(xyzzyx);

const char *mqttServer = MQTT_SERVER;         // Local broker defined in Kaywinnet.h
const int mqttPort = 1883;

// Declare strings for the topics. Topics will be created in setup_mqtt().
char statusTopic[20];
char cmndTopic[20];
char rssiTopic[20];


// --------------- setup ---------------
void setup() {
  //configure GPIO pins
  pinMode (greenLED, OUTPUT);
  pinMode (yellowLED, OUTPUT);

  Serial.begin(115200);
  Serial.println();
  Serial.print(SKETCH);
  Serial.println(F(".ino"));

  beginSerial();
  setup_wifi();                       // MUST be before setupMqtt()
  setup_mqtt();                       // Generate the topics

  // Call the setServer method on the PubSubClient object
  client.setServer(mqttServer, mqttPort);
  mqttConnect();
}


//--------------- loop ---------------
void loop() {
  mqttReconnect();          //Make sure we stay connected to the mqtt broker.
  delay(1);                //Other loop stuff goes here.
}


// =============== General functions ===============
// -------------- beginSerial() --------------
void beginSerial() {
  while (!Serial);
  Serial.begin( 115200 );
  Serial.println();
  Serial.println();
  Serial.print(SKETCH);
  Serial.print(".ino, Version ");
  Serial.println(VERSION);
  Serial.println(F("++++++++++++++++++ +"));
}


// --------------- Function to display a string for debugging. ---------------
void dbugs(const char *s, const char *v) {
  //Show a string variable. Enter with the string description and the string.
  //Example dbugs("My String= ",myString);
  Serial.print(s);
  Serial.print (F("\""));
  Serial.print(v);
  Serial.println(F("\""));
}


//--------------- Function to toggle the yellow LED ---------------
void yellow() {
  digitalWrite(yellowLED, !digitalRead(yellowLED));
}

//--------------- Function to toggle the green LED ---------------
void green() {
  digitalWrite(greenLED, !digitalRead(greenLED));
}


// =================== WiFi ===================
// --------------- setup_wifi ---------------
// Connect to WiFi network so we can reach the MQTT broker and publish messages to topics.
void setup_wifi() {
#ifndef Kaywinnet
#include <Kaywinnet.h>            // Network credentials
#endif
  byte mac[6];                    // The MAC address of your Wifi

  Serial.println(F("\n"));
  Serial.print(F("Connecting to "));
  Serial.println(MY_SSID);

  WiFi.mode(WIFI_STA);
  //WiFi.enableInsecureWEP();       //For OLD routers
  WiFi.begin(MY_SSID, MY_PASSWORD);
  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print(WiFi.status()); Serial.print(F(" "));
  }

  Serial.println(F("\nWiFi connected, "));
  Serial.print(F("MAC Address: "));
  Serial.println(WiFi.macAddress());
  Serial.print(F("IP address: "));
  Serial.println(WiFi.localIP());
  Serial.print(F("RSSI: "));
  Serial.println(WiFi.RSSI());
  Serial.println();


  // Get the last three numbers of the mac address.
  // "4C:11:AE:0D:83:86" becomes "0D8386" in macBuffer.
  WiFi.macAddress(mac);
  snprintf(macBuffer, sizeof(macBuffer), "%02X%02X%02X", mac[3], mac[4], mac[5]);

  // Build hostName from prefix + last three bytes of the MAC address.
  strcpy(hostName, nodeName);
  strcat(hostName, "-");
  strcat(hostName, macBuffer);
  WiFi.hostname(hostName);
  Serial.print(F("hostName= "));
  Serial.println(hostName);
}


// ===================  mqtt ===================
// ---------------  Create topic names ---------------
void setup_mqtt() {
  //MUST follow setupWiFi()
  strcpy(cmndTopic, nodeName);            //nodeName is defined in the wifi tab.
  strcat(cmndTopic, "/cmnd");             //Incoming commands, payload is a command.
  dbugs("cmndTopic= ", cmndTopic);        //Print the cmd topic
}


// --------------- mqttReconnect ---------------
void mqttReconnect() {
  //Make sure we stay connected to the mqtt broker
  if (!client.connected()) {
    mqttConnect();
  }
  if (!client.loop()) {
    client.connect(hostName);
  }
}

// --------------- mqttConnect ---------------
void mqttConnect() {
  client.setServer(mqttServer, mqttPort); //Call the setServer method
  while (!client.connected()) {
    Serial.print(F("MQTT connecting..."));
    if (client.connect(hostName)) {
      Serial.println(F("connected"));

      client.setCallback(callback);

      //Subscriptions:
      client.subscribe(cmndTopic);
      Serial.print(F("Subscribing to "));
      Serial.println(cmndTopic);
    } else {
      Serial.print(F("failed, rc="));
      Serial.print(client.state());
      Serial.println(F("- trying again in 5-seconds."));
      delay(5000);
    }
  }
}


// --------------- mqtt callback ---------------
// This function is executed when some device publishes a message to a topic that this ESP8266 is subscribed to.
// The MQTT payload is the filename of the message to play when the phone is picked up.  The payload is case-sensitive.
//
void callback(String topic, byte * payload, unsigned int length) {
  char message[length + 1];

  // copy contents of payload to message
  // convert the payload from a byte array to a char array
  memcpy(message, payload, length);
  message[length] = '\0';                 // add NULL terminator to message

  // Sometimes in the MQTT Explorer tool, I accdentally hit "Enter" on my keyboard.
  // This removes it.
  for (size_t i = 0; i == strlen(message); i++) {
    if (message[i] == 10) {
      message[i] = '\0';
      break;
    }
  }

  // Change to lower-case
  for (int i = 0; message [i]; i++ )
  {
    message[i] |= 0b00100000; //Lower case- set bit 5
  }

  Serial.println();
  Serial.println();
  Serial.print(F("Message arrived on topic: "));
  Serial.print(topic);
  Serial.println(F("."));

  Serial.print("message: ");
  Serial.println(message);
  Serial.print(F("Length= "));
  Serial.print(strlen(message));
  Serial.println();

  // If the message terminates in a line-feed, make it the terminating null char.
  int j = strlen(message) - 1;
  if (message[j] == 10) message[j] = '\0';



  // --------- Command topic ---------
  if (topic == cmndTopic) {                    // Process incoming commands
    Serial.print(F("received message on cmdTopic: '"));
    Serial.print(message);
    Serial.println(F("'"));

    if (!strcmp(message, "yellow")) {        // if message=="wag", then twitch the tail
      Serial.println(F("Toggle the yellow LED"));
      yellow();
    }

    if (!strcmp(message, "green")) {     // if message=="twitch", then twitch the tail
      Serial.println(F("Toggle the green LED"));
      green();
    }
  }
} //callback