ESP32 PubSubClient MQTT SemaphoreHandle

Hello. In my code, I want to receive messages from mqtt using the callbac function. Unfortunately, I can't figure out why this feature doesn't work. No errors are displayed, everything else is working properly. I think I'm not setting the callbac function correctly in the semaphore. Help me with this please.

#include <WiFi.h>
#include <PubSubClient.h>
#include <ezButton.h>
#include <Preferences.h>
#include <GyverPortal.h>
#include "EasyNextionLibrary.h"

////////////////////////////////////////////////
//WiFi_MQTT
struct config_wifi_struct {
  char ssid[30];
  char pass[30];
  bool DHCP;
  char ip[16];
  char gt[16];
  char mas[16];
  char dns[16];
};

struct config_mqtt_struct {
  char mqtt_server[16];
  int mqtt_port;
  char mqtt_username[30];
  char mqtt_password[30];
};

config_wifi_struct str_wifi;
config_mqtt_struct str_mqtt;

WiFiClient wifiClient;
PubSubClient MQTTclient(wifiClient);
EasyNex myNex(Serial2);



SemaphoreHandle_t sema_MQTT_KeepAlive;

/// button_relay
const int BUTTON_PIN_1 = 27;
const int BUTTON_PIN_2 = 14;
const int LED_PIN_1 = 19;
const int LED_PIN_2 = 22;

ezButton button_1(BUTTON_PIN_1);
ezButton button_2(BUTTON_PIN_2);

int ledState_1 = LOW;
int ledState_2 = LOW;

/// preferens
Preferences preferences;

/// nextion

int mqtt_delay_send = 500;
int nextion_tom_index_1 = 1;
int nextion_tom_index_2 = 0;
int nextion_tom_index_3 = 0;

////////////////////////////////////////////
void setup()
{
  
  Serial.begin(115200);

  myNex.begin(115200);
  
  Serial.println(F("Start"));

  pinMode(LED_PIN_1, OUTPUT);
  pinMode(LED_PIN_2, OUTPUT);
  
  pinMode(4, OUTPUT);
  digitalWrite(4, LOW);
  
  button_1.setDebounceTime(50);
  button_2.setDebounceTime(50);
 
  if(!button_1.getStateRaw()) {
    loginPortal();
  }
  read_config_wifi_mqtt();
  DHCP_switch();
  sema_MQTT_KeepAlive = xSemaphoreCreateBinary();
  xSemaphoreGive(sema_MQTT_KeepAlive);
  xTaskCreatePinnedToCore(MQTTkeepalive, "MQTTkeepalive", 20000, NULL, 3, NULL, 1);
}

void loop() {
  myNex.NextionListen();
  button_1.loop();
  button_2.loop();
  
  if(button_1.isPressed()) {
    ledState_1 = !ledState_1;
    digitalWrite(LED_PIN_1, ledState_1);
    char val[5];
    itoa(ledState_1, val, 10);
    MQTTclient.publish("esp32/led1_st", val);
  }
  if(button_2.isPressed()) {
    ledState_2 = !ledState_2;
    digitalWrite(LED_PIN_2, ledState_2);
    char val[5]; 
    itoa(ledState_2, val, 10);
    MQTTclient.publish("esp32/led2_st", val);
  }
}

///////////_____MQTT_WIFI_SETTINGS
void MQTTkeepalive( void *pvParameters )
{
  MQTTclient.setServer(str_mqtt.mqtt_server, str_mqtt.mqtt_port);
  MQTTclient.setCallback(callback);
  MQTTclient.setKeepAlive(90);
  
  for (;;)
  {
    if ((wifiClient.connected()) && (WiFi.status() == WL_CONNECTED))
    {
      xSemaphoreTake(sema_MQTT_KeepAlive, portMAX_DELAY);
      MQTTclient.loop();
      xSemaphoreGive( sema_MQTT_KeepAlive );
    }
    else {
      Serial.print(F("MQTT keep alive found MQTT status: "));
      Serial.println(String(wifiClient.connected()));
      Serial.print(F("WiFi status: "));
      Serial.println(String(WiFi.status()));
      if (!(WiFi.status() == WL_CONNECTED))
      {
        connectToWiFi();
      }
      
      connectToMQTT();
      main_Portal();
    }
    vTaskDelay( 250 );
  }
  vTaskDelete ( NULL );
}

void connectToMQTT()
{
  String client_id = "esp32-client-";
  client_id += String(WiFi.macAddress());
  Serial.print(F("Connect to MQTT as client: "));
  
  
  
  
  Serial.println(client_id);

  while (!MQTTclient.connected())
  {
    MQTTclient.connect(client_id.c_str(), str_mqtt.mqtt_username, str_mqtt.mqtt_password);
    Serial.println(F("Connecting to MQTT"));
    vTaskDelay(250);
  }
  
  Serial.println(F("MQTT Connected"));
  MQTTclient.publish("esp32/output", "Hi -_-");
  MQTTclient.subscribe("/devices/panel_test/controls/bt_2_1_1");
  MQTTclient.subscribe("/devices/panel_test/controls/bt_2_2_1");
  MQTTclient.subscribe("/devices/panel_test/controls/bt_2_3_1");
}

void connectToWiFi()
{
  Serial.println(F("Connect to WiFi"));
  while ( WiFi.status() != WL_CONNECTED )
  {
    WiFi.disconnect();
    Serial.println(str_wifi.ssid);
    WiFi.begin(str_wifi.ssid, str_wifi.pass);
    Serial.println(F("waiting on wifi connection"));
    vTaskDelay( 4000 );
  }
  Serial.println(F("Connected to WiFi"));
  Serial.print(F("ip address: "));
  Serial.println(WiFi.localIP());
  WiFi.onEvent(WiFiEvent);
}

void WiFiEvent(WiFiEvent_t event)
{
  switch (event) {
    case SYSTEM_EVENT_STA_CONNECTED:
      Serial.println(F("Connected to access point"));
      break;
    case SYSTEM_EVENT_STA_DISCONNECTED:
      Serial.println(F("Disconnected from WiFi access point"));
      break;
    case SYSTEM_EVENT_AP_STADISCONNECTED:
      Serial.println(F("WiFi client disconnected"));
      break;
    default: break;
  }
}

void callback(char* topic, byte* message, unsigned int length) {
  String messageTemp;
  
  for (int i = 0; i < length; i++) {
    messageTemp += (char)message[i];
  }

  Serial.println(messageTemp);
  MQTTclient.publish("esp32/output", "0");

  //info_climate_device
  if (String(topic) == "/devices/panel_test/controls/bt_2_1_1") {
    myNex.writeNum("controll.bt2_2_1_1_st.val", messageTemp.toInt());
    MQTTclient.publish("esp32/output", "1");
    if(nextion_tom_index_1 == 1){
      myNex.writeNum("controll.bt2_2_1.val", messageTemp.toInt());
    }
  }
  
  if (String(topic) == "/devices/panel_test/controls/bt_2_2_1") {
    myNex.writeNum("controll.bt2_2_2_1_st.val", messageTemp.toInt());
    MQTTclient.publish("esp32/output", "2");
    if(nextion_tom_index_2 == 1){
      myNex.writeNum("controll.bt2_2_1.val", messageTemp.toInt());
    }
  }

  if (String(topic) == "/devices/panel_test/controls/bt_2_3_1") {
    myNex.writeNum("controll.bt2_2_3_1_st.val", messageTemp.toInt());
    MQTTclient.publish("esp32/output", "3");
    if(nextion_tom_index_3 == 1){
      myNex.writeNum("controll.bt2_2_1.val", messageTemp.toInt());
    }
  }
}

///////////_____WEB_Portal
void build_login() {
  BUILD_BEGIN();
  GP.THEME(GP_DARK);
  
  GP.FORM_BEGIN("/login");
  GP.TEXT("lg", "Login", str_wifi.ssid);
  GP.BREAK();
  GP.TEXT("ps", "Password", str_wifi.pass);
  GP.BREAK();
  GP.LABEL("DHCP: ");
  GP.BREAK();
  GP.CHECK("DHCP", str_wifi.DHCP);
  GP.BREAK();
  GP.TEXT("ip", "ip", str_wifi.ip);
  GP.BREAK();
  GP.TEXT("gt", "gt", str_wifi.gt);
  GP.BREAK();
  GP.TEXT("mas", "mas", str_wifi.mas);
  GP.BREAK();
  GP.TEXT("dns", "dns", str_wifi.dns);
  GP.BREAK();
  GP.TEXT("mqtt_server", "mqtt_server", str_mqtt.mqtt_server);
  GP.BREAK();
  GP.NUMBER("mqtt_port", "mqtt_port", str_mqtt.mqtt_port);
  GP.BREAK();
  GP.TEXT("mqtt_username", "mqtt_username", str_mqtt.mqtt_username);
  GP.BREAK();
  GP.TEXT("mqtt_password", "mqtt_password", str_mqtt.mqtt_password);
  GP.BREAK();
  GP.SUBMIT("Submit");
  GP.FORM_END();

  BUILD_END();
}

void build_main() {
  //const char* strLocalIp = WiFi.localIP().toString().c_str();
  //const char* cstrIp = strLocalIp.c_str();
  //String str_getHostname = WiFi.getHostname().toString();
  //const char* cstrIp = strLocalIp.c_str();
  //String str_macAddress = WiFi.macAddress().toString();
  //const char* cstrIp = strLocalIp.c_str();
  //String str_subnetMask = WiFi.subnetMask().toString();
  //const char* cstrIp = strLocalIp.c_str();
  //String str_gatewayIP = WiFi.gatewayIP().toString();
  //const char* cstrIp = strLocalIp.c_str();
  //String str_dnsIP = WiFi.dnsIP().toString();
  //const char* cstrIp = strLocalIp.c_str();
  
  BUILD_BEGIN();
  GP.THEME(GP_DARK);
  
  GP.FORM_BEGIN("/login");
  GP.TITLE("Onyx panel");
  GP.HR();
  GP.LABEL("IP address: ");
  //GP.LABEL(strLocalIp);
  GP.BREAK();
  GP.LABEL("Hostname: ");
  //GP.LABEL(str_getHostname);
  GP.BREAK();
  GP.LABEL("Mac Address: ");
  //GP.LABEL(str_macAddress);
  GP.BREAK();
  GP.LABEL("Subnet Mask: ");
  //GP.LABEL(str_subnetMask);
  GP.BREAK();
  GP.LABEL("Gateway IP: ");
  //GP.LABEL(str_gatewayIP);
  GP.BREAK();
  GP.LABEL("DNS: ");
  //GP.LABEL(str_dnsIP);
  GP.BREAK();
  GP.HR();
  GP.LABEL("MQTT IP: ");
  GP.LABEL(str_mqtt.mqtt_server);
  GP.BREAK();
  GP.LABEL("MQTT PORT: ");
  GP.LABEL(str_mqtt.mqtt_port);
  GP.BREAK();
  GP.FORM_END();

  BUILD_END();
}

