12king
September 19, 2022, 3:31pm
1
I am trying to understand how to compare messages published by the broker in following sketch
Broker can publish any message in topic TOPIC_NAME_LED and I want to compare following strings one by one in sketch
"GREEAN_LED"
"YELLOW_LED"
"RED_LED"
I don't have any idea how to do it in following sketch
#include <SPI.h>
#include <Ethernet.h>
#include <PubSubClient.h>
// Update these with values suitable for your network.
byte mac[] = { 0xDE, 0xED, 0xBA, 0xFE, 0xFE, 0xED };
IPAddress ip(172, 16, 0, 100);
IPAddress server(172, 16, 0, 2);
void callback(char* topic, byte* payload, unsigned int length) {
Serial.print("Message arrived [");
Serial.print(topic);
Serial.print("] ");
for (int i=0;i<length;i++) {
Serial.print((char)payload[i]);
}
Serial.println();
}
EthernetClient ethClient;
PubSubClient client(ethClient);
void reconnect() {
// Loop until we're reconnected
while (!client.connected()) {
Serial.print("Attempting MQTT connection...");
// Attempt to connect
if (client.connect("arduinoClient")) {
Serial.println("connected");
// Once connected, publish an announcement...
client.publish("outTopic","hello world");
// ... and resubscribe
client.subscribe("inTopic");
client.subscribe("TOPIC_NAME_LED"); // This line subscribes in topic TOPIC_NAME_LED
} else {
Serial.print("failed, rc=");
Serial.print(client.state());
Serial.println(" try again in 5 seconds");
// Wait 5 seconds before retrying
delay(5000);
}
}
}
void setup()
{
Serial.begin(57600);
client.setServer(server, 1883);
client.setCallback(callback);
Ethernet.begin(mac, ip);
// Allow the hardware to sort itself out
delay(1500);
}
void loop()
{
if (!client.connected()) {
reconnect();
}
client.loop();
}
gcjr
September 19, 2022, 3:44pm
2
you may be interested in the following which i posted for handling named values and command thru a serial interface. loop() processes a comma separated list of values or commands which are looked up in tables and handled by varSet() or cmdExec()
// command line processor
int tom;
int dick;
int harry;
char s [80];
// -------------------------------------
void
sum (void)
{
harry = tom + dick;
}
// -----------------------------------------------------------------------------
struct Var {
const char *name;
int *pVar;
} varTbl [] = {
{ "tom", & tom },
{ "dick", & dick },
{ "harry", & harry },
{ 0, 0 },
};
// -------------------------------------
void varSet (
char *str )
{
char *varName = strtok (str, "=");
char *valStr = strtok (NULL, "=");
if (NULL == valStr) {
sprintf (s, "%s: Error - null value", __func__);
Serial.println (s);
return;
}
for (Var *v = varTbl; v->name; v++) {
if (! strcmp (v->name, varName)) {
*v->pVar = atoi(valStr);
return;
}
}
sprintf (s, "%s: unknown variable - %s", __func__, str);
Serial.println (s);
};
// -------------------------------------
void
varDisp (void)
{
Serial.println ("varList:");
for (Var *v = varTbl; v->name; v++) {
sprintf (s, " %6d %s", *v->pVar, v->name);
Serial.println (s);
}
}
// -----------------------------------------------------------------------------
struct Cmd {
const char *name;
void (*func) (void);
} cmdTbl [] = {
{ "vars", & varDisp },
{ "sum", & sum },
{ 0, 0 },
};
// -------------------------------------
void cmdExec (
char *str )
{
for (Cmd *c = cmdTbl; c->name; c++) {
if (! strcmp (c->name, str)) {
(*c->func) ();
return;
}
}
sprintf (s, "%s: unknown cmd - %s", __func__, str);
Serial.println (s);
}
// -----------------------------------------------------------------------------
void loop ()
{
char buf [80];
if (Serial.available ()) {
int n = Serial. readBytesUntil ('\n', buf, sizeof(buf)-1);
buf [n] = 0; // terminate string
char *savePtr;
char *req = strtok_r (buf, ",", &savePtr);
while (req) {
Serial.println (req);
if (strstr (req, "="))
varSet (req);
else
cmdExec (req);
req = strtok_r (NULL, ",", &savePtr);
}
}
}
// -----------------------------------------------------------------------------
void setup ()
{
Serial.begin (9600);
}
1 Like
12king
September 19, 2022, 4:03pm
3
if (!strncmp((char *)payload, "RED_LED"
Does this line compare payload and message?
gcjr
September 19, 2022, 4:18pm
4
yes, in general
i was thinking you might be interested in creating "cmds" for each of your strings, having a sub-function for each. thinking that you may want to do more than just turn LEDs on/off
12king
September 19, 2022, 4:43pm
5
gcjr:
yes, in general
It show error in second if comparison
too few arguments to function 'int strncmp(const char*, const char*, size_t)'
#include <SPI.h>
#include <Ethernet.h>
#include <PubSubClient.h>
// Update these with values suitable for your network.
byte mac[] = { 0xDE, 0xED, 0xBA, 0xFE, 0xFE, 0xED };
IPAddress ip(172, 16, 0, 100);
IPAddress server(172, 16, 0, 2);
void callback(char* topic, byte* payload, unsigned int length) {
Serial.print("Message arrived [");
Serial.print(topic);
for (int i = 0; i < length; i++) {
Serial.print((char)payload[i]);
}
Serial.println();
if (!strncmp((char *)payload, "GREEAN_LED")) {
Serial.println("GREEAN_LED");
}
if (!strncmp((char *)payload, "RED_LED")) {
Serial.println("RED LED");
}
}
EthernetClient ethClient;
PubSubClient client(ethClient);
void reconnect() {
// Loop until we're reconnected
while (!client.connected()) {
Serial.print("Attempting MQTT connection...");
// Attempt to connect
if (client.connect("arduinoClient")) {
Serial.println("connected");
// Once connected, publish an announcement...
client.publish("outTopic", "hello world");
// ... and resubscribe
client.subscribe("inTopic");
client.subscribe("TOPIC_NAME_LED"); // This line subscribes in topic TOPIC_NAME_LED
} else {
Serial.print("failed, rc=");
Serial.print(client.state());
Serial.println(" try again in 5 seconds");
// Wait 5 seconds before retrying
delay(5000);
}
}
}
void setup()
{
Serial.begin(57600);
client.setServer(server, 1883);
client.setCallback(callback);
Ethernet.begin(mac, ip);
// Allow the hardware to sort itself out
delay(1500);
}
void loop()
{
if (!client.connected()) {
reconnect();
}
client.loop();
}
12king:
I am trying to understand how to compare messages published by the broker in following sketch
Broker can publish any message in topic TOPIC_NAME_LED and I want to compare following strings one by one in sketch
In my use of MQTT the Broker does not publish 'anything' till something has been published to the Broker or the Broker has been set Presistance=On.
Thus I fund the above issue description a bit confusing. Could you restate the issue?
The MQTT callback should not have any print statements in it.
12king
September 19, 2022, 6:43pm
8
gcjr:
use strcmp() instead.
Could you explain where does complete payload store after subscribing the topic in my sketch
Does it store using for loop ?
#include <SPI.h>
#include <Ethernet.h>
#include <PubSubClient.h>
// Update these with values suitable for your network.
byte mac[] = { 0xDE, 0xED, 0xBA, 0xFE, 0xFE, 0xED };
IPAddress ip(172, 16, 0, 100);
IPAddress server(172, 16, 0, 2);
void callback(char* topic, byte* payload, unsigned int length) {
Serial.print("Message arrived [");
Serial.print(topic);
for (int i = 0; i < length; i++) {
Serial.print((char)payload[i]);
}
Serial.println();
if (strcmp(topic, "TOPIC_NAME_LED") == "GREEAN_LED") {
Serial.println("GREEAN_LED");
}
if (strcmp(topic, "TOPIC_NAME_LED") == "RED LED") {
Serial.println("RED LED");
}
}
EthernetClient ethClient;
PubSubClient client(ethClient);
void reconnect() {
// Loop until we're reconnected
while (!client.connected()) {
Serial.print("Attempting MQTT connection...");
// Attempt to connect
if (client.connect("arduinoClient")) {
Serial.println("connected");
// Once connected, publish an announcement...
client.publish("outTopic", "hello world");
// ... and resubscribe
client.subscribe("inTopic");
client.subscribe("TOPIC_NAME_LED"); // This line subscribes in topic TOPIC_NAME_LED
} else {
Serial.print("failed, rc=");
Serial.print(client.state());
Serial.println(" try again in 5 seconds");
// Wait 5 seconds before retrying
delay(5000);
}
}
}
void setup()
{
Serial.begin(57600);
client.setServer(server, 1883);
client.setCallback(callback);
Ethernet.begin(mac, ip);
// Allow the hardware to sort itself out
delay(1500);
}
void loop()
{
if (!client.connected()) {
reconnect();
}
client.loop();
}
gcjr
September 19, 2022, 6:50pm
9
not familiar with how callback works(). looks like it may be stored in an internal buffer.
not sure what you want to do. seems like you want to compare the topic? or do you want to compare the payload?
but please look at description of strcmp()
if (! strcmp(topic, "GREEAN_LED")) {
Serial.println (" received GREEAN_LED");
}
12king
September 19, 2022, 7:08pm
10
I want to compare payload. For this, first the entire payload has to be stored somewhere and then it has to be compared.
payload[i]
It is an array and can be accessed through a loop
gcjr
September 19, 2022, 7:18pm
11
c-strings are arrays of chars (a char is a 8-bit byte). payload [i] is a single char. if payload is "GREEN_LED", then payload [0] is 'G'.
strcmp() does a char-by-char comparison of chars in each string
1 Like
12king
September 19, 2022, 7:24pm
12
Sorry for asked asking to details
I understand string comparison but the confusing thing is where does payload store it in a sketch
In which lines of the sketch it is found that the payload is being storedy
Look into the PubSubClient code to see where the payload is being sotred till extracted.
What I do is extract the payload from the libraries storage location like so
const int payloadSize = 100;
struct stu_message
{
char payload [payloadSize] = {'\0'};
String topic ;
} x_message;
void IRAM_ATTR mqttCallback(char* topic, byte * payload, unsigned int length)
{
memset( x_message.payload, '\0', payloadSize ); // clear payload char buffer
x_message.topic = ""; //clear topic string buffer
x_message.topic = topic; //store new topic
int i = 0; // extract payload
for ( i; i < length; i++)
{
x_message.payload[i] = (char)payload[i];
}
x_message.payload[i] = '\0';
xQueueOverwrite( xQ_Message, (void *) &x_message );// send data to queue
} // void mqttCallback(char* topic, byte* payload, unsigned int length)
As one may see I create a structure to hold the payload and topic. Then in the callback the payload and topic are extracted and sent to a message queue. The message queue triggers the parser, where the topic and payload are turned into work.
parser
void fparseMQTT( void *pvParameters )
{
struct stu_message px_message;
for (;;)
{
if ( xQueueReceive(xQ_Message, &px_message, portMAX_DELAY) == pdTRUE )
{
// parse the time from the OK message and update MCU time
if ( String(px_message.topic) == topicOK )
{
if ( !TimeSet)
{
String temp = "";
temp = px_message.payload[0];
temp += px_message.payload[1];
temp += px_message.payload[2];
temp += px_message.payload[3];
int year = temp.toInt();
temp = "";
temp = px_message.payload[5];
temp += px_message.payload[6];
int month = temp.toInt();
temp = "";
temp = px_message.payload[8];
temp += px_message.payload[9];
int day = temp.toInt();
temp = "";
temp = px_message.payload[11];
temp += px_message.payload[12];
int hour = temp.toInt();
temp = "";
temp = px_message.payload[14];
temp += px_message.payload[15];
int min = temp.toInt();
rtc.setTime( 0, min, hour, day, month, year );
log_i( "rtc %s ", rtc.getTime() );
TimeSet = true;
}
}
//
} //if ( xQueueReceive(xQ_Message, &px_message, portMAX_DELAY) == pdTRUE )
xSemaphoreTake( sema_mqttOK, portMAX_DELAY );
mqttOK = 0;
xSemaphoreGive( sema_mqttOK );
}
} // void fparseMQTT( void *pvParameters )#i
1 Like
If you want to see where in the PubSubClient library, the payload is stored take a look at the loop() function as pasted below.
boolean PubSubClient::loop() {
if (connected()) {
unsigned long t = millis();
if ((t - lastInActivity > this->keepAlive*1000UL) || (t - lastOutActivity > this->keepAlive*1000UL)) {
if (pingOutstanding) {
this->_state = MQTT_CONNECTION_TIMEOUT;
_client->stop();
return false;
} else {
this->buffer[0] = MQTTPINGREQ;
this->buffer[1] = 0;
_client->write(this->buffer,2);
lastOutActivity = t;
lastInActivity = t;
pingOutstanding = true;
}
}
if (_client->available()) {
uint8_t llen;
uint16_t len = readPacket(&llen);
uint16_t msgId = 0;
uint8_t *payload;
if (len > 0) {
lastInActivity = t;
uint8_t type = this->buffer[0]&0xF0;
if (type == MQTTPUBLISH) {
if (callback) {
uint16_t tl = (this->buffer[llen+1]<<8)+this->buffer[llen+2]; /* topic length in bytes */
memmove(this->buffer+llen+2,this->buffer+llen+3,tl); /* move topic inside buffer 1 byte to front */
this->buffer[llen+2+tl] = 0; /* end the topic as a 'C' string with \x00 */
char *topic = (char*) this->buffer+llen+2;
// msgId only present for QOS>0
if ((this->buffer[0]&0x06) == MQTTQOS1) {
msgId = (this->buffer[llen+3+tl]<<8)+this->buffer[llen+3+tl+1];
payload = this->buffer+llen+3+tl+2;
callback(topic,payload,len-llen-3-tl-2);
this->buffer[0] = MQTTPUBACK;
this->buffer[1] = 2;
this->buffer[2] = (msgId >> 8);
this->buffer[3] = (msgId & 0xFF);
_client->write(this->buffer,4);
lastOutActivity = t;
} else {
payload = this->buffer+llen+3+tl;
callback(topic,payload,len-llen-3-tl);
}
}
} else if (type == MQTTPINGREQ) {
this->buffer[0] = MQTTPINGRESP;
this->buffer[1] = 0;
_client->write(this->buffer,2);
} else if (type == MQTTPINGRESP) {
pingOutstanding = false;
}
} else if (!connected()) {
// readPacket has closed the connection
return false;
}
}
return true;
}
return false;
}
And of course anyone can open and view those thignes on their own.
1 Like
have not thought of that. Hey thanks!
gfvalvo
September 19, 2022, 11:02pm
18
When your callback is invoked, it's provided with a pointer to where the payload data resides and the number of bytes it occupies. What else do you think you need to know?
gcjr
September 20, 2022, 12:16am
19
when the callback is called, it passes (provides) a pointer to where the payload data is.
a buffer could be pre-allocated (it isn't) and provided to the interface which is used and possible passed back when the callback is invoked
gfvalvo
September 20, 2022, 1:15am
20
It could have been implemented that way. There are many ways the interface could have been implemented. For example, the callback could have been provided with a reference to a 'std::vector<uint8_t>'. But, apparently, it was implemented to pass a pointer to the callback. That pointer likely points to a dynamically-allocated block of memory that will eventually be de-allocated or reused. So, the user best copy the data to their own buffer before returning from the callback.
1 Like