MQTT - failed, rc=-2

Hi

i am new to working with Arduino and c/++

but i am working on a small project that uses MQTT

i have setup a test MQTT server on the lan, and i can see the connection attempt from the Arduino device, but the result is:

failed, rc=-2

I can use other client's against the same MQTT broker, so i know that it's working.

if I try to use hostname for the MQTT server instead connecting from the Arduino, the connection is not established at all and i can not see the connection attempt on the broker.
Using hostname from other computers on the LAN works

i am using DHCP to assign IP to the Arduino, it's a mega rev3 with ethernet shield 2
network connection is stable, testing with icmp from another computer on the lan.

i have read many forum and blog post's but have yet to find a solution to the problem.

thanks

Can you set your MQTT server to verbose mode to get a better idea what is wrong?

it's already running in verbose mode, all its printing is:

1641314502: mosquitto version 1.6.9 starting
1641314502: Using default config.
1641314502: Opening ipv6 listen socket on port 1883.
1641314502: Opening ipv4 listen socket on port 1883.
1641314504: New connection from 192.168.1.141 on port 1883.
1641314506: New connection from 192.168.1.141 on port 1883.
1641314511: New connection from 192.168.1.141 on port 1883.
1641314516: New connection from 192.168.1.141 on port 1883.
1641314521: New connection from 192.168.1.141 on port 1883.
1641314526: New connection from 192.168.1.141 on port 1883.
1641314531: New connection from 192.168.1.141 on port 1883.
1641314536: New connection from 192.168.1.141 on port 1883.

code:

/*
* Untested fork of https://github.com/Vongraven/Flexit-SL4R-master
* Sending data over MQTT using ESP8266 and MAX3485. Waiting for parts..
*/

#include <Arduino.h>
#include <SPI.h>
#include <Ethernet.h>
#include <PubSubClient.h>
#include "Timer.h"


#define RXen    4
#define TXen    5
#define COM_VCC 6


#define MQTT_HOST  "192.168.1.120"
#define MQTT_PORT  1883
#define MQTT_USER "user1"
#define MQTT_PASS "test"
#define MQTT_CLIENT_ID "FlexitPanel"

#define TOPSZ 60  // Max number of characters in topic string
#define MESSZ 240 // Max number of characters in JSON message string


byte mac[] = { 0xDE, 0xAD, 0xBE, 0xEF, 0xFE, 0xED };


//MQTT command:
#define MQTT_COMMAND_CHANNEL "command/FlexitPanel/#"
//MQTT status: 
//heater element enabled or disabled
#define MQTT_STATUS_HEATER_ENABLE "state/FlexitPanel/heater_enable"
//fan level, 1 (low), 2 (medium), 3 (high)
#define MQTT_STATUS_FAN "state/FlexitPanel/fan_level"
//temperature
#define MQTT_STATUS_TEMP "state/FlexitPanel/temperature"
//heater state, (is the heating element active)
#define MQTT_STATUS_HEATER_STATE "state/FlexitPanel/heater_state"

uint8_t commandBuffer[18] = {195, 4, 0, 199, 81, 193, 4, 8, 32, 15, 0, 'F', 'P', 4, 0, 'T' };
uint8_t processedData [4]   = {};
uint8_t latestSentData [4]   = {};
uint8_t rawData [25]   = {};

uint8_t* rawFanLevel       = &rawData[5];
uint8_t* rawPreheatOnOff   = &rawData[6];
uint8_t* rawHeatExchTemp   = &rawData[9];
uint8_t* rawPreheatActive1 = &rawData[10];
uint8_t* rawPreheatActive2 = &rawData[11];

uint8_t* fanLevel          = &processedData[0];
uint8_t* heatExchTemp      = &processedData[1];
uint8_t* preheatEnable      = &processedData[2];
uint8_t* preheatState     = &processedData[3];

uint8_t* sentFanLevel      = &latestSentData[0];
uint8_t* sentHeatExchTemp  = &latestSentData[1];
uint8_t* sentPreheatEnable  = &latestSentData[2];
uint8_t* sentPreheatState = &latestSentData[3];

//Serial Serial1(2, 3);

EthernetClient ethClient;
PubSubClient mqttClient(ethClient);   // MQTT Client
Timer t;


void initMQTT();
void reconnectMQTT();
void mqttDataCb(char* topic, byte* data, unsigned int data_le);
void publishState();
void reconnect();
void callback(char*, byte*, unsigned int);
void createCommand(uint8_t, uint8_t);
void sendCommand();
void processFlexitData();
void updateFlexitData();
void updateMQTTServer(bool);
void oneLoop();
void fiveLoop();
void tenLoop();


void setup() {
  Serial.begin(115200);
  delay(10);
  Serial.println("Booting");
  Serial.println("Ready");

  // initialize the ethernet device
  Ethernet.begin(mac);
  //print out the IP address
  Serial.print("IP = ");
  Serial.println(Ethernet.localIP());
  Serial.print("DNS = ");
  Serial.println(Ethernet.dnsServerIP());
  delay(3000);

  // Init loop timers
  t.every(1000, tenLoop);
  t.every(5000, fiveLoop);
  t.every(10000, tenLoop);

  pinMode(RXen, OUTPUT);
  pinMode(TXen, OUTPUT);
  pinMode(COM_VCC, OUTPUT);
  
  digitalWrite(RXen, LOW);
  digitalWrite(TXen, LOW);
  digitalWrite(COM_VCC, LOW);
 
  initMQTT();
}

void initMQTT() {
  mqttClient.setServer(MQTT_HOST, MQTT_PORT);
  if (!mqttClient.connected()) {
    reconnectMQTT();
  }
}

void reconnectMQTT() {
  // Loop until we're reconnected
  Serial.println("Attempting MQTT connection...");
  // Attempt to connect
  if (mqttClient.connect(MQTT_CLIENT_ID, MQTT_USER, MQTT_PASS)) {
    Serial.println("connected");
    mqttClient.setCallback(mqttDataCb);
    mqttClient.subscribe(MQTT_COMMAND_CHANNEL);
  } else {
    Serial.print("failed, rc=");
    Serial.println(mqttClient.state());
  }
}