void action_login(GyverPortal& p) {
  if (p.form("/login")) {
    p.copyStr("lg", str_wifi.ssid);
    p.copyStr("ps", str_wifi.pass);
    str_wifi.DHCP = p.getCheck("DHCP");
    p.copyStr("ip", str_wifi.ip);
    p.copyStr("gt", str_wifi.gt);
    p.copyStr("mas", str_wifi.mas);
    p.copyStr("dns", str_wifi.dns);
    
    p.copyStr("mqtt_server", str_mqtt.mqtt_server);
    str_mqtt.mqtt_port = p.getInt("mqtt_port");
    p.copyStr("mqtt_username", str_mqtt.mqtt_username);
    p.copyStr("mqtt_password", str_mqtt.mqtt_password);
    
    WiFi.softAPdisconnect();
    load_config_wifi_mqtt();
    
  }
}

void loginPortal() {
  Serial.println(F("Portal start"));

  // запускаем точку доступа
  WiFi.mode(WIFI_AP);
  WiFi.softAP("WiFi Config");

  // запускаем портал
  Serial.println(F("Запуск портала"));
  GyverPortal portal;
  portal.attachBuild(build_login);
  Serial.println(F("Запуск build_login"));
  portal.start(WIFI_AP);
  Serial.println(F("Запуск WIFI_AP"));
  portal.attach(action_login);
  Serial.println(F("Запуск action_login"));

  // работа портала
  while (portal.tick());
}

void main_Portal() {
  GyverPortal portal;
  portal.attachBuild(build_main);
  portal.start();
  portal.enableOTA();
  portal.log.start();

  // работа портала
  while (portal.tick());
}

///////////_____JSON_load_read
void load_config_wifi_mqtt(){
  preferences.begin("wifi_mqtt", false);
  
  String ssid = String(str_wifi.ssid);
  String pass = String(str_wifi.pass);
  String ip = String(str_wifi.ip);
  String gt = String(str_wifi.gt);
  String mas = String(str_wifi.mas);
  String dns = String(str_wifi.dns);

  String mqtt_server = String(str_mqtt.mqtt_server);
  String mqtt_username = String(str_mqtt.mqtt_username);
  String mqtt_password = String(str_mqtt.mqtt_password);

  preferences.putString("ssid", ssid);
  preferences.putString("pass", pass);
  preferences.putBool("DHCP", str_wifi.DHCP);
  preferences.putString("ip", ip);
  preferences.putString("gt", gt);
  preferences.putString("mas", mas);
  preferences.putString("dns", dns);

  preferences.putString("mqtt_server", mqtt_server);
  preferences.putInt("mqtt_port", str_mqtt.mqtt_port);
  preferences.putString("mqtt_username", mqtt_username);
  preferences.putString("mqtt_password", mqtt_password);

  preferences.end();
}

void read_config_wifi_mqtt(){
  preferences.begin("wifi_mqtt", false);

  String ssid = preferences.getString("ssid", "0");
  String pass = preferences.getString("pass", "0");
  str_wifi.DHCP = preferences.getBool("DHCP", true);
  String ip = preferences.getString("ip", "0");
  String gt = preferences.getString("gt", "0");
  String mas = preferences.getString("mas", "0");
  String dns = preferences.getString("dns", "0");

  String mqtt_server = preferences.getString("mqtt_server", "0");
  str_mqtt.mqtt_port = preferences.getInt("mqtt_port", 1883);
  String mqtt_username = preferences.getString("mqtt_username", "0");
  String mqtt_password = preferences.getString("mqtt_password", "0");

  preferences.end();

  strcpy(str_wifi.ssid, ssid.c_str());
  strcpy(str_wifi.pass, pass.c_str());
  strcpy(str_wifi.ip, ip.c_str());
  strcpy(str_wifi.gt, gt.c_str());
  strcpy(str_wifi.mas, mas.c_str());
  strcpy(str_wifi.dns, dns.c_str());

  strcpy(str_mqtt.mqtt_server, mqtt_server.c_str());
  strcpy(str_mqtt.mqtt_username, mqtt_username.c_str());
  strcpy(str_mqtt.mqtt_password, mqtt_password.c_str());
}

bool DHCP_switch (){
  if(!str_wifi.DHCP){
    IPAddress local_IP(ipaddr_addr(str_wifi.ip));
    IPAddress gateway(ipaddr_addr(str_wifi.gt));
    IPAddress subnet(ipaddr_addr(str_wifi.mas));
    IPAddress primaryDNS(ipaddr_addr(str_wifi.dns));
    
    if (WiFi.config(local_IP, gateway, subnet, primaryDNS) == false) {
      Serial.println("Configuration failed.");
      return false;
    }
    Serial.println("Configuration set.");
    return true;
  }
}


////nextion
void trigger0(){
  nextion_tom_index_1 = myNex.readNumber("controll.bt2_1_1.val");
  nextion_tom_index_2 = myNex.readNumber("controll.bt2_1_2.val");
  nextion_tom_index_3 = myNex.readNumber("controll.bt2_1_3.val");
}
void trigger21(){
  char mqtt_val[5];
  String val = "0";
  val.toCharArray(mqtt_val,5);
  MQTTclient.publish("/devices/panel_test/controls/bt_2_1_1/on", mqtt_val);
  val = "1";
  val.toCharArray(mqtt_val,5);
  delay(mqtt_delay_send);
  MQTTclient.publish("/devices/panel_test/controls/bt_2_1_1/on", mqtt_val);
}
void trigger22(){
  char mqtt_val[5];
  String val = "0";
  val.toCharArray(mqtt_val,5);
  MQTTclient.publish("/devices/panel_test/controls/bt_2_1_1/on", mqtt_val);
}
void trigger23(){
  char mqtt_val[5];
  String val = "1";
  val.toCharArray(mqtt_val,5);
  MQTTclient.publish("/devices/panel_test/controls/bt_2_1_1/on", mqtt_val);
}

What exactly does this mean ?

My advice would be not to use Strings

Here is a callback function that I know works

void callback(char* requestTopic, byte* payload, unsigned int length)
{
  char messageBuffer[30];
  memcpy(messageBuffer, payload, length);
  messageBuffer[length] = '\0';
  Serial.printf("%s\n", messageBuffer);
}

Where is MQTTclient.loop() ran?

Without a pubsubclient.loop() the callback function will not work.

You code does to much parsing in the mqttcallback.

I like to do it this way.


QueueHandle_t xQ_Message; // payload and topic queue of MQTT payload and topic
const int payloadSize = 100;
struct stu_message
{
  char payload [payloadSize] = {'\0'};
  String topic ;
} x_message;


void IRAM_ATTR mqttCallback(char* topic, byte * payload, unsigned int length)
{
  memset( x_message.payload, '\0', payloadSize ); // clear payload char buffer
  x_message.topic = ""; //clear topic string buffer
  x_message.topic = topic; //store new topic
  int i = 0; // extract payload
  for ( i; i < length; i++)
  {
    x_message.payload[i] = (char)payload[i];
  }
  x_message.payload[i] = '\0';
  xQueueOverwrite( xQ_Message, (void *) &x_message );// send data to queue
} // void mqttCallback(char* topic, byte* payload, unsigned int length)

voiding setup()
{
x_message.topic.reserve(100);
}

void fparseMQTT( void *pvParameters )
{
  struct stu_message px_message;
  for (;;)
  {
    if ( xQueueReceive(xQ_Message, &px_message, portMAX_DELAY) == pdTRUE )
    {
      // parse the time from the OK message and update MCU time
      if ( String(px_message.topic) == topicOK )
      {
        if ( !TimeSet)
        {
          String temp = "";
          temp =  px_message.payload[0];
          temp += px_message.payload[1];
          temp += px_message.payload[2];
          temp += px_message.payload[3];
          int year =  temp.toInt();
          temp = "";
          temp =  px_message.payload[5];
          temp += px_message.payload[6];
          int month =  temp.toInt();
          temp =  "";
          temp =  px_message.payload[8];
          temp += px_message.payload[9];
          int day =  temp.toInt();
          temp = "";
          temp = px_message.payload[11];
          temp += px_message.payload[12];
          int hour =  temp.toInt();
          temp = "";
          temp = px_message.payload[14];
          temp += px_message.payload[15];
          int min =  temp.toInt();
          rtc.setTime( 0, min, hour, day, month, year );
          log_i( "rtc  %s ", rtc.getTime() );
          TimeSet = true;
        }
      }
      //
    } //if ( xQueueReceive(xQ_Message, &px_message, portMAX_DELAY) == pdTRUE )
    xSemaphoreTake( sema_mqttOK, portMAX_DELAY );
    mqttOK = 0;
    xSemaphoreGive( sema_mqttOK );
  }
} // void fparseMQTT( void *pvParameters )#include <ESP32Time.h>

as one may be able to see the receiving of the MQTT package is separate from the parsing.

Where is MQTTclient.loop() ran?

