Arduino MQTT Client

Hi.

I have a RAK7249 gateway installed and working using the in-build LoRa server. Backhaul from the server is ethernet to my router.

I have a very simple Arduino UNO + Dragino LoRa Shield end-node. Its payload is "Hello, Ryan". It is currently the only node in operation. It too appears to be working perfectly. I can see the packet/payload/message on the LoRa packet logger on the management software of the gateway. I can see uplinks and downlinks.

I am running MQTT.fx on windows 10 Laptop. I am connected to the node via ethernet. I am subscribed to the in-built server. The topic is "application/3/device/+/+". I have replaced specific EUI etc with wildcards. The MQTT.fx shows all the messages eg "join" and "rx".

I have to say its taken me many months to get this far. Don't laugh.

However, the intent of getting this to work is to eventually parse farm management information out of the message/payload and operate something, eg a pump. So to do that, I have built an Arduino MQTT client.

Here is the code -
<
/*

MQTT Basic v1.0

Using -

Arduin0 Mega + Arduino Ethernet2 Shield
MAC address is A8:61:0A:AE:6A:0B
Static IP address 192, 168, 1, 51

RAK7249/4240 in-built LoRa server
IP address 192, 168, 1, 227

This sketch connects to an MQTT server then -

  • subscribes to the topic "application/+/device/+/+"
  • prints out any messages it receives.
  • it assumes the received payloads are strings not binary
  • it will reconnect to the server if the connection is lost using a blocking reconnect function.

*/

#include <SPI.h>
#include <Ethernet.h>
#include <PubSubClient.h>

// Update these with values suitable for your network.
byte mac[] = { 0xA8, 0x61, 0x0A, 0xAE, 0x6A, 0x0B };
IPAddress ip(192, 168, 1, 51); // Static IP address Arduino MQTT client
IPAddress server(192, 168, 1, 227); // Static IP address RAK7249 built-in LoRa server

void callback(char* topic, byte* payload, unsigned int length) {
Serial.print("Message arrived [");
Serial.print(topic);
Serial.print("] ");
for (int i=0;i<length;i++) {
Serial.print((char)payload*);*

  • }*
  • Serial.println();*
    }
    EthernetClient ethClient;
    PubSubClient client(ethClient);
    void reconnect() {
  • // Loop until we're reconnected*
  • while (!client.connected()) {*
  • Serial.print("Attempting MQTT connection...");*
  • // Attempt to connect*
  • if (client.connect("arduinoClient")) {*
  • Serial.println("connected");*
  • // Once connected, publish an announcement...*
  • // client.publish("outTopic","hello world");*
  • // ... and resubscribe*
  • client.subscribe("application/3/device/+/+");*
  • } else {*
  • Serial.print("failed, rc=");*
  • Serial.print(client.state());*
  • Serial.println(" try again in 5 seconds");*
  • // Wait 5 seconds before retrying*
  • delay(5000);*
  • }*
  • }*
    }
    void setup()
    {
  • Serial.begin(57600);*
  • client.setServer(server, 1883);*
  • client.setCallback(callback);*
  • Ethernet.begin(mac, ip);*
  • // Allow the hardware to sort itself out*
  • delay(1500);*
    }
    void loop()
    {
  • if (!client.connected()) {*
  • reconnect();*
  • }*
  • client.loop();*
    }[/color]
    >
    The introductory comments detail hardware used and network details. It also references where I got the code. Please note that I have commented out the "publish" facility in the sketch. I don't require this.
    The Serial Monitor shows -
    Attempting MQTT connection...connected
    Message arrived [application/3/device/00033bbcff12a631/join] {"applicationID":"3","applicationName":"draginonode2","deviceName":"dev-00033bbcff12a631","devEUI":"00033bbcff12a631","devAddr":"021ec28b"}
    However, it shows nothing after that .... forever! The MQTT.fx, on the other hand, shows heaps of messages ending in "rx" and shows the whole payload. I can even get the topic up on my iPad and the continuous messages are there. That suggests to me that the node, gateway and MQTT is working. There is something not right in the Arduino MQTT client.
    Now having said all that, I have tried several hardware combinations ostensibly running the same code and I get exactly the same result on the Arduino client. And to make matters worse, I have had this Arduino MQTT client working in the past. The only thing that has changed, I upgraded the firmware in the RAK4279. However, under that firmware, the MQTT.fx laptop still works perfectly.
    Any help would be gratefully appreciated. Thank you. Regards