void loop() {

  mqttClient.loop();
  t.update();
}

void tenLoop() {
  publishState();
}

void fiveLoop() {
  if (!mqttClient.connected()) {
    reconnectMQTT();
  }
}

void oneLoop(){
  updateFlexitData();
}

void mqttDataCb(char* topic, byte* data, unsigned int data_len) {
  char svalue[MESSZ];
  char topicBuf[TOPSZ];
  char dataBuf[data_len+1];

  strncpy(topicBuf, topic, sizeof(topicBuf));
  memcpy(dataBuf, data, sizeof(dataBuf));
  dataBuf[sizeof(dataBuf)-1] = 0;

  snprintf_P(svalue, sizeof(svalue), PSTR("RSLT: Receive topic %s, data size %d, data %s"), topicBuf, data_len, dataBuf);
  Serial.println(svalue);

  // Extract command
  memmove(topicBuf, topicBuf+sizeof(MQTT_COMMAND_CHANNEL)-2, sizeof(topicBuf)-sizeof(MQTT_COMMAND_CHANNEL));

  int16_t payload = atoi(dataBuf);     // -32766 - 32767
  Serial.print("Received payload: ");
  Serial.println(payload);
  if (!strcmp(topicBuf, "heater_enable")) {
    if (payload==0)
      createCommand(15,0);
    else if (payload==1)
      createCommand(15,128);
    else
      Serial.print("heater_enable payload unknown");
  } else if (!strcmp(topicBuf, "fan_level")) {
    if (payload>=0 && (char)payload<=3)
      createCommand(11,payload);
    else
      Serial.print("fan_level payload unknown");
  } else if (!strcmp(topicBuf, "temperature")) {
    if (payload>=15 && (char)payload<=25)
        createCommand(15,payload);
    else
      Serial.print("temperature payload unknown");
  } else {
      Serial.write("unknown topic");
  }
}

void publishState() {
  char message[10];

  if (fanLevel != sentFanLevel) 
  {
    itoa(*fanLevel, message, 10);
    if(mqttClient.publish(MQTT_STATUS_FAN, message))
      sentFanLevel = fanLevel;
  }
  if (heatExchTemp != sentHeatExchTemp)
    {
      itoa(*heatExchTemp, message, 10);
      if(mqttClient.publish(MQTT_STATUS_TEMP, message))
        sentHeatExchTemp = heatExchTemp;
    }
  if (preheatEnable != sentPreheatEnable) {
    itoa(*preheatEnable, message, 10);
    if(mqttClient.publish(MQTT_STATUS_HEATER_ENABLE, message))
      sentPreheatEnable = preheatEnable;
  }
  if (preheatState != sentPreheatState) {
    itoa(*preheatState, message, 10);
    if(mqttClient.publish(MQTT_STATUS_HEATER_STATE, message))
      sentPreheatState = preheatState;
  }
}
          
/*/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
* CS50 control board repeats 16 lines of data over and over. For this purpose only line number 15 is needed. 
* Look for a specific combination of bytes to identify the correct line.
* When combination is matched, read the line into buffer rawData[]
*//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void updateFlexitData() {
  digitalWrite(COM_VCC, HIGH);      // activate MAX485 
  digitalWrite(RXen, LOW);      // RX enable
  uint8_t Buffer[1000];
  
  for (int i=0; i<1000; ++i) {
    while (!Serial1.available()); 
    Buffer[i] = Serial1.read();
    if (Buffer[i]==22 && Buffer[i-2]==193 && Buffer[i-8]==195) {
      for (int i=0; i<25; ++i) {
        while (!Serial1.available()); 
        rawData[i] = Serial1.read();
      }
    break;
    }
    if (i == 999) Serial.println("StBuffer not updated"); 
  }

  digitalWrite(RXen, HIGH);     // RX disable
  digitalWrite(COM_VCC, LOW);     // deactivate MAX485      
  while (Serial1.available()) Serial1.read();     // empty serial RX buffer
}  

/*/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////  
* - Fan level has three steps. They are received as 17, 34 or 51. Divide by 17 to get 1, 2 or 3.
* - Preheat state is received as 0 (off) or 128 (on), and is translated to 0/1
* - Heat exchanger temperature is unchanged (15 - 25 degrees) 
*//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void processFlexitData(){
  if (*rawFanLevel == 17 ||*rawFanLevel == 34 ||*rawFanLevel == 51) {
    *fanLevel = *rawFanLevel / 17;
    commandBuffer[11] = *rawFanLevel;      
  }
      
  if (*rawPreheatOnOff == 128) {
    *preheatEnable = 1;
    commandBuffer[12] = 128;  
  }
  else if (*rawPreheatOnOff == 0) {
    *preheatEnable = commandBuffer[12] = *preheatState = 0; 
  }
      
  if (*rawHeatExchTemp >= 15 && *rawHeatExchTemp <= 25) {
    *heatExchTemp = commandBuffer[15] = *rawHeatExchTemp;  
  }

  if (*rawPreheatActive1 > 10 && *preheatState == 0 && *preheatEnable == 1) { 
    *preheatState = 1;
  }
      
  if (*rawPreheatActive2 < 100 && *preheatState == 1 && *preheatEnable == 1) {
    *preheatState = 0;
  }
}

/*//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
* createCommand() takes two arguments, command and level
* cmd: which value in commandbuffer to change
*     fan level           - cmd=11
*     preheat on/off      - cmd=12
*     heat exchanger temp - cmd=15
* 
* lvl: which value to send    
*     fanlevel    1 > 2   - lvl=18    (17 +1)
*                   > 3   - lvl=35    (34 +1)
'                 3 > 2   - lvl=50    (51 -1)
'                   > 1   - lvl=33    (34 -1)

*     preheat         on  - lvl=128   
*                     off - lvl=0
*                     
*     heat exchanger temp - lvl=15-25
'
* Calculate and append check sum
*
' Important! execute updateFlexitData() and processFlexitData() before each command.   
*//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void createCommand(uint8_t cmd, uint8_t lvl) {                                    
  commandBuffer[cmd] = lvl;
  int sum1=0, sum2=0;
  for ( int i=5; i<(sizeof(commandBuffer)-2) ; ++i) {
    sum1 = sum1 + commandBuffer[i];
    sum2 = (sum2 + sum1);
  }
  commandBuffer[sizeof(commandBuffer)-2] = sum1%256;  
  commandBuffer[sizeof(commandBuffer)-1] = sum2%256;

  sendCommand();
}

