ESP8266 can't connect to MQTT broker

I'm attempting to connect ESP8266 to MQTT broker with simple setup. I got output from the ESP " Attempting MQTT connection...failed, rc=-2 try again in 5 seconds ". This is my first time i try with MQTT and i dont know how to fix it.

This is my code:

#include <ESP8266WiFi.h>
#include <PubSubClient.h>

WiFiClient espClient;
PubSubClient client(espClient);

const char * wifi_ssid = "********";
const char * wifi_password = "********";
const char * mqtt_sever = "localhost";

void setup() {
  Serial.begin(9600);
  connectToWifi();
  client.setServer(mqtt_sever, 1883);
  client.setCallback(callback);
  reconnect();
}

void loop() {
  if (!client.connected()) {
    reconnect();
  }
  client.loop();
  delay(5000);
}

void connectToWifi() {
  Serial.println("----------------------");
  Serial.print("Wifi connecting to ");
  Serial.println( wifi_ssid );
  WiFi.mode(WIFI_STA);
  WiFi.begin(wifi_ssid, wifi_password);
  Serial.println();
  Serial.print("Connecting");
  while ( WiFi.status() != WL_CONNECTED ) {
    delay(500);
    Serial.println(".");
  }
  Serial.print("Connected, IP address: ");
  Serial.println(WiFi.localIP());
}

void reconnect() {
  while (!client.connected()) {
    Serial.print("Attempting MQTT connection...");
    if (client.connect("ESP8266Client")) {
      Serial.println("connected");
      client.publish("event", "hello world");
      client.subscribe("event");
    } else {
      Serial.print("failed, rc=");
      Serial.print(client.state());
      Serial.println(" try again in 5 seconds");
      delay(5000);
    }
  }
}

void callback(String topic, byte* message, unsigned int length) {
  Serial.print("Message arrived on topic: ");
  Serial.print(topic);
  Serial.print(". Message: ");
  String messageTemp;

  for (int i = 0; i < length; i++) {
    Serial.print((char)message[i]);
    messageTemp += (char)message[i];
  }
  Serial.println();
}

As you can upload, it no longer is an issue that belongs in Installation and Troubleshooting :wink:

@ducle27, your topic has been moved to a more suitable location on the forum.

Does the Wifi connect?

As a note the callback should be placed before setup().

As a note it is a bad idea to print from the callback under normal operations. Troubleshooting, ok but remove the prints afterwards.

Whoa! A delay of 5000ms, take that out. client.loop() needs to run as fast as possible.

the function reconnect does NOT check for a valid WiFI connection before checking on MQTT connection.

Yeah, WiFi part is working fine!!

Could you show a print out of your connection?

Showing that WiFI connected OK and that MQTT connects OK?

When you fix the code issues, as mentioned eariler, could you post the fixed code?

The function reconnect() does not check for a valid WiFi connection.

Here is an working example of working MQTT connection that you can use as a model for your own code or not:

