Here is some ESP32 MQTT code that runs and runs and runs and so on and so forth.
/*
10/17/2020 Ambrose Campbell
Project to display MQTT data on a OLED screen
*/
#include <WiFi.h>
#include <PubSubClient.h>
#include "certs.h"
#include "sdkconfig.h"
#include "esp_system.h"
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "freertos/timers.h"
#include "freertos/event_groups.h"
#include <Adafruit_GFX.h>
#include <Adafruit_SSD1351.h>
#include "time.h"
//
hw_timer_t * timer = NULL;
EventGroupHandle_t eg;
#define evtDoDisplay ( 1 << 1 )
#define evtCollectHistory ( 1 << 2 )
#define evtParseMQTT ( 1 << 3 )
// the events in this event groups tasks have been given a delay to avoid a possible race condition.
#define evtGroupBits ( evtCollectHistory | evtDoDisplay )
// CS GPIO_NUM_27, DC GPIO_NUM_12, DIN GPIO_NUM_13, CLK GPIO_NUM_14, RST GPIO_NUM_26
Adafruit_SSD1351 tft = Adafruit_SSD1351( 128, 128, GPIO_NUM_27, GPIO_NUM_12, GPIO_NUM_13, GPIO_NUM_14, GPIO_NUM_26 );
////
WiFiClient wifiClient;
PubSubClient MQTTclient( mqtt_server, mqtt_port, wifiClient );
//////
// black, blue, red, green, cyan, magenta, yellow, white, lightyellow, brown
const int DataCells = 60;
int ColorPalette[10] = { 0x0000, 0x001F, 0xF800, 0x07E0, 0x07FF, 0xF81F, 0xFFE0, 0xFFFF, 0xFFFFD8, 0xFF8040 };
int arrayCellPtr = 0;
float oPressureHistory[DataCells] = {0.0f};
String str_eTopic;
char strPayload [300] = {NULL};
float oTemperature = 0.0f;
float oHumidity = 0.0f;
float oIAQ = 0.0f; // Indexed Air Quality
float oPressure = 0.0f;
byte mac[6];
int mqttOK = 0;
////
////
SemaphoreHandle_t sema_HistoryCompleted;
SemaphoreHandle_t sema_MQTT_Parser;;
SemaphoreHandle_t sema_MQTT_KeepAlive;
SemaphoreHandle_t sema_mqttOK;
////
volatile int iDoTheThing = 0;
////
void IRAM_ATTR mqttCallback(char* topic, byte * payload, unsigned int length)
{
xSemaphoreTake( sema_MQTT_Parser, portMAX_DELAY);
str_eTopic = topic;
int i = 0;
for ( i; i < length; i++)
{
strPayload[i] = ((char)payload[i]);
}
strPayload[i] = NULL;
//log_i( "topic %s payload %s" ,str_eTopicPtr, strPayloadPtr );
xSemaphoreGive ( sema_MQTT_Parser );
xEventGroupSetBits( eg, evtParseMQTT ); // trigger tasks
} // void mqttCallback(char* topic, byte* payload, unsigned int length)
////
void IRAM_ATTR WiFiEvent(WiFiEvent_t event)
{
switch (event) {
case SYSTEM_EVENT_STA_CONNECTED:
log_i("Connected to access point");
break;
case SYSTEM_EVENT_STA_DISCONNECTED:
log_i("Disconnected from WiFi access point");
break;
case SYSTEM_EVENT_AP_STADISCONNECTED:
log_i("WiFi client disconnected");
break;
default: break;
}
}
////
void IRAM_ATTR onTimer()
{
BaseType_t xHigherPriorityTaskWoken;
iDoTheThing++;
if ( iDoTheThing == 60000 )
{
xEventGroupSetBitsFromISR( eg, evtGroupBits, &xHigherPriorityTaskWoken );
iDoTheThing = 0;
}
}
////
void setup()
{
/* Use 4th timer of 4.
1 tick 1/(80MHZ/80) = 1us set divider 80 and count up.
Attach onTimer function to timer
Set alarm to call timer ISR, every 1000uS and repeat / reset ISR (true) after each alarm
Start an timer alarm
*/
timer = timerBegin( 3, 80, true );
timerAttachInterrupt( timer, &onTimer, true );
timerAlarmWrite(timer, 1000, true);
timerAlarmEnable(timer);
//
str_eTopic.reserve(300);
//
eg = xEventGroupCreate();
//
tft.begin();
tft.setRotation( 1 );
tft.fillScreen(0x0000);
//
sema_mqttOK = xSemaphoreCreateBinary();
sema_MQTT_Parser = xSemaphoreCreateBinary();
sema_HistoryCompleted = xSemaphoreCreateBinary();
sema_MQTT_KeepAlive = xSemaphoreCreateBinary();
xSemaphoreGive( sema_HistoryCompleted );
xSemaphoreGive( sema_MQTT_KeepAlive ); // found keep alive can mess with a publish, stop keep alive during publish
xSemaphoreGive( sema_mqttOK );
////
xTaskCreatePinnedToCore( fparseMQTT, "fparseMQTT", 7000, NULL, 5, NULL, 1 ); // assign all to core 1, WiFi in use.
xTaskCreatePinnedToCore( fCollectHistory, "fCollectHistory", 10000, NULL, 4, NULL, 1 );
xTaskCreatePinnedToCore( MQTTkeepalive, "MQTTkeepalive", 7000, NULL, 2, NULL, 1 ); //this task makes a WiFi and MQTT connection.
xTaskCreatePinnedToCore( fUpdateDisplay, "fUpdateDisplay", 50000, NULL, 5, NULL, 1 );
xTaskCreatePinnedToCore( fmqttWatchDog, "fmqttWatchDog", 3000, NULL, 3, NULL, 1 ); // assign all to core 1
////
} //setup() END
////
////
void fmqttWatchDog( void * paramater )
{
int maxNonMQTTresponse = 5;
//wait for a mqtt connection
while ( !MQTTclient.connected() )
{
vTaskDelay( 250 );
}
TickType_t xLastWakeTime = xTaskGetTickCount();
const TickType_t xFrequency = 1000; //delay for mS
for (;;)
{
xLastWakeTime = xTaskGetTickCount();
vTaskDelayUntil( &xLastWakeTime, xFrequency );
if ( mqttOK >= maxNonMQTTresponse )
{
ESP.restart();
}
}
vTaskDelete( NULL );
} //void fmqttWatchDog( void * paramater )
////
/*
Collect history information
task triggered by hardware timer once a minute
stores history into PSRAM
Using a semaphore to protect the PSRAM from multiple tasks access the same PSRM locations
*/
void fCollectHistory( void * parameter )
{
int StorageTriggerCount = 119;
bool Tick = true;
int maxStorageTriggerCount = 120;
for (;;)
{
xEventGroupWaitBits (eg, evtCollectHistory, pdTRUE, pdTRUE, portMAX_DELAY );
xSemaphoreTake( sema_HistoryCompleted, portMAX_DELAY );
oPressureHistory[arrayCellPtr] = oPressure;
StorageTriggerCount++; //triggered by the timer isr once a minute
log_i( "storage trigger count %d", StorageTriggerCount );
if ( StorageTriggerCount == maxStorageTriggerCount )
{
arrayCellPtr++;
log_i( "arrayCellPtr %d", arrayCellPtr );
if ( arrayCellPtr == DataCells )
{
arrayCellPtr = 0;
}
StorageTriggerCount = 0;
}
xSemaphoreGive( sema_HistoryCompleted );
// log_i( " high watermark %d", uxTaskGetStackHighWaterMark( NULL ) );
}
vTaskDelete( NULL );
} //void fCollectHistory( void * parameter )
////
// Using a semaphore to protect the PSRAM from multiple tasks access the same PSRM locations
////
void fUpdateDisplay( void * parameter )
{
// Color definitions
// http://www.barth-dev.de/online/rgb565-color-picker/
/*
ColorPalette[0] = 0x0000; //BLACK
ColorPalette[1] = 0x001F; //BLUE
ColorPalette[2] = 0xF800; //RED
ColorPalette[3] = 0x07E0; //GREEN
ColorPalette[4] = 0x07FF; //CYAN
ColorPalette[5] = 0xF81F; //MAGENTA
ColorPalette[6] = 0xFFE0; //YELLOW
ColorPalette[7] = 0xFFFF; //WHITE
ColorPalette[8] = 0xFFFFD8; //LIGHTYELLOW
ColorPalette[9] = 0xFF8040; //BROWN
*/
int rowRef = 80;
int hRef = int( oPressureHistory[0] );
const int nextPoint = 2;
int nextCol = 0;
int16_t aqColor = 216; //minimum value
for (;;)
{
xEventGroupWaitBits (eg, evtDoDisplay, pdTRUE, pdTRUE, portMAX_DELAY ); //
xSemaphoreTake( sema_mqttOK, portMAX_DELAY );
mqttOK ++;
xSemaphoreGive( sema_mqttOK );
vTaskDelay( 1 );
tft.fillScreen( ColorPalette[0] );
tft.setTextSize( 2 );
log_i( "oTemperature %f", oTemperature );
if ( (oTemperature >= 73.0f) && (oTemperature <= 100.0f) )
{
log_i( "if ( (oTemperature >= 73.0f) && (oTemperature <= 100.0f) )" );
// green to decrease and red to increase no blue
int tempDiff = int( 100.0f - oTemperature );
log_i( "oTemperature %f tempDiff %d", oTemperature, tempDiff );
int NewRed = tempDiff << 10;
int NewGreen = 0x07E0 - (tempDiff << 5 );
//int NewRed = ColorPalette[3] - (tempDiff << 5 ) ; // load in green decreased by temperature
log_i( "NewRed %d NewGreen %d", NewRed, NewGreen );
NewRed |= NewGreen ;
log_i( "NewRed | NewGreen %d ", NewRed );
tft.setTextColor( NewRed );
}
if ( oTemperature <= 0.0f )
{
log_i("white");
tft.setTextColor( ColorPalette[7] ); // white
}
if ((oTemperature >= 32.0f) && (oTemperature < 72.0f) )
{
//log_i("if ((oTemperature >= 32.0f) && (oTemperature < 72.0f) )");
//blue decreases, no red, green increases
int tempDiff = int( oTemperature - 32.0f );
if( tempDiff > 31 ) tempDiff = 31;
log_i(" oTemperature %f tempDiff %d", oTemperature, tempDiff );
int NewBlue = ColorPalette[1] - tempDiff; // decrease blue
log_i( "NewBlue %d", NewBlue );
int NewGreen = (tempDiff) << 5 ;// increase green
log_i( "NewGreen %d", NewGreen );
NewGreen |= NewBlue; // combine blue into newgreen
log_i( "NewGreen or'ed %d", NewGreen );
tft.setTextColor( NewGreen );
}
if ((oTemperature >= 72.0f) && (oTemperature < 73.0f) ) tft.setTextColor( ColorPalette[3] ); //green
if ( (oTemperature < 32.0f) && (oTemperature >= 0.0f) )
{
log_i( "if ( (oTemperature < 32.0f) && (oTemperature >= 0.0f) )" );
int NewBlue = ColorPalette[1]; // blue
//blue is max for each step below 32 red and green increase by one
//get steps below 32
int tempDiff = int( 32.0f - oTemperature );
NewBlue = NewBlue + ( (tempDiff) << 5); //add green
NewBlue = NewBlue | (xOff << 10);//add red
tft.setTextColor( NewBlue ); // red
} //if( (oTemperature <= 32.0f) %% (oTemperature >= 0.0f) )
if ( oTemperature >= 100.0f ) tft.setTextColor( ColorPalette[2] ); // red
tft.setCursor( 0, 0 );
if ( oTemperature < 100.0f )
{
tft.print( "Tmp " + String(oTemperature) );
} else
{
tft.print( "Tmp " + String(int(oTemperature)) );
}
tft.setCursor( 0, 20 );
tft.setTextSize( 2 );
//set humidity color
// log_i( "%f", oHumidity );
// if ( oHumidity < 33.0f )
// {
// tft.setTextColor( ColorPalette[6] ); //yellow
// }
// if ( (oHumidity >= 33.0f) && ( oTemperature <= 60.0f) )
// {
// tft.setTextColor( ColorPalette[3] ); //green
// }
// if ( (oTemperature > 60.0f) )
// {
// tft.setTextColor( ColorPalette[2] ); //red
// }
//below 50%
if ( oHumidity < 49.0f )
{
//reduce green component, increase blue component, red @0
//get difference from 50%
int xOff = 2 * ( abs(50 - int(oHumidity)) ); // used to make adjustments to green and blue
//shift xOff to the left by 5 places reduce green by xOff
newGreen = ColorPalette[3] - ( (xOff) << 5); //green falls off by xOff
// increase blue by xOff
newGreen = newGreen | (xOff);
//log_i( "ColorPalette[3] %d newGreen %d", ColorPalette[3], newGreen );
tft.setTextColor( newGreen );
}
//@50%
if ( (oHumidity <= 51.0f) && (oHumidity >= 49.0f) )
{
tft.setTextColor( ColorPalette[3] ); //green
}
// above
if ( oHumidity > 51.0f )
{
//log_i( "humidity %f", oHumidity );
//reduce green component, increase red component, blue @0
//get difference from 50%
int tempDiff = (int)oHumidity - 50;
if( tempDiff > 31 ) tempDiff = 31;
int newGreen = ColorPalette[3] - ( tempDiff << 5 ); //green falls off by xOff
int newRed = tempDiff << 10;
//log_i( "newGreen %d", newGreen );
// increase red by xOff
newGreen = newGreen | tempDiff;
//log_i( "newGreen %d", newGreen );
tft.setTextColor( newGreen );
}
tft.print( "Hum " + String(oHumidity) );
// set AQI text color
if ( (oIAQ * 216) >= (90.0f * 216))
{
int redColor = (int)oIAQ - 90;
redColor *= 216;
int green = ColorPalette[3] - (redColor << 5);
int blue = ColorPalette[1] - redColor;
aqColor &= 0x7FF; //set green to and blue to max
redColor = redColor << 10;
aqColor |= redColor;
tft.setTextColor( aqColor );
}
if ( oIAQ * 216 > 60.0f * 216 && oIAQ * 216 < 90.0f * 216 )
{
int blueColor = (int)oIAQ - 60;
blueColor *= 216;
aqColor &= (0x7E0 - (blueColor << 5) ); //set green reduce green by blue
aqColor |= blueColor;
tft.setTextColor( aqColor );
}
if ( oIAQ * 216 <= 60.0f * 216 )
{
int greenColor = oIAQ * 216;
greenColor = greenColor << 5;
tft.setTextColor( greenColor );
}
// display AQI
tft.setCursor( 0, 40 );
tft.print( "AQI " + String(oIAQ) );
//
tft.setTextSize( 1 );
tft.setTextColor( ColorPalette[1] ); //set graph line color
tft.setCursor( 0, 118 );
tft.print( " Pres: " + String(oPressure) + "mmHg" );
hRef = int( oPressureHistory[0] );
nextCol = 0;
xSemaphoreTake( sema_HistoryCompleted, portMAX_DELAY );
for (int i = 0; i <= DataCells; i++)
{
int hDisplacement = hRef - int( oPressureHistory[i] ); // cell height displacement from baseline
tft.setCursor( nextCol , (rowRef + hDisplacement) );
tft.print( "_" );
nextCol += nextPoint;
//place vertical bar representing a 24 hour time period.
if ( (nextCol % 24) == 0 )
{
//tft.drawLine( col, row, width, height, color );
tft.drawLine( uint16_t(nextCol), uint16_t(rowRef + 5), uint16_t(nextCol), uint16_t(rowRef + 11), ColorPalette[7] );
}
}
// advance cursor to place a begin of graph marker. each letter is 5 spaces wide but graph position marker only advances 2 spaces
// to keep cursor at begining place cursor +2 the nextCol position.
tft.setCursor( (arrayCellPtr * nextPoint), (rowRef + 3) );
tft.print( "I" );
xSemaphoreGive( sema_HistoryCompleted );
// log_i( "fUpdateDisplay MEMORY WATERMARK %d", uxTaskGetStackHighWaterMark(NULL) );
}
vTaskDelete( NULL );
} // void fUpdateDisplay( void * parameter )
/*
Important to not set vTaskDelay to less then 10. Errors begin to develop with the MQTT and network connection.
makes the initial wifi/mqtt connection and works to keeps those connections open.
*/
void MQTTkeepalive( void *pvParameters )
{
// setting must be set before a mqtt connection is made
MQTTclient.setKeepAlive( 90 ); // setting keep alive to 90 seconds makes for a very reliable connection, must be set before the 1st connection is made.
TickType_t xLastWakeTime = xTaskGetTickCount();
const TickType_t xFrequency = 250; // 250mS
for (;;)
{
//check for a is connected and if the WiFi thinks its connected, found checking on both is more realible than just a single check
if ( (wifiClient.connected()) && (WiFi.status() == WL_CONNECTED) )
{
xSemaphoreTake( sema_MQTT_KeepAlive, portMAX_DELAY ); // whiles loop() is running no other mqtt operations should be in process
MQTTclient.loop();
xSemaphoreGive( sema_MQTT_KeepAlive );
}
else {
log_i( "MQTT keep alive found MQTT status %s WiFi status %s", String(wifiClient.connected()), String(WiFi.status()) );
if ( !(WiFi.status() == WL_CONNECTED) )
{
connectToWiFi();
}
connectToMQTT();
}
xLastWakeTime = xTaskGetTickCount();
vTaskDelayUntil( &xLastWakeTime, xFrequency );
}
vTaskDelete ( NULL );
}
////
void connectToMQTT()
{
// create client ID from mac address
byte mac[5];
WiFi.macAddress(mac); // get mac address
String clientID = String(mac[0]) + String(mac[4]) ; // use mac address to create clientID
log_i( "connect to mqtt as client %s", clientID );
while ( !MQTTclient.connected() )
{
MQTTclient.connect( clientID.c_str(), mqtt_username, mqtt_password );
log_i( "connecting to MQTT" );
vTaskDelay( 250 );
}
log_i("MQTT Connected");
MQTTclient.setCallback( mqttCallback );
MQTTclient.subscribe( mqtt_topic );
}
//
void connectToWiFi()
{
log_i( "connect to wifi" );
while ( WiFi.status() != WL_CONNECTED )
{
WiFi.disconnect();
WiFi.begin( SSID, PASSWORD );
log_i(" waiting on wifi connection" );
vTaskDelay( 4000 );
}
log_i( "Connected to WiFi" );
WiFi.onEvent( WiFiEvent );
}
////
void fparseMQTT( void *pvParameters )
{
xSemaphoreGive ( sema_MQTT_Parser );
for (;;)
{
xEventGroupWaitBits (eg, evtParseMQTT, pdTRUE, pdTRUE, portMAX_DELAY ); //
xSemaphoreTake( sema_mqttOK, portMAX_DELAY );
mqttOK = 0;
xSemaphoreGive( sema_mqttOK );
xSemaphoreTake( sema_MQTT_Parser, portMAX_DELAY );
if ( str_eTopic == topicOutsideTemperature )
{
oTemperature = String(strPayload).toFloat();
}
if ( (String)str_eTopic == topicOutsideHumidity )
{
oHumidity = String(strPayload).toFloat();
}
if ( (String)str_eTopic == topicAQIndex )
{
oIAQ = String(strPayload).toFloat();
}
if ( String(str_eTopic) == topicOutsidePressure )
{
oPressure = String(strPayload).toFloat();
}
// clear pointer locations
memset( strPayload, '\0', 300 );
str_eTopic = ""; //clear string buffer
xSemaphoreGive( sema_MQTT_Parser );
}
} // void fparseMQTT( void *pvParameters )
////
void loop() { }
Perhaps you can use it as a model.