Hello, again

I need to start with a little digression. In the early to mid-70s I did a lot of assembler coding working on a development contract for British Telecom. Prior to that I did a lot of Fortran programming. I was trained in a discipline of software development which I have tended to maintain.

It is my view that the problem that I am experiencing with my Arduino MQTT Client has something to do with the callback not working, and I don't know how to fix that.

However, I have looked at the layout of the C code in the Arduino IDE and thought, this layout does not follow my ingrained discipline so I changed the layout of the code as follows -

  1. establishing variables, setting up "includes", defining the IPs etc for the network

  2. void setup()

  3. Void loop()

  4. void reconnect()

  5. void callback()

I verified, recompiled, and executed the code. The outcome was exactly the same as before, no messages other than the "join".

I then read this Arduino MQTT Client - Networking, Protocols, and Devices - Arduino Forum
which says that the callback function must precede the void setup() like this -

void callback(char* topic, byte* message, unsigned int length);
void setup()
{
...
}

I returned the callback function to its rightful place and in terms of messages, nothing changed.

As I understand it, when I shifted the callback function to the bottom of the code, the functionality was recognized. The message that I received was still solely the "join" message.

I moved it back. Still solely the join message.

That suggests to me that my Arduino MQTT client is not seeing/accessing the callback. The callback function is not working.

Any ideas as what to do next, please. Thank you.

Regards John

... was NOT recognised.

Can you run your MQTT server in verbose mode and see what it's saying?

Also, can you get it working with an MQTT clients sending manual messages?

You have serial prints in the MQTT callback, a bad idea.

Please. Please. Repost your code in code tags.

Which topics are you subscribed to? The way you posted the code it's difficult to see what's going on. Please. Please. Repost your code in code tags.

This code, both subscribes and publishes, will contain a lot of hints.

/*
   Project, use solar cells to generate power
   2/2/2020

*/
#include <WiFi.h>
#include <PubSubClient.h>
#include "certs.h"
#include "sdkconfig.h"
#include "esp_system.h"
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "freertos/timers.h"
#include "freertos/event_groups.h"
#include "esp_sleep.h"
//#include <Wire.h>
#include <SPI.h>
#include <Adafruit_Sensor.h>
#include "Adafruit_BME680.h"
////
#define evtDoMQTTwd          ( 1 << 1 )
EventGroupHandle_t eg; // variable for the event group handle
////
WiFiClient wifiClient;
PubSubClient MQTTclient(mqtt_server, mqtt_port, wifiClient);
//////
Adafruit_BME680 bme( GPIO_NUM_5 );
///
/*
   This semaphore is used to stop or prevent a publish from happening during client.loop()
*/
SemaphoreHandle_t sema_MQTT_KeepAlive;
SemaphoreHandle_t sema_mqttOK; // protects the int mqttOK
////
int mqttOK = 0; // stores a count value that is used to cause an esp reset 
////
/*
 * A single subject has been scribed to, the mqtt broker sends out "OK" messages if the client receives an OK message the mqttOK value is set back to zero
 */