/*/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
* Transmission of the commands must be timed so they dont collide with data from the CS60 control board. 
* Find the length of the incomming line (value number 8 in each line), count the bytes until end of line is reached, jump in and send command   
*/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////  
void sendCommand () {  
  digitalWrite(COM_VCC, HIGH);      // activate MAX485
  digitalWrite(RXen, LOW);      // RX enable
  uint8_t data;  
  int Length, repeats=0;
  do {
    while (!Serial1.available());
    data=Serial1.read();
    if (data == 195){
      while (!Serial1.available());
      data=Serial1.read();
      if (data == 1){
        for (int i=0; i<6; ++i) {
          while (!Serial1.available());
          Serial1.read();
        }
    
        while (!Serial1.available());    
        Length = Serial1.read()+2;
        if (3 < Length <33) {
          for (int i=0; i<Length; ++i) {
            while (!Serial1.available());
            Serial1.read();
          }
          digitalWrite(TXen, LOW);      // TX enable
          delay(10);
          Serial1.write(commandBuffer, 18);     // transmit command
          Serial1.flush();
          digitalWrite(TXen, HIGH);     // TX disable
          Serial.println("command sent");
          ++repeats;
        }
      }
    }   
  } while (repeats<5);
  digitalWrite(RXen, HIGH);     // RX disable
  digitalWrite(COM_VCC, LOW);     // deactivate MAX485  
}

Try something simpler. In particular, I'm a bit suspicious of those timer functions with potential to send data before the connection is established.

I've not had much luck with PubSubClient and using IP address for the MQTT Host. The MQTT Broker's server name works well.

used a simpler example with PubSubClient , works with both IP and hostname for the broker.

That helps. Can you splice in your other code bit by bit until you break it?

working on it.

I auto make a client ID based upon the last 5 characters of the MAC address. Makes it unique and I can copy code from one project to the next.

Attempting MQTT connection...failed, rc=-2 try again in 5 seconds · Issue #185 · knolleary/pubsubclient · GitHub

Comment from the author of PubSubCLient: "-2 means it's failing to create a network connection to the broker."

-2 common issue when more than one client logging in with the same client ID.

My MQTT connection function

void connectToMQTT()
{
  byte mac[5]; // create client ID from mac address
  WiFi.macAddress(mac); // get mac address
  String clientID = String(mac[0]) + String(mac[4]) ; // use mac address to create clientID
  while ( !MQTTclient.connected() )
  {
    MQTTclient.connect( clientID.c_str(), mqtt_username, mqtt_password );
    vTaskDelay( 250 );
  }
  MQTTclient.setCallback( mqttCallback );
  MQTTclient.subscribe  ( topicOK );
  MQTTclient.subscribe  ( topicRemainingMoisture_0 );
  MQTTclient.subscribe  ( topicWindSpeed );
  MQTTclient.subscribe  ( topicWindDirection );
  MQTTclient.subscribe  ( topicRainfall );
  MQTTclient.subscribe  ( topicWSVolts );
  MQTTclient.subscribe  ( topicWSCurrent );
  MQTTclient.subscribe  ( topicWSPower );
  MQTTclient.subscribe  ( topicDPnWI );
}

My MQTT keep alive task

void MQTTkeepalive( void *pvParameters )
{
  sema_MQTT_KeepAlive   = xSemaphoreCreateBinary();
  xSemaphoreGive( sema_MQTT_KeepAlive ); // found keep alive can mess with a publish, stop keep alive during publish
  MQTTclient.setKeepAlive( 90 ); // setting keep alive to 90 seconds makes for a very reliable connection, must be set before the 1st connection is made.
  TickType_t xLastWakeTime = xTaskGetTickCount();
  const TickType_t xFrequency = 250; //delay for ms
  for (;;)
  {
    //check for a is-connected and if the WiFi 'thinks' its connected, found checking on both is more realible than just a single check
    if ( (wifiClient.connected()) && (WiFi.status() == WL_CONNECTED) )
    {
      xSemaphoreTake( sema_MQTT_KeepAlive, portMAX_DELAY ); // whiles MQTTlient.loop() is running no other mqtt operations should be in process
      MQTTclient.loop();
      xSemaphoreGive( sema_MQTT_KeepAlive );
    }
    else {
      log_i( "MQTT keep alive found MQTT status % s WiFi status % s", String(wifiClient.connected()), String(WiFi.status()) );
      if ( !(wifiClient.connected()) || !(WiFi.status() == WL_CONNECTED) )
      {
        connectToWiFi();
      }
      connectToMQTT();
    }
    //log_i( " high watermark % d",  uxTaskGetStackHighWaterMark( NULL ) );
    xLastWakeTime = xTaskGetTickCount();
    vTaskDelayUntil( &xLastWakeTime, xFrequency );
  }
  vTaskDelete ( NULL );
}
////

hi, thanks!

got the working now as long as i don't set this:

   pinMode(RXen, OUTPUT);

most likely because pin4 is used by the ethernet shield for the SD-card data

so changed it to:

#define RXen 5
#define TXen 6
#define COM_VCC 7

next step, connect rest of the hardware

/*
* Idea and code from https://github.com/Vongraven/Flexit-SL4R-master and https://github.com/tomas04/Flexit-SL4R-master 
* Sending data over MQTT using AtMega2560, Ethernet shield2, RS485 module. 
*/

#include <Arduino.h>
#include <SPI.h>
#include <Ethernet.h>
#include <PubSubClient.h>
#include "Timer.h"


