Sending long strings via MQTT with one packet

Hi,
I have ESP32-CAM and SIM800L module and I want to send an image via MQTT. But there is an limit for the each package in "PubSubClient.h". I can send the image with 128 bytes chunks, but this process is way more unstable than one packet transfer. Also I tried "setBufferSize()" function to set the max buffer size but it failed eventually.

According to my researchings, there is an AT command that "AT+MQTTPUBRAW" can send longer strings to the server. But I failed with this attempt:

  SERIAL_SIM800L.println("AT+SAPBR=3,1,\"Contype\",\"GPRS\""); // Set connection type to GPRS
  delay(1000);
  SERIAL_SIM800L.println("AT+SAPBR=3,1,\"APN\",\"your.apn\""); // Set APN
  delay(1000);
  SERIAL_SIM800L.println("AT+SAPBR=1,1"); // Bring up wireless connection with the settings
  delay(1000);
  SERIAL_SIM800L.println("AT+CGATT=1"); // Attach to GPRS service
  delay(1000);
  SERIAL_SIM800L.println("AT+CMQTTSTART"); // SIM800L SENDS ERROR MESSAGE
  delay(1000);
  SERIAL_SIM800L.println("AT+CMQTTACCQ=0,\"client_id\""); // SIM800L SENDS ERROR MESSAGE'client_id'
  delay(1000);
  SERIAL_SIM800L.println("AT+CMQTTCONNECT=0,\"mqtt.yourbroker.com\",1883,60");// SIM800L SENDS ERROR MESSAGE
  delay(1000);


//// then finally

  String command = "AT+MQTTPUBRAW=\"";
  command += topic;
  command += "\",";
  command += total_length;
  command += ",";
  command += message;
  command += "\r\n";

  SERIAL_SIM800L.print(command);


So anyone knows that how to send approximately 20kb hex data to server via mqtt? Is there any library for this except "PubSubClient.h"? Or any different feature in "PubSubClient.h" for this purpose?

Any assistance you can provide would be greatly appreciated...

The message size limitation may be being imposed by the broker

Which MQTT broker are you using ?


My broker is Mosquitto, so according to this table my broker is open source?

Hi, @UKHeliBob
As an update, now the limit is aproximately 1400 bytes

Is the broker running locally ?

What have you got the PubSubClient MQTT_MAX_PACKET_SIZE constant set to ?

Hi, @UKHeliBob
I have overrided the max packet size variable with 500000

mqtt.setBufferSize(50000);
#include <TinyGsmClient.h>
#include <PubSubClient.h>
TinyGsm      modem(Serial);
TinyGsmClient client(modem);
PubSubClient  mqtt(client);

This is my photo data

// Take Picture with Camera
  fb = esp_camera_fb_get();  
  if(!fb) 
  {
    
    return;
  }

  String imageHex = "";
  for (uint32_t i = 0; i < fb->len; i++) {
    imageHex += String(fb->buf[i], HEX);
  }
  esp_camera_fb_return(fb);
  

This is the broker setup

 // MQTT Broker setup
  mqtt.setServer(broker, PORT);
  mqtt.setCallback(mqttCallback);

This is the sending process


// Split and send data
for (uint32_t i = 0; i < imageSize; i += CHUNK_SIZE) {
  // Get next chunk of data
  String chunk = imageHex.substring(i, min(i + CHUNK_SIZE, imageSize));

  // Convert chunk to char array
  char chunkChar[CHUNK_SIZE + 1];
  chunk.toCharArray(chunkChar, sizeof(chunkChar));

  // Send chunk over MQTT
  if (!mqtt.publish("sim800", chunkChar)) {
    Serial.println("Failed to send chunk via MQTT");
    break;  // or handle error in another way
  }
  mqtt.loop();
  //delay(100);  
}
  Serial.println("publish");
  mqtt.loop();
}

This: GitHub - knolleary/pubsubclient: A client library for the Arduino Ethernet Shield that provides support for MQTT. ?

So documentation here: https://pubsubclient.knolleary.net/ ?
Arduino Client for MQTT

As the name suggests, that sets only the internal buffer size - not the message size:

https://pubsubclient.knolleary.net/api#setBufferSize

1 Like

Thank you @awneil,
Couldn't I override or change the message size to send the message in one piece?

What does the documentation tell you?

That link does mention MQTT_MAX_MESSAGE_SIZE ...

The link says that the value is constant at the header file. I read that. But are we stuck at this point really? ESP32-CAM has 512kb RAM size :confused:

From Arduino Client for MQTT

boolean setBufferSize (size)

Sets the size, in bytes, of the internal send/receive buffer. This must be large enough to contain the full MQTT packet. When sending or receiving messages, the packet will contain the full topic string, the payload data and a small number of header bytes.

By default, it is set to 256 bytes - as defined by the MQTT_MAX_MESSAGE_SIZE constant in PubSubClient.h.