#include <WiFi.h>
#include <PubSubClient.h>
#include "certs.h" // include the connection infor for WiFi and MQTT
#include "sdkconfig.h" // used for log printing
#include "esp_system.h"
#include "freertos/FreeRTOS.h" //freeRTOS items to be used
#include "freertos/task.h"
#include <SPI.h>
#include <Adafruit_Sensor.h>
#include "Adafruit_BME680.h"
#include <Adafruit_GFX.h>    // Core graphics library
#include <Adafruit_ST7789.h> // Hardware-specific library for ST7789
#include <driver/adc.h>
#include "esp32-hal-ledc.h"
//#include "LinearRegression.h"
////
Adafruit_BME680 bme( GPIO_NUM_5 ); // use hardware SPI, set GPIO pin to use
//Adafruit_ST7789 tft = Adafruit_ST7789( TFT_CS     , TFT_DC    , TFT_MOSI   , TFT_SCLK   , TFT_RST     );
Adafruit_ST7789 tft   = Adafruit_ST7789( GPIO_NUM_15, GPIO_NUM_0, GPIO_NUM_13, GPIO_NUM_14, GPIO_NUM_22 );
WiFiClient   wifiClient; // do the WiFi instantiation thing
PubSubClient MQTTclient( mqtt_server, mqtt_port, wifiClient ); //do the MQTT instantiation thing
LinearRegression lr;
//////
#define evtDoParticleRead     ( 1 << 0 ) // declare an event
#define evtWaitForBME         ( 1 << 1 )
#define evtParseMQTT          ( 1 << 3 )
EventGroupHandle_t eg; // variable for the event group handle
//////
QueueHandle_t xQ_eData; // environmental data to be displayed on the screen
QueueHandle_t xQ_lrData; // linear regression data
struct stu_eData
{
  float Temperature = 0.0f;
  float Pressure    = 0.0f;
  float Humidity    = 0.0f;
  float IAQ         = 0.0f;
  float RM0         = 0.0f;
  float PM2         = 0.0f;
} x_eData; // environmental data
QueueHandle_t xQ_Message; // payload and topic queue of MQTT payload and topic
const int payloadSize = 100;
struct stu_message
{
  char payload [payloadSize] = {'\0'};
  String topic ;
} x_message;
////
QueueHandle_t xQ_pMessage;
////
const float oGasResistanceBaseLine = 149598.0f;
int mqttOK = 0;
//////
esp_timer_handle_t oneshot_timer; //veriable to store the hardware timer handle
//////
SemaphoreHandle_t sema_MQTT_KeepAlive;
SemaphoreHandle_t sema_PublishPM;
SemaphoreHandle_t sema_mqttOK;
//////
// interrupt service routine for WiFi events put into IRAM
void IRAM_ATTR WiFiEvent(WiFiEvent_t event)
{
  switch (event) {
    case SYSTEM_EVENT_STA_CONNECTED:
      log_i("Connected to WiFi access point");
      break;
    case SYSTEM_EVENT_STA_DISCONNECTED:
      log_i("Disconnected from WiFi access point");
      break;
    case SYSTEM_EVENT_AP_STADISCONNECTED:
      log_i("WiFi client disconnected");
      break;
    default: break;
  }
} // void IRAM_ATTR WiFiEvent(WiFiEvent_t event)
//////
void IRAM_ATTR oneshot_timer_callback( void* arg )
{
  BaseType_t xHigherPriorityTaskWoken;
  xEventGroupSetBitsFromISR( eg, evtDoParticleRead, &xHigherPriorityTaskWoken ); //freeRTOS event trigger made for ISR's
} //void IRAM_ATTR oneshot_timer_callback( void* arg )
//////
void IRAM_ATTR mqttCallback(char* topic, byte * payload, unsigned int length)
{
  memset( x_message.payload, '\0', payloadSize ); // clear payload char buffer
  x_message.topic = ""; //clear topic string buffer
  x_message.topic = topic; //store new topic
  int i = 0; // extract payload
  for ( i; i < length; i++)
  {
    x_message.payload[i] = ((char)payload[i]);
  }
  x_message.payload[i] = '\0';
  xQueueOverwrite( xQ_Message, (void *) &x_message );// send data to queue
} // void mqttCallback(char* topic, byte* payload, unsigned int length)
////
void setup()
{
  x_message.topic.reserve( payloadSize );
  //
  xQ_Message  = xQueueCreate( 1, sizeof(stu_message) );
  xQ_eData    = xQueueCreate( 1, sizeof(stu_eData) ); // sends a queue copy of the structure
  xQ_lrData   = xQueueCreate( 1, sizeof(float) );
  //String x = "";
  //x.reserve(500);
  //xQ_pMessage = xQueueCreate( 1, sizeof(x) ); //set size of string queue to a string of 500 string characters.
  //
  sema_PublishPM = xSemaphoreCreateBinary();
  xSemaphoreGive( sema_PublishPM );
  sema_mqttOK    =  xSemaphoreCreateBinary();
  xSemaphoreGive( sema_mqttOK );
  ledcSetup( 4, 12000, 8 ); // ledc: 4  => Group: 0, Channel: 2, Timer: 1, led frequency, resolution  bits // blue led
  ledcAttachPin( GPIO_NUM_12, 4 );   // gpio number and channel
  ledcWrite( 4, 0 ); // write to channel number 4
  //
  eg = xEventGroupCreate(); // get an event group handle
  //
  gpio_config_t io_cfg = {}; // initialize the gpio configuration structure
  io_cfg.mode = GPIO_MODE_OUTPUT; // set gpio mode
  io_cfg.pin_bit_mask = ( (1ULL << GPIO_NUM_4) ); //bit mask of the pins to set
  gpio_config(&io_cfg); // configure the gpio based upon the parameters as set in the configuration structure
  gpio_set_level( GPIO_NUM_4, LOW); // set air particle sensor trigger pin to LOW
  // set up A:D channels, refer: https://dl.espressif.com/doc/esp-idf/latest/api-reference/peripherals/adc.html
  adc1_config_width(ADC_WIDTH_12Bit);
  adc1_config_channel_atten(ADC1_CHANNEL_0, ADC_ATTEN_DB_11);// using GPIO 36
  // https://docs.espressif.com/projects/esp-idf/en/latest/esp32/api-reference/system/esp_timer.html?highlight=hardware%20timer High Resoultion Timer API
  esp_timer_create_args_t oneshot_timer_args = {}; // initialize High Resoulition Timer (HRT) configuration structure
  oneshot_timer_args.callback = &oneshot_timer_callback; // configure for callback, name of callback function
  esp_timer_create( &oneshot_timer_args, &oneshot_timer ); // assign configuration to the HRT, receive timer handle
  //
  xTaskCreatePinnedToCore( fparseMQTT, "fparseMQTT", 7000,  NULL, 5, NULL, 1 );
  xTaskCreatePinnedToCore( MQTTkeepalive, "MQTTkeepalive", 15000, NULL, 6, NULL, 1 );
  xTaskCreatePinnedToCore( DoTheBME680Thing, "DoTheBME280Thing", 20000, NULL, 5, NULL, 1);
  xTaskCreatePinnedToCore( fDoParticleDetector, "fDoParticleDetector", 6000, NULL, 3, NULL, 1 );
  xTaskCreatePinnedToCore( fmqttWatchDog, "fmqttWatchDog", 3000, NULL, 3, NULL, 1 );
  xTaskCreatePinnedToCore( fDoTheDisplayThing, "fDoTheDisplayThing", 22000, NULL, 3, NULL, 1 );
  xTaskCreatePinnedToCore( fDoTrends, "fDoTrends", 5000, NULL, 3, NULL, 1 );
  //xTaskCreatePinnedToCore( fSendMQTTpressure, "fSendMQTTpressure", 3000, NULL, 3, NULL, 1 );
} //void setup()
////
//void fSendMQTTpressure( void *pvParameters )
//{
//  //struct stu_pressureMessage px_message;
//  String apInfo = "";
//  apInfo.reserve( 150 );
//  for ( ;; )
//  {
//    if ( xQueueReceive(xQ_pMessage, &apInfo, portMAX_DELAY) == pdTRUE )
//    {
//      if ( MQTTclient.connected() )
//      {
//        xSemaphoreTake( sema_MQTT_KeepAlive, portMAX_DELAY );
//        MQTTclient.publish( topicPressureInfo, apInfo.c_str() );
//        vTaskDelay( 1 );
//        xSemaphoreGive( sema_MQTT_KeepAlive );
//      } else {
//        log_i( "fSendMQTTpressure not connected" );
//      }
//      apInfo = "";
//    }
//  } // for ( ;; )
//  vTaskDelete( NULL );
//} //void fSendMQTTpresure( void *pvParameters )
//////
void fparseMQTT( void *pvParameters )
{
  struct stu_message px_message;
  for (;;)
  {
    if ( xQueueReceive(xQ_Message, &px_message, portMAX_DELAY) == pdTRUE )
    {
      xSemaphoreTake( sema_mqttOK, portMAX_DELAY );
      mqttOK = 0;
      xSemaphoreGive( sema_mqttOK );
      if ( px_message.topic == topicRemainingMoisture_0 )
      {
        x_eData.RM0  = String(px_message.payload).toFloat();
      }
    } //if ( xQueueReceive(xQ_Message, &px_message, portMAX_DELAY) == pdTRUE )
  } //for(;;)
  vTaskDelete( NULL );
} // void fparseMQTT( void *pvParameters )
////
void fDoTheDisplayThing( void * parameter )
{
  tft.init( 240, 320 ); // Init ST7789 320x240
  tft.setRotation( 3 );
  tft.setTextSize( 3 );
  tft.fillScreen( ST77XX_BLACK );
  tft.setTextWrap( false );
  struct stu_eData px_eData;
  int OneTwoThree = 0;
  int countUpDown = 255;
  ledcWrite( 4, countUpDown ); //backlight set
  int dimDelaytime = 7;
  for (;;)
  {
    if ( xQueueReceive(xQ_eData, &px_eData, portMAX_DELAY) == pdTRUE )
    {
      for ( countUpDown; countUpDown-- > 0; )
      {
        ledcWrite( 4, countUpDown ); // write to channel number 4, dim backlight
        vTaskDelay( dimDelaytime );
      }
      tft.fillScreen(ST77XX_BLACK);
      tft.setCursor( 0, 0 );
      OneTwoThree++;
      if ( OneTwoThree == 1 )
      {
        tft.setTextColor( ST77XX_RED );
      }
      if ( OneTwoThree == 2 )
      {
        tft.setTextColor( ST77XX_WHITE );
      }
      if ( OneTwoThree == 3 )
      {
        tft.setTextColor( ST77XX_BLUE );
        OneTwoThree = 0;
      }
      tft.println( "Temp " + String(px_eData.Temperature) + "F" );
      tft.setCursor( 0, 30 );
      tft.println( "Hum  " + String(px_eData.Humidity) + "%" );
      tft.setCursor( 0, 60 );
      tft.println( "Pres " + String(px_eData.Pressure) + "mmHg" );
      tft.setCursor( 0, 90 );
      tft.println( "AQI  " + String(px_eData.IAQ) + "%" );
      tft.setCursor( 0, 120 );
      tft.println( "RM0  " + String(px_eData.RM0) + "%" );
      tft.setCursor( 0, 150 );
      tft.println( "PM2  " + String(px_eData.PM2) + "ug/m3" );
      tft.setCursor( 0, 180 );
      if ( px_eData.PM2 <= 35.0f )
      {
        //tft.setTextColor( ST77XX_GREEN );
        tft.println( "PM2 is Excellent" );
      }
      if ( (px_eData.PM2 > 35.0f) && px_eData.PM2 <= 75.0f )
      {
        tft.println( "PM2 is Average" );
      }
      if ( (px_eData.PM2 > 75.0f) && px_eData.PM2 <= 115.0f )
      {
        tft.println( "PM2 is Light" );
      }
      if ( (px_eData.PM2 > 115.0f) && px_eData.PM2 <= 150.0f )
      {
        tft.println( "PM2 is Moderate" );
      }
      if ( (px_eData.PM2 > 150.0f) && px_eData.PM2 <= 250.0f )
      {
        tft.println( "PM2 is Heavy" );
      }
      if ( (px_eData.PM2 > 250.0f) )
      {
        tft.println( "PM2 is Serious" );
      }
      //brighten blacklight level
      vTaskDelay( 400 ); // wait for screen update to be done
      for ( countUpDown; countUpDown <= 255; countUpDown++ )
      {
        ledcWrite( 4, countUpDown ); // write to channel number 4
        vTaskDelay( dimDelaytime  );
      }
      //log_i( "DoTheBME280Thing high watermark % d",  uxTaskGetStackHighWaterMark( NULL ) );
    } //if ( xQueueReceive(xQ_eData, &px_eData, portMAX_DELAY) == pdTRUE )
  } //for (;;)
  vTaskDelete( NULL );
} //void fDoTheDisplayTHing( void * parameter )
////
void fmqttWatchDog( void * paramater )
{
  int maxNonMQTTresponse = 5;
  for (;;)
  {
    vTaskDelay( 1000 );
    if ( mqttOK >= maxNonMQTTresponse )
    {
      ESP.restart();
    }
  }
  vTaskDelete( NULL );
}
////
float fCalulate_IAQ_Index( int gasResistance, float Humidity)
{
  float hum_baseline = 40.0f;
  float hum_weighting = 0.25f;
  float gas_offset = 0.0f;
  float hum_offset = 0.0f;
  float hum_score = 0.0f;
  float gas_score = 0.0f;
  gas_offset = oGasResistanceBaseLine - float( gasResistance );
  hum_offset = float( Humidity ) - hum_baseline;
  // calculate hum_score as distance from hum_baseline
  if ( hum_offset > 0.0f )
  {
    hum_score = 100.0f - hum_baseline - hum_offset;
    hum_score /= ( 100.0f - hum_baseline );
    hum_score *= ( hum_weighting * 100.0f );
  } else {
    hum_score = hum_baseline + hum_offset;
    hum_score /= hum_baseline;
    hum_score *= ( 100.0f - (hum_weighting * 100.0f) );
  }
  //calculate gas score as distance from baseline
  if ( gas_offset > 0.0f )
  {
    gas_score = float( gasResistance ) / oGasResistanceBaseLine;
    gas_score *= ( 100.0f - (hum_weighting * 100.0f ) );
  } else {
    gas_score = 100.0f - ( hum_weighting * 100.0f );
  }
  return ( hum_score + gas_score );
} //void fCalulate_IAQ_Index( int gasResistance, float Humidity):
////
void fDoParticleDetector( void * parameter )
{
  /*
    ug/m3     AQI                 Lvl AQ (Air Quality)
    (air Quality Index)
    0-35     0-50                1   Excellent
    35-75    51-100              2   Average
    75-115   101-150             3   Light pollution
    115-150  151-200             4   moderate
    150-250  201-300             5   heavy
    250-500  >=300               6   serious
  */
  float ADbits = 4095.0f;
  float uPvolts = 3.3f;
  float adcValue = 0.0f;
  float dustDensity = 0.0f;
  float Voc = 0.6f; // Set the typical output voltage in Volts when there is zero dust.
  const float K = 0.5f; // Use the typical sensitivity in units of V per 100ug/m3.
  xEventGroupWaitBits (eg, evtWaitForBME, pdTRUE, pdTRUE, portMAX_DELAY );
  TickType_t xLastWakeTime = xTaskGetTickCount();
  const TickType_t xFrequency = 100; //delay for mS
  for (;;)
  {
    //enable sensor led
    gpio_set_level( GPIO_NUM_4, HIGH ); // set gpio 4 to high to turn on sensor internal led for measurement
    esp_timer_start_once( oneshot_timer, 280 ); // trigger one shot timer for a 280uS timeout, warm up time.
    xEventGroupWaitBits (eg, evtDoParticleRead, pdTRUE, pdTRUE, portMAX_DELAY ); // event will be triggered by the timer expiring, wait here for the 280uS
    adcValue = float( adc1_get_raw(ADC1_CHANNEL_0) ); //take a raw ADC reading from the dust sensor
    gpio_set_level( GPIO_NUM_4, LOW );//Shut off the sensor LED
    adcValue = ( adcValue * uPvolts ) / ADbits; //calculate voltage
    dustDensity = (adcValue / K) * 100.0; //convert volts to dust density
    if ( dustDensity < 0.0f )
    {
      dustDensity = 0.00f; // make negative values a 0
    }
    if ( xSemaphoreTake( sema_PublishPM, 0 ) == pdTRUE )  // don't wait for semaphore to be available
    {
      xSemaphoreTake( sema_MQTT_KeepAlive, portMAX_DELAY );
      //log_i( "ADC volts %f Dust Density = %ug / m3 ", adcValue, dustDensity ); // print the calculated voltage and dustdensity
      MQTTclient.publish( topicInsidePM, String(dustDensity).c_str() );
      xSemaphoreGive( sema_MQTT_KeepAlive );
      x_eData.PM2 = dustDensity;
    }
    xLastWakeTime = xTaskGetTickCount();
    vTaskDelayUntil( &xLastWakeTime, xFrequency );
    //log_i( " high watermark % d",  uxTaskGetStackHighWaterMark( NULL ) );
  }
  vTaskDelete( NULL );
}// end fDoParticleDetector()
//// just pass the info the RPi instead of processing here
void fDoTrends( void *pvParameters )
{
  //  const int magicNumber = 96;
  //  double    values[2];
  //  int       lrCount = 0;
  float     lrData = 0.0f;
  //  float     DataPoints[magicNumber] = {0.0f};
  //  float     TimeStamps[magicNumber] = {0.0f};
  //  float     dpHigh = 702.0f;
  //  float     dpLow  = 683.0f;
  //  float     dpAtom = 0.0f;
  //  float     dpMean = 0.0f; //data point mean
  //  float     tsAtom = 0.0f;
  //  float     tsUnit = 0.0f;
  //  float     tsMean = 0.0f;
  //  bool      dpRecalculate = true;
  //  bool      FirstTimeMQTT = true;
  String    apInfo = "";
  apInfo.reserve( 150 );
  for (;;)
  {
    if ( xQueueReceive(xQ_lrData, &lrData, portMAX_DELAY) == pdTRUE )
    {
      apInfo.concat( String((float)xTaskGetTickCount() / 1000.0f) );
      apInfo.concat( "," );
      apInfo.concat( String(lrData) );
      apInfo.concat( ",0.0" );
      apInfo.concat( ",0.0" );
      if ( MQTTclient.connected() )
      {
        xSemaphoreTake( sema_MQTT_KeepAlive, portMAX_DELAY );
        MQTTclient.publish( topicPressureInfo, apInfo.c_str() );
        vTaskDelay( 1 );
        xSemaphoreGive( sema_MQTT_KeepAlive );
      }
      //xQueueSend( xQ_pMessage, (void *) &apInfo, portMAX_DELAY ); // wait for queue space to become available
      apInfo = "";
      //find dpHigh and dpLow, collects historical high and low data points, used for data normalization
      //      if ( lrData > dpHigh )
      //      {
      //        dpHigh = lrData;
      //        dpRecalculate = true;
      //      }
      //      if ( lrData < dpLow )
      //      {
      //        dpLow = lrData;
      //        dpRecalculate = true;
      //      }
      //      if ( lrCount != magicNumber )
      //      {
      //        DataPoints[lrCount] = lrData;
      //        TimeStamps[lrCount] = (float)xTaskGetTickCount() / 1000.0f;
      //        log_i( "lrCount %d TimeStamp %f lrData %f", lrCount, TimeStamps[lrCount], DataPoints[lrCount] );
      //        lrCount++;
      //      } else {
      //        //shift datapoints collected one place to the left
      //        for ( int i = 0; i < magicNumber; i++ )
      //        {
      //          DataPoints[i] = DataPoints[i + 1];
      //          TimeStamps[i] = TimeStamps[i + 1];
      //        }
      //        //insert new data points and time stamp (ts) at the end of the data arrays
      //        DataPoints[magicNumber - 1] = lrData;
      //        TimeStamps[magicNumber - 1] = (float)xTaskGetTickCount() / 1000.0f;
      //        lr.Reset(); //reset the LinearRegression Parameters
      //        // use dpHigh and dpLow to calculate data mean atom for normalization
      //        if ( dpRecalculate )
      //        {
      //          dpAtom = 1.0f / (dpHigh - dpLow); // a new high or low data point has been found adjust mean dpAtom
      //          dpRecalculate = false;
      //        }
      //        //timestamp mean is ts * (1 / ts_Firstcell - ts_Lastcell[magicNumber]). ts=time stamp
      //        tsAtom = 1.0f / (TimeStamps[magicNumber - 1] - TimeStamps[0]); // no need to do this part of the calculation every for loop ++
      //        for (int i = 0; i < magicNumber; i++)
      //        {
      //          dpMean = (DataPoints[i] - dpLow) * dpAtom;
      //          tsMean = TimeStamps[i] * tsAtom;
      //          lr.Data( tsMean, dpMean ); // train lr
      //          //send to mqtt the first time
      //          if ( FirstTimeMQTT )
      //          {
      //            apInfo.concat( String(TimeStamps[i]) );
      //            apInfo.concat( "," );
      //            apInfo.concat( String(DataPoints[i]) );
      //            apInfo.concat( "," );
      //            apInfo.concat(  String(tsMean) );
      //            apInfo.concat( "," );
      //            apInfo.concat( String(dpMean) );
      //            xQueueSend( xQ_pMessage, (void *) &apInfo, portMAX_DELAY ); // wait for queue space to become available
      //            apInfo = "";
      //          }
      //        }
      //        if ( !FirstTimeMQTT )
      //        {
      //          apInfo.concat( String(TimeStamps[magicNumber - 1]) );
      //          apInfo.concat( "," );
      //          apInfo.concat( String(DataPoints[magicNumber - 1]) );
      //          apInfo.concat( "," );
      //          apInfo.concat( String(tsMean) );
      //          apInfo.concat( "," );
      //          apInfo.concat( String(dpMean) );
      //          xQueueSend( xQ_pMessage, (void *) &apInfo, portMAX_DELAY );
      //          apInfo = "";
      //        }
      //        FirstTimeMQTT = false;
      //        lr.Parameters( values );
      //        //calculate
      //        tsUnit = TimeStamps[magicNumber - 1] - TimeStamps[magicNumber - 2]; //get the time stamp quantity
      //        tsUnit += TimeStamps[magicNumber - 1]; //get a future time
      //        tsUnit *= tsAtom; //setting time units to the same scale
      //        log_i( "Calculate next x using y = %f", lr. Calculate( tsUnit ) ); //calculate next datapoint using time stamp
      //        log_i( "Correlation: %f Values: Y=%f and *X + %f ", lr.Correlation(), values[0], values[1] ); // correlation is the strength and direction of the relationship
      //        //calculate datapoint for current time stamp, use current data point against calculated datapoint to get error magnatude and direction.
      //        //log_i( "lr.Error( x_pMessage.TimeStamp, x_pMessage.nDataPoint ) %f", lr.Error(x_pMessage.nTimeStamp, x_pMessage.nDataPoint) ); //
      //      }
      //log_i( "fDoTrends high watermark % d",  uxTaskGetStackHighWaterMark( NULL ) );
    } //if ( xQueueReceive(xQ_lrData, &lrData, portMAX_DELAY) == pdTRUE )
  } //for(;;)
  vTaskDelete ( NULL );
} //void fDoTrends( void *pvParameters )
////
void DoTheBME680Thing( void *pvParameters )
{
  SPI.begin(); // initialize the SPI library
  vTaskDelay( 10 );
  if (!bme.begin()) {
    log_i("Could not find a valid BME680 sensor, check wiring!");
    while (1);
  }
  // Set up oversampling and filter initialization
  bme.setTemperatureOversampling(BME680_OS_8X);
  bme.setHumidityOversampling(BME680_OS_2X);
  bme.setPressureOversampling(BME680_OS_4X);
  bme.setIIRFilterSize(BME680_FILTER_SIZE_3);
  bme.setGasHeater(320, 150); // 320*C for 150 ms
  //wait for a mqtt connection
  while ( !MQTTclient.connected() )
  {
    vTaskDelay( 250 );
  }
  xEventGroupSetBits( eg, evtWaitForBME );
  TickType_t xLastWakeTime    = xTaskGetTickCount();
  const TickType_t xFrequency = 1000 * 15; //delay for mS
  int sendLRDataTrigger       = 240; // 1 hour-ish = 240
  int sendLRdataCount         = sendLRDataTrigger - 1; //send linear regression data when count is reached
  String bmeInfo = "";
  bmeInfo.reserve( 100 );
  for (;;)
  {
    x_eData.Temperature  = bme.readTemperature();
    x_eData.Temperature  = ( x_eData.Temperature * 1.8f ) + 32.0f; // (Celsius x 1.8) + 32
    x_eData.Pressure     = bme.readPressure();
    x_eData.Pressure     = x_eData.Pressure / 133.3223684f; //converts to mmHg
    sendLRdataCount++;
    if ( sendLRdataCount >= sendLRDataTrigger )
    {
      xQueueOverwrite( xQ_lrData, (void *) &x_eData.Pressure ); // send to trends
      sendLRdataCount    = 0;
    }
    x_eData.Humidity     = bme.readHumidity();
    x_eData.IAQ          = fCalulate_IAQ_Index( bme.readGas(), x_eData.Humidity );
    //log_i( " temperature % f, Pressure % f, Humidity % f IAQ % f", x_eData.Temperature, x_eData.Pressure, x_eData.Humidity, x_eData.IAQ);
    bmeInfo.concat( String(x_eData.Temperature, 2) );
    bmeInfo.concat( "," );
    bmeInfo.concat( String(x_eData.Pressure, 2) );
    bmeInfo.concat( "," );
    bmeInfo.concat( String(x_eData.Humidity, 2) );
    bmeInfo.concat( "," );
    bmeInfo.concat( String(x_eData.IAQ, 2) );
    xSemaphoreTake( sema_MQTT_KeepAlive, portMAX_DELAY );
    if ( MQTTclient.connected() )
    {
      MQTTclient.publish( topicInsideInfo, bmeInfo.c_str() );
    }
    xSemaphoreGive( sema_MQTT_KeepAlive );
    xSemaphoreGive( sema_PublishPM ); // release publish of dust density
    xSemaphoreTake( sema_mqttOK, portMAX_DELAY );
    mqttOK ++;
    xSemaphoreGive( sema_mqttOK );
    xQueueOverwrite( xQ_eData, (void *) &x_eData );// send data to display
    //
    bmeInfo = ""; // empty string
    xLastWakeTime = xTaskGetTickCount();
    vTaskDelayUntil( &xLastWakeTime, xFrequency );
    // log_i( "DoTheBME280Thing high watermark % d",  uxTaskGetStackHighWaterMark( NULL ) );
  }
  vTaskDelete ( NULL );
}
////
/*
  Important to not set vTaskDelay/vTaskDelayUntil to less then 10. Errors begin to develop with the MQTT and network connection.
  makes the initial wifi/mqtt connection and works to keeps those connections open.
*/
void MQTTkeepalive( void *pvParameters )
{
  sema_MQTT_KeepAlive   = xSemaphoreCreateBinary();
  xSemaphoreGive( sema_MQTT_KeepAlive ); // found keep alive can mess with a publish, stop keep alive during publish
  MQTTclient.setKeepAlive( 90 ); // setting keep alive to 90 seconds makes for a very reliable connection, must be set before the 1st connection is made.
  TickType_t xLastWakeTime = xTaskGetTickCount();
  const TickType_t xFrequency = 250; //delay for ms
  for (;;)
  {
    //check for a is-connected and if the WiFi 'thinks' its connected, found checking on both is more realible than just a single check
    if ( (wifiClient.connected()) && (WiFi.status() == WL_CONNECTED) )
    {
      xSemaphoreTake( sema_MQTT_KeepAlive, portMAX_DELAY ); // whiles MQTTlient.loop() is running no other mqtt operations should be in process
      MQTTclient.loop();
      xSemaphoreGive( sema_MQTT_KeepAlive );
    }
    else {
      log_i( "MQTT keep alive found MQTT status % s WiFi status % s", String(wifiClient.connected()), String(WiFi.status()) );
      if ( !(wifiClient.connected()) || !(WiFi.status() == WL_CONNECTED) )
      {
        connectToWiFi();
      }
      connectToMQTT();
    }
    xLastWakeTime = xTaskGetTickCount();
    vTaskDelayUntil( &xLastWakeTime, xFrequency );
  }
  vTaskDelete ( NULL );
}
////
void connectToMQTT()
{
  byte mac[5]; // create client ID from mac address
  WiFi.macAddress(mac); // get mac address
  String clientID = String(mac[0]) + String(mac[4]) ; // use mac address to create clientID
  while ( !MQTTclient.connected() )
  {
    MQTTclient.connect( clientID.c_str(), mqtt_username, mqtt_password );
    //log_i( "connecting to MQTT" );
    vTaskDelay( 250 );
  }
  MQTTclient.setCallback( mqttCallback );
  MQTTclient.subscribe( topicOK );
  MQTTclient.subscribe( topicRemainingMoisture_0 );
  //log_i("MQTT Connected");
} //void connectToMQTT()
void connectToWiFi()
{
  int TryCount = 0;
  //log_i( "connect to wifi" );
  while ( WiFi.status() != WL_CONNECTED )
  {
    TryCount++;
    WiFi.disconnect();
    WiFi.begin( SSID, PASSWORD );
    //log_i(" waiting on wifi connection" );
    vTaskDelay( 4000 );
    if ( TryCount == 10 )
    {
      ESP.restart();
    }
  }
  //log_i( "Connected to WiFi" );
  WiFi.onEvent( WiFiEvent );
}
////
void loop() { }

