@@ -222,6 +222,32 @@ int8_t Adafruit_MQTT::connect(const char *user, const char *pass) {
222222 return connect ();
223223}
224224
225+ void Adafruit_MQTT::processSubscriptionPacket (Adafruit_MQTT_Subscribe *sub) {
226+ if (sub->callback_uint32t != NULL ) {
227+ // execute callback in integer mode
228+ uint32_t data = 0 ;
229+ data = atoi ((char *)sub->lastread );
230+ sub->callback_uint32t (data);
231+ } else if (sub->callback_double != NULL ) {
232+ // execute callback in doublefloat mode
233+ double data = 0 ;
234+ data = atof ((char *)sub->lastread );
235+ sub->callback_double (data);
236+ } else if (sub->callback_buffer != NULL ) {
237+ // execute callback in buffer mode
238+ sub->callback_buffer ((char *)sub->lastread , sub->datalen );
239+ } else if (sub->callback_io != NULL ) {
240+ // execute callback in io mode
241+ ((sub->io_mqtt )->*(sub->callback_io ))((char *)sub->lastread , sub->datalen );
242+ } else {
243+ DEBUG_PRINTLN (
244+ " ERROR: Subscription packet did not have an associated callback" );
245+ return ;
246+ }
247+ // mark subscription message as "read""
248+ sub->new_message = false ;
249+ }
250+
225251uint16_t Adafruit_MQTT::processPacketsUntil (uint8_t *buffer,
226252 uint8_t waitforpackettype,
227253 uint16_t timeout) {
@@ -239,7 +265,9 @@ uint16_t Adafruit_MQTT::processPacketsUntil(uint8_t *buffer,
239265 return len;
240266 } else {
241267 if (packetType == MQTT_CTRL_PUBLISH) {
242- handleSubscriptionPacket (len);
268+ Adafruit_MQTT_Subscribe *sub = handleSubscriptionPacket (len);
269+ if (sub)
270+ processSubscriptionPacket (sub);
243271 } else {
244272 ERROR_PRINTLN (F (" Dropped a packet" ));
245273 }
@@ -478,27 +506,8 @@ void Adafruit_MQTT::processPackets(int16_t timeout) {
478506
479507 while (elapsed < (uint32_t )timeout) {
480508 Adafruit_MQTT_Subscribe *sub = readSubscription (timeout - elapsed);
481- if (sub) {
482- if (sub->callback_uint32t != NULL ) {
483- // huh lets do the callback in integer mode
484- uint32_t data = 0 ;
485- data = atoi ((char *)sub->lastread );
486- sub->callback_uint32t (data);
487- } else if (sub->callback_double != NULL ) {
488- // huh lets do the callback in doublefloat mode
489- double data = 0 ;
490- data = atof ((char *)sub->lastread );
491- sub->callback_double (data);
492- } else if (sub->callback_buffer != NULL ) {
493- // huh lets do the callback in buffer mode
494- sub->callback_buffer ((char *)sub->lastread , sub->datalen );
495- } else if (sub->callback_io != NULL ) {
496- // huh lets do the callback in io mode
497- ((sub->io_mqtt )->*(sub->callback_io ))((char *)sub->lastread ,
498- sub->datalen );
499- }
500- }
501-
509+ if (sub)
510+ processSubscriptionPacket (sub);
502511 // keep track over elapsed time
503512 endtime = millis ();
504513 if (endtime < starttime) {
0 commit comments