MQTT subscription failing

Hi all, I’m hoping you can help me clarify a point on MQTT and topic subscription.

I am working on an ESP32 Dev board for a project that both publishes to MQTT topics, and subscribes to control variables, using the PubSubClient library. I have a Digital Ocean machine running Mosquitto server, and Node Red to handle the published data from the ESP32 onto a dashboard along with controls that the ESP32 is subscribed to.

The publishing of the topics from the MCU is working just fine. The subscription however doesn’t seem to work - the first step I have in Void Callback() is a Serial.print line which I use to debug and check that the message is arriving from the MQTT server.

To check where I am going wrong I have stripped back the code for my project and put just the essentials into the PubSubClient example file mqtt_basic. I have also installed MQTT Explorer on my PC, and using that I can see both the messages published by the MCU, and the messages published by the Mosquitto server via Node Red. As such I definitely know that the messages are being sent out from the server just fine.

This is the code that I am using to test the MQTT usage. It’s the PubSubClient example with as little changed as possible:

#include <SPI.h>
#include <Ethernet.h>
#include <WiFi.h>
#include <PubSubClient.h>
#include <NTPClient.h>

#define ssid "**********"
#define password "**********"
#define mqttServer "***.***.***.***"
#define mqttUser "**********"
#define mqttPass "**********"
#define mqttTime  "BrewKettle/time"

//NTP Client for time retrieval
WiFiUDP ntpUDP;
NTPClient timeClient(ntpUDP);

long now;
String formattedTime;
char strTime  [9];

void callback(char* topic, byte* payload, unsigned int length) {
  Serial.print("Message arrived [");
  Serial.print(topic);
  Serial.print("] ");
  for (int i=0;i<length;i++) {
    Serial.print((char)payload[i]);
  }
  Serial.println();
}

WiFiClient ethClient;
PubSubClient client(ethClient);

void reconnect() {
  // Loop until we're reconnected
  delay(1500);
  while (!client.connected()) {
    Serial.print("Attempting MQTT connection...");
    // Attempt to connect
    if (client.connect("arduinoClient", mqttUser, mqttPass)) {
      Serial.println("connected");
      // Once connected, publish an announcement...
      client.publish("BrewKettle/inTemp","hello world");
      // ... and resubscribe
      client.subscribe("BrewKettle/*");
      //client.subscribe("BrewKettle/heat1");
      //client.subscribe("BrewKettle/heat2");
      //client.subscribe("BrewKettle/pump");
      //client.subscribe("BrewKettle/state");
    } else {
      Serial.print("failed, rc=");
      Serial.print(client.state());
      Serial.println(" try again in 5 seconds");
      // Wait 5 seconds before retrying
      delay(5000);
    }
  }
}

void setup()
{
  Serial.begin(115200);

  client.setServer(mqttServer, 1883);
  client.setCallback(callback);

// Initialise Serial output, connect to Wifi
  Serial.begin(115200);
  Serial.println("Setting up Wifi");
  WiFi.begin(ssid, password);
  if (WiFi.status() != WL_CONNECTED) {
    Serial.println("Failed to connect to Wifi");
  }else{
    Serial.println("Wifi Connected");
  }
  // Allow the hardware to sort itself out
  delay(1500);
  Serial.println("Setting Up");

  //Initialise NTPClient for time retrieval
  timeClient.begin();
}

void loop()
{
  if (!client.connected()) {
    reconnect();
  }
  client.loop();

  if((millis()-now)>3000) {
    timeClient.forceUpdate();
    formattedTime = timeClient.getFormattedTime();
    strcpy(strTime, formattedTime.c_str());
    Serial.print("Last update: ");
    Serial.println(strTime);
    client.publish(mqttTime, strTime);
    now = millis();
  }
}