It's written for an ESP32.

Im newbie and i dont think i understand exactly. Sr about that!!

this is image about wifi connected:

This is my code fixed:

#include <ESP8266WiFi.h>
#include <PubSubClient.h>

WiFiClient espClient;
PubSubClient client(espClient);

const char * wifi_ssid = "********";
const char * wifi_password = "********";
const char * mqtt_sever = "localhost";

void callback(String topic, byte* message, unsigned int length) {
  Serial.print("Message arrived on topic: ");
  Serial.print(topic);
  Serial.print(". Message: ");
  String messageTemp;

  for (int i = 0; i < length; i++) {
    Serial.print((char)message[i]);
    messageTemp += (char)message[i];
  }
  Serial.println();
}

void setup() {
  Serial.begin(9600);
  connectToWifi();

  client.setServer(mqtt_sever, 1883);
  client.setCallback(callback);
  reconnect();
}

void loop() {
  if (!client.connected()) {
    reconnect();
  }
  client.loop();
}

void connectToWifi() {
  Serial.println("----------------------");
  Serial.print("Wifi connecting to ");
  Serial.println( wifi_ssid );
  WiFi.begin(wifi_ssid, wifi_password);
  Serial.println();
  Serial.print("Connecting");
  while ( WiFi.status() != WL_CONNECTED ) {
    delay(500);
    Serial.println(".");
  }
  Serial.print("Connected, IP address: ");
  Serial.println(WiFi.localIP());
}