void MQTTkeepalive( void *pvParameters )
{
  MQTTclient.setServer(str_mqtt.mqtt_server, str_mqtt.mqtt_port);
  MQTTclient.setCallback(callback);
  MQTTclient.setKeepAlive(90);
  
  for (;;)
  {
    if ((wifiClient.connected()) && (WiFi.status() == WL_CONNECTED))
    {
      xSemaphoreTake(sema_MQTT_KeepAlive, portMAX_DELAY);
      MQTTclient.loop();              ///////////////////here
      xSemaphoreGive( sema_MQTT_KeepAlive );
    }
    else {
      Serial.print(F("MQTT keep alive found MQTT status: "));
      Serial.println(String(wifiClient.connected()));
      Serial.print(F("WiFi status: "));
      Serial.println(String(WiFi.status()));
      if (!(WiFi.status() == WL_CONNECTED))
      {
        connectToWiFi();
      }
      
      connectToMQTT();
      main_Portal();
    }
    vTaskDelay( 250 );
  }
  vTaskDelete ( NULL );
}

My callbac function is very simple and I don't really care about the lines because they are cleared from RAM anyway. In other esp32 projects, I use the same callbac function and it works there, but I do not use multithreading there like here. I think the problem here is that I'm declaring .setCallback in the wrong place. Help me with this please.

If you want to multithread using freeRTOS then the loop() function must NOT contain any code. The loop() function is NOT guaranteed to run in a timely fashion with other tasks declared. Also, code in the loop() function stops the ESP32's OS from doing memory cleanup and other house keeping chores.

Once you've moved all the code out of loop() post the revised code here, please.

Where should I put all the code from the loop() function?

tasks.

see no loop() code:

/*
   Chappie Weather upgrade/addition
   process wind speed direction and rain fall.
*/
#include "esp32/ulp.h"
//#include "ulptool.h"
#include "driver/rtc_io.h"
#include <WiFi.h>
#include <PubSubClient.h>
#include "certs.h"
#include "sdkconfig.h"
#include "esp_system.h"
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "freertos/timers.h"
#include "freertos/event_groups.h"
#include <driver/pcnt.h>
#include <driver/adc.h>
#include <SimpleKalmanFilter.h>
#include <ESP32Time.h>
////
ESP32Time rtc;
WiFiClient wifiClient;
PubSubClient MQTTclient(mqtt_server, mqtt_port, wifiClient);
////
float CalculatedVoltage = 0.0f;
float kph = 0.0f;
float rain  = 0.0f;
/*
   PCNT PCNT_UNIT_0, PCNT_CHANNEL_0 GPIO_NUM_15 = pulse input pin
   PCNT PCNT_UNIT_1, PCNT_CHANNEL_0 GPIO_NUM_4 = pulse input pin
*/
pcnt_unit_t pcnt_unit00 = PCNT_UNIT_0; //pcnt unit 0 channel 0
pcnt_unit_t pcnt_unit10 = PCNT_UNIT_1; //pcnt unit 1 channel 0
//
//
hw_timer_t * timer = NULL;
//
#define evtAnemometer  ( 1 << 0 )
#define evtRainFall    ( 1 << 1 )
#define evtParseMQTT   ( 1 << 2 )
EventGroupHandle_t eg;
#define OneMinuteGroup ( evtAnemometer | evtRainFall )
////
QueueHandle_t xQ_Message; // payload and topic queue of MQTT payload and topic
const int payloadSize = 100;
struct stu_message
{
  char payload [payloadSize] = {'\0'};
  String topic ;
} x_message;
////
SemaphoreHandle_t sema_MQTT_KeepAlive; // used to stop all other MQTT thing do's
SemaphoreHandle_t sema_mqttOK; // protect the mqttOK variable.
SemaphoreHandle_t sema_CalculatedVoltage; // protects the CalculatedVoltage variable.
////
int mqttOK = 0; // stores a count value that is used to cause an esp reset
volatile bool TimeSet = false;
////
/*
   Topic topicOK has been subscribed to, the mqtt broker sends out "OK" messages if the client receives an OK message the mqttOK value is set back to zero.
   If the mqttOK count reaches a set point the ESP32 will reset.
*/
////
void IRAM_ATTR mqttCallback(char* topic, byte * payload, unsigned int length)
{
  memset( x_message.payload, '\0', payloadSize ); // clear payload char buffer
  x_message.topic = ""; //clear topic string buffer
  x_message.topic = topic; //store new topic
  int i = 0; // extract payload
  for ( i; i < length; i++)
  {
    x_message.payload[i] = (char)payload[i];
  }
  x_message.payload[i] = '\0';
  xQueueOverwrite( xQ_Message, (void *) &x_message );// send data to queue
} // void mqttCallback(char* topic, byte* payload, unsigned int length)
////
// interrupt service routine for WiFi events put into IRAM
void IRAM_ATTR WiFiEvent(WiFiEvent_t event)
{
  switch (event) {
    case SYSTEM_EVENT_STA_CONNECTED:
      break;
    case SYSTEM_EVENT_STA_DISCONNECTED:
      log_i("Disconnected from WiFi access point");
      break;
    case SYSTEM_EVENT_AP_STADISCONNECTED:
      log_i("WiFi client disconnected");
      break;
    default: break;
  }
} // void IRAM_ATTR WiFiEvent(WiFiEvent_t event)
////
void IRAM_ATTR onTimer()
{
  BaseType_t xHigherPriorityTaskWoken;
  xEventGroupSetBitsFromISR(eg, OneMinuteGroup, &xHigherPriorityTaskWoken);
} // void IRAM_ATTR onTimer()
////
void setup()
{
  eg = xEventGroupCreate(); // get an event group handle
  x_message.topic.reserve(100);
  adc1_config_width(ADC_WIDTH_12Bit);
  adc1_config_channel_atten(ADC1_CHANNEL_6, ADC_ATTEN_DB_11);// using GPIO 34 wind direction
  adc1_config_channel_atten(ADC1_CHANNEL_3, ADC_ATTEN_DB_11);// using GPIO 39 current
  adc1_config_channel_atten(ADC1_CHANNEL_0, ADC_ATTEN_DB_11);// using GPIO 36 battery volts

  // hardware timer 4 set for one minute alarm
  timer = timerBegin( 3, 80, true );
  timerAttachInterrupt( timer, &onTimer, true );
  timerAlarmWrite(timer, 60000000, true);
  timerAlarmEnable(timer);
  /* Initialize PCNT's counter */
  int PCNT_H_LIM_VAL         = 3000;
  int PCNT_L_LIM_VAL         = -10;
  // 1st PCNT counter
  // Anemometer
  pcnt_config_t pcnt_config  = {};
  pcnt_config.pulse_gpio_num = GPIO_NUM_15;// Set PCNT input signal and control GPIOs
  pcnt_config.ctrl_gpio_num  = PCNT_PIN_NOT_USED;
  pcnt_config.channel        = PCNT_CHANNEL_0;
  pcnt_config.unit           = PCNT_UNIT_0;
  // What to do on the positive / negative edge of pulse input?
  pcnt_config.pos_mode       = PCNT_COUNT_INC;   // Count up on the positive edge
  pcnt_config.neg_mode       = PCNT_COUNT_DIS;   // Count down disable
  // What to do when control input is low or high?
  pcnt_config.lctrl_mode     = PCNT_MODE_KEEP; // Keep the primary counter mode if low
  pcnt_config.hctrl_mode     = PCNT_MODE_KEEP;    // Keep the primary counter mode if high
  // Set the maximum and minimum limit values to watch
  pcnt_config.counter_h_lim  = PCNT_H_LIM_VAL;
  pcnt_config.counter_l_lim  = PCNT_L_LIM_VAL;
  pcnt_unit_config(&pcnt_config); // Initialize PCNT unit
  // 12.5ns is one APB_CLK cycle 12.5*500, debounce time
  pcnt_set_filter_value( PCNT_UNIT_0, 500); //Configure and enable the input filter, debounce
  pcnt_filter_enable( PCNT_UNIT_0 );
  pcnt_counter_pause( PCNT_UNIT_0 );
  pcnt_counter_clear( PCNT_UNIT_0 );
  pcnt_counter_resume( PCNT_UNIT_0); // start the show
  // setup 2nd PCNT
  pcnt_config = {};
  pcnt_config.pulse_gpio_num = GPIO_NUM_4;
  pcnt_config.ctrl_gpio_num  = PCNT_PIN_NOT_USED;
  pcnt_config.channel        = PCNT_CHANNEL_0;
  pcnt_config.unit           = PCNT_UNIT_1;
  pcnt_config.pos_mode       = PCNT_COUNT_INC;
  pcnt_config.neg_mode       = PCNT_COUNT_DIS;
  pcnt_config.lctrl_mode     = PCNT_MODE_KEEP;
  pcnt_config.hctrl_mode     = PCNT_MODE_KEEP;
  pcnt_config.counter_h_lim  = PCNT_H_LIM_VAL;
  pcnt_config.counter_l_lim  = PCNT_L_LIM_VAL;
  pcnt_unit_config(&pcnt_config);
  pcnt_set_filter_value( PCNT_UNIT_1, 500 );
  pcnt_filter_enable  ( PCNT_UNIT_1 );
  pcnt_counter_pause  ( PCNT_UNIT_1 );
  pcnt_counter_clear  ( PCNT_UNIT_1 );
  pcnt_counter_resume ( PCNT_UNIT_1 );
  //
  xQ_Message = xQueueCreate( 1, sizeof(stu_message) );
  //
  sema_CalculatedVoltage = xSemaphoreCreateBinary();
  xSemaphoreGive( sema_CalculatedVoltage );
  sema_mqttOK = xSemaphoreCreateBinary();
  xSemaphoreGive( sema_mqttOK );
  sema_MQTT_KeepAlive = xSemaphoreCreateBinary();
  ///
  xTaskCreatePinnedToCore( MQTTkeepalive, "MQTTkeepalive", 10000, NULL, 5, NULL, 1 );
  xTaskCreatePinnedToCore( fparseMQTT, "fparseMQTT", 10000, NULL, 5, NULL, 1 ); // assign all to core 1, WiFi in use.
  xTaskCreatePinnedToCore( fReadBattery, "fReadBattery", 4000, NULL, 3, NULL, 1 );
  xTaskCreatePinnedToCore( fReadCurrent, "fReadCurrent", 4000, NULL, 3, NULL, 1 );
  xTaskCreatePinnedToCore( fWindDirection, "fWindDirection", 10000, NULL, 4, NULL, 1 );
  xTaskCreatePinnedToCore( fAnemometer, "fAnemometer", 10000, NULL, 4, NULL, 1 );
  xTaskCreatePinnedToCore( fRainFall, "fRainFall", 10000, NULL, 4, NULL, 1 );
  xTaskCreatePinnedToCore( fmqttWatchDog, "fmqttWatchDog", 3000, NULL, 3, NULL, 1 ); // assign all to core 1
} //void setup()
static void init_ulp_program()
{
// not sharing this code.
}
////
void fWindDirection( void *pvParameters )
// read the wind direction sensor, return heading in degrees
{
  SimpleKalmanFilter KF_ADC( 1.0f, 1.0f, .01f );
  const TickType_t xFrequency = 100; //delay for mS
  float    adcValue = 0.0f;
  uint64_t TimePastKalman  = esp_timer_get_time();
  float    high = 0.0f;
  float    low = 2000.0f;
  float    ADscale = 3.3f / 4096.0f;
  int      count = 0;
  String   windDirection;
  String   MQTTinfo = "";
  windDirection.reserve(20);
  MQTTinfo.reserve( 150 );
  TickType_t xLastWakeTime = xTaskGetTickCount();
  while ( !MQTTclient.connected() )
  {
    vTaskDelay( 250 );
  }
  for (;;)
  {
    windDirection = "";
    adcValue = float( adc1_get_raw(ADC1_CHANNEL_6) ); //take a raw ADC reading
    KF_ADC.setProcessNoise( (esp_timer_get_time() - TimePastKalman) / 1000000.0f ); //get time, in microsecods, since last readings
    adcValue = KF_ADC.updateEstimate( adcValue ); // apply simple Kalman filter
    TimePastKalman = esp_timer_get_time(); // time of update complete
    adcValue = adcValue * ADscale;
    if ( (adcValue >= 0.0f) & (adcValue <= .25f )  )
    {
      // log_i( " n" );
      windDirection.concat( "N" );
    }
    if ( (adcValue > .25f) & (adcValue <= .6f ) )
    {
      //  log_i( " e" );
      windDirection.concat( "E" );
    }
    if ( (adcValue > 2.0f) & ( adcValue < 3.3f) )
    {
      //   log_i( " s" );
      windDirection.concat( "S");
    }
    if ( (adcValue >= 1.7f) & (adcValue < 2.0f ) )
    {
      // log_i( " w" );
      windDirection.concat( "W" );
    }
    if ( count >= 30 )
    {
      MQTTinfo.concat( String(kph, 2) );
      MQTTinfo.concat( ",");
      MQTTinfo.concat( windDirection );
      MQTTinfo.concat( ",");
      MQTTinfo.concat( String(rain, 2) );
      xSemaphoreTake( sema_MQTT_KeepAlive, portMAX_DELAY );
      MQTTclient.publish( topicWSWDRF, MQTTinfo.c_str() );
      xSemaphoreGive( sema_MQTT_KeepAlive );
      count = 0;
    }
    count++;
    MQTTinfo = "";
    xLastWakeTime = xTaskGetTickCount();
    vTaskDelayUntil( &xLastWakeTime, xFrequency );
  }
  vTaskDelete ( NULL );
}
// read rainfall
void fRainFall( void *pvParemeters )
{
  int16_t click = 0; //count tipping bucket clicks
  pcnt_counter_pause( PCNT_UNIT_1 );
  pcnt_counter_clear( PCNT_UNIT_1 );
  pcnt_counter_resume( PCNT_UNIT_1 );
  for  (;;)
  {
    xEventGroupWaitBits (eg, evtRainFall, pdTRUE, pdTRUE, portMAX_DELAY);
    if ( (rtc.getHour(true) == 23) && (rtc.getMinute() == 59) )
    {
      pcnt_counter_pause( PCNT_UNIT_1 );
      rain = 0.0f;
      pcnt_counter_clear( PCNT_UNIT_1 );
      pcnt_counter_resume( PCNT_UNIT_1 );
    } else {
      pcnt_counter_pause( PCNT_UNIT_1 );
      pcnt_get_counter_value( PCNT_UNIT_1, &click );
      if ( click != 0 )
      {
        rain = rain + (0.2794f * (float)click);// 0.2794mm of rain per click
        pcnt_counter_clear( PCNT_UNIT_1 );
        log_i( "count %d, rain rain = %f mm", click, rain );
      }
      pcnt_counter_resume( PCNT_UNIT_1 );
      click = 0;
    }
  }
  vTaskDelete ( NULL );
}
////
void fAnemometer( void *pvParameters )
{
  int16_t count = 0;
  pcnt_counter_clear(PCNT_UNIT_0);
  pcnt_counter_resume(PCNT_UNIT_0);
  for (;;)
  {
    xEventGroupWaitBits (eg, evtAnemometer, pdTRUE, pdTRUE, portMAX_DELAY);
    pcnt_counter_pause( PCNT_UNIT_0 );
    pcnt_get_counter_value( PCNT_UNIT_0, &count);
    kph = 2.4f * ((float)count / 60.0f);// A wind speed of 2.4km/h causes the switch to close once per second
    //log_i( "%f", kph );
    pcnt_counter_clear( PCNT_UNIT_0 );
    pcnt_counter_resume( PCNT_UNIT_0 );
  }
  vTaskDelete ( NULL );
}
//////
void fmqttWatchDog( void * paramater )
{
  int UpdateImeTrigger = 86400; //seconds in a day
  int UpdateTimeInterval = 86300; // 1st time update in 100 counts
  int maxNonMQTTresponse = 60;
  for (;;)
  {
    vTaskDelay( 1000 );
    if ( mqttOK >= maxNonMQTTresponse )
    {
      ESP.restart();
    }
    xSemaphoreTake( sema_mqttOK, portMAX_DELAY );
    mqttOK++;
    xSemaphoreGive( sema_mqttOK );
    UpdateTimeInterval++; // trigger new time get
    if ( UpdateTimeInterval >= UpdateImeTrigger )
    {
      TimeSet = false; // sets doneTime to false to get an updated time after a days count of seconds
      UpdateTimeInterval = 0;
    }
  }
  vTaskDelete( NULL );
}
//////
void fparseMQTT( void *pvParameters )
{
  struct stu_message px_message;
  for (;;)
  {
    if ( xQueueReceive(xQ_Message, &px_message, portMAX_DELAY) == pdTRUE )
    {
      // parse the time from the OK message and update MCU time
      if ( String(px_message.topic) == topicOK )
      {
        if ( !TimeSet)
        {
          String temp = "";
          temp =  px_message.payload[0];
          temp += px_message.payload[1];
          temp += px_message.payload[2];
          temp += px_message.payload[3];
          int year =  temp.toInt();
          temp = "";
          temp =  px_message.payload[5];
          temp += px_message.payload[6];
          int month =  temp.toInt();
          temp =  "";
          temp =  px_message.payload[8];
          temp += px_message.payload[9];
          int day =  temp.toInt();
          temp = "";
          temp = px_message.payload[11];
          temp += px_message.payload[12];
          int hour =  temp.toInt();
          temp = "";
          temp = px_message.payload[14];
          temp += px_message.payload[15];
          int min =  temp.toInt();
          rtc.setTime( 0, min, hour, day, month, year );
          log_i( "rtc  %s ", rtc.getTime() );
          TimeSet = true;
        }
      }
      //
    } //if ( xQueueReceive(xQ_Message, &px_message, portMAX_DELAY) == pdTRUE )
    xSemaphoreTake( sema_mqttOK, portMAX_DELAY );
    mqttOK = 0;
    xSemaphoreGive( sema_mqttOK );
  }
} // void fparseMQTT( void *pvParameters )#include <ESP32Time.h>
//////
void fReadCurrent( void * parameter )
{
  const TickType_t xFrequency = 1000; //delay for mS
  const float mVperAmp        = 185.0f;
  float    ADbits             = 4096.0f;
  float    ref_voltage        = 3.3f;
  float    mA                 = 0.0f;
  float    adcValue           = 0.0f;
  float    Voltage            = 0.0f;
  float    Power              = 0.0f;
  float    offSET             = 0.0f;
  int      printCount         = 0;
  uint64_t TimePastKalman     = esp_timer_get_time(); // used by the Kalman filter UpdateProcessNoise, time since last kalman calculation
  SimpleKalmanFilter KF_I( 1.0f, 1.0f, .01f );
  /*
     185mv/A = 5 AMP MODULE
     100mv/A = 20 amp module
     66mv/A = 30 amp module
  */
  String powerInfo = "";
  powerInfo.reserve( 150 );
  while ( !MQTTclient.connected() )
  {
    vTaskDelay( 250 );
  }
  TickType_t xLastWakeTime = xTaskGetTickCount();
  for (;;)
  {
    adc1_get_raw(ADC1_CHANNEL_3); // read once discard reading
    adcValue = ( (float)adc1_get_raw(ADC1_CHANNEL_3) );
    //log_i( "adcValue I = %f", adcValue );
    Voltage = ( (adcValue * ref_voltage) / ADbits ) + offSET; // Gets you mV
    mA = Voltage / mVperAmp; // get amps
    KF_I.setProcessNoise( (esp_timer_get_time() - TimePastKalman) / 1000000.0f ); //get time, in microsecods, since last readings
    mA = KF_I.updateEstimate( mA ); // apply simple Kalman filter
    TimePastKalman = esp_timer_get_time(); // time of update complete
    printCount++;
    if ( printCount == 60 )
    {
      xSemaphoreTake( sema_CalculatedVoltage, portMAX_DELAY);
      Power = CalculatedVoltage * mA;
      //log_i( "Voltage=%f mA=%f Power=%f", CalculatedVoltage, mA, Power );
      printCount = 0;
      powerInfo.concat( String(CalculatedVoltage, 2) );
      xSemaphoreGive( sema_CalculatedVoltage );
      powerInfo.concat( ",");
      powerInfo.concat( String(mA, 4) );
      powerInfo.concat( ",");
      powerInfo.concat( String(Power, 4) );
      xSemaphoreTake( sema_MQTT_KeepAlive, portMAX_DELAY );
      MQTTclient.publish( topicPower, powerInfo.c_str() );
      xSemaphoreGive( sema_MQTT_KeepAlive );
      powerInfo = "";
    }
    xLastWakeTime = xTaskGetTickCount();
    vTaskDelayUntil( &xLastWakeTime, xFrequency );
  }
  vTaskDelete( NULL );
} //void fReadCurrent( void * parameter )
////
void fReadBattery( void * parameter )
{
  const float r1 = 50500.0f; // R1 in ohm, 50K
  const float r2 = 10000.0f; // R2 in ohm, 10k potentiometer
  const TickType_t xFrequency = 1000; //delay for mS
  float    adcValue = 0.0f;
  float    Vbatt = 0.0f;
  int      printCount = 0;
  float    vRefScale = (3.3f / 4096.0f) * ((r1 + r2) / r2);
  uint64_t TimePastKalman  = esp_timer_get_time(); // used by the Kalman filter UpdateProcessNoise, time since last kalman calculation
  SimpleKalmanFilter KF_ADC_b( 1.0f, 1.0f, .01f );
  TickType_t xLastWakeTime = xTaskGetTickCount();
  for (;;)
  {
    adc1_get_raw(ADC1_CHANNEL_0); //read and discard
    adcValue = float( adc1_get_raw(ADC1_CHANNEL_0) ); //take a raw ADC reading
    KF_ADC_b.setProcessNoise( (esp_timer_get_time() - TimePastKalman) / 1000000.0f ); //get time, in microsecods, since last readings
    adcValue = KF_ADC_b.updateEstimate( adcValue ); // apply simple Kalman filter
    Vbatt = adcValue * vRefScale;
    xSemaphoreTake( sema_CalculatedVoltage, portMAX_DELAY );
    CalculatedVoltage = Vbatt;
    xSemaphoreGive( sema_CalculatedVoltage );
    
      printCount++;
      if ( printCount == 3 )
      {
      //log_i( "Vbatt %f", Vbatt );
      printCount = 0;
      }
    
    TimePastKalman = esp_timer_get_time(); // time of update complete
    xLastWakeTime = xTaskGetTickCount();
    vTaskDelayUntil( &xLastWakeTime, xFrequency );
    //log_i( "fReadBattery %d",  uxTaskGetStackHighWaterMark( NULL ) );
  }
  vTaskDelete( NULL );
}
////
void MQTTkeepalive( void *pvParameters )
{
  sema_MQTT_KeepAlive   = xSemaphoreCreateBinary();
  xSemaphoreGive( sema_MQTT_KeepAlive ); // found keep alive can mess with a publish, stop keep alive during publish
  // setting must be set before a mqtt connection is made
  MQTTclient.setKeepAlive( 90 ); // setting keep alive to 90 seconds makes for a very reliable connection, must be set before the 1st connection is made.
  for (;;)
  {
    //check for a is-connected and if the WiFi 'thinks' its connected, found checking on both is more realible than just a single check
    if ( (wifiClient.connected()) && (WiFi.status() == WL_CONNECTED) )
    {
      xSemaphoreTake( sema_MQTT_KeepAlive, portMAX_DELAY ); // whiles MQTTlient.loop() is running no other mqtt operations should be in process
      MQTTclient.loop();
      xSemaphoreGive( sema_MQTT_KeepAlive );
    }
    else {
      log_i( "MQTT keep alive found MQTT status %s WiFi status %s", String(wifiClient.connected()), String(WiFi.status()) );
      if ( !(wifiClient.connected()) || !(WiFi.status() == WL_CONNECTED) )
      {
        connectToWiFi();
      }
      connectToMQTT();
    }
    vTaskDelay( 250 ); //task runs approx every 250 mS
  }
  vTaskDelete ( NULL );
}
////
void connectToWiFi()
{
  int TryCount = 0;
  while ( WiFi.status() != WL_CONNECTED )
  {
    TryCount++;
    WiFi.disconnect();
    WiFi.begin( SSID, PASSWORD );
    vTaskDelay( 4000 );
    if ( TryCount == 10 )
    {
      ESP.restart();
    }
  }
  WiFi.onEvent( WiFiEvent );
} // void connectToWiFi()
////
void connectToMQTT()
{
  MQTTclient.setKeepAlive( 90 ); // needs be made before connecting
  byte mac[5];
  WiFi.macAddress(mac);
  String clientID = String(mac[0]) + String(mac[4]) ; // use mac address to create clientID
  while ( !MQTTclient.connected() )
  {
    // boolean connect(const char* id, const char* user, const char* pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage);
    MQTTclient.connect( clientID.c_str(), mqtt_username, mqtt_password, NULL , 1, true, NULL );
    vTaskDelay( 250 );
  }
  MQTTclient.setCallback( mqttCallback );
  MQTTclient.subscribe( topicOK );
} // void connectToMQTT()
////
void loop() {}
#include <WiFi.h>
#include <PubSubClient.h>
#include <ezButton.h>
#include <Preferences.h>
#include <GyverPortal.h>
#include "EasyNextionLibrary.h"

