ESP32 MQTT onMessage Callback never called

Hello Guys

I use an ESP32 and connect it to AWS IOT.
After connecting to Wifi and IOT I publish an empty message to the topic "$aws/things/chicken-door/shadow/name/door/get"
AWS IOT has some logic. So the empty message to this topic triggers a new message to the topic "$aws/things/chicken-door/shadow/name/door/get/accepted" containing a json document of the things state (they call it shadow).

Until here everything works fine. I see the empty message in the "get topic" and I see the json file in the "get/accepted topic".

But unfortunately the message is never received by the ESP32. the onMessage callback function is never called.

Here is my code:


#include "secrets.h"
#include <WiFiClientSecure.h>
#include <MQTTClient.h>
#include <ArduinoJson.h>
#include "WiFi.h"

// Topics
#define AWS_IOT_PUBLISH_TOPIC   "$aws/things/chicken-door/shadow/name/door/get"
#define AWS_IOT_SUBSCRIBE_TOPIC "$aws/things/chicken-door/shadow/name/door/get/accepted"

int msgReceived = 0;
String rcvdPayload;
char sndPayloadOff[512];
char sndPayloadOn[512];

WiFiClientSecure net = WiFiClientSecure();
MQTTClient client;

void connectAWS()
{
  WiFi.mode(WIFI_STA);
  WiFi.begin(WIFI_SSID, WIFI_PASSWORD);
  Serial.println("");
  Serial.println("###################### Starting Execution ########################");
  Serial.println("Connecting to Wi-Fi");

  while (WiFi.status() != WL_CONNECTED){
    delay(500);
    Serial.print(".");
  }

  // Configure WiFiClientSecure to use the AWS IoT device credentials
  net.setCACert(AWS_CERT_CA);
  net.setCertificate(AWS_CERT_CRT);
  net.setPrivateKey(AWS_CERT_PRIVATE);

  // Connect to the MQTT broker on the AWS endpoint
  client.begin(AWS_IOT_ENDPOINT, 8883, net);

  // Create a message handler
  client.onMessage(messageHandler);

  Serial.println("Connecting to AWS IOT");

  while (!client.connect(THINGNAME)) {
    Serial.print(".");
    delay(100);
  }

  if(!client.connected()){
    Serial.println("AWS IoT Timeout!");
    return;
  }

  // Subscribe to a topic
  client.subscribe(AWS_IOT_SUBSCRIBE_TOPIC);

  Serial.println("AWS IoT Connected!");
}

void messageHandler(String &topic, String &payload) {
  msgReceived = 1;
  rcvdPayload = payload;
  Serial.println("something received");
}

void setup() {
  Serial.begin(115200);
  
  connectAWS();
  
  Serial.println("Get shadow");
  client.publish(AWS_IOT_PUBLISH_TOPIC, "");
  
  Serial.println("##############################################");
}

void loop() {
   if(msgReceived == 1)
    {
//      This code will run whenever a message is received on the SUBSCRIBE_TOPIC_NAME Topic
        delay(100);
        msgReceived = 0;
        Serial.print("Received Message:");
        Serial.println(rcvdPayload);
      Serial.println("##############################################");
    }
  client.loop();
}

Thanks in advance for looking into this.

What MQTT client are you using? Most people use PubSubClient, it works well with ESP and has examples to show usage with callback.

I am currently using this client: GitHub - 256dpi/arduino-mqtt: MQTT library for Arduino as it was recomended by AWS.

So you suggest to use the PubSubClient instead?

I have basically rebuilt the solution based on an example of the PubSubClient library.. (example used: ESP-MQTT-AWS-IoT-Core/PubSubClient.ino at master · debsahu/ESP-MQTT-AWS-IoT-Core · GitHub)
They are not useing the shadow logic (i.e if I publish to /get, IoT will publish to /get/accepted)