void reconnect() {
  while (!client.connected()) {
    if ( WiFi.status() == WL_CONNECTED ) {
      Serial.println("Wifi connected");
    }
    Serial.print("Attempting MQTT connection...");
    if (client.connect("ESP8266Client")) {
      Serial.println("connected");
      client.publish("event", "hello world");
      client.subscribe("event");
    } else {
      Serial.print("failed, rc=");
      Serial.print(client.state());
      Serial.println(" try again in 5 seconds");
      delay(5000);
    }
  }
}

do you know the ip address of the MQTT server? Use the IP address or the server name to connect to the server. Have you looked at your MQTT Broker log files to see if a connection attempt is being made and why its being refused? Documentation on Eclipse MQTT Broker is on-line where you can find the how to on the thing do.

Thanks for showing the printout.

So if WiFI is not connected your code does nothing about it? How can you make a MQTT connection if WiFI is NOT connected?

Before making a WIFi connection issue a WIFi disconnect(). Obviously WiFi.diconnect() disconnects if connected but the less obvious thing WiFI.DIsconnect() does is reset the WiFi stack to its default values. WiFi stack corruption is the biggest reason for WiFi connection failures.

1 Like

As noted, you need the name of the server running the broker to connect. localhost means "this machine", so unless you're running the broker on your ESP, that isn't it. ESP8266Client doesn't seem plausible either.

1 Like

If you are having MQTT connection issues. get, install MQTT.fx. Use MQTT.fx to connect with your MQTT Broker. MQTT.fx gives some fair feedback on why the connection be not work. Once you are able to make a MQTT.fx connection you can use the MQTT.fx connection info in your program.

1 Like

yeah, i create broker mqtt on my com and change "local host" to ip and it works. Tks all!!

This topic was automatically closed 120 days after the last reply. New replies are no longer allowed.