Compare messages published by the broker in sketch

I am trying to understand how to compare messages published by the broker in following sketch

Broker can publish any message in topic TOPIC_NAME_LED and I want to compare following strings one by one in sketch

"GREEAN_LED"
"YELLOW_LED"
"RED_LED"

I don't have any idea how to do it in following sketch


#include <SPI.h>
#include <Ethernet.h>
#include <PubSubClient.h>

// Update these with values suitable for your network.
byte mac[]    = {  0xDE, 0xED, 0xBA, 0xFE, 0xFE, 0xED };
IPAddress ip(172, 16, 0, 100);
IPAddress server(172, 16, 0, 2);

void callback(char* topic, byte* payload, unsigned int length) {
  Serial.print("Message arrived [");
  Serial.print(topic);
  Serial.print("] ");
  for (int i=0;i<length;i++) {
    Serial.print((char)payload[i]);
  }
  Serial.println();
}

EthernetClient ethClient;
PubSubClient client(ethClient);

void reconnect() {
  // Loop until we're reconnected
  while (!client.connected()) {
    Serial.print("Attempting MQTT connection...");
    // Attempt to connect
    if (client.connect("arduinoClient")) {
      Serial.println("connected");
      // Once connected, publish an announcement...
      client.publish("outTopic","hello world");
      // ... and resubscribe
      client.subscribe("inTopic");
      client.subscribe("TOPIC_NAME_LED");  // This line subscribes in topic TOPIC_NAME_LED
    } else {
      Serial.print("failed, rc=");
      Serial.print(client.state());
      Serial.println(" try again in 5 seconds");
      // Wait 5 seconds before retrying
      delay(5000);
    }
  }
}

void setup()
{
  Serial.begin(57600);

  client.setServer(server, 1883);
  client.setCallback(callback);

  Ethernet.begin(mac, ip);
  // Allow the hardware to sort itself out
  delay(1500);
}

void loop()
{
  if (!client.connected()) {
    reconnect();
  }
  client.loop();
}

you may be interested in the following which i posted for handling named values and command thru a serial interface. loop() processes a comma separated list of values or commands which are looked up in tables and handled by varSet() or cmdExec()

// command line processor

int tom;
int dick;
int harry;

char s [80];

// -------------------------------------
void
sum (void)
{
    harry = tom + dick;
}

// -----------------------------------------------------------------------------
struct Var {
    const char *name;
    int        *pVar;
} varTbl [] = {
    { "tom",   & tom },
    { "dick",  & dick },
    { "harry", & harry },
    { 0,       0 },
};

// -------------------------------------
void varSet (
    char *str )
{
    char *varName = strtok (str, "=");
    char *valStr  = strtok (NULL, "=");

    if (NULL == valStr) {
        sprintf (s, "%s: Error - null value", __func__);
        Serial.println (s);
        return;
    }

    for (Var *v = varTbl; v->name; v++)  {
        if (! strcmp (v->name, varName))  {
            *v->pVar = atoi(valStr);
            return;
        }
    }

    sprintf (s, "%s: unknown variable - %s", __func__, str);
    Serial.println (s);
};

// -------------------------------------
void
varDisp (void)
{
    Serial.println ("varList:");

    for (Var *v = varTbl; v->name; v++)  {
        sprintf (s, "  %6d %s", *v->pVar, v->name);
        Serial.println (s);
    }
}

// -----------------------------------------------------------------------------
struct Cmd {
    const char *name;
    void       (*func) (void);
} cmdTbl [] = {
    { "vars",  & varDisp },
    { "sum",   & sum },
    { 0,       0 },
};

// -------------------------------------
void cmdExec (
    char *str )
{
    for (Cmd *c = cmdTbl; c->name; c++)  {
        if (! strcmp (c->name, str))  {
            (*c->func) ();
            return;
        }
    }

    sprintf (s, "%s: unknown cmd - %s", __func__, str);
    Serial.println (s);
}