What I have noticed in the meantime is that if I subscribe directly to the topic I am publishing in, the subscription works (pub and sub to i.e /update/accepted). Only when another process (in my case AWS IOT) is publishing to the topic i don't get the message (even though i see the message when debugging in the aws console). Very strange.

#include <WiFi.h>
#include <WiFiClientSecure.h>
#include <PubSubClient.h>
#include <ArduinoJson.h>
#include <time.h>
#define emptyString String()

#include "secrets.h"

const int MQTT_PORT = 8883;
const char AWS_IOT_UPDATE[] = "$aws/things/chicken-door/shadow/name/door/update";
const char AWS_IOT_UPDATE_DELTA[] = "$aws/things/chicken-door/shadow/name/door/update/delta";
const char AWS_IOT_GET_SHADOW[] = "$aws/things/chicken-door/shadow/name/door/get";
const char AWS_IOT_SHADOW_ACCEPTED[] = "$aws/things/chicken-door/shadow/name/door/get/accepted";

WiFiClientSecure net;
PubSubClient client(net);

unsigned long lastMillis = 0;

void messageReceived(char *topic, byte *payload, unsigned int length)
{
  if (strcmp(topic,AWS_IOT_SHADOW_ACCEPTED)==0){
    Serial.printf("Received Message from topic : %s", AWS_IOT_SHADOW_ACCEPTED);
  }
  if (strcmp(topic,AWS_IOT_UPDATE_DELTA)==0){
    Serial.printf("Received Message from topic : %s", AWS_IOT_SHADOW_ACCEPTED);
  }
  Serial.print("Received [");
  Serial.print(topic);
  Serial.print("]: ");
  for (int i = 0; i < length; i++)
  {
    Serial.print((char)payload[i]);
  }
  Serial.println();
}

void pubSubErr(int8_t MQTTErr)
{
  if (MQTTErr == MQTT_CONNECTION_TIMEOUT)
    Serial.print("Connection tiemout");
  else if (MQTTErr == MQTT_CONNECTION_LOST)
    Serial.print("Connection lost");
  else if (MQTTErr == MQTT_CONNECT_FAILED)
    Serial.print("Connect failed");
  else if (MQTTErr == MQTT_DISCONNECTED)
    Serial.print("Disconnected");
  else if (MQTTErr == MQTT_CONNECTED)
    Serial.print("Connected");
  else if (MQTTErr == MQTT_CONNECT_BAD_PROTOCOL)
    Serial.print("Connect bad protocol");
  else if (MQTTErr == MQTT_CONNECT_BAD_CLIENT_ID)
    Serial.print("Connect bad Client-ID");
  else if (MQTTErr == MQTT_CONNECT_UNAVAILABLE)
    Serial.print("Connect unavailable");
  else if (MQTTErr == MQTT_CONNECT_BAD_CREDENTIALS)
    Serial.print("Connect bad credentials");
  else if (MQTTErr == MQTT_CONNECT_UNAUTHORIZED)
    Serial.print("Connect unauthorized");
}

void connectToMqtt(bool nonBlocking = false)
{
  Serial.print("MQTT connecting ");
  while (!client.connected())
  {
    if (client.connect(THINGNAME))
    {
      if (!client.subscribe(AWS_IOT_SHADOW_ACCEPTED))
        pubSubErr(client.state());
      if (!client.subscribe(AWS_IOT_UPDATE_DELTA))
        pubSubErr(client.state());
      Serial.println("connected!");
    }
    else
    {
      Serial.print("failed, reason -> ");
      pubSubErr(client.state());
      if (!nonBlocking)
      {
        Serial.println(" < try again in 5 seconds");
        delay(5000);
      }
      else
      {
        Serial.println(" <");
      }
    }
    if (nonBlocking)
      break;
  }
}

void connectToWiFi(String init_str)
{
  if (init_str != emptyString)
    Serial.print(init_str);
  while (WiFi.status() != WL_CONNECTED)
  {
    Serial.print(".");
    delay(1000);
  }
  if (init_str != emptyString)
    Serial.println("ok!");
}