Note : setBufferSize returns a boolean flag to indicate whether it was able to reallocate the memory to change the buffer size. This means, unlike the other setXYZ functions that return a reference to the client, this function cannot be chained with those functions.

Parameters
  • size uint16_t - the size, in bytes, for the internal buffer
Returns
  • false - the buffer could not be resized
  • true - the buffer was resized

Note that the parameter for the function is an unsigned 16 bit integer so the maximum value is 65,536

What do you see if you print the return value from the function ? Does it report success or failure ?

1 Like

But you can change it?

Hi @UKHeliBob,

I tried to print the function status and I saw the "1" value on the console. That means true I think.
And also I tried to send the whole message and I failed.

AT+CIPSEND=0,12822
failed to send

The 12822 value is the length of my message.

Alright @awneil , I will try when I am hopeless because it sounds like corrupting the library. :smiley:

aka "configuring"

Of course, RTFM first to be aware of all implications ...

Hi @awneil
Thanks but I couldnt find the variable that is MQTT_MAX_MESSAGE_SIZE. Dou you know where it is?

here is the library

/*
 PubSubClient.h - A simple client for MQTT.
  Nick O'Leary
  http://knolleary.net
*/

#ifndef PubSubClient_h
#define PubSubClient_h

#include <Arduino.h>
#include "IPAddress.h"
#include "Client.h"
#include "Stream.h"

#define MQTT_VERSION_3_1      3
#define MQTT_VERSION_3_1_1    4

// MQTT_VERSION : Pick the version
//#define MQTT_VERSION MQTT_VERSION_3_1
#ifndef MQTT_VERSION
#define MQTT_VERSION MQTT_VERSION_3_1_1
#endif

// MQTT_MAX_PACKET_SIZE : Maximum packet size. Override with setBufferSize().
#ifndef MQTT_MAX_PACKET_SIZE
#define MQTT_MAX_PACKET_SIZE 256
#endif

// MQTT_KEEPALIVE : keepAlive interval in Seconds. Override with setKeepAlive()
#ifndef MQTT_KEEPALIVE
#define MQTT_KEEPALIVE 15
#endif

// MQTT_SOCKET_TIMEOUT: socket timeout interval in Seconds. Override with setSocketTimeout()
#ifndef MQTT_SOCKET_TIMEOUT
#define MQTT_SOCKET_TIMEOUT 15
#endif

// MQTT_MAX_TRANSFER_SIZE : limit how much data is passed to the network client
//  in each write call. Needed for the Arduino Wifi Shield. Leave undefined to
//  pass the entire MQTT packet in each write call.
//#define MQTT_MAX_TRANSFER_SIZE 80

// Possible values for client.state()
#define MQTT_CONNECTION_TIMEOUT     -4
#define MQTT_CONNECTION_LOST        -3
#define MQTT_CONNECT_FAILED         -2
#define MQTT_DISCONNECTED           -1
#define MQTT_CONNECTED               0
#define MQTT_CONNECT_BAD_PROTOCOL    1
#define MQTT_CONNECT_BAD_CLIENT_ID   2
#define MQTT_CONNECT_UNAVAILABLE     3
#define MQTT_CONNECT_BAD_CREDENTIALS 4
#define MQTT_CONNECT_UNAUTHORIZED    5

#define MQTTCONNECT     1 << 4  // Client request to connect to Server
#define MQTTCONNACK     2 << 4  // Connect Acknowledgment
#define MQTTPUBLISH     3 << 4  // Publish message
#define MQTTPUBACK      4 << 4  // Publish Acknowledgment
#define MQTTPUBREC      5 << 4  // Publish Received (assured delivery part 1)
#define MQTTPUBREL      6 << 4  // Publish Release (assured delivery part 2)
#define MQTTPUBCOMP     7 << 4  // Publish Complete (assured delivery part 3)
#define MQTTSUBSCRIBE   8 << 4  // Client Subscribe request
#define MQTTSUBACK      9 << 4  // Subscribe Acknowledgment
#define MQTTUNSUBSCRIBE 10 << 4 // Client Unsubscribe request
#define MQTTUNSUBACK    11 << 4 // Unsubscribe Acknowledgment
#define MQTTPINGREQ     12 << 4 // PING Request
#define MQTTPINGRESP    13 << 4 // PING Response
#define MQTTDISCONNECT  14 << 4 // Client is Disconnecting
#define MQTTReserved    15 << 4 // Reserved

#define MQTTQOS0        (0 << 1)
#define MQTTQOS1        (1 << 1)
#define MQTTQOS2        (2 << 1)

// Maximum size of fixed header and variable length size header
#define MQTT_MAX_HEADER_SIZE 5

#if defined(ESP8266) || defined(ESP32)
#include <functional>
#define MQTT_CALLBACK_SIGNATURE std::function<void(char*, uint8_t*, unsigned int)> callback
#else
#define MQTT_CALLBACK_SIGNATURE void (*callback)(char*, uint8_t*, unsigned int)
#endif