void IRAM_ATTR mqttCallback(char* topic, byte * payload, unsigned int length)
{
  xSemaphoreTake( sema_mqttOK, portMAX_DELAY );
  mqttOK = 0;
  xSemaphoreGive( sema_mqttOK );
} // void mqttCallback(char* topic, byte* payload, unsigned int length)
////
// interrupt service routine for WiFi events put into IRAM
void IRAM_ATTR WiFiEvent(WiFiEvent_t event)
{
  switch (event) {
    case SYSTEM_EVENT_STA_CONNECTED:
      break;
    case SYSTEM_EVENT_STA_DISCONNECTED:
      log_i("Disconnected from WiFi access point");
      break;
    case SYSTEM_EVENT_AP_STADISCONNECTED:
      log_i("WiFi client disconnected");
      break;
    default: break;
  }
} // void IRAM_ATTR WiFiEvent(WiFiEvent_t event)
////
////
void setup()
{
  sema_mqttOK = xSemaphoreCreateBinary();
  xSemaphoreGive( sema_mqttOK );
  eg = xEventGroupCreate(); // get an event group handle
  xTaskCreatePinnedToCore( MQTTkeepalive, "MQTTkeepalive", 20000, NULL, 5, NULL, 1 );
  xTaskCreatePinnedToCore( fDoBME, "fDoBME", 20000, NULL, 3, NULL, 1 ); // assigned to core
  xTaskCreatePinnedToCore( fmqttWatchDog, "fmqttWatchDog", 3000, NULL, 3, NULL, 1 ); // assign all to core 1
} //void setup()
////
////
void fmqttWatchDog( void * paramater )
{
  int maxNonMQTTresponse = 5;
  for (;;)
  {
    xEventGroupWaitBits (eg, evtDoMQTTwd, pdTRUE, pdTRUE, portMAX_DELAY );
    xSemaphoreTake( sema_mqttOK, portMAX_DELAY );
    mqttOK++;
    xSemaphoreGive( sema_mqttOK );
    if ( mqttOK >= maxNonMQTTresponse )
    {
      ESP.restart();
    }
  }
  vTaskDelete( NULL );
} //void fmqttWatchDog( void * paramater )
////
void connectToMQTT()
{
  MQTTclient.setKeepAlive( 90 ); // needs be made before connecting
  // create client ID from mac address
  byte mac[5];
  WiFi.macAddress(mac); // get mac address
  String clientID = String(mac[0]) + String(mac[4]) ; // use mac address to create clientID
  while ( !MQTTclient.connected() )
  {
    // boolean connect(const char* id, const char* user, const char* pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage);
    MQTTclient.connect( clientID.c_str(), mqtt_username, mqtt_password, NULL , 1, true, NULL );
    vTaskDelay( 250 );
  }
  MQTTclient.setCallback( mqttCallback );
  MQTTclient.subscribe( topicOK );
  log_i("MQTT Connected");
} // void connectToMQTT()
////
void fDoBME ( void *pvParameters )
{
  if (!bme.begin()) {
    log_i("Could not find a valid BME680 sensor, check wiring!");
    while (1);
  }
  // Set up oversampling and filter initialization
  bme.setTemperatureOversampling(BME680_OS_8X);
  bme.setHumidityOversampling(BME680_OS_2X);
  bme.setPressureOversampling(BME680_OS_4X);
  bme.setIIRFilterSize(BME680_FILTER_SIZE_3);
  bme.setGasHeater(320, 150); // 320*C for 150 ms
  float fTemperature = 0.0f;
  float fPressure = 0.0f;
  float fHumidity = 0.0f;
  uint32_t iGasResistance = 0;
  TickType_t DelayTime = 1000 * 30;
  //wait for a mqtt connection
  while ( !MQTTclient.connected() )
  {
    vTaskDelay( 250 );
  }
  for ( ;; )
  {
    fTemperature = bme.readTemperature();
    fTemperature = (fTemperature * 1.8f) + 32.0f; // (Celsius x 1.8) + 32
    fPressure = bme.readPressure();
    fPressure = fPressure / 133.3223684f; //converts to mmHg
    fHumidity = bme.readHumidity();
    iGasResistance = bme.readGas();
    xSemaphoreTake( sema_MQTT_KeepAlive, portMAX_DELAY ); // whiles MQTTlient.loop() is running no other mqtt operations should be in process
    log_i( "Temperature %f C, Pressure %f mmHg, Humidity %f, gasResistance %d", fTemperature, fPressure, fHumidity, iGasResistance );
    MQTTclient.publish( topicOutsideTemp, String(fTemperature).c_str() );
    vTaskDelay( 5 ); // gives the Raspberry Pi 4 time to receive the message and process
    MQTTclient.publish( topicOutsideHumidity, String(fHumidity).c_str() );
    vTaskDelay( 5 ); // no delay and RPi is still processing previous message
    MQTTclient.publish( topicOutsideGasResistance, String(iGasResistance).c_str() );
    vTaskDelay( 5 );
    MQTTclient.publish( topicOutsidePressure, String(fPressure).c_str() );
    vTaskDelay( 5 );
    xSemaphoreGive( sema_MQTT_KeepAlive );
    xEventGroupSetBits( eg, evtDoMQTTwd );
    vTaskDelay( DelayTime );
  } // for loop
  vTaskDelete ( NULL );
} // void fDoBME ( void *pvParameters )
////
/*
    Important to not set vTaskDelay to less then 10. Errors begin to develop with the MQTT and network connection.
    makes the initial wifi/mqtt connection and works to keeps those connections open.
*/
void MQTTkeepalive( void *pvParameters )
{
  sema_MQTT_KeepAlive   = xSemaphoreCreateBinary();
  xSemaphoreGive( sema_MQTT_KeepAlive ); // found keep alive can mess with a publish, stop keep alive during publish
  // setting must be set before a mqtt connection is made
  MQTTclient.setKeepAlive( 90 ); // setting keep alive to 90 seconds makes for a very reliable connection, must be set before the 1st connection is made.
  for (;;)
  {
    //check for a is-connected and if the WiFi 'thinks' its connected, found checking on both is more realible than just a single check
    if ( (wifiClient.connected()) && (WiFi.status() == WL_CONNECTED) )
    {
      xSemaphoreTake( sema_MQTT_KeepAlive, portMAX_DELAY ); // whiles MQTTlient.loop() is running no other mqtt operations should be in process
      MQTTclient.loop();
      xSemaphoreGive( sema_MQTT_KeepAlive );
    }
    else {
      log_i( "MQTT keep alive found MQTT status %s WiFi status %s", String(wifiClient.connected()), String(WiFi.status()) );
      if ( !(wifiClient.connected()) || !(WiFi.status() == WL_CONNECTED) )
      {
        connectToWiFi();
      }
      connectToMQTT();
    }
    vTaskDelay( 250 ); //task runs approx every 250 mS
  }
  vTaskDelete ( NULL );
}
////
void connectToWiFi()
{
  int TryCount = 0;
  //log_i( "connect to wifi" );
  while ( WiFi.status() != WL_CONNECTED )
  {
    TryCount++;
    WiFi.disconnect();
    WiFi.begin( SSID, PWD );
    vTaskDelay( 4000 );
    if ( TryCount == 10 )
    {
      ESP.restart();
    }
  }
  WiFi.onEvent( WiFiEvent );
} // void connectToWiFi()
////
void loop() {}
////