void checkWiFiThenMQTT(void)
{
  connectToWiFi("Checking WiFi");
  connectToMqtt();
}

unsigned long previousMillis = 0;
const long interval = 5000;

void checkWiFiThenMQTTNonBlocking(void)
{
  connectToWiFi(emptyString);
  if (millis() - previousMillis >= interval && !client.connected()) {
    previousMillis = millis();
    connectToMqtt(true);
  }
}

void checkWiFiThenReboot(void)
{
  connectToWiFi("Checking WiFi");
  Serial.print("Rebooting");
  ESP.restart();
}

void sendData(String value)
{
  DynamicJsonDocument jsonBuffer(JSON_OBJECT_SIZE(3) + 100);
  JsonObject root = jsonBuffer.to<JsonObject>();
  JsonObject state = root.createNestedObject("state");
  JsonObject state_reported = state.createNestedObject("reported");
  state_reported["value"] = value;
  Serial.printf("Sending  [%s]: ", AWS_IOT_UPDATE);
  serializeJson(root, Serial);
  Serial.println();
  char shadow[measureJson(root) + 1];
  serializeJson(root, shadow, sizeof(shadow));
  if (!client.publish(AWS_IOT_UPDATE, shadow, false))
    pubSubErr(client.state());
}

void getShadow(void)
{
  Serial.printf("Requesting Shadow [%s]: ", AWS_IOT_GET_SHADOW);
  if (!client.publish(AWS_IOT_GET_SHADOW, "", false))
    pubSubErr(client.state());
}

void setup()
{
  Serial.begin(115200);
  delay(5000);
  Serial.println();
  Serial.println();
#ifdef ESP32
  WiFi.setHostname(THINGNAME);
#else
  WiFi.hostname(THINGNAME);
#endif
  WiFi.mode(WIFI_STA);
  WiFi.begin(WIFI_SSID, WIFI_PASSWORD);
  connectToWiFi(String("Attempting to connect to SSID: ") + String(WIFI_SSID));

  net.setCACert(AWS_CERT_CA);
  net.setCertificate(AWS_CERT_CRT);
  net.setPrivateKey(AWS_CERT_PRIVATE);

  client.setServer(AWS_IOT_ENDPOINT, MQTT_PORT);
  client.setCallback(messageReceived);

  connectToMqtt();
  getShadow();
}

void loop()
{
  if (!client.connected())
  {
    checkWiFiThenMQTT();
  }
  else
  {
    client.loop();
    if (millis() - lastMillis > 5000)
    {
      lastMillis = millis();
      sendData("closed");
    }
  }
}

I do not use AWS but instead of subscribing to AWS_IOT_SHADOW_ACCEPTED and AWS_IOT_UPDATE_DELTA what do you get in the serial monitor if you subscribe to just "$aws/things/chicken-door/shadow/name/door/#". The # will match anything that is returned so may help.

Is this the example you followed on aws?

That is exactly the tutorial i followed at first. Now i implemented the solution based on PubSubClient. For this i used this as a base: ESP-MQTT-AWS-IoT-Core/PubSubClient.ino at master · debsahu/ESP-MQTT-AWS-IoT-Core · GitHub

If I subscribe to /# I still get only the messages I publish in the loop (via sendData()). Eventhough on AWS I see everything else.

I also tested with MQTT Explorer using the same credentials I use on the ESP32 (cert and key). Everything works as expected there. Therefor I would think the issue has to be in the implementation on the ESP32...

Any ideas?

Does the example sketch work without alteration (apart from adding secret.h details) or does this also fail?
From your description it does seem there is some problem with the certificate/secure connection on the ESP and AWS is rejecting/ignoring it.
The example sketch implement pubSubErr() that may help in determining the cause of the connection issue, what do you see in the Serial monitor?

The example sketch works (sending and subscribing to the same topic).

I did log if the wifi connection fails. And it looks like the wifi connection is lost everytime i send data to a topic that I am not explicitly subsribed to.
COM output:

Attempting to connect to SSID: dehei.local...ok!
MQTT connecting connected!
Requesting Shadow [$aws/things/chicken-door/shadow/name/door/get]: Sending test to $aws/things/chicken-door/shadow/name/door/get/accepted: 
Checking WiFiok!
MQTT connecting connected!
Sending test to $aws/things/chicken-door/shadow/name/door/get/accepted: 
Sending test to $aws/things/chicken-door/shadow/name/door/get/accepted: 
Connection lostChecking WiFiok!
MQTT connecting connected!
Sending test to $aws/things/chicken-door/shadow/name/door/get/accepted: 
Connection lostChecking WiFiok!
MQTT connecting connected!
Sending test to $aws/things/chicken-door/shadow/name/door/get/accepted: 
Connection lostChecking WiFiok!
MQTT connecting connected!
Checking WiFiok!
MQTT connecting connected!
Sending test to $aws/things/chicken-door/shadow/name/door/get/accepted: 
Checking WiFiok!
MQTT connecting connected!
Sending test to $aws/things/chicken-door/shadow/name/door/get/accepted: 
Connection lostChecking WiFiok!
MQTT connecting connected!
Checking WiFiok!
MQTT connecting connected!
Sending test to $aws/things/chicken-door/shadow/name/door/get/accepted: 
Sending test to $aws/things/chicken-door/shadow/name/door/get/accepted: 
Connection lostChecking WiFiok!
MQTT connecting connected!
Checking WiFiok!
MQTT connecting connected!
Sending test to $aws/things/chicken-door/shadow/name/door/get/accepted: 
Sending test to $aws/things/chicken-door/shadow/name/door/get/accepted: 
Connection lostChecking WiFiok!

The current code:

#include <WiFiClientSecure.h>
#include <PubSubClient.h>
#include <ArduinoJson.h>
#include <time.h>
#define emptyString String()

#include "secrets.h"

const int MQTT_PORT = 8883;
const char AWS_IOT_UPDATE[] = "$aws/things/chicken-door/shadow/name/door/update";
const char AWS_IOT_UPDATE_DELTA[] = "$aws/things/chicken-door/shadow/name/door/update/delta";
const char AWS_IOT_GET_SHADOW[] = "$aws/things/chicken-door/shadow/name/door/get";
const char AWS_IOT_SHADOW_ACCEPTED[] = "$aws/things/chicken-door/shadow/name/door/get/accepted";

WiFiClientSecure net;
PubSubClient client(net);

unsigned long lastMillis = 0;

void messageReceived(char *topic, byte *payload, unsigned int length)
{
  Serial.print("Received [");
  Serial.print(topic);
  Serial.print("]: ");
  for (int i = 0; i < length; i++)
  {
    Serial.print((char)payload[i]);
  }
  Serial.println();
}

void pubSubErr(int8_t MQTTErr)
{
  if (MQTTErr == MQTT_CONNECTION_TIMEOUT)
    Serial.print("Connection tiemout");
  else if (MQTTErr == MQTT_CONNECTION_LOST)
    Serial.print("Connection lost");
  else if (MQTTErr == MQTT_CONNECT_FAILED)
    Serial.print("Connect failed");
  else if (MQTTErr == MQTT_DISCONNECTED)
    Serial.print("Disconnected");
  else if (MQTTErr == MQTT_CONNECTED)
    Serial.print("Connected");
  else if (MQTTErr == MQTT_CONNECT_BAD_PROTOCOL)
    Serial.print("Connect bad protocol");
  else if (MQTTErr == MQTT_CONNECT_BAD_CLIENT_ID)
    Serial.print("Connect bad Client-ID");
  else if (MQTTErr == MQTT_CONNECT_UNAVAILABLE)
    Serial.print("Connect unavailable");
  else if (MQTTErr == MQTT_CONNECT_BAD_CREDENTIALS)
    Serial.print("Connect bad credentials");
  else if (MQTTErr == MQTT_CONNECT_UNAUTHORIZED)
    Serial.print("Connect unauthorized");
}

