Sketch stops sending MQTT messages after many hours

I have a Nano ESP32 in a remote location, sending measurements to an MQTT Broker. It has 2 I2C sensors (and SHT30 and a Melexis light sensor), both generating 2 measurements, and from the SHT measurements I calculate another, the dew point. So every 5 seconds it sends a set of data to the MQTT Broker, being the 5 measurements.

The MQTT Broker is Mosquito running on a Raspberry Pi.

To smooth the measurements I'm using the library RunningAverage, with buffer sizes of 5000, for 3 of the measurements.

This works well, but after a time the MQTT broker stops receiving data. This time can be 6 hours or up to 2 days. The ESP32 is still alive -- I can see it on a local network scan.

It seems that the messages are not being recorded by the MQTT Broker: I'm running the same code on a desktop example, again with the same sensors (except the buffer sizes are 10000) and haven't seen any stoppage in 2 days in the serial window. But the messages are no longer recorded in the MQTT Broker.

I can fix the problem by power-cycling the Nano ESP32, suggesting that the Nano is the source of the problem, rather than the Mosquito Broker.

Here is the code of the office version, with longer RunningAverage buffers:

#include <WiFi.h>
#include <MQTTClient.h>

#include <Arduino.h>
#include <Wire.h>
#include "Adafruit_SHT31.h"

#include <Adafruit_MLX90614.h>

#include "temperature.h"
#include "RunningAverage.h"

#define CLIENT_ID "ESP32T"  // CHANGE IT AS YOU DESIRE

const char WIFI_SSID[] = "<my-network-name>";                  // CHANGE TO YOUR WIFI SSID
const char WIFI_PASSWORD[] = "<my-password>";  // CHANGE TO YOUR WIFI PASSWORD
const char MQTT_BROKER_ADRRESS[] = "192.168.0.52";    // CHANGE TO MQTT BROKER'S IP ADDRESS
const int MQTT_PORT = 1883;
const char MQTT_USERNAME[] = "";  // CHANGE IT IF REQUIRED
const char MQTT_PASSWORD[] = "";  // CHANGE IT IF REQUIRED

// The MQTT topics that this device should publish/subscribe
#define PUBLISH_TOPIC_T_INT "esp32t/T_int"
#define PUBLISH_TOPIC_T_DOME "esp32t/T_dome"
#define PUBLISH_TOPIC_T_EXT "esp32t/T_ext"
#define PUBLISH_TOPIC_H_EXT "esp32t/H_ext"
#define PUBLISH_TOPIC_DP_EXT "esp32t/DP_ext"

#define SUBSCRIBE_TOPIC "esp32-001/receive"

#define PUBLISH_INTERVAL 5000  // 5 seconds
#define HEATER_CYCLE 30000     // 30 seconds

WiFiClient network;
MQTTClient mqtt = MQTTClient(256);

unsigned long lastPublishTime = 0;
unsigned long lastHeaterTime = 0;
unsigned long lastHeaterOnTime = 0;

float t_ext;
float h_ext;
float t_int;
float dp_ext;
float t_dome;

// we're going to put the ext-T and ext_RH values into a running average structure
// to remove the oscillation in thier values caused by the heater action

bool enableHeater = false;

Adafruit_SHT31 sht31 = Adafruit_SHT31();
Adafruit_MLX90614 mlx = Adafruit_MLX90614();

#define RA_LENGTH 10000                // we read 114 values in 5 sec (according to testing)
RunningAverage extTempRA(RA_LENGTH);   // so RA_LENGTH =  5000 -> 3.6 minutes
RunningAverage extHumRA(RA_LENGTH);    // so RA_LENGTH = 10000 -> 7.3 minutes
RunningAverage extDPRA(RA_LENGTH);     // so RA_LENGTH = 15000 -> 11 minutes
#define RA_MLX_LENGTH 1000             //
RunningAverage intMLX(RA_MLX_LENGTH);  // so about 0.7 minutes

int counter = 0;

void setup() {
  Serial.begin(9600);

  extTempRA.clear();
  extHumRA.clear();
  extDPRA.clear();

  // set the ADC attenuation to 11 dB (up to ~3.3V input)
  analogSetAttenuation(ADC_11db);
  WiFi.mode(WIFI_STA);
  WiFi.begin(WIFI_SSID, WIFI_PASSWORD);

  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print(".");
  }
  Serial.println();

  if (!sht31.begin(0x44)) {  // Set to 0x45 for alternate i2c addr
    Serial.println("Couldn't find SHT31");
    while (1) delay(1);
  }

  if (!mlx.begin()) {
    Serial.println("Error connecting to MLX sensor. Check wiring.");
    while (1)
      ;
  };

  Serial.print("Emissivity = ");
  Serial.println(mlx.readEmissivity());

  connectToMQTT();
}