I might be wrong, but even with the actual functionality of my full project stripped out, from this I should be seeing a message printed out on the Serial Monitor any time a message under one of the subscribed topics is sent. As it is all I see over the Serial Monitor in the Arduino IDE are the time updates, even though I see the subscribed values change on MQTT Explorer running on my PC. Could you help me figure out why my code isn’t subscribing correctly?

To be honest, I don’t have a great understanding on how this library works, even after trying to read up on it. I especially don’t understand how the Callback() function works, how it is called, and how that makes the subscription work.

I wondered if there is a problem with timing and conflicts, but that seems unlikely.

Many thanks for any help!

have you used something like MQTT.fx to connect to the MQTT Broker and poke around? I'd use MQTT.fx to see the going-on of the BrewKettle tree. Load MQTT.fx, enter the connection thingies. connect, subscribe, scan. With the scan you'll see the MQTT published topics. click on one to subscribe to the topic to see its data and data arrival frequency.

This will prove that the data is there for the MQTT callback.

client.loop() causes the callback to trigger.

Why load: #include <Ethernet.h> when it’s not being used?

Add a new tab, name the tab certs.h, put your secret cht in certs.h, inclue certs.h to use your secret cht.

Before reconnecting or connecting use WiFI.disconnect() it does 2 things. Thing one it disconnects if their is an open connection. Thing two is that it forces the WiFI stack to reset to default values.

Dig into the pubsubclient set keep alive to (90) before making the mqtt connection.

Your reconnect() function does not seem to care if WiFI is connected.

Thanks for the replies! To respond to each in turn:

have you used something like MQTT.fx to connect to the MQTT Broker and poke around? I’d use MQTT.fx to see the going-on of the BrewKettle tree. Load MQTT.fx, enter the connection thingies. connect, subscribe, scan. With the scan you’ll see the MQTT published topics. click on one to subscribe to the topic to see its data and data arrival frequency.

Is that necessary? It asks for a license key and subscription, even with a free trial. I’m using MQTT Explorer which seems to be the same thing, and that is showing received MQTT messages on the subscribed topics that are coming from the Mosquitto server. This shows that the data is there for the callback.

Why load: #include <Ethernet.h> when it’s not being used?

That shouldn’t be there, but it’s just kinda superfluous.

Add a new tab, name the tab certs.h, put your secret cht in certs.h, inclue certs.h to use your secret cht.

I do this in the finished code of my project; given that this is a stripped down version of the example code in order to test MQTT subscription I haven’t bothered.

Before reconnecting or connecting use WiFI.disconnect() it does 2 things. Thing one it disconnects if their is an open connection. Thing two is that it forces the WiFI stack to reset to default values.

Cool! I’ve added this line in before the beginning the WiFi connection.

Dig into the pubsubclient set keep alive to (90) before making the mqtt connection.

Okay, I’ve done this by changing the relevant value in pubsubclient.h

Your reconnect() function does not seem to car if WiFI is connected.

I can change that, but does that really matter for this test? I know that messages are getting through as the ESP32 publishes them.

I’ve tried these things but there’s no change in what I’m seeing in the Serial Monitor. No messages come through when published by the Mosquitto server, even though I am seeing those messages arrive on the MQTT Explorer program.

I don’t seem to be closer to getting the ESP32 to successfully subscribe to topics are respond to messages published on them. I just don’t know what I’m doing wrong!

I let the Broker know what I am subscribing too after a connection to the Broker has been made, not before.
Here is the PubSubClient subscribe function:

boolean PubSubClient::subscribe(const char* topic, uint8_t qos) {
    size_t topicLength = strnlen(topic, this->bufferSize);
    if (topic == 0) {
        return false;
    }
    if (qos > 1) {
        return false;
    }
    if (this->bufferSize < 9 + topicLength) {
        // Too long
        return false;
    }
    if (connected()) {
        // Leave room in the buffer for header and variable length field
        uint16_t length = MQTT_MAX_HEADER_SIZE;
        nextMsgId++;
        if (nextMsgId == 0) {
            nextMsgId = 1;
        }
        this->buffer[length++] = (nextMsgId >> 8);
        this->buffer[length++] = (nextMsgId & 0xFF);
        length = writeString((char*)topic, this->buffer,length);
        this->buffer[length++] = qos;
        return write(MQTTSUBSCRIBE|MQTTQOS1,this->buffer,length-MQTT_MAX_HEADER_SIZE);
    }
    return false;
}