void connectToMqtt(bool nonBlocking = false)
{
  Serial.print("MQTT connecting ");
  while (!client.connected())
  {
    if (client.connect(THINGNAME))
    {
//      if (!client.subscribe(AWS_IOT_SHADOW_ACCEPTED))
//        pubSubErr(client.state());
//      if (!client.subscribe(AWS_IOT_UPDATE_DELTA))
//        pubSubErr(client.state());
      if (!client.subscribe("$aws/things/chicken-door/shadow/name/door/#"))
        pubSubErr(client.state());
      Serial.println("connected!");
    }
    else
    {
      Serial.print("failed, reason -> ");
      pubSubErr(client.state());
      if (!nonBlocking)
      {
        Serial.println(" < try again in 5 seconds");
        delay(5000);
      }
      else
      {
        Serial.println(" <");
      }
    }
    if (nonBlocking)
      break;
  }
}

void connectToWiFi(String init_str)
{
  if (init_str != emptyString)
    Serial.print(init_str);
  while (WiFi.status() != WL_CONNECTED)
  {
    Serial.print(".");
    delay(1000);
  }
  if (init_str != emptyString)
    Serial.println("ok!");
}

void checkWiFiThenMQTT(void)
{
  connectToWiFi("Checking WiFi");
  connectToMqtt();
}

unsigned long previousMillis = 0;
const long interval = 5000;

void checkWiFiThenMQTTNonBlocking(void)
{
  connectToWiFi(emptyString);
  if (millis() - previousMillis >= interval && !client.connected()) {
    previousMillis = millis();
    connectToMqtt(true);
  }
}

void checkWiFiThenReboot(void)
{
  connectToWiFi("Checking WiFi");
  Serial.print("Rebooting");
  ESP.restart();
}

void sendData(String value) 
{
  DynamicJsonDocument jsonBuffer(JSON_OBJECT_SIZE(3) + 100);
  JsonObject root = jsonBuffer.to<JsonObject>();
  JsonObject state = root.createNestedObject("state");
  JsonObject state_reported = state.createNestedObject("reported");
  state_reported["value"] = value;
  Serial.printf("Sending test to %s: ", AWS_IOT_SHADOW_ACCEPTED);
  //serializeJson(root, Serial);
  Serial.println();
  char shadow[measureJson(root) + 1];
  serializeJson(root, shadow, sizeof(shadow));
  if (!client.publish(AWS_IOT_SHADOW_ACCEPTED, "test", false))
    pubSubErr(client.state());
}

void getShadow(void)
{
  Serial.printf("Requesting Shadow [%s]: ", AWS_IOT_GET_SHADOW);
  if (!client.publish(AWS_IOT_GET_SHADOW, "", false))
    pubSubErr(client.state());
}

void setup()
{
  Serial.begin(115200);
  delay(5000);
  Serial.println();
  Serial.println();

  WiFi.setHostname(THINGNAME);
  WiFi.mode(WIFI_STA);
  WiFi.begin(WIFI_SSID, WIFI_PASSWORD);
  connectToWiFi(String("Attempting to connect to SSID: ") + String(WIFI_SSID));

  net.setCACert(AWS_CERT_CA);
  net.setCertificate(AWS_CERT_CRT);
  net.setPrivateKey(AWS_CERT_PRIVATE);

  client.setServer(AWS_IOT_ENDPOINT, MQTT_PORT);
  client.setCallback(messageReceived);

  connectToMqtt();
  getShadow();
}

void loop()
{
  if (!client.connected())
  {
    checkWiFiThenMQTT();
  }
  else
  {
    client.loop();
    if (millis() - lastMillis > 5000)
    {
      lastMillis = millis();
      sendData("closed");
    }
  }
}
"
type or paste code here

I tested the credentials by using them in the desktop tool "MQTT Explorer". There I can publish to the topics and also receive the updates in the get/accepted topic