void loop() {

  float dome;

  float sht_tmp;
  float sht_hum;
  float sht_dew;

  t_int = mlx.readAmbientTempC();
  dome = mlx.readObjectTempC();
  intMLX.addValue(dome);

  sht_tmp = sht31.readTemperature();
  sht_hum = sht31.readHumidity();
  sht_dew = dewPoint(sht_tmp, sht_hum);

  extTempRA.addValue(sht_tmp);
  extHumRA.addValue(sht_hum);
  extDPRA.addValue(sht_dew);

  counter++;

  mqtt.loop();

  if (millis() - lastPublishTime > PUBLISH_INTERVAL) {

    Serial.print("Counter: ");
    Serial.println(counter);

    Serial.print("DP RA elements used: ");
    Serial.println(extDPRA.getCount());

    Serial.print("DP std dev: ");
    Serial.println(extDPRA.getStandardDeviation());

    counter = 0;

    t_dome = intMLX.getAverage();
    h_ext = extHumRA.getAverage();
    t_ext = extTempRA.getAverage();
    dp_ext = extDPRA.getAverage();

    sendToMQTT();
    lastPublishTime = millis();
  }

  if (millis() - lastHeaterTime > HEATER_CYCLE) {
    enableHeater = !enableHeater;
    sht31.heater(enableHeater);

    Serial.print("Heater Enabled State: ");
    if (sht31.isHeaterEnabled())
      Serial.println("ENABLED");
    else
      Serial.println("DISABLED");

    lastHeaterTime = millis();
  }
}

void connectToMQTT() {
  // Connect to the MQTT broker
  mqtt.begin(MQTT_BROKER_ADRRESS, MQTT_PORT, network);

  // Create a handler for incoming messages
  mqtt.onMessage(messageHandler);

   Serial.print("Arduino Nano ESP32 - Connecting to MQTT broker");

  while (!mqtt.connect(CLIENT_ID, MQTT_USERNAME, MQTT_PASSWORD)) {
      Serial.print(".");
    delay(100);
  }
    Serial.println();

  if (!mqtt.connected()) {
    Serial.println("Arduino Nano ESP32 - MQTT broker Timeout!");
    return;
  }

  // Subscribe to a topic, the incoming messages are processed by messageHandler() function
  if (mqtt.subscribe(SUBSCRIBE_TOPIC))
      Serial.print("Arduino Nano ESP32 - Subscribed to the topic: ");
    else
    Serial.print("Arduino Nano ESP32 - Failed to subscribe to the topic: ");

    Serial.println(SUBSCRIBE_TOPIC);
    Serial.println("Arduino Nano ESP32  - MQTT broker Connected!");
}

void sendToMQTT() {
  /*
  StaticJsonDocument<200> message;
  message["timestamp"] = millis();
  message["data"] = analogRead(0);  // Or you can read data from other sensors
  char messageBuffer[512];
  serializeJson(message, messageBuffer);

  mqtt.publish(PUBLISH_TOPIC_T_INT, messageBuffer);
*/
  Serial.println("Arduino Nano ESP32 - sent to MQTT:");

  String T_int = String(t_int, 2);
  mqtt.publish(PUBLISH_TOPIC_T_INT, T_int);
  
  Serial.print("- topic: ");
  Serial.print(PUBLISH_TOPIC_T_INT);
  Serial.print("  - payload:");
  Serial.println(t_int);

  String T_dome = String(t_dome, 2);
  mqtt.publish(PUBLISH_TOPIC_T_DOME, T_dome);
  
  Serial.print("- topic: ");
  Serial.print(PUBLISH_TOPIC_T_DOME);
  Serial.print("  - payload:");
  Serial.println(t_dome);


  String T_ext = String(t_ext, 2);
  mqtt.publish(PUBLISH_TOPIC_T_EXT, T_ext);
  
  Serial.print("- topic: ");
  Serial.print(PUBLISH_TOPIC_T_EXT);
  Serial.print("  - payload:");
  Serial.println(t_ext);


  String H_ext = String(h_ext, 2);
  mqtt.publish(PUBLISH_TOPIC_H_EXT, H_ext);
  
  Serial.print("- topic: ");
  Serial.print(PUBLISH_TOPIC_H_EXT);
  Serial.print("  - payload:");
  Serial.println(h_ext);


  String DP_ext = String(dp_ext, 2);
  mqtt.publish(PUBLISH_TOPIC_DP_EXT, DP_ext);
  
  Serial.print("- topic: ");
  Serial.print(PUBLISH_TOPIC_DP_EXT);
  Serial.print("  - payload:");
  Serial.println(dp_ext);
 
}