You may notice in void MQTTkeepalive( void *pvParameters ) the line MQTTclient.setKeepAlive( 90 );. Which was, for me, the ticket in maintaining a 24/7 open MQTT connection.

Hi Idahowalker

Thank you for your responses.

Here is the code -

/*

Version Mega + genuine Arduino Ethernet Shield2 v3.1

1.	Subscribes to topic "application/+/device/+/+"

2.	IPaddress server has been changed to RAK7249 as a client of Linksys LAPAP in the barn, ie 192.168.1.227

3.      MAC address is A8:61:0A:AE:6A:0B

HiveMQ -

Arduino Mega + Arduino Ethernet Shield2 connects to RAK7249 in-built LoRa server and subscribes 
to topic "application/+/device/+/+". Whenever a message is received it is printed to the Serial console. 

https://www.hivemq.com/blog/mqtt-client-library-encyclopedia-arduino-pubsubclient/ 

*/

#include <SPI.h>
#include <Ethernet.h>
#include <PubSubClient.h>

byte mac[] = { 0xA8, 0x61, 0x0A, 0xAE, 0x6A, 0x0B };                            //  MAC address Arduino Ethernet Shield MQTT client
IPAddress  ip(192, 168, 1, 51);                                                 //  Static IP address Arduino MQTT client
IPAddress  server(192, 168, 1, 227);                                            //  Static IP address RAK7249 built-in LoRa server


