Here is the code to my solar powered weather station that has had its MQTT connection for 8+ months continous:
Project, use solar cells to generate power
2/2/2020
*/
#include <WiFi.h>
#include <PubSubClient.h>
#include "certs.h"
#include "sdkconfig.h"
#include "esp_system.h"
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "freertos/timers.h"
#include "freertos/event_groups.h"
#include <SPI.h>
#include <Adafruit_Sensor.h>
#include "Adafruit_BME680.h"
#include <ESP32Time.h>
////
#define evtDoMQTTwd ( 1 << 1 )
EventGroupHandle_t eg; // variable for the event group handle
////
WiFiClient wifiClient;
PubSubClient MQTTclient(mqtt_server, mqtt_port, wifiClient);
ESP32Time rtc;
//////
Adafruit_BME680 bme( GPIO_NUM_5 );
///
/*
This semaphore is used to stop or prevent a publish from happening during client.loop()
*/
SemaphoreHandle_t sema_MQTT_KeepAlive;
SemaphoreHandle_t sema_mqttOK; // protects the int mqttOK
////
QueueHandle_t xQ_Message; // payload and topic queue of MQTT payload and topic
const int payloadSize = 300;
bool TimeSet = false;
struct stu_message
{
char payload [payloadSize] = {'\0'};
String topic ;
} x_message;
////
int mqttOK = 0; // stores a count value that is used to cause an esp reset
////
void IRAM_ATTR mqttCallback(char* topic, byte * payload, unsigned int length)
{
// clear locations
memset( x_message.payload, '\0', payloadSize );
x_message.topic = ""; //clear string buffer
x_message.topic = topic;
int i = 0;
for ( i; i < length; i++)
{
x_message.payload[i] = ((char)payload[i]);
}
x_message.payload[i] = '\0';
xQueueOverwrite( xQ_Message, (void *) &x_message );// send data to queue
} // void mqttCallback(char* topic, byte* payload, unsigned int length)
////
////
// interrupt service routine for WiFi events put into IRAM
void IRAM_ATTR WiFiEvent(WiFiEvent_t event)
{
switch (event) {
case SYSTEM_EVENT_STA_CONNECTED:
break;
case SYSTEM_EVENT_STA_DISCONNECTED:
log_i("Disconnected from WiFi access point");
break;
case SYSTEM_EVENT_AP_STADISCONNECTED:
log_i("WiFi client disconnected");
break;
default: break;
}
} // void IRAM_ATTR WiFiEvent(WiFiEvent_t event)
////
////
void setup()
{
x_message.topic.reserve( payloadSize );
xQ_Message = xQueueCreate( 1, sizeof(stu_message) );
sema_mqttOK = xSemaphoreCreateBinary();
xSemaphoreGive( sema_mqttOK );
eg = xEventGroupCreate(); // get an event group handle
xTaskCreatePinnedToCore( MQTTkeepalive, "MQTTkeepalive", 20000, NULL, 5, NULL, 1 );
xTaskCreatePinnedToCore( fDoBME, "fDoBME", 20000, NULL, 3, NULL, 1 ); // assigned to core
xTaskCreatePinnedToCore( fparseMQTT, "fparseMQTT", 10000, NULL, 4, NULL, 1 ); // assign all to core 1, WiFi in use.
xTaskCreatePinnedToCore( fmqttWatchDog, "fmqttWatchDog", 3000, NULL, 3, NULL, 1 ); // assign all to core 1
} //void setup()
////
////
void fmqttWatchDog( void * paramater )
{
int UpdateImeTrigger = 86400; //seconds in a day
int UpdateTimeInterval = 85000; // get another reading when = UpdateTimeTrigger
int maxNonMQTTresponse = 3;
TickType_t xLastWakeTime = xTaskGetTickCount();
const TickType_t xFrequency = 60000; //delay for mS
for (;;)
{
xLastWakeTime = xTaskGetTickCount();
vTaskDelayUntil( &xLastWakeTime, xFrequency );
xSemaphoreTake( sema_mqttOK, portMAX_DELAY ); // update mqttOK
mqttOK++;
xSemaphoreGive( sema_mqttOK );
if ( mqttOK >= maxNonMQTTresponse )
{
log_i( "mqtt watchdog rest" );
vTaskDelay( 200 );
ESP.restart();
}
UpdateTimeInterval++; // trigger new time get
if ( UpdateTimeInterval >= UpdateImeTrigger )
{
TimeSet = false; // sets doneTime to false to get an updated time after a days count of seconds
UpdateTimeInterval = 0;
}
}
vTaskDelete( NULL );
} //void fmqttWatchDog( void * paramater )
////
void connectToMQTT()
{
MQTTclient.setKeepAlive( 90 ); // needs be made before connecting
byte mac[5];
WiFi.macAddress(mac); // get mac address
String clientID = String(mac[0]) + String(mac[4]) ; // use mac address to create clientID
while ( !MQTTclient.connected() )
{
// boolean connect(const char* id, const char* user, const char* pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage);
MQTTclient.connect( clientID.c_str(), mqtt_username, mqtt_password, NULL , 1, true, NULL );
vTaskDelay( 250 );
}
MQTTclient.setCallback( mqttCallback );
MQTTclient.subscribe( topicOK );
log_i("MQTT Connected");
} // void connectToMQTT()
////
void fDoBME ( void *pvParameters )
{
if (!bme.begin()) {
log_i("Could not find a valid BME680 sensor, check wiring!");
while (1);
}
// Set up oversampling and filter initialization
bme.setTemperatureOversampling(BME680_OS_8X);
bme.setHumidityOversampling(BME680_OS_2X);
bme.setPressureOversampling(BME680_OS_4X);
bme.setIIRFilterSize(BME680_FILTER_SIZE_3);
bme.setGasHeater(320, 150); // 320*C for 150 ms
TickType_t xLastWakeTime = xTaskGetTickCount();
const TickType_t xFrequency = 1000 * 15;
//wait for a mqtt connection
while ( !MQTTclient.connected() )
{
vTaskDelay( 250 );
}
String bmeInfo = "";
bmeInfo.reserve( 100 );
int Count = 0;
for ( ;; )
{
bmeInfo.concat( String((bme.readTemperature() * 1.8f) + 32.0f, 4) ); // (Celsius x 1.8) + 32
bmeInfo.concat( "," );
bmeInfo.concat( String( bme.readPressure() / 133.3223684f,4) ); //mmHg
bmeInfo.concat( "," );
bmeInfo.concat( String(bme.readHumidity(),3) );
bmeInfo.concat( "," );
bmeInfo.concat( String(bme.readGas()) );
xSemaphoreTake( sema_MQTT_KeepAlive, portMAX_DELAY );
MQTTclient.publish( topicOutside, bmeInfo.c_str() );
xSemaphoreGive( sema_MQTT_KeepAlive );
xEventGroupSetBits( eg, evtDoMQTTwd );
Count++;
log_i( "Tick from line 142 count is %d", Count );
bmeInfo = "";
xLastWakeTime = xTaskGetTickCount();
vTaskDelayUntil( &xLastWakeTime, xFrequency );
} // for loop
vTaskDelete ( NULL );
} // void fDoBME ( void *pvParameters )
////
/*
Important to not set vTaskDelay to less then 10. Errors begin to develop with the MQTT and network connection.
makes the initial wifi/mqtt connection and works to keeps those connections open.
*/
void MQTTkeepalive( void *pvParameters )
{
sema_MQTT_KeepAlive = xSemaphoreCreateBinary();
xSemaphoreGive( sema_MQTT_KeepAlive ); // found keep alive can mess with a publish, stop keep alive during publish
// setting must be set before a mqtt connection is made
MQTTclient.setKeepAlive( 90 ); // setting keep alive to 90 seconds makes for a very reliable connection, must be set before the 1st connection is made.
for (;;)
{
//check for a is-connected and if the WiFi 'thinks' its connected, found checking on both is more realible than just a single check
if ( (wifiClient.connected()) && (WiFi.status() == WL_CONNECTED) )
{
xSemaphoreTake( sema_MQTT_KeepAlive, portMAX_DELAY ); // whiles MQTTlient.loop() is running no other mqtt operations should be in process
MQTTclient.loop();
xSemaphoreGive( sema_MQTT_KeepAlive );
}
else {
log_i( "MQTT keep alive found MQTT status %s WiFi status %s", String(wifiClient.connected()), String(WiFi.status()) );
if ( !(wifiClient.connected()) || !(WiFi.status() == WL_CONNECTED) )
{
connectToWiFi();
}
connectToMQTT();
}
vTaskDelay( 250 ); //task runs approx every 250 mS
}
vTaskDelete ( NULL );
}
////
void connectToWiFi()
{
int TryCount = 0;
//log_i( "connect to wifi" );
while ( WiFi.status() != WL_CONNECTED )
{
TryCount++;
WiFi.disconnect();
WiFi.begin( SSID, PASSWORD );
vTaskDelay( 4000 );
if ( TryCount == 10 )
{
ESP.restart();
}
}
WiFi.onEvent( WiFiEvent );
} // void connectToWiFi()
////
void fparseMQTT( void *pvParameters )
{
struct stu_message px_message;
for (;;)
{
if ( xQueueReceive(xQ_Message, &px_message, portMAX_DELAY) == pdTRUE )
{
xSemaphoreTake( sema_mqttOK, portMAX_DELAY );
mqttOK = 0;
xSemaphoreGive( sema_mqttOK );
if ( !TimeSet)
{
if ( String(px_message.topic) == topicOK )
{
String temp = "";
temp = px_message.payload[0];
temp += px_message.payload[1];
temp += px_message.payload[2];
temp += px_message.payload[3];
int year = temp.toInt();
temp = "";
temp = px_message.payload[5];
temp += px_message.payload[6];
int month = temp.toInt();
temp = "";
temp = px_message.payload[8];
temp += px_message.payload[9];
int day = temp.toInt();
temp = "";
temp = px_message.payload[11];
temp += px_message.payload[12];
int hour = temp.toInt();
temp = "";
temp = px_message.payload[14];
temp += px_message.payload[15];
int min = temp.toInt();
rtc.setTime( 0, min, hour, day, month, year );
log_i( "%s rtc %s ", px_message.payload, rtc.getTime() );
TimeSet = true;
}
}
}
} //for(;;)
vTaskDelete( NULL );
} // void fparseMQTT( void *pvParameters )
////
void loop() {}
////
The code is for ESP32's and uses WiFi.