#define RXen    5
#define TXen    6
#define COM_VCC 7


#define MQTT_HOST  "192.168.1.120"
#define MQTT_PORT  1883
#define MQTT_USER "user1"
#define MQTT_PASS "test"
#define MQTT_CLIENT_ID "FlexitPanel"

#define TOPSZ 60  // Max number of characters in topic string
#define MESSZ 240 // Max number of characters in JSON message string


byte mac[] = { 0xDE, 0xAD, 0xBE, 0xEF, 0xFE, 0xED };


//MQTT command:
#define MQTT_COMMAND_CHANNEL "command/FlexitPanel/#"
//MQTT status: 
//heater element enabled or disabled
#define MQTT_STATUS_HEATER_ENABLE "state/FlexitPanel/heater_enable"
//fan level, 1 (low), 2 (medium), 3 (high)
#define MQTT_STATUS_FAN "state/FlexitPanel/fan_level"
//temperature
#define MQTT_STATUS_TEMP "state/FlexitPanel/temperature"
//heater state, (is the heating element active)
#define MQTT_STATUS_HEATER_STATE "state/FlexitPanel/heater_state"

uint8_t commandBuffer[18] = {195, 4, 0, 199, 81, 193, 4, 8, 32, 15, 0, 'F', 'P', 4, 0, 'T' };
uint8_t processedData [4]   = {};
uint8_t latestSentData [4]   = {};
uint8_t rawData [25]   = {};

uint8_t* rawFanLevel       = &rawData[5];
uint8_t* rawPreheatOnOff   = &rawData[6];
uint8_t* rawHeatExchTemp   = &rawData[9];
uint8_t* rawPreheatActive1 = &rawData[10];
uint8_t* rawPreheatActive2 = &rawData[11];

uint8_t* fanLevel          = &processedData[0];
uint8_t* heatExchTemp      = &processedData[1];
uint8_t* preheatEnable      = &processedData[2];
uint8_t* preheatState     = &processedData[3];

uint8_t* sentFanLevel      = &latestSentData[0];
uint8_t* sentHeatExchTemp  = &latestSentData[1];
uint8_t* sentPreheatEnable  = &latestSentData[2];
uint8_t* sentPreheatState = &latestSentData[3];

//Serial Serial1(2, 3);

EthernetClient ethClient;
PubSubClient mqttClient(ethClient);   // MQTT Client
Timer t;


void initMQTT();
void reconnectMQTT();
void mqttDataCb(char* topic, byte* data, unsigned int data_le);
void publishState();
void reconnect();
void callback(char*, byte*, unsigned int);
void createCommand(uint8_t, uint8_t);
void sendCommand();
void processFlexitData();
void updateFlexitData();
void updateMQTTServer(bool);
void oneLoop();
void fiveLoop();
void tenLoop();


void setup() {
  Serial1.begin(19200, SERIAL_8N1); 

  pinMode(RXen, OUTPUT);
  pinMode(TXen, OUTPUT);
  pinMode(COM_VCC, OUTPUT);

  digitalWrite(RXen, LOW);
  digitalWrite(TXen, LOW);
  digitalWrite(COM_VCC, LOW);
  
  Serial.begin(115200);
   
  delay(100);
  Serial.println("Booting");
  Serial.println("Ready");

  // initialize the ethernet device
  Ethernet.begin(mac);
  //print out the IP address
  Serial.print("IP = ");
  Serial.println(Ethernet.localIP());
  Serial.print("DNS = ");
  Serial.println(Ethernet.dnsServerIP());
  delay(3000);
  
  mqttClient.setServer(MQTT_HOST, MQTT_PORT);
  mqttClient.setCallback(mqttDataCb);


  // Init loop timers
  t.every(1000, tenLoop);
  t.every(5000, fiveLoop);
  t.every(10000, tenLoop);


  initMQTT();
}

void initMQTT() {
  mqttClient.setServer(MQTT_HOST, MQTT_PORT);
  if (!mqttClient.connected()) {
    reconnectMQTT();
  }
}

   
void reconnectMQTT() {
  // Loop until we're reconnected
  Serial.println("Attempting MQTT connection...");
  // Attempt to connect
  if (mqttClient.connect(MQTT_CLIENT_ID)) {
    Serial.println("connected");
    mqttClient.setCallback(mqttDataCb);
    mqttClient.subscribe(MQTT_COMMAND_CHANNEL);
  } else {
    Serial.print("failed, rc=");
    Serial.println(mqttClient.state());
  }
}
 

void loop() {

  mqttClient.loop();
  t.update();
}

void tenLoop() {
  publishState();
}

void fiveLoop() {
  if (!mqttClient.connected()) {
    reconnectMQTT();
  }
}

void oneLoop(){
  updateFlexitData();
}

void mqttDataCb(char* topic, byte* data, unsigned int data_len) {
  char svalue[MESSZ];
  char topicBuf[TOPSZ];
  char dataBuf[data_len+1];

  strncpy(topicBuf, topic, sizeof(topicBuf));
  memcpy(dataBuf, data, sizeof(dataBuf));
  dataBuf[sizeof(dataBuf)-1] = 0;

  snprintf_P(svalue, sizeof(svalue), PSTR("RSLT: Receive topic %s, data size %d, data %s"), topicBuf, data_len, dataBuf);
  Serial.println(svalue);

  // Extract command
  memmove(topicBuf, topicBuf+sizeof(MQTT_COMMAND_CHANNEL)-2, sizeof(topicBuf)-sizeof(MQTT_COMMAND_CHANNEL));

  int16_t payload = atoi(dataBuf);     // -32766 - 32767
  Serial.print("Received payload: ");
  Serial.println(payload);
  if (!strcmp(topicBuf, "heater_enable")) {
    if (payload==0)
      createCommand(15,0);
    else if (payload==1)
      createCommand(15,128);
    else
      Serial.print("heater_enable payload unknown");
  } else if (!strcmp(topicBuf, "fan_level")) {
    if (payload>=0 && (char)payload<=3)
      createCommand(11,payload);
    else
      Serial.print("fan_level payload unknown");
  } else if (!strcmp(topicBuf, "temperature")) {
    if (payload>=15 && (char)payload<=25)
        createCommand(15,payload);
    else
      Serial.print("temperature payload unknown");
  } else {
      Serial.write("unknown topic");
  }
}