////////////////////////////////////////////////
//WiFi_MQTT
struct config_wifi_struct {
  char ssid[30];
  char pass[30];
  bool DHCP;
  char ip[16];
  char gt[16];
  char mas[16];
  char dns[16];
};

struct config_mqtt_struct {
  char mqtt_server[16];
  int mqtt_port;
  char mqtt_username[30];
  char mqtt_password[30];
};

config_wifi_struct str_wifi;
config_mqtt_struct str_mqtt;

WiFiClient wifiClient;
PubSubClient MQTTclient(wifiClient);
EasyNex myNex(Serial2);



SemaphoreHandle_t sema_MQTT_KeepAlive;
SemaphoreHandle_t sema_button_loop;

/// button_relay
const int BUTTON_PIN_1 = 27;
const int BUTTON_PIN_2 = 14;
const int LED_PIN_1 = 19;
const int LED_PIN_2 = 22;

ezButton button_1(BUTTON_PIN_1);
ezButton button_2(BUTTON_PIN_2);

int ledState_1 = LOW;
int ledState_2 = LOW;

/// preferens
Preferences preferences;

/// nextion

int mqtt_delay_send = 500;
int nextion_tom_index_1 = 1;
int nextion_tom_index_2 = 0;
int nextion_tom_index_3 = 0;

////////////////////////////////////////////
void setup()
{
  
  Serial.begin(115200);

  myNex.begin(115200);
  
  Serial.println(F("Start"));

  pinMode(LED_PIN_1, OUTPUT);
  pinMode(LED_PIN_2, OUTPUT);
  
  pinMode(4, OUTPUT);
  digitalWrite(4, LOW);
  
  button_1.setDebounceTime(50);
  button_2.setDebounceTime(50);
 
  if(!button_1.getStateRaw()) {
    loginPortal();
  }
  read_config_wifi_mqtt();
  
  DHCP_switch();
  
  sema_MQTT_KeepAlive = xSemaphoreCreateBinary();
  sema_button_loop = xSemaphoreCreateBinary();
  
  xSemaphoreGive(sema_MQTT_KeepAlive);
  xSemaphoreGive(sema_button_loop);
  
  xTaskCreatePinnedToCore(MQTTkeepalive, "MQTTkeepalive", 20000, NULL, 3, NULL, 1);
  xTaskCreatePinnedToCore(button_loop, "button_loop", 10000, NULL, 1, NULL, 0);
}

void loop() {}

void button_loop( void *pvParameters ){
    for(;;){
        myNex.NextionListen();
        button_1.loop();
        button_2.loop();
  
        if(button_1.isPressed()) {
          ledState_1 = !ledState_1;
          digitalWrite(LED_PIN_1, ledState_1);
          char val[5];
          itoa(ledState_1, val, 10);
          MQTTclient.publish("esp32/led1_st", val);
        }
        if(button_2.isPressed()) {
          ledState_2 = !ledState_2;
          digitalWrite(LED_PIN_2, ledState_2);
          char val[5]; 
          itoa(ledState_2, val, 10);
          MQTTclient.publish("esp32/led2_st", val);
        }  
    }
    vTaskDelete ( NULL );
}

///////////_____MQTT_WIFI_SETTINGS
void MQTTkeepalive( void *pvParameters )
{
  MQTTclient.setServer(str_mqtt.mqtt_server, str_mqtt.mqtt_port);
  MQTTclient.setCallback(callback);
  MQTTclient.setKeepAlive(90);
  
  for (;;)
  {
    if ((wifiClient.connected()) && (WiFi.status() == WL_CONNECTED))
    {
      xSemaphoreTake(sema_MQTT_KeepAlive, portMAX_DELAY);
      MQTTclient.loop();
      xSemaphoreGive( sema_MQTT_KeepAlive );
    }
    else {
      Serial.print(F("MQTT keep alive found MQTT status: "));
      Serial.println(String(wifiClient.connected()));
      Serial.print(F("WiFi status: "));
      Serial.println(String(WiFi.status()));
      if (!(WiFi.status() == WL_CONNECTED))
      {
        connectToWiFi();
      }
      
      connectToMQTT();
      main_Portal();
    }
    vTaskDelay( 250 );
  }
  vTaskDelete ( NULL );
}

void connectToMQTT()
{
  String client_id = "esp32-client-";
  client_id += String(WiFi.macAddress());
  Serial.print(F("Connect to MQTT as client: "));
  
  
  
  
  Serial.println(client_id);

  while (!MQTTclient.connected())
  {
    MQTTclient.connect(client_id.c_str(), str_mqtt.mqtt_username, str_mqtt.mqtt_password);
    Serial.println(F("Connecting to MQTT"));
    vTaskDelay(250);
  }
  
  Serial.println(F("MQTT Connected"));
  MQTTclient.publish("esp32/output", "Hi -_-");
  MQTTclient.subscribe("/devices/panel_test/controls/bt_2_1_1");
  MQTTclient.subscribe("/devices/panel_test/controls/bt_2_2_1");
  MQTTclient.subscribe("/devices/panel_test/controls/bt_2_3_1");
}

void connectToWiFi()
{
  Serial.println(F("Connect to WiFi"));
  while ( WiFi.status() != WL_CONNECTED )
  {
    WiFi.disconnect();
    Serial.println(str_wifi.ssid);
    WiFi.begin(str_wifi.ssid, str_wifi.pass);
    Serial.println(F("waiting on wifi connection"));
    vTaskDelay( 4000 );
  }
  Serial.println(F("Connected to WiFi"));
  Serial.print(F("ip address: "));
  Serial.println(WiFi.localIP());
  WiFi.onEvent(WiFiEvent);
}