void messageHandler(String &topic, String &payload) {
  /*
  Serial.println("Arduino Nano ESP32 - received from MQTT:");
  Serial.println("- topic: " + topic);
  Serial.println("- payload:");
  Serial.println(payload);
*/
}

Is the MQTT account one of the inexpensive sort - where every 5 seconds seems a lot of access - and it's bouncing you out?

1 Like

What happens when millis() overflows?

Add some kind of timer that can reset the entire board so that millis goes back to 0;

If a board reset is not wanted, set up a timer interrupt that sets a flag every PUBLISH_INTERVAL. Where your if (millis code is becomes if (flag

Your code assumes that the network connection stays up forever.
For long term operation you need to listen for WiFi events such as SYSTEM_EVENT_STA_DISCONNECTED and then take action to repair the network connection
Once you receive a SYSTEM_EVENT_STA_GOT_IP event then you have an IP address and can attempt to get MQTT working again.

2 Likes

Thanks for the reply.

The MQTT "account" isn't a commercial service. It's an MQTT Broker called Mosquito running on a local Raspberry Pi

Thanks for the various suggestions here.

It is true that I have to do something about the overflow of mills(), and I can use your suggestion in the later post.

But it is not the cause of the problem here -- mills() runs about 50 days before overflow, and the various versions of this software have only been running a few days -- indeed the delivery of the Nano ESP32's was only 42 days ago.

Thanks for this suggestion, and you make a good point.

I'm looking into it.

Even though it's on your Pi, I wonder if the routing doesn't take a packet through free testing devices which can and do get taken down frequently. Try a fee based service with a published uptime guarantee.

OK, concerning the 'millis()' overflow, or roll-over, I was concerned about it as I wrote the code -- I had a comment in there that I would need to address it.

However, I've been looking more carefully and due to the fact that my variables to hold the last value of 'millis()' and millis() itself, being unsigned longs, the subtraction as I have it will always yield a sensible result, even if millis() rolls-over.

Specifically if I have a value of lastPublishTime 1000 short of the roll-over value, then after 5 seconds, with millis() after roll-over, the value in millis() will be 3999.

The subtraction I have (millis() - lastPublishTime) will still give a result of 5000.

Why would it go via another service ? I'm explicitly giving the IP address and port of the service and it's on my local network.

Another datum -- I have 2 systems running this code: one on the remote system and the other on a Nano SP connected to the Arduino IDE on my desk so I can watch it. They have slightly different topic names, to distinguish them, but they are both publishing to the same Mosquitto server.

The desktop one failed to publish this morning, while the other one was happily publishing away, so it's not in the Mosquitto thing.

Well when you say it that way. Just tossing out ideas, I haven't worked with MQTT directly in a couple years now so I may be misremembering the topology. IF indeed it is from your source device to your router to your mosquito broker on your pi then what you see happening is not possible correct? If that is correct, then the path I described isn't correct. Does that make sense?
I just checked your original post and you said the NANO is remote. What is the path from the NANO to the Pi?

Oh wow, the unit on the desk should have a direct path to the Pi. That makes no sense.
My hunch is there must be some sort of external (to your network) addressing lookup but that is just a hunch based on what you are seeing. Try disconnecting your router from the internet and see what happens.

1 Like

To be clear about the topology. The MQTT Broker is running in an R-Pi on my desk. It is connected to our local network via an ethernet cable, which goes via 2 routers back to the main router.

The "desktop" ESP32 is also on my desk, and that goes through one node of a mesh wifi system which is connected to the same main router.

The "remote" ESP32 is in an enclosure on top of a 5m mast at my observatory, 100m from the house. It's part of a new all-sky camera I'm developing. This one is connected by wifi to another node on this mesh wifi network, which in this case is connected by ethernet to the observatory router, which is connected by optical fibre to the main house router.

So they are all internal to my network.

I'm not about to disconnect my main router from the outside world as that would be far too disruptive -- the drop-out interval can be days ! I would probably be evicted by the rest of the family !

In any case I'm now certain that the problem lies in one or the other ESP32's losing connectivity to the MQTT Broker:

  1. if one drops out, the other continues publishing to the MQTT Broker
  2. I've modified the code in the desk one and have seen that there are failures in the mqtt.loop command (which returns a boolean), and in the mqtt.publish command, which also returns a boolean. This corresponds to the MQTT Broker stopping receiving the published packets.

Power-cycling the ESP32 always restores the service so to me it's a no-brainer than the problem lies in the ESP32.

I'm now most of the way through implementing safeguards along the line suggested by mikb55 above.

Thanks for your ideas, and thinking about it, in any case !

Cheers,
Richard

1 Like