Notice this line: if (connected())? and what happens after that line? If you subscribe before the connection then the MQTT Broker is NOT informed of your subscription choice.

No need to edit the pubsubclient library to set KeepAlive use: MQTTclient.setKeepAlive( 90 );, before making a connection.

Here is my mqtt keep alive function that manages the WiFi connection, the MQTT connection, and client.loop():

void MQTTkeepalive( void *pvParameters )
{
  sema_MQTT_KeepAlive   = xSemaphoreCreateBinary();
  xSemaphoreGive( sema_MQTT_KeepAlive ); // found keep alive can mess with a publish, stop keep alive during publish
  MQTTclient.setKeepAlive( 90 ); // setting keep alive to 90 seconds makes for a very reliable connection, must be set before the 1st connection is made.
  TickType_t xLastWakeTime = xTaskGetTickCount();
  const TickType_t xFrequency = 250; //delay for ms
  for (;;)
  {
    //check for a is-connected and if the WiFi 'thinks' its connected, found checking on both is more realible than just a single check
    if ( (wifiClient.connected()) && (WiFi.status() == WL_CONNECTED) )
    {
      xSemaphoreTake( sema_MQTT_KeepAlive, portMAX_DELAY ); // whiles MQTTlient.loop() is running no other mqtt operations should be in process
      MQTTclient.loop();
      xSemaphoreGive( sema_MQTT_KeepAlive );
    }
    else {
      log_i( "MQTT keep alive found MQTT status % s WiFi status % s", String(wifiClient.connected()), String(WiFi.status()) );
      if ( !(wifiClient.connected()) || !(WiFi.status() == WL_CONNECTED) )
      {
        connectToWiFi();
      }
      connectToMQTT();
    }
    xLastWakeTime = xTaskGetTickCount();
    vTaskDelayUntil( &xLastWakeTime, xFrequency );
  }
  vTaskDelete ( NULL );
}
////
void connectToMQTT()
{
  byte mac[5]; // create client ID from mac address
  WiFi.macAddress(mac); // get mac address
  String clientID = String(mac[0]) + String(mac[4]) ; // use mac address to create clientID
  while ( !MQTTclient.connected() )
  {
    MQTTclient.connect( clientID.c_str(), mqtt_username, mqtt_password );
    log_i( "connecting to MQTT" );
    vTaskDelay( 250 );
  }
  MQTTclient.setCallback( mqttCallback );
  MQTTclient.subscribe( topicOK );
  MQTTclient.subscribe( topicRemainingMoisture_0 );
  log_i("MQTT Connected");
} //void connectToMQTT()
void connectToWiFi()
{
  int TryCount = 0;
  //log_i( "connect to wifi" );
  while ( WiFi.status() != WL_CONNECTED )
  {
    TryCount++;
    WiFi.disconnect();
    WiFi.begin( SSID, PASSWORD );
    //log_i(" waiting on wifi connection" );
    vTaskDelay( 4000 );
    if ( TryCount == 10 )
    {
      ESP.restart();
    }
  }
  //log_i( "Connected to WiFi" );
  WiFi.onEvent( WiFiEvent );
}

I let the Broker know what I am subscribing too after a connection to the Broker has been made, not before.

This was the problem I think! That, and a problematic While Loop that was in my main code.
Thanks so much for your help in figuring it out, I've ceased banging my head against the wall.

This topic was automatically closed 120 days after the last reply. New replies are no longer allowed.