void WiFiEvent(WiFiEvent_t event)
{
  switch (event) {
    case SYSTEM_EVENT_STA_CONNECTED:
      Serial.println(F("Connected to access point"));
      break;
    case SYSTEM_EVENT_STA_DISCONNECTED:
      Serial.println(F("Disconnected from WiFi access point"));
      break;
    case SYSTEM_EVENT_AP_STADISCONNECTED:
      Serial.println(F("WiFi client disconnected"));
      break;
    default: break;
  }
}

void callback(char* requestTopic, byte* payload, unsigned int length)
{
  char messageBuffer[30];
  memcpy(messageBuffer, payload, length);
  messageBuffer[length] = '\0';
  Serial.printf("%s\n", messageBuffer);
}

///////////_____WEB_Portal
void build_login() {
  BUILD_BEGIN();
  GP.THEME(GP_DARK);
  
  GP.FORM_BEGIN("/login");
  GP.TEXT("lg", "Login", str_wifi.ssid);
  GP.BREAK();
  GP.TEXT("ps", "Password", str_wifi.pass);
  GP.BREAK();
  GP.LABEL("DHCP: ");
  GP.BREAK();
  GP.CHECK("DHCP", str_wifi.DHCP);
  GP.BREAK();
  GP.TEXT("ip", "ip", str_wifi.ip);
  GP.BREAK();
  GP.TEXT("gt", "gt", str_wifi.gt);
  GP.BREAK();
  GP.TEXT("mas", "mas", str_wifi.mas);
  GP.BREAK();
  GP.TEXT("dns", "dns", str_wifi.dns);
  GP.BREAK();
  GP.TEXT("mqtt_server", "mqtt_server", str_mqtt.mqtt_server);
  GP.BREAK();
  GP.NUMBER("mqtt_port", "mqtt_port", str_mqtt.mqtt_port);
  GP.BREAK();
  GP.TEXT("mqtt_username", "mqtt_username", str_mqtt.mqtt_username);
  GP.BREAK();
  GP.TEXT("mqtt_password", "mqtt_password", str_mqtt.mqtt_password);
  GP.BREAK();
  GP.SUBMIT("Submit");
  GP.FORM_END();

  BUILD_END();
}

void build_main() {
  //const char* strLocalIp = WiFi.localIP().toString().c_str();
  //const char* cstrIp = strLocalIp.c_str();
  //String str_getHostname = WiFi.getHostname().toString();
  //const char* cstrIp = strLocalIp.c_str();
  //String str_macAddress = WiFi.macAddress().toString();
  //const char* cstrIp = strLocalIp.c_str();
  //String str_subnetMask = WiFi.subnetMask().toString();
  //const char* cstrIp = strLocalIp.c_str();
  //String str_gatewayIP = WiFi.gatewayIP().toString();
  //const char* cstrIp = strLocalIp.c_str();
  //String str_dnsIP = WiFi.dnsIP().toString();
  //const char* cstrIp = strLocalIp.c_str();
  
  BUILD_BEGIN();
  GP.THEME(GP_DARK);
  
  GP.FORM_BEGIN("/login");
  GP.TITLE("Onyx panel");
  GP.HR();
  GP.LABEL("IP address: ");
  //GP.LABEL(strLocalIp);
  GP.BREAK();
  GP.LABEL("Hostname: ");
  //GP.LABEL(str_getHostname);
  GP.BREAK();
  GP.LABEL("Mac Address: ");
  //GP.LABEL(str_macAddress);
  GP.BREAK();
  GP.LABEL("Subnet Mask: ");
  //GP.LABEL(str_subnetMask);
  GP.BREAK();
  GP.LABEL("Gateway IP: ");
  //GP.LABEL(str_gatewayIP);
  GP.BREAK();
  GP.LABEL("DNS: ");
  //GP.LABEL(str_dnsIP);
  GP.BREAK();
  GP.HR();
  GP.LABEL("MQTT IP: ");
  GP.LABEL(str_mqtt.mqtt_server);
  GP.BREAK();
  GP.LABEL("MQTT PORT: ");
  GP.LABEL(str_mqtt.mqtt_port);
  GP.BREAK();
  GP.FORM_END();

  BUILD_END();
}

void action_login(GyverPortal& p) {
  if (p.form("/login")) {
    p.copyStr("lg", str_wifi.ssid);
    p.copyStr("ps", str_wifi.pass);
    str_wifi.DHCP = p.getCheck("DHCP");
    p.copyStr("ip", str_wifi.ip);
    p.copyStr("gt", str_wifi.gt);
    p.copyStr("mas", str_wifi.mas);
    p.copyStr("dns", str_wifi.dns);
    
    p.copyStr("mqtt_server", str_mqtt.mqtt_server);
    str_mqtt.mqtt_port = p.getInt("mqtt_port");
    p.copyStr("mqtt_username", str_mqtt.mqtt_username);
    p.copyStr("mqtt_password", str_mqtt.mqtt_password);
    
    WiFi.softAPdisconnect();
    load_config_wifi_mqtt();
    
  }
}

void loginPortal() {
  Serial.println(F("Portal start"));

  // запускаем точку доступа
  WiFi.mode(WIFI_AP);
  WiFi.softAP("WiFi Config");

  // запускаем портал
  Serial.println(F("Запуск портала"));
  GyverPortal portal;
  portal.attachBuild(build_login);
  Serial.println(F("Запуск build_login"));
  portal.start(WIFI_AP);
  Serial.println(F("Запуск WIFI_AP"));
  portal.attach(action_login);
  Serial.println(F("Запуск action_login"));

  // работа портала
  while (portal.tick());
}

void main_Portal() {
  GyverPortal portal;
  portal.attachBuild(build_main);
  portal.start();
  portal.enableOTA();
  portal.log.start();

  // работа портала
  while (portal.tick());
}

///////////_____JSON_load_read
void load_config_wifi_mqtt(){
  preferences.begin("wifi_mqtt", false);
  
  String ssid = String(str_wifi.ssid);
  String pass = String(str_wifi.pass);
  String ip = String(str_wifi.ip);
  String gt = String(str_wifi.gt);
  String mas = String(str_wifi.mas);
  String dns = String(str_wifi.dns);

  String mqtt_server = String(str_mqtt.mqtt_server);
  String mqtt_username = String(str_mqtt.mqtt_username);
  String mqtt_password = String(str_mqtt.mqtt_password);

  preferences.putString("ssid", ssid);
  preferences.putString("pass", pass);
  preferences.putBool("DHCP", str_wifi.DHCP);
  preferences.putString("ip", ip);
  preferences.putString("gt", gt);
  preferences.putString("mas", mas);
  preferences.putString("dns", dns);

  preferences.putString("mqtt_server", mqtt_server);
  preferences.putInt("mqtt_port", str_mqtt.mqtt_port);
  preferences.putString("mqtt_username", mqtt_username);
  preferences.putString("mqtt_password", mqtt_password);

  preferences.end();
}

void read_config_wifi_mqtt(){
  preferences.begin("wifi_mqtt", false);

  String ssid = preferences.getString("ssid", "0");
  String pass = preferences.getString("pass", "0");
  str_wifi.DHCP = preferences.getBool("DHCP", true);
  String ip = preferences.getString("ip", "0");
  String gt = preferences.getString("gt", "0");
  String mas = preferences.getString("mas", "0");
  String dns = preferences.getString("dns", "0");

  String mqtt_server = preferences.getString("mqtt_server", "0");
  str_mqtt.mqtt_port = preferences.getInt("mqtt_port", 1883);
  String mqtt_username = preferences.getString("mqtt_username", "0");
  String mqtt_password = preferences.getString("mqtt_password", "0");

  preferences.end();

  strcpy(str_wifi.ssid, ssid.c_str());
  strcpy(str_wifi.pass, pass.c_str());
  strcpy(str_wifi.ip, ip.c_str());
  strcpy(str_wifi.gt, gt.c_str());
  strcpy(str_wifi.mas, mas.c_str());
  strcpy(str_wifi.dns, dns.c_str());

  strcpy(str_mqtt.mqtt_server, mqtt_server.c_str());
  strcpy(str_mqtt.mqtt_username, mqtt_username.c_str());
  strcpy(str_mqtt.mqtt_password, mqtt_password.c_str());
}

bool DHCP_switch (){
  if(!str_wifi.DHCP){
    IPAddress local_IP(ipaddr_addr(str_wifi.ip));
    IPAddress gateway(ipaddr_addr(str_wifi.gt));
    IPAddress subnet(ipaddr_addr(str_wifi.mas));
    IPAddress primaryDNS(ipaddr_addr(str_wifi.dns));
    
    if (WiFi.config(local_IP, gateway, subnet, primaryDNS) == false) {
      Serial.println("Configuration failed.");
      return false;
    }
    Serial.println("Configuration set.");
    return true;
  }
}


////nextion
void trigger0(){
  nextion_tom_index_1 = myNex.readNumber("controll.bt2_1_1.val");
  nextion_tom_index_2 = myNex.readNumber("controll.bt2_1_2.val");
  nextion_tom_index_3 = myNex.readNumber("controll.bt2_1_3.val");
}
void trigger21(){
  char mqtt_val[5];
  String val = "0";
  val.toCharArray(mqtt_val,5);
  MQTTclient.publish("/devices/panel_test/controls/bt_2_1_1/on", mqtt_val);
  val = "1";
  val.toCharArray(mqtt_val,5);
  delay(mqtt_delay_send);
  MQTTclient.publish("/devices/panel_test/controls/bt_2_1_1/on", mqtt_val);
}
void trigger22(){
  char mqtt_val[5];
  String val = "0";
  val.toCharArray(mqtt_val,5);
  MQTTclient.publish("/devices/panel_test/controls/bt_2_1_1/on", mqtt_val);
}
void trigger23(){
  char mqtt_val[5];
  String val = "1";
  val.toCharArray(mqtt_val,5);
  MQTTclient.publish("/devices/panel_test/controls/bt_2_1_1/on", mqtt_val);
}

I recive:

ets Jun  8 2016 00:22:57

rst:0x10 (RTCWDT_RTC_RESET),boot:0x13 (SPI_FAST_FLASH_BOOT)
configsip: 0, SPIWP:0xee
clk_drv:0x00,q_drv:0x00,d_drv:0x00,cs0_drv:0x00,hd_drv:0x00,wp_drv:0x00
mode:DIO, clock div:1
load:0x3fff0018,len:4
load:0x3fff001c,len:1044
load:0x40078000,len:10124
load:0x40080400,len:5856
entry 0x400806a8
Start
Configuration set.
MQTT keep alive found MQTT status: 0
WiFi status: 6
Connect to WiFi
KSI
waiting on wifi connection
Connected to WiFi
ip address: 192.168.82.205
Connect to MQTT as client: esp32-client-AC:67:B2:31:87:B4
Connecting to MQTT
MQTT Connected
E (5401) task_wdt: Task watchdog got triggered. The following tasks did not reset the watchdog in time:
E (5401) task_wdt:  - IDLE0 (CPU 0)
E (5401) task_wdt: Tasks currently running:
E (5401) task_wdt: CPU 0: button_loop
E (5401) task_wdt: CPU 1: loopTask
E (5401) task_wdt: Aborting.
abort() was called at PC 0x401445a4 on core 0

ELF file SHA256: 0000000000000000

Backtrace: 0x400887b8:0x3ffbf890 0x40088a35:0x3ffbf8b0 0x401445a4:0x3ffbf8d0 0x40086f2d:0x3ffbf8f0 0x40084272:0x3ffd3720 0x40084eab:0x3ffd3740 0x400811e3:0x3ffd3760 0x400d63d4:0x3ffd3780 0x400d0e51:0x3ffd37a0 0x40089a46:0x3ffd37d0
xTaskCreatePinnedToCore(button_loop, "button_loop", 10000, NULL, 1, NULL, 0);

Using core0 with WiFi requires skills. Put all the tasks on core1 when WiFi is in use.

Sorry, I didn't understand
I just need to fix callbac

I changed kernels, but callbac still doesn't work. No errors

post the most recent code being used.

Did you use a 3rd party software to connect to your MQTT Broker to confirm your connection parameters are properly working?

I use MQTT.fx to confirm my MQTT settings.

#include <WiFi.h>
#include <PubSubClient.h>
#include <ezButton.h>
#include <Preferences.h>
#include <GyverPortal.h>
#include "EasyNextionLibrary.h"

////////////////////////////////////////////////
//WiFi_MQTT
struct config_wifi_struct {
  char ssid[30];
  char pass[30];
  bool DHCP;
  char ip[16];
  char gt[16];
  char mas[16];
  char dns[16];
};

struct config_mqtt_struct {
  char mqtt_server[16];
  int mqtt_port;
  char mqtt_username[30];
  char mqtt_password[30];
};

config_wifi_struct str_wifi;
config_mqtt_struct str_mqtt;

WiFiClient wifiClient;
PubSubClient MQTTclient(wifiClient);
EasyNex myNex(Serial2);



SemaphoreHandle_t sema_MQTT_KeepAlive;
SemaphoreHandle_t sema_button_loop;

/// button_relay
const int BUTTON_PIN_1 = 27;
const int BUTTON_PIN_2 = 14;
const int LED_PIN_1 = 19;
const int LED_PIN_2 = 22;

ezButton button_1(BUTTON_PIN_1);
ezButton button_2(BUTTON_PIN_2);

int ledState_1 = LOW;
int ledState_2 = LOW;

/// preferens
Preferences preferences;

/// nextion

int mqtt_delay_send = 500;
int nextion_tom_index_1 = 1;
int nextion_tom_index_2 = 0;
int nextion_tom_index_3 = 0;

////////////////////////////////////////////
void setup()
{
  
  Serial.begin(115200);

  myNex.begin(115200);
  
  Serial.println(F("Start"));

  pinMode(LED_PIN_1, OUTPUT);
  pinMode(LED_PIN_2, OUTPUT);
  
  pinMode(4, OUTPUT);
  digitalWrite(4, LOW);
  
  button_1.setDebounceTime(50);
  button_2.setDebounceTime(50);
 
  if(!button_1.getStateRaw()) {
    loginPortal();
  }
  read_config_wifi_mqtt();
  
  DHCP_switch();
  
  sema_MQTT_KeepAlive = xSemaphoreCreateBinary();
  sema_button_loop = xSemaphoreCreateBinary();
  
  xSemaphoreGive(sema_MQTT_KeepAlive);
  xSemaphoreGive(sema_button_loop);
  
  xTaskCreatePinnedToCore(MQTTkeepalive, "MQTTkeepalive", 20000, NULL, 3, NULL, 0);
  xTaskCreatePinnedToCore(button_loop, "button_loop", 10000, NULL, 1, NULL, 1);
}

void loop() {}

void button_loop( void *pvParameters ){
    for(;;){
        myNex.NextionListen();
        button_1.loop();
        button_2.loop();
  
        if(button_1.isPressed()) {
          ledState_1 = !ledState_1;
          digitalWrite(LED_PIN_1, ledState_1);
          char val[5];
          itoa(ledState_1, val, 10);
          MQTTclient.publish("esp32/led1_st", val);
        }
        if(button_2.isPressed()) {
          ledState_2 = !ledState_2;
          digitalWrite(LED_PIN_2, ledState_2);
          char val[5]; 
          itoa(ledState_2, val, 10);
          MQTTclient.publish("esp32/led2_st", val);
        }  
    }
    vTaskDelete ( NULL );
}

///////////_____MQTT_WIFI_SETTINGS
void MQTTkeepalive( void *pvParameters )
{
  MQTTclient.setServer(str_mqtt.mqtt_server, str_mqtt.mqtt_port);
  MQTTclient.setCallback(callback);
  MQTTclient.setKeepAlive(90);
  
  for (;;)
  {
    if ((wifiClient.connected()) && (WiFi.status() == WL_CONNECTED))
    {
      xSemaphoreTake(sema_MQTT_KeepAlive, portMAX_DELAY);
      MQTTclient.loop();
      xSemaphoreGive( sema_MQTT_KeepAlive );
    }
    else {
      Serial.print(F("MQTT keep alive found MQTT status: "));
      Serial.println(String(wifiClient.connected()));
      Serial.print(F("WiFi status: "));
      Serial.println(String(WiFi.status()));
      if (!(WiFi.status() == WL_CONNECTED))
      {
        connectToWiFi();
      }
      
      connectToMQTT();
      main_Portal();
    }
    vTaskDelay( 250 );
  }
  vTaskDelete ( NULL );
}

void connectToMQTT()
{
  String client_id = "esp32-client-";
  client_id += String(WiFi.macAddress());
  Serial.print(F("Connect to MQTT as client: "));
  
  
  
  
  Serial.println(client_id);

  while (!MQTTclient.connected())
  {
    MQTTclient.connect(client_id.c_str(), str_mqtt.mqtt_username, str_mqtt.mqtt_password);
    Serial.println(F("Connecting to MQTT"));
    vTaskDelay(250);
  }
  
  Serial.println(F("MQTT Connected"));
  MQTTclient.publish("esp32/output", "Hi -_-");
  MQTTclient.subscribe("/devices/panel_test/controls/bt_2_1_1");
  MQTTclient.subscribe("/devices/panel_test/controls/bt_2_2_1");
  MQTTclient.subscribe("/devices/panel_test/controls/bt_2_3_1");
}

void connectToWiFi()
{
  Serial.println(F("Connect to WiFi"));
  while ( WiFi.status() != WL_CONNECTED )
  {
    WiFi.disconnect();
    Serial.println(str_wifi.ssid);
    WiFi.begin(str_wifi.ssid, str_wifi.pass);
    Serial.println(F("waiting on wifi connection"));
    vTaskDelay( 4000 );
  }
  Serial.println(F("Connected to WiFi"));
  Serial.print(F("ip address: "));
  Serial.println(WiFi.localIP());
  WiFi.onEvent(WiFiEvent);
}

void WiFiEvent(WiFiEvent_t event)
{
  switch (event) {
    case SYSTEM_EVENT_STA_CONNECTED:
      Serial.println(F("Connected to access point"));
      break;
    case SYSTEM_EVENT_STA_DISCONNECTED:
      Serial.println(F("Disconnected from WiFi access point"));
      break;
    case SYSTEM_EVENT_AP_STADISCONNECTED:
      Serial.println(F("WiFi client disconnected"));
      break;
    default: break;
  }
}

void callback(char* requestTopic, byte* payload, unsigned int length)
{
  char messageBuffer[30];
  memcpy(messageBuffer, payload, length);
  messageBuffer[length] = '\0';
  Serial.printf("%s\n", messageBuffer);
}

///////////_____WEB_Portal
void build_login() {
  BUILD_BEGIN();
  GP.THEME(GP_DARK);
  
  GP.FORM_BEGIN("/login");
  GP.TEXT("lg", "Login", str_wifi.ssid);
  GP.BREAK();
  GP.TEXT("ps", "Password", str_wifi.pass);
  GP.BREAK();
  GP.LABEL("DHCP: ");
  GP.BREAK();
  GP.CHECK("DHCP", str_wifi.DHCP);
  GP.BREAK();
  GP.TEXT("ip", "ip", str_wifi.ip);
  GP.BREAK();
  GP.TEXT("gt", "gt", str_wifi.gt);
  GP.BREAK();
  GP.TEXT("mas", "mas", str_wifi.mas);
  GP.BREAK();
  GP.TEXT("dns", "dns", str_wifi.dns);
  GP.BREAK();
  GP.TEXT("mqtt_server", "mqtt_server", str_mqtt.mqtt_server);
  GP.BREAK();
  GP.NUMBER("mqtt_port", "mqtt_port", str_mqtt.mqtt_port);
  GP.BREAK();
  GP.TEXT("mqtt_username", "mqtt_username", str_mqtt.mqtt_username);
  GP.BREAK();
  GP.TEXT("mqtt_password", "mqtt_password", str_mqtt.mqtt_password);
  GP.BREAK();
  GP.SUBMIT("Submit");
  GP.FORM_END();

  BUILD_END();
}