void publishState() {
  char message[10];

  if (fanLevel != sentFanLevel) 
  {
    itoa(*fanLevel, message, 10);
    if(mqttClient.publish(MQTT_STATUS_FAN, message))
      sentFanLevel = fanLevel;
  }
  if (heatExchTemp != sentHeatExchTemp)
    {
      itoa(*heatExchTemp, message, 10);
      if(mqttClient.publish(MQTT_STATUS_TEMP, message))
        sentHeatExchTemp = heatExchTemp;
    }
  if (preheatEnable != sentPreheatEnable) {
    itoa(*preheatEnable, message, 10);
    if(mqttClient.publish(MQTT_STATUS_HEATER_ENABLE, message))
      sentPreheatEnable = preheatEnable;
  }
  if (preheatState != sentPreheatState) {
    itoa(*preheatState, message, 10);
    if(mqttClient.publish(MQTT_STATUS_HEATER_STATE, message))
      sentPreheatState = preheatState;
  }
}
          
/*/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
* CS50 control board repeats 16 lines of data over and over. For this purpose only line number 15 is needed. 
* Look for a specific combination of bytes to identify the correct line.
* When combination is matched, read the line into buffer rawData[]
*//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void updateFlexitData() {
  digitalWrite(COM_VCC, HIGH);      // activate MAX485 
  digitalWrite(RXen, LOW);      // RX enable
  uint8_t Buffer[1000];
  
  for (int i=0; i<1000; ++i) {
    while (!Serial1.available()); 
    Buffer[i] = Serial1.read();
    if (Buffer[i]==22 && Buffer[i-2]==193 && Buffer[i-8]==195) {
      for (int i=0; i<25; ++i) {
        while (!Serial1.available()); 
        rawData[i] = Serial1.read();
      }
    break;
    }
    if (i == 999) Serial.println("StBuffer not updated"); 
  }

  digitalWrite(RXen, HIGH);     // RX disable
  digitalWrite(COM_VCC, LOW);     // deactivate MAX485      
  while (Serial1.available()) Serial1.read();     // empty serial RX buffer
}  

/*/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////  
* - Fan level has three steps. They are received as 17, 34 or 51. Divide by 17 to get 1, 2 or 3.
* - Preheat state is received as 0 (off) or 128 (on), and is translated to 0/1
* - Heat exchanger temperature is unchanged (15 - 25 degrees) 
*//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void processFlexitData(){
  if (*rawFanLevel == 17 ||*rawFanLevel == 34 ||*rawFanLevel == 51) {
    *fanLevel = *rawFanLevel / 17;
    commandBuffer[11] = *rawFanLevel;      
  }
      
  if (*rawPreheatOnOff == 128) {
    *preheatEnable = 1;
    commandBuffer[12] = 128;  
  }
  else if (*rawPreheatOnOff == 0) {
    *preheatEnable = commandBuffer[12] = *preheatState = 0; 
  }
      
  if (*rawHeatExchTemp >= 15 && *rawHeatExchTemp <= 25) {
    *heatExchTemp = commandBuffer[15] = *rawHeatExchTemp;  
  }

  if (*rawPreheatActive1 > 10 && *preheatState == 0 && *preheatEnable == 1) { 
    *preheatState = 1;
  }
      
  if (*rawPreheatActive2 < 100 && *preheatState == 1 && *preheatEnable == 1) {
    *preheatState = 0;
  }
}

/*//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
* createCommand() takes two arguments, command and level
* cmd: which value in commandbuffer to change
*     fan level           - cmd=11
*     preheat on/off      - cmd=12
*     heat exchanger temp - cmd=15
* 
* lvl: which value to send    
*     fanlevel    1 > 2   - lvl=18    (17 +1)
*                   > 3   - lvl=35    (34 +1)
'                 3 > 2   - lvl=50    (51 -1)
'                   > 1   - lvl=33    (34 -1)

*     preheat         on  - lvl=128   
*                     off - lvl=0
*                     
*     heat exchanger temp - lvl=15-25
'
* Calculate and append check sum
*
' Important! execute updateFlexitData() and processFlexitData() before each command.   
*//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void createCommand(uint8_t cmd, uint8_t lvl) {                                    
  commandBuffer[cmd] = lvl;
  int sum1=0, sum2=0;
  for ( int i=5; i<(sizeof(commandBuffer)-2) ; ++i) {
    sum1 = sum1 + commandBuffer[i];
    sum2 = (sum2 + sum1);
  }
  commandBuffer[sizeof(commandBuffer)-2] = sum1%256;  
  commandBuffer[sizeof(commandBuffer)-1] = sum2%256;

  sendCommand();
}

/*/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
* Transmission of the commands must be timed so they dont collide with data from the CS60 control board. 
* Find the length of the incomming line (value number 8 in each line), count the bytes until end of line is reached, jump in and send command   
*/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////  
void sendCommand () {  
  digitalWrite(COM_VCC, HIGH);      // activate MAX485
  digitalWrite(RXen, LOW);      // RX enable
  uint8_t data;  
  int Length, repeats=0;
  do {
    while (!Serial1.available());
    data=Serial1.read();
    if (data == 195){
      while (!Serial1.available());
      data=Serial1.read();
      if (data == 1){
        for (int i=0; i<6; ++i) {
          while (!Serial1.available());
          Serial1.read();
        }
    
        while (!Serial1.available());    
        Length = Serial1.read()+2;
        if (3 < Length <33) {
          for (int i=0; i<Length; ++i) {
            while (!Serial1.available());
            Serial1.read();
          }
          digitalWrite(TXen, LOW);      // TX enable
          delay(10);
          Serial1.write(commandBuffer, 18);     // transmit command
          Serial1.flush();
          digitalWrite(TXen, HIGH);     // TX disable
          Serial.println("command sent");
          ++repeats;
        }
      }
    }   
  } while (repeats<5);
  digitalWrite(RXen, HIGH);     // RX disable
  digitalWrite(COM_VCC, LOW);     // deactivate MAX485  
}