// -----------------------------------------------------------------------------
void loop ()
{
    char buf [80];
    if (Serial.available ())  {
        int n = Serial. readBytesUntil ('\n', buf, sizeof(buf)-1);
        buf [n] = 0;      // terminate string

        char *savePtr;
        char *req = strtok_r (buf, ",", &savePtr);
        while (req)  {
            Serial.println (req);
            if (strstr (req, "="))
                varSet (req);
            else
                cmdExec (req);

            req = strtok_r (NULL, ",", &savePtr);
        }
    }
}

// -----------------------------------------------------------------------------
void setup ()
{
    Serial.begin (9600);
}
1 Like

if (!strncmp((char *)payload, "RED_LED"

Does this line compare payload and message?

yes, in general

i was thinking you might be interested in creating "cmds" for each of your strings, having a sub-function for each. thinking that you may want to do more than just turn LEDs on/off

It show error in second if comparison

too few arguments to function 'int strncmp(const char*, const char*, size_t)'
#include <SPI.h>
#include <Ethernet.h>
#include <PubSubClient.h>

// Update these with values suitable for your network.
byte mac[]    = {  0xDE, 0xED, 0xBA, 0xFE, 0xFE, 0xED };
IPAddress ip(172, 16, 0, 100);
IPAddress server(172, 16, 0, 2);

void callback(char* topic, byte* payload, unsigned int length) {
  Serial.print("Message arrived [");
  Serial.print(topic);

  for (int i = 0; i < length; i++) {
    Serial.print((char)payload[i]);
  }
  Serial.println();

  if (!strncmp((char *)payload, "GREEAN_LED")) {
  Serial.println("GREEAN_LED");
  }
  if (!strncmp((char *)payload, "RED_LED")) {
  Serial.println("RED LED");
  }
}

EthernetClient ethClient;
PubSubClient client(ethClient);

void reconnect() {
  // Loop until we're reconnected
  while (!client.connected()) {
    Serial.print("Attempting MQTT connection...");
    // Attempt to connect
    if (client.connect("arduinoClient")) {
      Serial.println("connected");
      // Once connected, publish an announcement...
      client.publish("outTopic", "hello world");
      // ... and resubscribe
      client.subscribe("inTopic");
      client.subscribe("TOPIC_NAME_LED");  // This line subscribes in topic TOPIC_NAME_LED
    } else {
      Serial.print("failed, rc=");
      Serial.print(client.state());
      Serial.println(" try again in 5 seconds");
      // Wait 5 seconds before retrying
      delay(5000);
    }
  }
}

void setup()
{
  Serial.begin(57600);

  client.setServer(server, 1883);
  client.setCallback(callback);

  Ethernet.begin(mac, ip);
  // Allow the hardware to sort itself out
  delay(1500);
}

void loop()
{
  if (!client.connected()) {
    reconnect();
  }
  client.loop();
}

In my use of MQTT the Broker does not publish 'anything' till something has been published to the Broker or the Broker has been set Presistance=On.

Thus I fund the above issue description a bit confusing. Could you restate the issue?

The MQTT callback should not have any print statements in it.

use strcmp() instead.

Could you explain where does complete payload store after subscribing the topic in my sketch

Does it store using for loop ?

#include <SPI.h>
#include <Ethernet.h>
#include <PubSubClient.h>

// Update these with values suitable for your network.
byte mac[]    = {  0xDE, 0xED, 0xBA, 0xFE, 0xFE, 0xED };
IPAddress ip(172, 16, 0, 100);
IPAddress server(172, 16, 0, 2);

void callback(char* topic, byte* payload, unsigned int length) {
  Serial.print("Message arrived [");
  Serial.print(topic);

  for (int i = 0; i < length; i++) {
    Serial.print((char)payload[i]);
  }
  Serial.println();
  if (strcmp(topic, "TOPIC_NAME_LED") == "GREEAN_LED") {
    Serial.println("GREEAN_LED");
  }
  if (strcmp(topic, "TOPIC_NAME_LED") == "RED LED") {
    Serial.println("RED LED");
  }
}

EthernetClient ethClient;
PubSubClient client(ethClient);

void reconnect() {
  // Loop until we're reconnected
  while (!client.connected()) {
    Serial.print("Attempting MQTT connection...");
    // Attempt to connect
    if (client.connect("arduinoClient")) {
      Serial.println("connected");
      // Once connected, publish an announcement...
      client.publish("outTopic", "hello world");
      // ... and resubscribe
      client.subscribe("inTopic");
      client.subscribe("TOPIC_NAME_LED");  // This line subscribes in topic TOPIC_NAME_LED
    } else {
      Serial.print("failed, rc=");
      Serial.print(client.state());
      Serial.println(" try again in 5 seconds");
      // Wait 5 seconds before retrying
      delay(5000);
    }
  }
}

void setup()
{
  Serial.begin(57600);

  client.setServer(server, 1883);
  client.setCallback(callback);

  Ethernet.begin(mac, ip);
  // Allow the hardware to sort itself out
  delay(1500);
}

void loop()
{
  if (!client.connected()) {
    reconnect();
  }
  client.loop();
}

not familiar with how callback works(). looks like it may be stored in an internal buffer.

not sure what you want to do. seems like you want to compare the topic? or do you want to compare the payload?

but please look at description of strcmp()

  if (! strcmp(topic,  "GREEAN_LED")) {
    Serial.println (" received GREEAN_LED");
  }

I want to compare payload. For this, first the entire payload has to be stored somewhere and then it has to be compared.

payload[i]

It is an array and can be accessed through a loop

c-strings are arrays of chars (a char is a 8-bit byte). payload [i] is a single char. if payload is "GREEN_LED", then payload [0] is 'G'.

strcmp() does a char-by-char comparison of chars in each string

1 Like

Sorry for asked asking to details

I understand string comparison but the confusing thing is where does payload store it in a sketch

In which lines of the sketch it is found that the payload is being storedy

Look into the PubSubClient code to see where the payload is being sotred till extracted.

What I do is extract the payload from the libraries storage location like so

const int payloadSize = 100;
struct stu_message
{
  char payload [payloadSize] = {'\0'};
  String topic ;
} x_message;

void IRAM_ATTR mqttCallback(char* topic, byte * payload, unsigned int length)
{
  memset( x_message.payload, '\0', payloadSize ); // clear payload char buffer
  x_message.topic = ""; //clear topic string buffer
  x_message.topic = topic; //store new topic
  int i = 0; // extract payload
  for ( i; i < length; i++)
  {
    x_message.payload[i] = (char)payload[i];
  }
  x_message.payload[i] = '\0';
  xQueueOverwrite( xQ_Message, (void *) &x_message );// send data to queue
} // void mqttCallback(char* topic, byte* payload, unsigned int length)

As one may see I create a structure to hold the payload and topic. Then in the callback the payload and topic are extracted and sent to a message queue. The message queue triggers the parser, where the topic and payload are turned into work.

parser


void fparseMQTT( void *pvParameters )
{
  struct stu_message px_message;
  for (;;)
  {
    if ( xQueueReceive(xQ_Message, &px_message, portMAX_DELAY) == pdTRUE )
    {
      // parse the time from the OK message and update MCU time
      if ( String(px_message.topic) == topicOK )
      {
        if ( !TimeSet)
        {
          String temp = "";
          temp =  px_message.payload[0];
          temp += px_message.payload[1];
          temp += px_message.payload[2];
          temp += px_message.payload[3];
          int year =  temp.toInt();
          temp = "";
          temp =  px_message.payload[5];
          temp += px_message.payload[6];
          int month =  temp.toInt();
          temp =  "";
          temp =  px_message.payload[8];
          temp += px_message.payload[9];
          int day =  temp.toInt();
          temp = "";
          temp = px_message.payload[11];
          temp += px_message.payload[12];
          int hour =  temp.toInt();
          temp = "";
          temp = px_message.payload[14];
          temp += px_message.payload[15];
          int min =  temp.toInt();
          rtc.setTime( 0, min, hour, day, month, year );
          log_i( "rtc  %s ", rtc.getTime() );
          TimeSet = true;
        }
      }
      //
    } //if ( xQueueReceive(xQ_Message, &px_message, portMAX_DELAY) == pdTRUE )
    xSemaphoreTake( sema_mqttOK, portMAX_DELAY );
    mqttOK = 0;
    xSemaphoreGive( sema_mqttOK );
  }
} // void fparseMQTT( void *pvParameters )#i
1 Like

If you want to see where in the PubSubClient library, the payload is stored take a look at the loop() function as pasted below.

boolean PubSubClient::loop() {
    if (connected()) {
        unsigned long t = millis();
        if ((t - lastInActivity > this->keepAlive*1000UL) || (t - lastOutActivity > this->keepAlive*1000UL)) {
            if (pingOutstanding) {
                this->_state = MQTT_CONNECTION_TIMEOUT;
                _client->stop();
                return false;
            } else {
                this->buffer[0] = MQTTPINGREQ;
                this->buffer[1] = 0;
                _client->write(this->buffer,2);
                lastOutActivity = t;
                lastInActivity = t;
                pingOutstanding = true;
            }
        }
        if (_client->available()) {
            uint8_t llen;
            uint16_t len = readPacket(&llen);
            uint16_t msgId = 0;
            uint8_t *payload;
            if (len > 0) {
                lastInActivity = t;
                uint8_t type = this->buffer[0]&0xF0;
                if (type == MQTTPUBLISH) {
                    if (callback) {
                        uint16_t tl = (this->buffer[llen+1]<<8)+this->buffer[llen+2]; /* topic length in bytes */
                        memmove(this->buffer+llen+2,this->buffer+llen+3,tl); /* move topic inside buffer 1 byte to front */
                        this->buffer[llen+2+tl] = 0; /* end the topic as a 'C' string with \x00 */
                        char *topic = (char*) this->buffer+llen+2;
                        // msgId only present for QOS>0
                        if ((this->buffer[0]&0x06) == MQTTQOS1) {
                            msgId = (this->buffer[llen+3+tl]<<8)+this->buffer[llen+3+tl+1];
                            payload = this->buffer+llen+3+tl+2;
                            callback(topic,payload,len-llen-3-tl-2);
this->buffer[0] = MQTTPUBACK;
                            this->buffer[1] = 2;
                            this->buffer[2] = (msgId >> 8);
                            this->buffer[3] = (msgId & 0xFF);
                            _client->write(this->buffer,4);
                            lastOutActivity = t;

                        } else {
                            payload = this->buffer+llen+3+tl;
                            callback(topic,payload,len-llen-3-tl);
                        }
                    }
                } else if (type == MQTTPINGREQ) {
                    this->buffer[0] = MQTTPINGRESP;
                    this->buffer[1] = 0;
                    _client->write(this->buffer,2);
                } else if (type == MQTTPINGRESP) {
                    pingOutstanding = false;
                }
            } else if (!connected()) {
                // readPacket has closed the connection
                return false;
            }
        }
        return true;
    }
    return false;
}

And of course anyone can open and view those thignes on their own.

1 Like

Why not just memcpy()?

1 Like

have not thought of that. Hey thanks!

within the library

When your callback is invoked, it's provided with a pointer to where the payload data resides and the number of bytes it occupies. What else do you think you need to know?

when the callback is called, it passes (provides) a pointer to where the payload data is.

a buffer could be pre-allocated (it isn't) and provided to the interface which is used and possible passed back when the callback is invoked

It could have been implemented that way. There are many ways the interface could have been implemented. For example, the callback could have been provided with a reference to a 'std::vector<uint8_t>'. But, apparently, it was implemented to pass a pointer to the callback. That pointer likely points to a dynamically-allocated block of memory that will eventually be de-allocated or reused. So, the user best copy the data to their own buffer before returning from the callback.

1 Like