void callback(char* topic, byte* payload, unsigned int length) 
{
  Serial.print("\nMessage arrived\nTopic\n  [");
  Serial.print(topic);
  Serial.print("]\n");
  Serial.print("Payload\n  ");
  for (int i=0;i<length;i++) 
  {
    Serial.print((char)payload[i]);
  }
  Serial.println();
}


EthernetClient ethClient;
PubSubClient mqttClient(ethClient);


void reconnect() 
{
  while (!mqttClient.connected())                                             // Loop until we're reconnected
  {
    Serial.print("\nAttempting MQTT connection...");
    
    if (mqttClient.connect("arduino3"))                                       // Attempt to connect
    {
      Serial.println("\nThe client is connected.");
      mqttClient.subscribe("application/+/device/+/+");                       // resubscribe
    } 
    else 
    {
      Serial.print("\nFailed, rc = ");
      Serial.print(mqttClient.state());
      Serial.print("\nTry again in 5 seconds.");
      delay(5000);                                                            // Wait 5 seconds before retrying
    }
  }
}


void setup()
{
  Serial.begin(57600);

  mqttClient.setServer(server, 1883);
  mqttClient.setCallback(callback);

  Ethernet.begin(mac, ip);
  
  delay(5000);                                                                // Allow the hardware to sort itself out
}


void loop()
{
  if (!mqttClient.connected()) 
  {
    reconnect();
  }
  mqttClient.loop();
}

I am using Arduino IDE Web App Version - 6.2.1 and also Arduino Pro IDE 0.1.3 Alpha. Regrettably, they both compile differently.

I am not able to get a verbose output.

Any ideas, please. Thank you.

void setup()
{
  Serial.begin(57600);


 // Ethernet.begin(mac, ip);


 
  //delay(5000);                                                                // Allow the hardware to sort itself out


//  mqttClient.setServer(server, 1883);
//  mqttClient.setCallback(callback);
}


void loop()
{
Serial.println( " verbose " );
delay(100);
}

Does that give an output on the IDE's monitor?

Hello, again. Firstly, many thanks for your help to date.

Can I recap a little?

I have had 4 Arduino UNO/Mega + Dragino LoRa shields working perfectly on AU915. I have been using the ubiquitous "thomaslaurenson" code and his LMIC fork. The nodes have been using DHT22 and PIR sensors, and also "Hello World!" Payloads. All 4 have worked well over the past 12 months.

I built some gateways - RAK and Dragino - using RPi hats. Got those to go perfectly, and cld see the nodes on the TTN console (AU915). All this didn't happen overnight. It took a lot of research, blood, sweat and tears. Learnt heaps, worked well.

Then I purchased a 16-channel RAK7249 gateway and spent quite a bit of time making that go. I was able to activate 16 AU915 frequencies and see the nodes on the TTN console.

I then got into building an Arduino MQTT with a view to subscribing to the sensor data that the gateway was receiving. I was using the in-built Lora server of the RAK7249. That too eventually worked really well. I then spent time developing code to unpack the payload to manage other gear: sensor node says a tank is nearly empty, so turn on a pump.

I dismantled the nodes - can't remember why, but, regrettably, didn't have any discipline around archiving/storing the code and project details; I do now!

I rebuilt the nodes. I accidentally damaged my RAK7249 gateway. Replaced it, then started again. Rebuilt the nodes, but they never worked again. Cldn't see anything on the TTN, cldn't see any messages on the Arduino MQTT beyond "join". Then I asked for help.

Pretty sure what I have discovered is that LMIC libraries do not operate on AU915 and that is the problem.

Following 'thomaslaurenson" utilising the LMIC libraries embedded with the particular GitHub repositories I have managed to get a "Hello World!" Node to be seen by the gateway and the "join" to be seen by the Arduino MQTT, but no "rx" messages, and the monitor is not showing the detail that it is capable of showing, ie the code is not cycling though all the stages.

So, I'm pretty sure the bottom line is the absence of a non-forked LMIC library that operates in the AU915 frequency range.

Wondered if anyone can point me in the direction of such code. Many thanks.

This topic was automatically closed 120 days after the last reply. New replies are no longer allowed.