Hi

anyone here that can give some pointers on how to get the mqtt message in the correct format and with the correct data, looking at the code? (full code in previous post.)

#define TOPSZ 60  // Max number of characters in topic string
#define MESSZ 240 // Max number of characters in JSON message string
void mqttDataCb(char* topic, byte* data, unsigned int data_len) {
  char svalue[MESSZ];
  char topicBuf[TOPSZ];
  char dataBuf[data_len+1];

  strncpy(topicBuf, topic, sizeof(topicBuf));
  memcpy(dataBuf, data, sizeof(dataBuf));
  dataBuf[sizeof(dataBuf)-1] = 0;

  snprintf_P(svalue, sizeof(svalue), PSTR("RSLT: Receive topic %s, data size %d, data %s"), topicBuf, data_len, dataBuf);
  Serial.println(svalue);



  // Extract command
  memmove(topicBuf, topicBuf+sizeof(MQTT_COMMAND_CHANNEL)-2, sizeof(topicBuf)-sizeof(MQTT_COMMAND_CHANNEL));

Here I take multiple floats and then send via MQTT.

void SolarTimeFormat( double h, int i  )
{
  int hours   = 0;
  int minutes = 0;
  if ( h != 0.0f )
  {
    int m = int(round(h * 60));
    hours = (m / 60) % 24;
    minutes = m % 60;
  }
  switch ( i )
  {
    case 0:
      x_eData.SunRiseHr = hours;
      x_eData.SunRiseMin = minutes;
      break;
    case 1:
      x_eData.SunSetHr = hours;
      x_eData.SunSetMin = minutes;
      break;
    case 2:
      x_eData.DawnHr = hours;
      x_eData.DawnMin = minutes;
      break;
    case 3:
      x_eData.DuskHr = hours;
      x_eData.DawnMin = minutes;
      break;
    case 4:
      x_eData.TransitHr = hours;
      x_eData.TransitMin = minutes;
      break;
    case 5:
      String sTopic = "";
      sTopic.reserve( 35 );
      sTopic.concat( String(x_eData.SunRiseHr) + "," );
      sTopic.concat( String(x_eData.SunRiseMin) + "," );
      sTopic.concat( String(x_eData.SunSetHr) + "," );
      sTopic.concat( String(x_eData.SunSetMin) + "," );
      sTopic.concat( String(x_eData.DawnHr) + "," );
      sTopic.concat( String(x_eData.DawnMin) + "," );
      sTopic.concat( String(x_eData.TransitHr) + "," );
      sTopic.concat( String(x_eData.TransitMin) );
      xSemaphoreTake( sema_MQTT_KeepAlive, portMAX_DELAY );
      MQTTclient.publish( topicSRSSDDT, sTopic.c_str() );
      xSemaphoreGive( sema_MQTT_KeepAlive );
      sTopic = "";
      sTopic.concat( String(x_eData.azimuth) + "," + String(x_eData.azimuth) );
      xSemaphoreTake( sema_MQTT_KeepAlive, portMAX_DELAY );
      MQTTclient.publish( topicAzEle, sTopic.c_str() );
      xSemaphoreGive( sema_MQTT_KeepAlive );
      sTopic = "";
      break;
  } // switch ( i ) {
} // void SolarTimeFormat( double h, int i  )

My MQTT callback function.

void IRAM_ATTR mqttCallback(char* topic, byte * payload, unsigned int length)
{
  memset( x_message.payload, '\0', payloadSize ); // clear payload char buffer
  x_message.topic = ""; //clear topic string buffer
  x_message.topic = topic; //store new topic
  int i = 0; // extract payload
  for ( i; i < length; i++)
  {
    x_message.payload[i] = ((char)payload[i]);
  }
  x_message.payload[i] = '\0';
  xQueueOverwrite( xQ_Message, (void *) &x_message );// send data to queue
} // void mqttCallback(char* topic, byte* payload, unsigned int length)

thank you, waiting for some more parts before i can test if it's working.

1 Like

Hi

i have connected everything, but i have not managed to get it to work for yet

when the correct topic is received, the program seems to stop at this point
could be that the rest of the mqtt message that i send is not correct?

and then this led turn's on on the 485 board

i suspect something in the code, but i have not been able to figure out what it is yet

Have you used a MQTT viewer to see if the data has made it to the Broker?

Yes, your secret code could be the issue but as long as the code remains secret or it's feed out in snippets, troubleshooting the code will be near impossible. Good luck allowing others help with troubleshooting your secret code and snippet's.

yes, it's received by the broker and it's received by the Arduino, code running on the Arduino gives another error if the topic is wrong, so mqtt seems to be ok.

and when the topic is correct the 485 board is activated (led lit), but nothing else happens, and the code seems to stop, Arduino is still replying on ICMP, but does not process new messages from mqtt.

complete code is already posted in this thread. jan4.

the codes is from:

and

/*
* Idea and code from https://github.com/Vongraven/Flexit-SL4R-master and https://github.com/tomas04/Flexit-SL4R-master 
* Sending data over MQTT using AtMega2560, Ethernet shield2, RS485 module. 
*/

#include <Arduino.h>
#include <SPI.h>
#include <Ethernet.h>
#include <PubSubClient.h>
#include "Timer.h"


#define RXen    5
#define TXen    6
#define COM_VCC 7


#define MQTT_HOST  "192.168.1.120"
#define MQTT_PORT  1883
#define MQTT_USER "user1"
#define MQTT_PASS "test"
#define MQTT_CLIENT_ID "FlexitPanel"

#define TOPSZ 60  // Max number of characters in topic string
#define MESSZ 240 // Max number of characters in JSON message string


byte mac[] = { 0xDE, 0xAD, 0xBE, 0xEF, 0xFE, 0xED };


//MQTT command:
#define MQTT_COMMAND_CHANNEL "command/FlexitPanel/#"
//MQTT status: 
//heater element enabled or disabled
#define MQTT_STATUS_HEATER_ENABLE "state/FlexitPanel/heater_enable"
//fan level, 1 (low), 2 (medium), 3 (high)
#define MQTT_STATUS_FAN "state/FlexitPanel/fan_level"
//temperature
#define MQTT_STATUS_TEMP "state/FlexitPanel/temperature"
//heater state, (is the heating element active)
#define MQTT_STATUS_HEATER_STATE "state/FlexitPanel/heater_state"

uint8_t commandBuffer[18] = {195, 4, 0, 199, 81, 193, 4, 8, 32, 15, 0, 'F', 'P', 4, 0, 'T' };
uint8_t processedData [4]   = {};
uint8_t latestSentData [4]   = {};
uint8_t rawData [25]   = {};

uint8_t* rawFanLevel       = &rawData[5];
uint8_t* rawPreheatOnOff   = &rawData[6];
uint8_t* rawHeatExchTemp   = &rawData[9];
uint8_t* rawPreheatActive1 = &rawData[10];
uint8_t* rawPreheatActive2 = &rawData[11];

uint8_t* fanLevel          = &processedData[0];
uint8_t* heatExchTemp      = &processedData[1];
uint8_t* preheatEnable      = &processedData[2];
uint8_t* preheatState     = &processedData[3];

uint8_t* sentFanLevel      = &latestSentData[0];
uint8_t* sentHeatExchTemp  = &latestSentData[1];
uint8_t* sentPreheatEnable  = &latestSentData[2];
uint8_t* sentPreheatState = &latestSentData[3];



EthernetClient ethClient;
PubSubClient mqttClient(ethClient);   // MQTT Client
Timer t;


void initMQTT();
void reconnectMQTT();
void mqttDataCb(char* topic, byte* data, unsigned int data_le);
void publishState();
void reconnect();
void callback(char*, byte*, unsigned int);
void createCommand(uint8_t, uint8_t);
void sendCommand();
void processFlexitData();
void updateFlexitData();
void updateMQTTServer(bool);
void oneLoop();
void fiveLoop();
void tenLoop();


void setup() {
  Serial1.begin(19200, SERIAL_8N1); 

  pinMode(RXen, OUTPUT);
  pinMode(TXen, OUTPUT);
  pinMode(COM_VCC, OUTPUT);

  digitalWrite(RXen, LOW);
  digitalWrite(TXen, LOW);
  digitalWrite(COM_VCC, LOW);
  
  Serial.begin(115200);
   
  delay(100);
  Serial.println("Booting");
  Serial.println("Ready");

  // initialize the ethernet device
  Ethernet.begin(mac);
  //print out the IP address
  Serial.print("IP = ");
  Serial.println(Ethernet.localIP());
  Serial.print("DNS = ");
  Serial.println(Ethernet.dnsServerIP());
  delay(3000);
  
  mqttClient.setServer(MQTT_HOST, MQTT_PORT);
  mqttClient.setCallback(mqttDataCb);


  // Init loop timers
  t.every(1000, tenLoop);
  t.every(5000, fiveLoop);
  t.every(10000, tenLoop);


  initMQTT();
}

void initMQTT() {
  mqttClient.setServer(MQTT_HOST, MQTT_PORT);
  if (!mqttClient.connected()) {
    reconnectMQTT();
  }
}

   
void reconnectMQTT() {
  // Loop until we're reconnected
  Serial.println("Attempting MQTT connection...");
  // Attempt to connect
  if (mqttClient.connect(MQTT_CLIENT_ID)) {
    Serial.println("connected");
    mqttClient.setCallback(mqttDataCb);
    mqttClient.subscribe(MQTT_COMMAND_CHANNEL);
  } else {
    Serial.print("failed, rc=");
    Serial.println(mqttClient.state());
  }
}
 

void loop() {

  mqttClient.loop();
  t.update();
}

void tenLoop() {
  publishState();
}

void fiveLoop() {
  if (!mqttClient.connected()) {
    reconnectMQTT();
  }
}

void oneLoop(){
  updateFlexitData();
}

void mqttDataCb(char* topic, byte* data, unsigned int data_len) {
  char svalue[MESSZ];
  char topicBuf[TOPSZ];
  char dataBuf[data_len+1];

  strncpy(topicBuf, topic, sizeof(topicBuf));
  memcpy(dataBuf, data, sizeof(dataBuf));
  dataBuf[sizeof(dataBuf)-1] = 0;

  snprintf_P(svalue, sizeof(svalue), PSTR("RSLT: Receive topic %s, data size %d, data %s"), topicBuf, data_len, dataBuf);
  Serial.println(svalue);

  // Extract command
  memmove(topicBuf, topicBuf+sizeof(MQTT_COMMAND_CHANNEL)-2, sizeof(topicBuf)-sizeof(MQTT_COMMAND_CHANNEL));

  int16_t payload = atoi(dataBuf);     // -32766 - 32767
  Serial.print("Received payload: ");
  Serial.println(payload);
  if (!strcmp(topicBuf, "heater_enable")) {
    if (payload==0)
      createCommand(15,0);
    else if (payload==1)
      createCommand(15,128);
    else
      Serial.print("heater_enable payload unknown");
  } else if (!strcmp(topicBuf, "fan_level")) {
    if (payload>=0 && (char)payload<=3)
      createCommand(11,payload);
    else
      Serial.print("fan_level payload unknown");
  } else if (!strcmp(topicBuf, "temperature")) {
    if (payload>=15 && (char)payload<=25)
        createCommand(15,payload);
    else
      Serial.print("temperature payload unknown");
  } else {
      Serial.write("unknown topic");
  }
}

void publishState() {
  char message[10];

  if (fanLevel != sentFanLevel) 
  {
    itoa(*fanLevel, message, 10);
    if(mqttClient.publish(MQTT_STATUS_FAN, message))
      sentFanLevel = fanLevel;
  }
  if (heatExchTemp != sentHeatExchTemp)
    {
      itoa(*heatExchTemp, message, 10);
      if(mqttClient.publish(MQTT_STATUS_TEMP, message))
        sentHeatExchTemp = heatExchTemp;
    }
  if (preheatEnable != sentPreheatEnable) {
    itoa(*preheatEnable, message, 10);
    if(mqttClient.publish(MQTT_STATUS_HEATER_ENABLE, message))
      sentPreheatEnable = preheatEnable;
  }
  if (preheatState != sentPreheatState) {
    itoa(*preheatState, message, 10);
    if(mqttClient.publish(MQTT_STATUS_HEATER_STATE, message))
      sentPreheatState = preheatState;
  }
}
          
/*/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
* CS50 control board repeats 16 lines of data over and over. For this purpose only line number 15 is needed. 
* Look for a specific combination of bytes to identify the correct line.
* When combination is matched, read the line into buffer rawData[]
*//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void updateFlexitData() {
  digitalWrite(COM_VCC, HIGH);      // activate MAX485 
  digitalWrite(RXen, LOW);      // RX enable
  uint8_t Buffer[1000];
  
  for (int i=0; i<1000; ++i) {
    while (!Serial1.available()); 
    Buffer[i] = Serial1.read();
    if (Buffer[i]==22 && Buffer[i-2]==193 && Buffer[i-8]==195) {
      for (int i=0; i<25; ++i) {
        while (!Serial1.available()); 
        rawData[i] = Serial1.read();
      }
    break;
    }
    if (i == 999) Serial.println("StBuffer not updated"); 
  }

  digitalWrite(RXen, HIGH);     // RX disable
  digitalWrite(COM_VCC, LOW);     // deactivate MAX485      
  while (Serial1.available()) Serial1.read();     // empty serial RX buffer
}  

/*/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////  
* - Fan level has three steps. They are received as 17, 34 or 51. Divide by 17 to get 1, 2 or 3.
* - Preheat state is received as 0 (off) or 128 (on), and is translated to 0/1
* - Heat exchanger temperature is unchanged (15 - 25 degrees) 
*//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void processFlexitData(){
  if (*rawFanLevel == 17 ||*rawFanLevel == 34 ||*rawFanLevel == 51) {
    *fanLevel = *rawFanLevel / 17;
    commandBuffer[11] = *rawFanLevel;      
  }
      
  if (*rawPreheatOnOff == 128) {
    *preheatEnable = 1;
    commandBuffer[12] = 128;  
  }
  else if (*rawPreheatOnOff == 0) {
    *preheatEnable = commandBuffer[12] = *preheatState = 0; 
  }
      
  if (*rawHeatExchTemp >= 15 && *rawHeatExchTemp <= 25) {
    *heatExchTemp = commandBuffer[15] = *rawHeatExchTemp;  
  }

  if (*rawPreheatActive1 > 10 && *preheatState == 0 && *preheatEnable == 1) { 
    *preheatState = 1;
  }
      
  if (*rawPreheatActive2 < 100 && *preheatState == 1 && *preheatEnable == 1) {
    *preheatState = 0;
  }
}

/*//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
* createCommand() takes two arguments, command and level
* cmd: which value in commandbuffer to change
*     fan level           - cmd=11
*     preheat on/off      - cmd=12
*     heat exchanger temp - cmd=15
* 
* lvl: which value to send    
*     fanlevel    1 > 2   - lvl=18    (17 +1)
*                   > 3   - lvl=35    (34 +1)
'                 3 > 2   - lvl=50    (51 -1)
'                   > 1   - lvl=33    (34 -1)

*     preheat         on  - lvl=128   
*                     off - lvl=0
*                     
*     heat exchanger temp - lvl=15-25
'
* Calculate and append check sum
*
' Important! execute updateFlexitData() and processFlexitData() before each command.   
*//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void createCommand(uint8_t cmd, uint8_t lvl) {                                    
  commandBuffer[cmd] = lvl;
  int sum1=0, sum2=0;
  for ( int i=5; i<(sizeof(commandBuffer)-2) ; ++i) {
    sum1 = sum1 + commandBuffer[i];
    sum2 = (sum2 + sum1);
  }
  commandBuffer[sizeof(commandBuffer)-2] = sum1%256;  
  commandBuffer[sizeof(commandBuffer)-1] = sum2%256;

  sendCommand();
}

/*/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
* Transmission of the commands must be timed so they dont collide with data from the CS60 control board. 
* Find the length of the incomming line (value number 8 in each line), count the bytes until end of line is reached, jump in and send command   
*/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////  
void sendCommand () {  
  digitalWrite(COM_VCC, HIGH);      // activate MAX485
  digitalWrite(RXen, LOW);      // RX enable
  uint8_t data;  
  int Length, repeats=0;
  do {
    while (!Serial1.available());
    data=Serial1.read();
    if (data == 195){
      while (!Serial1.available());
      data=Serial1.read();
      if (data == 1){
        for (int i=0; i<6; ++i) {
          while (!Serial1.available());
          Serial1.read();
        }
    
        while (!Serial1.available());    
        Length = Serial1.read()+2;
        if (3 < Length <33) {
          for (int i=0; i<Length; ++i) {
            while (!Serial1.available());
            Serial1.read();
          }
          digitalWrite(TXen, LOW);      // TX enable
          delay(10);
          Serial1.write(commandBuffer, 18);     // transmit command
          Serial1.flush();
          digitalWrite(TXen, HIGH);     // TX disable
          Serial.println("command sent");
          ++repeats;
        }
      }
    }   
  } while (repeats<5);
  digitalWrite(RXen, HIGH);     // RX disable
  digitalWrite(COM_VCC, LOW);     // deactivate MAX485  
}

Hello, ESP8266 or ESP32 will be much easier for MQTT related stuffs. You can get some idea from here: ESP32 MQTT - The Engineering Projects

Add some more prints in mqttDataCb so you can see what you got and where execution went.

This topic was automatically closed 180 days after the last reply. New replies are no longer allowed.