Subscribe to MQTT topic

I'm using two hardware ( uno and nano) both connected to ethernet (enc28j60) . I'm publishing messages to a topic(loop/) from UNO using MQTT and the publishing works fine. I've written a code to subscribe to above topic (loop/) and when the message arrives publish the message on a second topic (dmd/). When the subscribe code is run individually (manually sending msg from MQTTX) the code works fine but as soon as I run both the code simultaneously (on different pc) either of them gets disconnected while publishing. Even I tried running both the hardware on same pc using Platform.io still the subscribing code fails.
Please Help.Thank you in advance
P.S- I forgot to take ss while performing both simultaneously

#include <SPI.h>
#include <PubSubClient.h>
#include <UIPEthernet.h>

const char* mqtt_server = "***";
const int mqtt_port = 1883;
const char* mqttUser = "***";
const char* mqttPassword = "***";

EthernetClient ethClient;
PubSubClient client(ethClient);
byte mac[]    = {  0xDE, 0xED, 0xBA, 0xFE, 0xFE, 0xED };
unsigned long lastMsg = 0;
const char* fetch = "loop/";
const char*  post = "dmd/";

void callback(char* topic, byte* message, unsigned int length);
void callback(char* topic, byte* payload, unsigned int length)
{
  Serial.print("Message arrived [");
  Serial.print(topic);
  Serial.print("] ");
  String Car;
  for (int i = 0; i < length; i++)
  {
    Serial.print((char)payload[i]);
    Car+= (char)payload[i];
  }
   publishMessage(post, Car, true);
}
void setup()
{
  Serial.begin(9600);
  while (!Serial) delay(1);
  Ethernet.begin(mac);
  client.setServer(mqtt_server, mqtt_port);
  client.setCallback(callback);
}

void reconnect()
{
  if (client.connect("arduinoClientSuperior", mqttUser, mqttPassword))
  {
    Serial.println("connected");
    client.subscribe(fetch);
  }
  else
  {
    Serial.print("failed, rc=");
    Serial.print(client.state());
    Serial.println(" try again in 5 seconds");
    delay(1000);
  }
}
void loop()
{
  if (!client.connected())
  {
    reconnect();
  }
  client.loop();
  delay(1000);
}

void publishMessage(const char* topic, String payload , boolean retained)
{
  if (client.publish(topic, payload.c_str(), true))
    Serial.println("Message publised [" + String(topic) + "]: " + payload);
}


Publishing in the callback might be tricky (I have never tried). Could you try setting a flag to true in the callback and catch that flag in the main loop() and publish there + reset the flag.

That will let the call back complete its work before calling the MQTT API again.

Side note: Using a String to store the incoming message to send it again as a cString is useless. Just push the same bytes and count you received.

The PubSubClient library has an example of publishing in the callback function

Whether it is implemented in a sensible way is a matter for debate

// Callback function
void callback(char* topic, byte* payload, unsigned int length) {
  // In order to republish this payload, a copy must be made
  // as the orignal payload buffer will be overwritten whilst
  // constructing the PUBLISH packet.

  // Allocate the correct amount of memory for the payload copy
  byte* p = (byte*)malloc(length);
  // Copy the payload to the new buffer
  memcpy(p,payload,length);
  client.publish("outTopic", p, length);
  // Free the memory
  free(p);
}

Something like this?
publishing in callback is absolutely working fine when run using single hardware

def on_connect(client, userdata, flags, rc):
     logging.info("Connected flags"+str(flags)+"result code "\
     +str(rc)+"client1_id ")
     client.connected_flag=True

I did try client.publish() but it isn't storing the count, it just requires data in "". I'll try the byte method suggested by you.

I tried this and it worked but the main problem of subscribing and publishing together still remains

As a note, client.loop() should be ran a lot more often than once a sec. I recommend 4* a sec in the least.

As a note the MQTT Broker likes to disconnect from its clients. The MQTT Broker should be encouraged to keep the connection open. I find this command MQTTclient.setKeepAlive( 90 ); // needs be made before connecting, also, helps with keeping the MQTT Broker interested in maintaining a connection to the clients.

I found that the network connection is also prone to disconnecting and that just testing for a MQTT Broker connection was futile if the network was disconnected.