void build_main() {
  //const char* strLocalIp = WiFi.localIP().toString().c_str();
  //const char* cstrIp = strLocalIp.c_str();
  //String str_getHostname = WiFi.getHostname().toString();
  //const char* cstrIp = strLocalIp.c_str();
  //String str_macAddress = WiFi.macAddress().toString();
  //const char* cstrIp = strLocalIp.c_str();
  //String str_subnetMask = WiFi.subnetMask().toString();
  //const char* cstrIp = strLocalIp.c_str();
  //String str_gatewayIP = WiFi.gatewayIP().toString();
  //const char* cstrIp = strLocalIp.c_str();
  //String str_dnsIP = WiFi.dnsIP().toString();
  //const char* cstrIp = strLocalIp.c_str();
  
  BUILD_BEGIN();
  GP.THEME(GP_DARK);
  
  GP.FORM_BEGIN("/login");
  GP.TITLE("Onyx panel");
  GP.HR();
  GP.LABEL("IP address: ");
  //GP.LABEL(strLocalIp);
  GP.BREAK();
  GP.LABEL("Hostname: ");
  //GP.LABEL(str_getHostname);
  GP.BREAK();
  GP.LABEL("Mac Address: ");
  //GP.LABEL(str_macAddress);
  GP.BREAK();
  GP.LABEL("Subnet Mask: ");
  //GP.LABEL(str_subnetMask);
  GP.BREAK();
  GP.LABEL("Gateway IP: ");
  //GP.LABEL(str_gatewayIP);
  GP.BREAK();
  GP.LABEL("DNS: ");
  //GP.LABEL(str_dnsIP);
  GP.BREAK();
  GP.HR();
  GP.LABEL("MQTT IP: ");
  GP.LABEL(str_mqtt.mqtt_server);
  GP.BREAK();
  GP.LABEL("MQTT PORT: ");
  GP.LABEL(str_mqtt.mqtt_port);
  GP.BREAK();
  GP.FORM_END();

  BUILD_END();
}

void action_login(GyverPortal& p) {
  if (p.form("/login")) {
    p.copyStr("lg", str_wifi.ssid);
    p.copyStr("ps", str_wifi.pass);
    str_wifi.DHCP = p.getCheck("DHCP");
    p.copyStr("ip", str_wifi.ip);
    p.copyStr("gt", str_wifi.gt);
    p.copyStr("mas", str_wifi.mas);
    p.copyStr("dns", str_wifi.dns);
    
    p.copyStr("mqtt_server", str_mqtt.mqtt_server);
    str_mqtt.mqtt_port = p.getInt("mqtt_port");
    p.copyStr("mqtt_username", str_mqtt.mqtt_username);
    p.copyStr("mqtt_password", str_mqtt.mqtt_password);
    
    WiFi.softAPdisconnect();
    load_config_wifi_mqtt();
    
  }
}

void loginPortal() {
  Serial.println(F("Portal start"));

  // запускаем точку доступа
  WiFi.mode(WIFI_AP);
  WiFi.softAP("WiFi Config");

  // запускаем портал
  Serial.println(F("Запуск портала"));
  GyverPortal portal;
  portal.attachBuild(build_login);
  Serial.println(F("Запуск build_login"));
  portal.start(WIFI_AP);
  Serial.println(F("Запуск WIFI_AP"));
  portal.attach(action_login);
  Serial.println(F("Запуск action_login"));

  // работа портала
  while (portal.tick());
}

void main_Portal() {
  GyverPortal portal;
  portal.attachBuild(build_main);
  portal.start();
  portal.enableOTA();
  portal.log.start();

  // работа портала
  while (portal.tick());
}

///////////_____JSON_load_read
void load_config_wifi_mqtt(){
  preferences.begin("wifi_mqtt", false);
  
  String ssid = String(str_wifi.ssid);
  String pass = String(str_wifi.pass);
  String ip = String(str_wifi.ip);
  String gt = String(str_wifi.gt);
  String mas = String(str_wifi.mas);
  String dns = String(str_wifi.dns);

  String mqtt_server = String(str_mqtt.mqtt_server);
  String mqtt_username = String(str_mqtt.mqtt_username);
  String mqtt_password = String(str_mqtt.mqtt_password);

  preferences.putString("ssid", ssid);
  preferences.putString("pass", pass);
  preferences.putBool("DHCP", str_wifi.DHCP);
  preferences.putString("ip", ip);
  preferences.putString("gt", gt);
  preferences.putString("mas", mas);
  preferences.putString("dns", dns);

  preferences.putString("mqtt_server", mqtt_server);
  preferences.putInt("mqtt_port", str_mqtt.mqtt_port);
  preferences.putString("mqtt_username", mqtt_username);
  preferences.putString("mqtt_password", mqtt_password);

  preferences.end();
}

void read_config_wifi_mqtt(){
  preferences.begin("wifi_mqtt", false);

  String ssid = preferences.getString("ssid", "0");
  String pass = preferences.getString("pass", "0");
  str_wifi.DHCP = preferences.getBool("DHCP", true);
  String ip = preferences.getString("ip", "0");
  String gt = preferences.getString("gt", "0");
  String mas = preferences.getString("mas", "0");
  String dns = preferences.getString("dns", "0");

  String mqtt_server = preferences.getString("mqtt_server", "0");
  str_mqtt.mqtt_port = preferences.getInt("mqtt_port", 1883);
  String mqtt_username = preferences.getString("mqtt_username", "0");
  String mqtt_password = preferences.getString("mqtt_password", "0");

  preferences.end();

  strcpy(str_wifi.ssid, ssid.c_str());
  strcpy(str_wifi.pass, pass.c_str());
  strcpy(str_wifi.ip, ip.c_str());
  strcpy(str_wifi.gt, gt.c_str());
  strcpy(str_wifi.mas, mas.c_str());
  strcpy(str_wifi.dns, dns.c_str());

  strcpy(str_mqtt.mqtt_server, mqtt_server.c_str());
  strcpy(str_mqtt.mqtt_username, mqtt_username.c_str());
  strcpy(str_mqtt.mqtt_password, mqtt_password.c_str());
}

bool DHCP_switch (){
  if(!str_wifi.DHCP){
    IPAddress local_IP(ipaddr_addr(str_wifi.ip));
    IPAddress gateway(ipaddr_addr(str_wifi.gt));
    IPAddress subnet(ipaddr_addr(str_wifi.mas));
    IPAddress primaryDNS(ipaddr_addr(str_wifi.dns));
    
    if (WiFi.config(local_IP, gateway, subnet, primaryDNS) == false) {
      Serial.println("Configuration failed.");
      return false;
    }
    Serial.println("Configuration set.");
    return true;
  }
}


////nextion
void trigger0(){
  nextion_tom_index_1 = myNex.readNumber("controll.bt2_1_1.val");
  nextion_tom_index_2 = myNex.readNumber("controll.bt2_1_2.val");
  nextion_tom_index_3 = myNex.readNumber("controll.bt2_1_3.val");
}
void trigger21(){
  char mqtt_val[5];
  String val = "0";
  val.toCharArray(mqtt_val,5);
  MQTTclient.publish("/devices/panel_test/controls/bt_2_1_1/on", mqtt_val);
  val = "1";
  val.toCharArray(mqtt_val,5);
  delay(mqtt_delay_send);
  MQTTclient.publish("/devices/panel_test/controls/bt_2_1_1/on", mqtt_val);
}
void trigger22(){
  char mqtt_val[5];
  String val = "0";
  val.toCharArray(mqtt_val,5);
  MQTTclient.publish("/devices/panel_test/controls/bt_2_1_1/on", mqtt_val);
}
void trigger23(){
  char mqtt_val[5];
  String val = "1";
  val.toCharArray(mqtt_val,5);
  MQTTclient.publish("/devices/panel_test/controls/bt_2_1_1/on", mqtt_val);
}

I use mqtt.gs 1.7.1
I can recive msg from esp, but i can't recive msg by esp

ets Jun 8 2016 00:22:57

rst:0x1 (POWERON_RESET),boot:0x13 (SPI_FAST_FLASH_BOOT)
flash read err, 1000
ets_main.c 371
ets Jun 8 2016 00:22:57

rst:0x10 (RTCWDT_RTC_RESET),boot:0x13 (SPI_FAST_FLASH_BOOT)
configsip: 0, SPIWP:0xee
clk_drv:0x00,q_drv:0x00,d_drv:0x00,cs0_drv:0x00,hd_drv:0x00,wp_drv:0x00
mode:DIO, clock div:1
load:0x3fff0018,len:4
load:0x3fff001c,len:1044
load:0x40078000,len:10124
load:0x40080400,len:5856
entry 0x400806a8
Start
Configuration set.
MQTT keep alive found MQTT status: 0
WiFi status: 6
Connect to WiFi
KSI
waiting on wifi connection
Connected to WiFi
ip address: 192.168.82.205
Connect to MQTT as client: esp32-client-AC:67:B2:31:87:B4
Connecting to MQTT
MQTT Connected

next did you try to see if this task is actually running by using a serial print?

if ((wifiClient.connected()) && (WiFi.status() == WL_CONNECTED))
    {
      xSemaphoreTake(sema_MQTT_KeepAlive, portMAX_DELAY);


Serial.println( "I do mqtt client loopies");      


MQTTclient.loop();


      xSemaphoreGive( sema_MQTT_KeepAlive );
    }
    else {
      Serial.print(F("MQTT keep alive found MQTT status: "));
      Serial.println(String(wifiClient.connected()));
      Serial.print(F("WiFi status: "));
      Serial.println(String(WiFi.status()));
      if (!(WiFi.status() == WL_CONNECTED))
      {
        connectToWiFi();
      }
      
      connectToMQTT();
      main_Portal();
    }
    vTaskDelay( 250 );
  }
  vTaskDelete ( NULL );
}


without proving that mqtt.loop() is being run the callback will not work. In order to receive a message from the broker client.loop() MUST run.

Oh. You're right! mqtt.loop() doesn't work.
I didn't recive msg about work loop

Now put some more serial prints to see what/what is running/not running.

Sorry for my stupudity, but i don't know where i can put debug msg. May be suggest your opinion?