#define CHECK_STRING_LENGTH(l,s) if (l+2+strnlen(s, this->bufferSize) > this->bufferSize) {_client->stop();return false;}

class PubSubClient : public Print {
private:
   Client* _client;
   uint8_t* buffer;
   uint16_t bufferSize;
   uint16_t keepAlive;
   uint16_t socketTimeout;
   uint16_t nextMsgId;
   unsigned long lastOutActivity;
   unsigned long lastInActivity;
   bool pingOutstanding;
   MQTT_CALLBACK_SIGNATURE;
   uint32_t readPacket(uint8_t*);
   boolean readByte(uint8_t * result);
   boolean readByte(uint8_t * result, uint16_t * index);
   boolean write(uint8_t header, uint8_t* buf, uint16_t length);
   uint16_t writeString(const char* string, uint8_t* buf, uint16_t pos);
   // Build up the header ready to send
   // Returns the size of the header
   // Note: the header is built at the end of the first MQTT_MAX_HEADER_SIZE bytes, so will start
   //       (MQTT_MAX_HEADER_SIZE - <returned size>) bytes into the buffer
   size_t buildHeader(uint8_t header, uint8_t* buf, uint16_t length);
   IPAddress ip;
   const char* domain;
   uint16_t port;
   Stream* stream;
   int _state;
public:
   PubSubClient();
   PubSubClient(Client& client);
   PubSubClient(IPAddress, uint16_t, Client& client);
   PubSubClient(IPAddress, uint16_t, Client& client, Stream&);
   PubSubClient(IPAddress, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client);
   PubSubClient(IPAddress, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client, Stream&);
   PubSubClient(uint8_t *, uint16_t, Client& client);
   PubSubClient(uint8_t *, uint16_t, Client& client, Stream&);
   PubSubClient(uint8_t *, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client);
   PubSubClient(uint8_t *, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client, Stream&);
   PubSubClient(const char*, uint16_t, Client& client);
   PubSubClient(const char*, uint16_t, Client& client, Stream&);
   PubSubClient(const char*, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client);
   PubSubClient(const char*, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client, Stream&);

   ~PubSubClient();

   PubSubClient& setServer(IPAddress ip, uint16_t port);
   PubSubClient& setServer(uint8_t * ip, uint16_t port);
   PubSubClient& setServer(const char * domain, uint16_t port);
   PubSubClient& setCallback(MQTT_CALLBACK_SIGNATURE);
   PubSubClient& setClient(Client& client);
   PubSubClient& setStream(Stream& stream);
   PubSubClient& setKeepAlive(uint16_t keepAlive);
   PubSubClient& setSocketTimeout(uint16_t timeout);

   boolean setBufferSize(uint16_t size);
   uint16_t getBufferSize();

   boolean connect(const char* id);
   boolean connect(const char* id, const char* user, const char* pass);
   boolean connect(const char* id, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage);
   boolean connect(const char* id, const char* user, const char* pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage);
   boolean connect(const char* id, const char* user, const char* pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage, boolean cleanSession);
   void disconnect();
   boolean publish(const char* topic, const char* payload);
   boolean publish(const char* topic, const char* payload, boolean retained);
   boolean publish(const char* topic, const uint8_t * payload, unsigned int plength);
   boolean publish(const char* topic, const uint8_t * payload, unsigned int plength, boolean retained);
   boolean publish_P(const char* topic, const char* payload, boolean retained);
   boolean publish_P(const char* topic, const uint8_t * payload, unsigned int plength, boolean retained);
   // Start to publish a message.
   // This API:
   //   beginPublish(...)
   //   one or more calls to write(...)
   //   endPublish()
   // Allows for arbitrarily large payloads to be sent without them having to be copied into
   // a new buffer and held in memory at one time
   // Returns 1 if the message was started successfully, 0 if there was an error
   boolean beginPublish(const char* topic, unsigned int plength, boolean retained);
   // Finish off this publish message (started with beginPublish)
   // Returns 1 if the packet was sent successfully, 0 if there was an error
   int endPublish();
   // Write a single byte of payload (only to be used with beginPublish/endPublish)
   virtual size_t write(uint8_t);
   // Write size bytes from buffer into the payload (only to be used with beginPublish/endPublish)
   // Returns the number of bytes written
   virtual size_t write(const uint8_t *buffer, size_t size);
   boolean subscribe(const char* topic);
   boolean subscribe(const char* topic, uint8_t qos);
   boolean unsubscribe(const char* topic);
   boolean loop();
   boolean connected();
   int state();

};


#endif

Anyone knows this command? Is it working? I couldnt run it it gave me error

See MQTT AT Commands - ESP32 - — ESP-AT User Guide latest documentation

Please post an example of how you used it
What error did you get ?