here is how I connect and maintain a network/mqtt connection,


void MQTTkeepalive( void *pvParameters )
{
  sema_MQTT_KeepAlive   = xSemaphoreCreateBinary();
  xSemaphoreGive( sema_MQTT_KeepAlive ); // found keep alive can mess with a publish, stop keep alive during publish
  // setting must be set before a mqtt connection is made
  MQTTclient.setKeepAlive( 90 ); // setting keep alive to 90 seconds makes for a very reliable connection, must be set before the 1st connection is made.
  for (;;)
  {
    //check for a is-connected and if the WiFi 'thinks' its connected, found checking on both is more realible than just a single check
    if ( (wifiClient.connected()) && (WiFi.status() == WL_CONNECTED) )
    {
      xSemaphoreTake( sema_MQTT_KeepAlive, portMAX_DELAY ); // whiles MQTTlient.loop() is running no other mqtt operations should be in process
      MQTTclient.loop();
      xSemaphoreGive( sema_MQTT_KeepAlive );
    }
    else {
      log_i( "MQTT keep alive found MQTT status %s WiFi status %s", String(wifiClient.connected()), String(WiFi.status()) );
      if ( !(wifiClient.connected()) || !(WiFi.status() == WL_CONNECTED) )
      {
        connectToWiFi();
      }
      connectToMQTT();
    }
    vTaskDelay( 250 ); //task runs approx every 250 mS
  }
  vTaskDelete ( NULL );
}
////
void connectToWiFi()
{
  int TryCount = 0;
  while ( WiFi.status() != WL_CONNECTED )
  {
    TryCount++;
    WiFi.disconnect();
    WiFi.begin( SSID, PASSWORD );
    vTaskDelay( 4000 );
    if ( TryCount == 10 )
    {
      ESP.restart();
    }
  }
  WiFi.onEvent( WiFiEvent );
} // void connectToWiFi()
////
void connectToMQTT()
{
  MQTTclient.setKeepAlive( 90 ); // needs be made before connecting
  byte mac[5];
  WiFi.macAddress(mac);
  String clientID = String(mac[0]) + String(mac[4]) ; // use mac address to create clientID
  while ( !MQTTclient.connected() )
  {
    // boolean connect(const char* id, const char* user, const char* pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage);
    MQTTclient.connect( clientID.c_str(), mqtt_username, mqtt_password, NULL , 1, true, NULL );
    vTaskDelay( 250 );
  }
  MQTTclient.setCallback( mqttCallback );
  MQTTclient.subscribe( topicOK );
} // void connectToMQTT()

hope the code will give a few ideas.

Could you, please, restate the issue

What does that mean?

I did refer to this code as you have mentioned it in someone's project as well. I've written client.setKeepAlive(90); in reconnect function but it isn't making any noteworthy changes to the output (according to me). Addressing my issue: I want to publish message through arduino uno and recieve the message in arduino nano. My publish test code and recieve test code are working fine when run individually. But when i tried running those simultaneously I'm not getting the desired output. So the output I'm recieving is when i publish a message through arduino, the message is published i get the acknowledgement as well but the arduino uno gets diconnected shortly after pblishing msg. On the recieving part after the uno is diconnected the nano is connected and publishes the acknowledgemnet of recieving part and stays connected for a while. Now after receiving the acknowledgement of first message when i try to publish second message the uno remains disconnected and i have to wait till the uno is connecte dto publish message.Basically I want both the devices to stay connected to the broker all the time and as soon as i publish a message i should get the acknowledgement at the receiving side.

I tried by placing client.setKeepAlive() in both reconnect() and loop()

void reconnect()
{
  if (client.connect("arduinoClientSuperior", mqttUser, mqttPassword))
  {
    Serial.println("connected");
    client.subscribe(fetch );
    client.setKeepAlive(90);
  }
  else
  {
    Serial.print("failed, rc=");
    Serial.print(client.state());
    Serial.println(" try again in 5 seconds");
    delay(1000);
  }
}
void loop()
{
  if (!client.connected())
  {
    reconnect();
    #client.setKeepAlive(90);
  }
  client.loop();
  delay(1000);
}

This topic was automatically closed 180 days after the last reply. New replies are no longer allowed.