Commit 167278c0 authored by Ian Craggs's avatar Ian Craggs

First pass at MQTTAsync - sending V5 packets and receiving responses

parent 6983ad7f
...@@ -96,7 +96,7 @@ SYNC_TESTS = ${addprefix ${blddir}/test/,${TEST_FILES_C}} ...@@ -96,7 +96,7 @@ SYNC_TESTS = ${addprefix ${blddir}/test/,${TEST_FILES_C}}
TEST_FILES_CS = test3 TEST_FILES_CS = test3
SYNC_SSL_TESTS = ${addprefix ${blddir}/test/,${TEST_FILES_CS}} SYNC_SSL_TESTS = ${addprefix ${blddir}/test/,${TEST_FILES_CS}}
TEST_FILES_A = test4 test6 test9 test_mqtt4async TEST_FILES_A = test4 test45 test6 test9 test_mqtt4async
ASYNC_TESTS = ${addprefix ${blddir}/test/,${TEST_FILES_A}} ASYNC_TESTS = ${addprefix ${blddir}/test/,${TEST_FILES_A}}
TEST_FILES_AS = test5 TEST_FILES_AS = test5
......
...@@ -32,19 +32,8 @@ ...@@ -32,19 +32,8 @@
#include "MQTTClient.h" #include "MQTTClient.h"
#include "LinkedList.h" #include "LinkedList.h"
#include "MQTTClientPersistence.h" #include "MQTTClientPersistence.h"
/*BE
include "LinkedList"
BE*/
/*BE
def PUBLICATIONS
{
n32 ptr STRING open "topic"
n32 ptr DATA "payload"
n32 dec "payloadlen"
n32 dec "refcount"
}
BE*/
/** /**
* Stored publication data to minimize copying * Stored publication data to minimize copying
*/ */
...@@ -57,28 +46,6 @@ typedef struct ...@@ -57,28 +46,6 @@ typedef struct
int refcount; int refcount;
} Publications; } Publications;
/*BE
// This should get moved to MQTTProtocol, but the includes don't quite work yet
map MESSAGE_TYPES
{
"PUBREC" 5
"PUBREL" .
"PUBCOMP" .
}
def MESSAGES
{
n32 dec "qos"
n32 map bool "retain"
n32 dec "msgid"
n32 ptr PUBLICATIONS "publish"
n32 time "lastTouch"
n8 map MESSAGE_TYPES "nextMessageType"
n32 dec "len"
}
defList(MESSAGES)
BE*/
/** /**
* Client publication message data * Client publication message data
*/ */
...@@ -95,17 +62,6 @@ typedef struct ...@@ -95,17 +62,6 @@ typedef struct
int len; /**> length of the whole structure+data */ int len; /**> length of the whole structure+data */
} Messages; } Messages;
/*BE
def WILLMESSAGES
{
n32 ptr STRING open "topic"
n32 ptr DATA open "msg"
n32 dec "retained"
n32 dec "qos"
}
BE*/
/** /**
* Client will message data * Client will message data
*/ */
...@@ -118,40 +74,6 @@ typedef struct ...@@ -118,40 +74,6 @@ typedef struct
int qos; int qos;
} willMessages; } willMessages;
/*BE
map CLIENT_BITS
{
"cleansession" 1 : .
"connected" 2 : .
"good" 4 : .
"ping_outstanding" 8 : .
}
def CLIENTS
{
n32 ptr STRING open "clientID"
n32 ptr STRING open "username"
n32 ptr STRING open "password"
n32 map CLIENT_BITS "bits"
at 4 n8 bits 7:6 dec "connect_state"
at 8
n32 dec "socket"
n32 ptr "SSL"
n32 dec "msgID"
n32 dec "keepAliveInterval"
n32 dec "maxInflightMessages"
n32 ptr BRIDGECONNECTIONS "bridge_context"
n32 time "lastContact"
n32 ptr WILLMESSAGES "will"
n32 ptr MESSAGESList open "inboundMsgs"
n32 ptr MESSAGESList open "outboundMsgs"
n32 ptr MESSAGESList open "messageQueue"
n32 dec "discardedMsgs"
}
defList(CLIENTS)
BE*/
typedef struct typedef struct
{ {
int socket; int socket;
......
...@@ -56,6 +56,8 @@ static mutex_type heap_mutex = &heap_mutex_store; ...@@ -56,6 +56,8 @@ static mutex_type heap_mutex = &heap_mutex_store;
static heap_info state = {0, 0}; /**< global heap state information */ static heap_info state = {0, 0}; /**< global heap state information */
static int eyecatcher = 0x88888888; static int eyecatcher = 0x88888888;
/*#define HEAP_STACK 1 */
/** /**
* Each item on the heap is recorded with this structure. * Each item on the heap is recorded with this structure.
*/ */
...@@ -65,6 +67,9 @@ typedef struct ...@@ -65,6 +67,9 @@ typedef struct
int line; /**< the line no in the source file where it was allocated */ int line; /**< the line no in the source file where it was allocated */
void* ptr; /**< pointer to the allocated storage */ void* ptr; /**< pointer to the allocated storage */
size_t size; /**< size of the allocated storage */ size_t size; /**< size of the allocated storage */
#if defined(HEAP_STACK)
char* stack;
#endif
} storageElement; } storageElement;
static Tree heap; /**< Tree that holds the allocation records */ static Tree heap; /**< Tree that holds the allocation records */
...@@ -168,6 +173,17 @@ void* mymalloc(char* file, int line, size_t size) ...@@ -168,6 +173,17 @@ void* mymalloc(char* file, int line, size_t size)
} }
space += filenamelen; space += filenamelen;
strcpy(s->file, file); strcpy(s->file, file);
#if defined(HEAP_STACK)
#define STACK_LEN 300
if ((s->stack = malloc(STACK_LEN)) == NULL)
{
Log(LOG_ERROR, 13, errmsg);
free(s->file);
free(s);
return NULL;
}
StackTrace_get(Thread_getid(), s->stack, STACK_LEN);
#endif
s->line = line; s->line = line;
/* Add space for eyecatcher at each end */ /* Add space for eyecatcher at each end */
if ((s->ptr = malloc(size + 2*sizeof(int))) == NULL) if ((s->ptr = malloc(size + 2*sizeof(int))) == NULL)
...@@ -361,6 +377,9 @@ static void HeapScan(enum LOG_LEVELS log_level) ...@@ -361,6 +377,9 @@ static void HeapScan(enum LOG_LEVELS log_level)
storageElement* s = (storageElement*)(current->content); storageElement* s = (storageElement*)(current->content);
Log(log_level, -1, "Heap element size %d, line %d, file %s, ptr %p", s->size, s->line, s->file, s->ptr); Log(log_level, -1, "Heap element size %d, line %d, file %s, ptr %p", s->size, s->line, s->file, s->ptr);
Log(log_level, -1, " Content %*.s", (10 > current->size) ? s->size : 10, (char*)(((int*)s->ptr) + 1)); Log(log_level, -1, " Content %*.s", (10 > current->size) ? s->size : 10, (char*)(((int*)s->ptr) + 1));
#if defined(HEAP_STACK)
Log(log_level, -1, " Stack:\n%s", s->stack);
#endif
} }
Log(log_level, -1, "Heap scan end"); Log(log_level, -1, "Heap scan end");
Thread_unlock_mutex(heap_mutex); Thread_unlock_mutex(heap_mutex);
......
This diff is collapsed.
This diff is collapsed.
...@@ -505,6 +505,7 @@ exit: ...@@ -505,6 +505,7 @@ exit:
void MQTTClient_freeMessage(MQTTClient_message** message) void MQTTClient_freeMessage(MQTTClient_message** message)
{ {
FUNC_ENTRY; FUNC_ENTRY;
MQTTProperties_free(&(*message)->properties);
free((*message)->payload); free((*message)->payload);
free(*message); free(*message);
*message = NULL; *message = NULL;
...@@ -827,10 +828,12 @@ void Protocol_processPublication(Publish* publish, Clients* client) ...@@ -827,10 +828,12 @@ void Protocol_processPublication(Publish* publish, Clients* client)
{ {
qEntry* qe = NULL; qEntry* qe = NULL;
MQTTClient_message* mm = NULL; MQTTClient_message* mm = NULL;
MQTTClient_message initialized = MQTTClient_message_initializer;
FUNC_ENTRY; FUNC_ENTRY;
qe = malloc(sizeof(qEntry)); qe = malloc(sizeof(qEntry));
mm = malloc(sizeof(MQTTClient_message)); mm = malloc(sizeof(MQTTClient_message));
memcpy(mm, &initialized, sizeof(MQTTClient_message));
qe->msg = mm; qe->msg = mm;
...@@ -858,6 +861,9 @@ void Protocol_processPublication(Publish* publish, Clients* client) ...@@ -858,6 +861,9 @@ void Protocol_processPublication(Publish* publish, Clients* client)
mm->dup = publish->header.bits.dup; mm->dup = publish->header.bits.dup;
mm->msgid = publish->msgId; mm->msgid = publish->msgId;
if (publish->MQTTVersion >= 5)
mm->properties = MQTTProperties_copy(&publish->properties);
ListAppend(client->messageQueue, qe, sizeof(qe) + sizeof(mm) + mm->payloadlen + strlen(qe->topicName)+1); ListAppend(client->messageQueue, qe, sizeof(qe) + sizeof(mm) + mm->payloadlen + strlen(qe->topicName)+1);
#if !defined(NO_PERSISTENCE) #if !defined(NO_PERSISTENCE)
if (client->persistence) if (client->persistence)
...@@ -1781,9 +1787,10 @@ int MQTTClient_publish(MQTTClient handle, const char* topicName, int payloadlen, ...@@ -1781,9 +1787,10 @@ int MQTTClient_publish(MQTTClient handle, const char* topicName, int payloadlen,
MQTTResponse MQTTClient_publishMessage5(MQTTClient handle, const char* topicName, MQTTClient_message* message, MQTTResponse MQTTClient_publishMessage5(MQTTClient handle, const char* topicName, MQTTClient_message* message,
MQTTProperties* props, MQTTClient_deliveryToken* deliveryToken) MQTTClient_deliveryToken* deliveryToken)
{ {
MQTTResponse rc = {MQTTCLIENT_SUCCESS, NULL}; MQTTResponse rc = {MQTTCLIENT_SUCCESS, NULL};
MQTTProperties* props = NULL;
FUNC_ENTRY; FUNC_ENTRY;
if (message == NULL) if (message == NULL)
...@@ -1792,12 +1799,16 @@ MQTTResponse MQTTClient_publishMessage5(MQTTClient handle, const char* topicName ...@@ -1792,12 +1799,16 @@ MQTTResponse MQTTClient_publishMessage5(MQTTClient handle, const char* topicName
goto exit; goto exit;
} }
if (strncmp(message->struct_id, "MQTM", 4) != 0 || message->struct_version != 0) if (strncmp(message->struct_id, "MQTM", 4) != 0 ||
(message->struct_version != 0 && message->struct_version != 1))
{ {
rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE; rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
goto exit; goto exit;
} }
if (message->struct_version == 1)
props = &message->properties;
rc = MQTTClient_publish5(handle, topicName, message->payloadlen, message->payload, rc = MQTTClient_publish5(handle, topicName, message->payloadlen, message->payload,
message->qos, message->retained, props, deliveryToken); message->qos, message->retained, props, deliveryToken);
exit: exit:
...@@ -1809,7 +1820,13 @@ exit: ...@@ -1809,7 +1820,13 @@ exit:
int MQTTClient_publishMessage(MQTTClient handle, const char* topicName, MQTTClient_message* message, int MQTTClient_publishMessage(MQTTClient handle, const char* topicName, MQTTClient_message* message,
MQTTClient_deliveryToken* deliveryToken) MQTTClient_deliveryToken* deliveryToken)
{ {
MQTTResponse rc = MQTTClient_publishMessage5(handle, topicName, message, NULL, deliveryToken); MQTTResponse rc = {MQTTCLIENT_SUCCESS, NULL};
if (strncmp(message->struct_id, "MQTM", 4) != 0 ||
(message->struct_version != 0 && message->struct_version != 1))
return MQTTCLIENT_BAD_STRUCTURE;
rc = MQTTClient_publishMessage5(handle, topicName, message, deliveryToken);
return rc.reasonCode; return rc.reasonCode;
} }
......
...@@ -124,6 +124,7 @@ ...@@ -124,6 +124,7 @@
#include "MQTTProperties.h" #include "MQTTProperties.h"
#include "MQTTReasonCodes.h" #include "MQTTReasonCodes.h"
#include "MQTTSubscribeOpts.h"
#if !defined(NO_PERSISTENCE) #if !defined(NO_PERSISTENCE)
#include "MQTTClientPersistence.h" #include "MQTTClientPersistence.h"
#endif #endif
...@@ -256,7 +257,8 @@ typedef struct ...@@ -256,7 +257,8 @@ typedef struct
{ {
/** The eyecatcher for this structure. must be MQTM. */ /** The eyecatcher for this structure. must be MQTM. */
char struct_id[4]; char struct_id[4];
/** The version number of this structure. Must be 0 */ /** The version number of this structure. Must be 0 or 1
* 0 indicates no message properties */
int struct_version; int struct_version;
/** The length of the MQTT message payload in bytes. */ /** The length of the MQTT message payload in bytes. */
int payloadlen; int payloadlen;
...@@ -306,9 +308,13 @@ typedef struct ...@@ -306,9 +308,13 @@ typedef struct
* MQTT client and server. * MQTT client and server.
*/ */
int msgid; int msgid;
/**
* The MQTT V5 properties associated with the message.
*/
MQTTProperties properties;
} MQTTClient_message; } MQTTClient_message;
#define MQTTClient_message_initializer { {'M', 'Q', 'T', 'M'}, 0, 0, NULL, 0, 0, 0, 0 } #define MQTTClient_message_initializer { {'M', 'Q', 'T', 'M'}, 1, 0, NULL, 0, 0, 0, 0, MQTTProperties_initializer }
/** /**
* This is a callback function. The client application * This is a callback function. The client application
...@@ -831,20 +837,6 @@ DLLExport int MQTTClient_isConnected(MQTTClient handle); ...@@ -831,20 +837,6 @@ DLLExport int MQTTClient_isConnected(MQTTClient handle);
DLLExport int MQTTClient_subscribe(MQTTClient handle, const char* topic, int qos); DLLExport int MQTTClient_subscribe(MQTTClient handle, const char* topic, int qos);
typedef struct MQTTSubscribe_options
{
/** The eyecatcher for this structure. Must be MQSO. */
char struct_id[4];
/** The version number of this structure. Must be 0.
*/
int struct_version;
unsigned char noLocal; /* 0 or 1 */
unsigned char retainAsPublished; /* 0 or 1 */
unsigned char retainHandling; /* 0, 1 or 2 */
} MQTTSubscribe_options;
#define MQTTSubscribe_options_initializer { {'M', 'Q', 'S', 'O'}, 0, 0, 0, 0 }
DLLExport MQTTResponse MQTTClient_subscribe5(MQTTClient handle, const char* topic, int qos, DLLExport MQTTResponse MQTTClient_subscribe5(MQTTClient handle, const char* topic, int qos,
MQTTSubscribe_options* opts, MQTTProperties* props); MQTTSubscribe_options* opts, MQTTProperties* props);
...@@ -948,7 +940,7 @@ DLLExport int MQTTClient_publishMessage(MQTTClient handle, const char* topicName ...@@ -948,7 +940,7 @@ DLLExport int MQTTClient_publishMessage(MQTTClient handle, const char* topicName
DLLExport MQTTResponse MQTTClient_publishMessage5(MQTTClient handle, const char* topicName, MQTTClient_message* msg, DLLExport MQTTResponse MQTTClient_publishMessage5(MQTTClient handle, const char* topicName, MQTTClient_message* msg,
MQTTProperties* properties, MQTTClient_deliveryToken* dt); MQTTClient_deliveryToken* dt);
/** /**
* This function is called by the client application to synchronize execution * This function is called by the client application to synchronize execution
......
...@@ -80,7 +80,7 @@ pf new_packets[] = ...@@ -80,7 +80,7 @@ pf new_packets[] =
NULL, /**< MQTTPacket_subscribe*/ NULL, /**< MQTTPacket_subscribe*/
MQTTPacket_suback, /**< SUBACK */ MQTTPacket_suback, /**< SUBACK */
NULL, /**< MQTTPacket_unsubscribe*/ NULL, /**< MQTTPacket_unsubscribe*/
MQTTPacket_ack, /**< UNSUBACK */ MQTTPacket_unsuback, /**< UNSUBACK */
MQTTPacket_header_only, /**< PINGREQ */ MQTTPacket_header_only, /**< PINGREQ */
MQTTPacket_header_only, /**< PINGRESP */ MQTTPacket_header_only, /**< PINGRESP */
MQTTPacket_header_only /**< DISCONNECT */ MQTTPacket_header_only /**< DISCONNECT */
...@@ -637,6 +637,24 @@ void MQTTPacket_freeSuback(Suback* pack) ...@@ -637,6 +637,24 @@ void MQTTPacket_freeSuback(Suback* pack)
} }
/**
* Free allocated storage for a suback packet.
* @param pack pointer to the suback packet structure
*/
void MQTTPacket_freeUnsuback(Unsuback* pack)
{
FUNC_ENTRY;
if (pack->MQTTVersion >= MQTTVERSION_5)
{
MQTTProperties_free(&pack->properties);
if (pack->reasonCodes != NULL)
ListFree(pack->reasonCodes);
}
free(pack);
FUNC_EXIT;
}
/** /**
* Send an MQTT PUBREC packet down a socket. * Send an MQTT PUBREC packet down a socket.
* @param msgid the MQTT message id to use * @param msgid the MQTT message id to use
......
...@@ -32,6 +32,7 @@ typedef unsigned int bool; ...@@ -32,6 +32,7 @@ typedef unsigned int bool;
typedef void* (*pf)(int, unsigned char, char*, size_t); typedef void* (*pf)(int, unsigned char, char*, size_t);
#include "MQTTProperties.h" #include "MQTTProperties.h"
#include "MQTTReasonCodes.h"
enum errors enum errors
{ {
...@@ -177,6 +178,19 @@ typedef struct ...@@ -177,6 +178,19 @@ typedef struct
} Suback; } Suback;
/**
* Data for an MQTT V5 unsuback packet.
*/
typedef struct
{
Header header; /**< MQTT header byte */
int msgId; /**< MQTT message id */
int MQTTVersion; /**< the version of MQTT */
MQTTProperties properties; /**< MQTT 5.0 properties. Not used for MQTT < 5.0 */
List* reasonCodes; /**< list of reason codes */
} Unsuback;
/** /**
* Data for a publish packet. * Data for a publish packet.
*/ */
...@@ -209,7 +223,6 @@ typedef Ack Puback; ...@@ -209,7 +223,6 @@ typedef Ack Puback;
typedef Ack Pubrec; typedef Ack Pubrec;
typedef Ack Pubrel; typedef Ack Pubrel;
typedef Ack Pubcomp; typedef Ack Pubcomp;
typedef Ack Unsuback;
int MQTTPacket_encode(char* buf, size_t length); int MQTTPacket_encode(char* buf, size_t length);
int MQTTPacket_decode(networkHandles* net, size_t* value); int MQTTPacket_decode(networkHandles* net, size_t* value);
...@@ -237,6 +250,7 @@ int MQTTPacket_send_puback(int msgid, networkHandles* net, const char* clientID) ...@@ -237,6 +250,7 @@ int MQTTPacket_send_puback(int msgid, networkHandles* net, const char* clientID)
void* MQTTPacket_ack(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen); void* MQTTPacket_ack(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen);
void MQTTPacket_freeSuback(Suback* pack); void MQTTPacket_freeSuback(Suback* pack);
void MQTTPacket_freeUnsuback(Unsuback* pack);
int MQTTPacket_send_pubrec(int msgid, networkHandles* net, const char* clientID); int MQTTPacket_send_pubrec(int msgid, networkHandles* net, const char* clientID);
int MQTTPacket_send_pubrel(int msgid, int dup, networkHandles* net, const char* clientID); int MQTTPacket_send_pubrel(int msgid, int dup, networkHandles* net, const char* clientID);
int MQTTPacket_send_pubcomp(int msgid, networkHandles* net, const char* clientID); int MQTTPacket_send_pubcomp(int msgid, networkHandles* net, const char* clientID);
......
...@@ -56,14 +56,14 @@ int MQTTPacket_send_connect(Clients* client, int MQTTVersion, ...@@ -56,14 +56,14 @@ int MQTTPacket_send_connect(Clients* client, int MQTTVersion,
packet.header.byte = 0; packet.header.byte = 0;
packet.header.bits.type = CONNECT; packet.header.bits.type = CONNECT;
len = ((MQTTVersion == 3) ? 12 : 10) + (int)strlen(client->clientID)+2; len = ((MQTTVersion == MQTTVERSION_3_1) ? 12 : 10) + (int)strlen(client->clientID)+2;
if (client->will) if (client->will)
len += (int)strlen(client->will->topic)+2 + client->will->payloadlen+2; len += (int)strlen(client->will->topic)+2 + client->will->payloadlen+2;
if (client->username) if (client->username)
len += (int)strlen(client->username)+2; len += (int)strlen(client->username)+2;
if (client->password) if (client->password)
len += client->passwordlen+2; len += client->passwordlen+2;
if (MQTTVersion >= 5) if (MQTTVersion >= MQTTVERSION_5)
{ {
len += MQTTProperties_len(connectProperties); len += MQTTProperties_len(connectProperties);
if (client->will && willProperties) if (client->will && willProperties)
...@@ -71,12 +71,12 @@ int MQTTPacket_send_connect(Clients* client, int MQTTVersion, ...@@ -71,12 +71,12 @@ int MQTTPacket_send_connect(Clients* client, int MQTTVersion,
} }
ptr = buf = malloc(len); ptr = buf = malloc(len);
if (MQTTVersion == 3) if (MQTTVersion == MQTTVERSION_3_1)
{ {
writeUTF(&ptr, "MQIsdp"); writeUTF(&ptr, "MQIsdp");
writeChar(&ptr, (char)3); writeChar(&ptr, (char)MQTTVERSION_3_1);
} }
else if (MQTTVersion == 4 || MQTTVersion == 5) else if (MQTTVersion == MQTTVERSION_3_1_1 || MQTTVersion == MQTTVERSION_5)
{ {
writeUTF(&ptr, "MQTT"); writeUTF(&ptr, "MQTT");
writeChar(&ptr, (char)MQTTVersion); writeChar(&ptr, (char)MQTTVersion);
...@@ -99,12 +99,12 @@ int MQTTPacket_send_connect(Clients* client, int MQTTVersion, ...@@ -99,12 +99,12 @@ int MQTTPacket_send_connect(Clients* client, int MQTTVersion,
writeChar(&ptr, packet.flags.all); writeChar(&ptr, packet.flags.all);
writeInt(&ptr, client->keepAliveInterval); writeInt(&ptr, client->keepAliveInterval);
if (MQTTVersion == 5) if (MQTTVersion >= MQTTVERSION_5)
MQTTProperties_write(&ptr, connectProperties); MQTTProperties_write(&ptr, connectProperties);
writeUTF(&ptr, client->clientID); writeUTF(&ptr, client->clientID);
if (client->will) if (client->will)
{ {
if (MQTTVersion == 5) if (MQTTVersion >= MQTTVERSION_5)
MQTTProperties_write(&ptr, willProperties); MQTTProperties_write(&ptr, willProperties);
writeUTF(&ptr, client->will->topic); writeUTF(&ptr, client->will->topic);
writeData(&ptr, client->will->payload, client->will->payloadlen); writeData(&ptr, client->will->payload, client->will->payloadlen);
...@@ -230,13 +230,13 @@ int MQTTPacket_send_subscribe(List* topics, List* qoss, MQTTSubscribe_options* o ...@@ -230,13 +230,13 @@ int MQTTPacket_send_subscribe(List* topics, List* qoss, MQTTSubscribe_options* o
datalen = 2 + topics->count * 3; /* utf length + char qos == 3 */ datalen = 2 + topics->count * 3; /* utf length + char qos == 3 */
while (ListNextElement(topics, &elem)) while (ListNextElement(topics, &elem))
datalen += (int)strlen((char*)(elem->content)); datalen += (int)strlen((char*)(elem->content));
if (client->MQTTVersion >= 5) if (client->MQTTVersion >= MQTTVERSION_5)
datalen += MQTTProperties_len(props); datalen += MQTTProperties_len(props);
ptr = data = malloc(datalen); ptr = data = malloc(datalen);
writeInt(&ptr, msgid); writeInt(&ptr, msgid);
if (client->MQTTVersion >= 5) if (client->MQTTVersion >= MQTTVERSION_5)
MQTTProperties_write(&ptr, props); MQTTProperties_write(&ptr, props);
elem = NULL; elem = NULL;
...@@ -247,13 +247,13 @@ int MQTTPacket_send_subscribe(List* topics, List* qoss, MQTTSubscribe_options* o ...@@ -247,13 +247,13 @@ int MQTTPacket_send_subscribe(List* topics, List* qoss, MQTTSubscribe_options* o
ListNextElement(qoss, &qosElem); ListNextElement(qoss, &qosElem);
writeUTF(&ptr, (char*)(elem->content)); writeUTF(&ptr, (char*)(elem->content));
subopts = *(int*)(qosElem->content); subopts = *(int*)(qosElem->content);
if (client->MQTTVersion >= 5 && opts != NULL) if (client->MQTTVersion >= MQTTVERSION_5 && opts != NULL)
{ {
subopts |= (opts[i].noLocal << 2); /* 1 bit */ subopts |= (opts[i].noLocal << 2); /* 1 bit */
subopts |= (opts[i].retainAsPublished << 3); /* 1 bit */ subopts |= (opts[i].retainAsPublished << 3); /* 1 bit */
subopts |= (opts[i].retainHandling << 4); /* 2 bits */ subopts |= (opts[i].retainHandling << 4); /* 2 bits */
} }
writeChar(&ptr, *(int*)(qosElem->content)); writeChar(&ptr, subopts);
++i; ++i;
} }
rc = MQTTPacket_send(&client->net, header, data, datalen, 1); rc = MQTTPacket_send(&client->net, header, data, datalen, 1);
...@@ -333,13 +333,13 @@ int MQTTPacket_send_unsubscribe(List* topics, MQTTProperties* props, int msgid, ...@@ -333,13 +333,13 @@ int MQTTPacket_send_unsubscribe(List* topics, MQTTProperties* props, int msgid,
datalen = 2 + topics->count * 2; /* utf length == 2 */ datalen = 2 + topics->count * 2; /* utf length == 2 */
while (ListNextElement(topics, &elem)) while (ListNextElement(topics, &elem))
datalen += (int)strlen((char*)(elem->content)); datalen += (int)strlen((char*)(elem->content));
if (client->MQTTVersion >= 5) if (client->MQTTVersion >= MQTTVERSION_5)
datalen += MQTTProperties_len(props); datalen += MQTTProperties_len(props);
ptr = data = malloc(datalen); ptr = data = malloc(datalen);
writeInt(&ptr, msgid); writeInt(&ptr, msgid);
if (client->MQTTVersion >= 5) if (client->MQTTVersion >= MQTTVERSION_5)
MQTTProperties_write(&ptr, props); MQTTProperties_write(&ptr, props);
elem = NULL; elem = NULL;
...@@ -352,3 +352,46 @@ int MQTTPacket_send_unsubscribe(List* topics, MQTTProperties* props, int msgid, ...@@ -352,3 +352,46 @@ int MQTTPacket_send_unsubscribe(List* topics, MQTTProperties* props, int msgid,
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
return rc; return rc;
} }
/**
* Function used in the new packets table to create unsuback packets.
* @param MQTTVersion the version of MQTT
* @param aHeader the MQTT header byte
* @param data the rest of the packet
* @param datalen the length of the rest of the packet
* @return pointer to the packet structure
*/
void* MQTTPacket_unsuback(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
{
Unsuback* pack = malloc(sizeof(Unsuback));
char* curdata = data;
char* enddata = &data[datalen];
FUNC_ENTRY;
pack->MQTTVersion = MQTTVersion;
pack->header.byte = aHeader;
pack->msgId = readInt(&curdata);
pack->reasonCodes = NULL;
if (MQTTVersion >= MQTTVERSION_5)
{
MQTTProperties props = MQTTProperties_initializer;
pack->properties = props;
if (MQTTProperties_read(&pack->properties, &curdata, enddata) != 1)
{
free(pack->properties.array);
free(pack);
pack = NULL; /* signal protocol error */
}
pack->reasonCodes = ListInitialize();
while ((size_t)(curdata - data) < datalen)
{
enum MQTTReasonCodes* newrc;
newrc = malloc(sizeof(enum MQTTReasonCodes));
*newrc = (enum MQTTReasonCodes)readChar(&curdata);
ListAppend(pack->reasonCodes, newrc, sizeof(enum MQTTReasonCodes));
}
}
FUNC_EXIT;
return pack;
}
...@@ -34,5 +34,6 @@ int MQTTPacket_send_subscribe(List* topics, List* qoss, MQTTSubscribe_options* o ...@@ -34,5 +34,6 @@ int MQTTPacket_send_subscribe(List* topics, List* qoss, MQTTSubscribe_options* o
void* MQTTPacket_suback(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen); void* MQTTPacket_suback(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen);
int MQTTPacket_send_unsubscribe(List* topics, MQTTProperties* props, int msgid, int dup, Clients* client); int MQTTPacket_send_unsubscribe(List* topics, MQTTProperties* props, int msgid, int dup, Clients* client);
void* MQTTPacket_unsuback(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen);
#endif #endif
...@@ -61,7 +61,7 @@ struct nameToType ...@@ -61,7 +61,7 @@ struct nameToType
}; };
static char* datadup(MQTTLenString* str) static char* datadup(const MQTTLenString* str)
{ {
char* temp = malloc(str->len); char* temp = malloc(str->len);
memcpy(temp, str->data, str->len); memcpy(temp, str->data, str->len);
...@@ -92,12 +92,14 @@ int MQTTProperties_len(MQTTProperties* props) ...@@ -92,12 +92,14 @@ int MQTTProperties_len(MQTTProperties* props)
} }
int MQTTProperties_add(MQTTProperties* props, MQTTProperty* prop) int MQTTProperties_add(MQTTProperties* props, const MQTTProperty* prop)
{ {
int rc = 0, type; int rc = 0, type;
if ((type = MQTTProperty_getType(prop->identifier)) < 0) if ((type = MQTTProperty_getType(prop->identifier)) < 0)
{ {
printf("id %d\n", prop->identifier);
StackTrace_printStack(stdout);
rc = MQTT_INVALID_PROPERTY_ID; rc = MQTT_INVALID_PROPERTY_ID;
goto exit; goto exit;
} }
...@@ -201,7 +203,7 @@ int MQTTProperty_write(char** pptr, MQTTProperty* prop) ...@@ -201,7 +203,7 @@ int MQTTProperty_write(char** pptr, MQTTProperty* prop)
* @param properties pointer to the property list, can be NULL * @param properties pointer to the property list, can be NULL
* @return whether the write succeeded or not, number of bytes written or < 0 * @return whether the write succeeded or not, number of bytes written or < 0
*/ */
int MQTTProperties_write(char** pptr, MQTTProperties* properties) int MQTTProperties_write(char** pptr, const MQTTProperties* properties)
{ {
int rc = -1; int rc = -1;
int i = 0, len = 0; int i = 0, len = 0;
...@@ -281,7 +283,7 @@ int MQTTProperties_read(MQTTProperties* properties, char** pptr, char* enddata) ...@@ -281,7 +283,7 @@ int MQTTProperties_read(MQTTProperties* properties, char** pptr, char* enddata)
int remlength = 0; int remlength = 0;
FUNC_ENTRY; FUNC_ENTRY;
properties->count = 0; /* we assume an initialized properties structure */
if (enddata - (*pptr) > 0) /* enough length to read the VBI? */ if (enddata - (*pptr) > 0) /* enough length to read the VBI? */
{ {
*pptr += MQTTPacket_decodeBuf(*pptr, &remlength); *pptr += MQTTPacket_decodeBuf(*pptr, &remlength);
...@@ -303,6 +305,12 @@ int MQTTProperties_read(MQTTProperties* properties, char** pptr, char* enddata) ...@@ -303,6 +305,12 @@ int MQTTProperties_read(MQTTProperties* properties, char** pptr, char* enddata)
rc = 1; /* data read successfully */ rc = 1; /* data read successfully */
} }
if (rc != 1 && properties->array != NULL)
{
free(properties->array);
properties->array = NULL;
properties->max_count = properties->count = 0;
}
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
return rc; return rc;
} }
...@@ -362,6 +370,9 @@ DLLExport void MQTTProperties_free(MQTTProperties* props) ...@@ -362,6 +370,9 @@ DLLExport void MQTTProperties_free(MQTTProperties* props)
{ {
int i = 0; int i = 0;
FUNC_ENTRY;
if (props == NULL)
goto exit;
for (i = 0; i < props->count; ++i) for (i = 0; i < props->count; ++i)
{ {
int id = props->array[i].identifier; int id = props->array[i].identifier;
...@@ -381,34 +392,25 @@ DLLExport void MQTTProperties_free(MQTTProperties* props) ...@@ -381,34 +392,25 @@ DLLExport void MQTTProperties_free(MQTTProperties* props)
if (props->array) if (props->array)
free(props->array); free(props->array);
memset(props, '\0', sizeof(MQTTProperties)); /* zero all fields */ memset(props, '\0', sizeof(MQTTProperties)); /* zero all fields */
exit:
FUNC_EXIT;
} }
MQTTProperties MQTTProperties_copy(MQTTProperties* props) MQTTProperties MQTTProperties_copy(const MQTTProperties* props)
{ {
int i = 0; int i = 0;
MQTTProperties result = MQTTProperties_initializer; MQTTProperties result = MQTTProperties_initializer;
for (i = 0; i > props->count; ++i) FUNC_ENTRY;
for (i = 0; i < props->count; ++i)
{ {
int id = props->array[i].identifier; int rc = 0;
int type = MQTTProperty_getType(id);
if ((rc = MQTTProperties_add(&result, &props->array[i])) != 0)
MQTTProperties_add(&result, &props->array[i]); Log(LOG_ERROR, -1, "Error from MQTTProperties add %d", rc);
switch (type)
{
case BINARY_DATA:
case UTF_8_ENCODED_STRING:
case UTF_8_STRING_PAIR:
result.array[i].value.data.data = malloc(result.array[i].value.data.len);
memcpy(result.array[i].value.data.data, props->array[i].value.data.data, props->array[i].value.data.len);
if (type == UTF_8_STRING_PAIR)
{
result.array[i].value.value.data = malloc(result.array[i].value.value.len);
memcpy(result.array[i].value.value.data, props->array[i].value.value.data, props->array[i].value.value.len);
}
break;
}
} }
FUNC_EXIT;
return result; return result;
} }
...@@ -109,14 +109,14 @@ int MQTTProperties_len(MQTTProperties* props); ...@@ -109,14 +109,14 @@ int MQTTProperties_len(MQTTProperties* props);
* @param prop * @param prop
* @return whether the write succeeded or not, number of bytes written or < 0 * @return whether the write succeeded or not, number of bytes written or < 0
*/ */
DLLExport int MQTTProperties_add(MQTTProperties* props, MQTTProperty* prop); DLLExport int MQTTProperties_add(MQTTProperties* props, const MQTTProperty* prop);
int MQTTProperties_write(char** pptr, MQTTProperties* properties); int MQTTProperties_write(char** pptr, const MQTTProperties* properties);
int MQTTProperties_read(MQTTProperties* properties, char** pptr, char* enddata); int MQTTProperties_read(MQTTProperties* properties, char** pptr, char* enddata);
DLLExport void MQTTProperties_free(MQTTProperties* properties); DLLExport void MQTTProperties_free(MQTTProperties* properties);
MQTTProperties MQTTProperties_copy(MQTTProperties* props); MQTTProperties MQTTProperties_copy(const MQTTProperties* props);
#endif /* MQTTPROPERTIES_H */ #endif /* MQTTPROPERTIES_H */
...@@ -301,11 +301,16 @@ int MQTTProtocol_handlePublishes(void* pack, int sock) ...@@ -301,11 +301,16 @@ int MQTTProtocol_handlePublishes(void* pack, int sock)
m->msgid = publish->msgId; m->msgid = publish->msgId;
m->qos = publish->header.bits.qos; m->qos = publish->header.bits.qos;
m->retain = publish->header.bits.retain; m->retain = publish->header.bits.retain;
m->MQTTVersion = publish->MQTTVersion;
if (m->MQTTVersion >= MQTTVERSION_5)
m->properties = MQTTProperties_copy(&publish->properties);
m->nextMessageType = PUBREL; m->nextMessageType = PUBREL;
if ( ( listElem = ListFindItem(client->inboundMsgs, &(m->msgid), messageIDCompare) ) != NULL ) if ( ( listElem = ListFindItem(client->inboundMsgs, &(m->msgid), messageIDCompare) ) != NULL )
{ /* discard queued publication with same msgID that the current incoming message */ { /* discard queued publication with same msgID that the current incoming message */
Messages* msg = (Messages*)(listElem->content); Messages* msg = (Messages*)(listElem->content);
MQTTProtocol_removePublication(msg->publish); MQTTProtocol_removePublication(msg->publish);
if (msg->MQTTVersion >= MQTTVERSION_5)
MQTTProperties_free(&msg->properties);
ListInsert(client->inboundMsgs, m, sizeof(Messages) + len, listElem); ListInsert(client->inboundMsgs, m, sizeof(Messages) + len, listElem);
ListRemove(client->inboundMsgs, msg); ListRemove(client->inboundMsgs, msg);
} else } else
...@@ -349,6 +354,8 @@ int MQTTProtocol_handlePubacks(void* pack, int sock) ...@@ -349,6 +354,8 @@ int MQTTProtocol_handlePubacks(void* pack, int sock)
rc = MQTTPersistence_remove(client, PERSISTENCE_PUBLISH_SENT, m->qos, puback->msgId); rc = MQTTPersistence_remove(client, PERSISTENCE_PUBLISH_SENT, m->qos, puback->msgId);
#endif #endif
MQTTProtocol_removePublication(m->publish); MQTTProtocol_removePublication(m->publish);
if (m->MQTTVersion >= MQTTVERSION_5)
MQTTProperties_free(&m->properties);
ListRemove(client->outboundMsgs, m); ListRemove(client->outboundMsgs, m);
} }
} }
...@@ -460,10 +467,15 @@ int MQTTProtocol_handlePubrels(void* pack, int sock) ...@@ -460,10 +467,15 @@ int MQTTProtocol_handlePubrels(void* pack, int sock)
publish.topiclen = m->publish->topiclen; publish.topiclen = m->publish->topiclen;
publish.payload = m->publish->payload; publish.payload = m->publish->payload;
publish.payloadlen = m->publish->payloadlen; publish.payloadlen = m->publish->payloadlen;
publish.MQTTVersion = m->MQTTVersion;
if (publish.MQTTVersion >= MQTTVERSION_5)
publish.properties = m->properties;
Protocol_processPublication(&publish, client); Protocol_processPublication(&publish, client);
#if !defined(NO_PERSISTENCE) #if !defined(NO_PERSISTENCE)
rc += MQTTPersistence_remove(client, PERSISTENCE_PUBLISH_RECEIVED, m->qos, pubrel->msgId); rc += MQTTPersistence_remove(client, PERSISTENCE_PUBLISH_RECEIVED, m->qos, pubrel->msgId);
#endif #endif
if (m->MQTTVersion >= MQTTVERSION_5)
MQTTProperties_free(&m->properties);
ListRemove(&(state.publications), m->publish); ListRemove(&(state.publications), m->publish);
ListRemove(client->inboundMsgs, m); ListRemove(client->inboundMsgs, m);
++(state.msgs_received); ++(state.msgs_received);
...@@ -515,6 +527,8 @@ int MQTTProtocol_handlePubcomps(void* pack, int sock) ...@@ -515,6 +527,8 @@ int MQTTProtocol_handlePubcomps(void* pack, int sock)
rc = MQTTPersistence_remove(client, PERSISTENCE_PUBLISH_SENT, m->qos, pubcomp->msgId); rc = MQTTPersistence_remove(client, PERSISTENCE_PUBLISH_SENT, m->qos, pubcomp->msgId);
#endif #endif
MQTTProtocol_removePublication(m->publish); MQTTProtocol_removePublication(m->publish);
if (m->MQTTVersion >= MQTTVERSION_5)
MQTTProperties_free(&m->properties);
ListRemove(client->outboundMsgs, m); ListRemove(client->outboundMsgs, m);
(++state.msgs_sent); (++state.msgs_sent);
} }
...@@ -741,9 +755,9 @@ void MQTTProtocol_emptyMessageList(List* msgList) ...@@ -741,9 +755,9 @@ void MQTTProtocol_emptyMessageList(List* msgList)
while (ListNextElement(msgList, &current)) while (ListNextElement(msgList, &current))
{ {
Messages* m = (Messages*)(current->content); Messages* m = (Messages*)(current->content);
MQTTProtocol_removePublication(m->publish);
if (m->MQTTVersion >= MQTTVERSION_5) if (m->MQTTVersion >= MQTTVERSION_5)
MQTTProperties_free(&m->properties); MQTTProperties_free(&m->properties);
MQTTProtocol_removePublication(m->publish);
} }
ListEmpty(msgList); ListEmpty(msgList);
FUNC_EXIT; FUNC_EXIT;
......
...@@ -240,9 +240,7 @@ int MQTTProtocol_handleUnsubacks(void* pack, int sock) ...@@ -240,9 +240,7 @@ int MQTTProtocol_handleUnsubacks(void* pack, int sock)
FUNC_ENTRY; FUNC_ENTRY;
client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content); client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
Log(LOG_PROTOCOL, 24, NULL, sock, client->clientID, unsuback->msgId); Log(LOG_PROTOCOL, 24, NULL, sock, client->clientID, unsuback->msgId);
if (unsuback->MQTTVersion >= MQTTVERSION_5) MQTTPacket_freeUnsuback(unsuback);
MQTTProperties_free(&unsuback->properties);
free(unsuback);
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
return rc; return rc;
} }
......
...@@ -14,6 +14,9 @@ ...@@ -14,6 +14,9 @@
* Ian Craggs - initial API and implementation and/or initial documentation * Ian Craggs - initial API and implementation and/or initial documentation
*******************************************************************************/ *******************************************************************************/
#if !defined(MQTTREASONCODES_H)
#define MQTTREASONCODES_H
enum MQTTReasonCodes { enum MQTTReasonCodes {
SUCCESS = 0, SUCCESS = 0,
NORMAL_DISCONNECTION = 0, NORMAL_DISCONNECTION = 0,
...@@ -61,3 +64,5 @@ enum MQTTReasonCodes { ...@@ -61,3 +64,5 @@ enum MQTTReasonCodes {
SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED = 161, SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED = 161,
WILDCARD_SUBSCRIPTION_NOT_SUPPORTED = 162 WILDCARD_SUBSCRIPTION_NOT_SUPPORTED = 162
}; };
#endif
/*******************************************************************************
* Copyright (c) 2018 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
*******************************************************************************/
#if !defined(SUBOPTS_H)
#define SUBOPTS_H
typedef struct MQTTSubscribe_options
{
/** The eyecatcher for this structure. Must be MQSO. */
char struct_id[4];
/** The version number of this structure. Must be 0.
*/
int struct_version;
unsigned char noLocal; /* 0 or 1 */
unsigned char retainAsPublished; /* 0 or 1 */
unsigned char retainHandling; /* 0, 1 or 2 */
} MQTTSubscribe_options;
#define MQTTSubscribe_options_initializer { {'M', 'Q', 'S', 'O'}, 0, 0, 0, 0 }
#endif
...@@ -274,6 +274,26 @@ SET_TESTS_PROPERTIES( ...@@ -274,6 +274,26 @@ SET_TESTS_PROPERTIES(
PROPERTIES TIMEOUT 540 PROPERTIES TIMEOUT 540
) )
ADD_EXECUTABLE(
test45
test45.c
)
TARGET_LINK_LIBRARIES(
test45
paho-mqtt3a
)
ADD_TEST(
NAME test45-1-basic-connect-subscribe-receive
COMMAND test45 "--test_no" "1" "--connection" ${MQTT_TEST_BROKER}
)
SET_TESTS_PROPERTIES(
test45-1-basic-connect-subscribe-receive
PROPERTIES TIMEOUT 540
)
IF (PAHO_WITH_SSL) IF (PAHO_WITH_SSL)
ADD_EXECUTABLE( ADD_EXECUTABLE(
test5 test5
......
...@@ -297,10 +297,9 @@ void test1_sendAndReceive(MQTTClient* c, int qos, char* test_topic) ...@@ -297,10 +297,9 @@ void test1_sendAndReceive(MQTTClient* c, int qos, char* test_topic)
char* topicName = NULL; char* topicName = NULL;
int topicLen; int topicLen;
int i = 0; int i = 0;
int iterations = 50; int iterations = 1; //50;
int rc; int rc;
MQTTResponse resp; MQTTResponse resp;
MQTTProperties props = MQTTProperties_initializer;
MQTTProperty property; MQTTProperty property;
MyLog(LOGA_DEBUG, "%d messages at QoS %d", iterations, qos); MyLog(LOGA_DEBUG, "%d messages at QoS %d", iterations, qos);
...@@ -314,15 +313,15 @@ void test1_sendAndReceive(MQTTClient* c, int qos, char* test_topic) ...@@ -314,15 +313,15 @@ void test1_sendAndReceive(MQTTClient* c, int qos, char* test_topic)
property.value.data.len = strlen(property.value.data.data); property.value.data.len = strlen(property.value.data.data);
property.value.value.data = "test user property value"; property.value.value.data = "test user property value";
property.value.value.len = strlen(property.value.value.data); property.value.value.len = strlen(property.value.value.data);
MQTTProperties_add(&props, &property); MQTTProperties_add(&pubmsg.properties, &property);
for (i = 0; i < iterations; ++i) for (i = 0; i < iterations; ++i)
{ {
if (i % 10 == 0) if (i % 10 == 0)
resp = MQTTClient_publish5(c, test_topic, pubmsg.payloadlen, pubmsg.payload, pubmsg.qos, pubmsg.retained, resp = MQTTClient_publish5(c, test_topic, pubmsg.payloadlen, pubmsg.payload, pubmsg.qos, pubmsg.retained,
&props, &dt); &pubmsg.properties, &dt);
else else
resp = MQTTClient_publishMessage5(c, test_topic, &pubmsg, &props, &dt); resp = MQTTClient_publishMessage5(c, test_topic, &pubmsg, &dt);
assert("Good rc from publish", resp.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", resp.reasonCode); assert("Good rc from publish", resp.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", resp.reasonCode);
if (qos > 0) if (qos > 0)
...@@ -330,7 +329,6 @@ void test1_sendAndReceive(MQTTClient* c, int qos, char* test_topic) ...@@ -330,7 +329,6 @@ void test1_sendAndReceive(MQTTClient* c, int qos, char* test_topic)
rc = MQTTClient_waitForCompletion(c, dt, 5000L); rc = MQTTClient_waitForCompletion(c, dt, 5000L);
assert("Good rc from waitforCompletion", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc); assert("Good rc from waitforCompletion", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
} }
rc = MQTTClient_receive(c, &topicName, &topicLen, &m, 5000); rc = MQTTClient_receive(c, &topicName, &topicLen, &m, 5000);
assert("Good rc from receive", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc); assert("Good rc from receive", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
if (topicName) if (topicName)
...@@ -343,6 +341,7 @@ void test1_sendAndReceive(MQTTClient* c, int qos, char* test_topic) ...@@ -343,6 +341,7 @@ void test1_sendAndReceive(MQTTClient* c, int qos, char* test_topic)
MyLog(LOGA_INFO, "Error: wrong data - received lengths %d %d", pubmsg.payloadlen, m->payloadlen); MyLog(LOGA_INFO, "Error: wrong data - received lengths %d %d", pubmsg.payloadlen, m->payloadlen);
break; break;
} }
assert("Property count should be > 0", m->properties.count > 0, "property count was %d", m->properties.count);
MQTTClient_free(topicName); MQTTClient_free(topicName);
MQTTClient_freeMessage(&m); MQTTClient_freeMessage(&m);
} }
...@@ -360,7 +359,7 @@ void test1_sendAndReceive(MQTTClient* c, int qos, char* test_topic) ...@@ -360,7 +359,7 @@ void test1_sendAndReceive(MQTTClient* c, int qos, char* test_topic)
MQTTClient_receive(c, &topicName, &topicLen, &m, 2000); MQTTClient_receive(c, &topicName, &topicLen, &m, 2000);
} }
MQTTProperties_free(&props); MQTTProperties_free(&pubmsg.properties);
} }
void logProperties(MQTTProperties *props) void logProperties(MQTTProperties *props)
...@@ -389,11 +388,11 @@ void logProperties(MQTTProperties *props) ...@@ -389,11 +388,11 @@ void logProperties(MQTTProperties *props)
break; break;
case BINARY_DATA: case BINARY_DATA:
case UTF_8_ENCODED_STRING: case UTF_8_ENCODED_STRING:
MyLog(LOGA_INFO, "Property name %s value %*.s", name, MyLog(LOGA_INFO, "Property name %s value %.*s", name,
props->array[i].value.data.len, props->array[i].value.data.data); props->array[i].value.data.len, props->array[i].value.data.data);
break; break;
case UTF_8_STRING_PAIR: case UTF_8_STRING_PAIR:
MyLog(LOGA_INFO, "Property name %s key %*.s value %*.s", name, MyLog(LOGA_INFO, "Property name %s key %.*s value %.*s", name,
props->array[i].value.data.len, props->array[i].value.data.data, props->array[i].value.data.len, props->array[i].value.data.data,
props->array[i].value.value.len, props->array[i].value.value.data); props->array[i].value.value.len, props->array[i].value.value.data);
break; break;
...@@ -410,7 +409,8 @@ int test1(struct Options options) ...@@ -410,7 +409,8 @@ int test1(struct Options options)
MQTTProperties props = MQTTProperties_initializer; MQTTProperties props = MQTTProperties_initializer;
MQTTProperties willProps = MQTTProperties_initializer; MQTTProperties willProps = MQTTProperties_initializer;
MQTTProperty property; MQTTProperty property;
MQTTResponse response; MQTTSubscribe_options subopts = MQTTSubscribe_options_initializer;
MQTTResponse response = {SUCCESS, NULL};
int rc = 0; int rc = 0;
char* test_topic = "C client test1"; char* test_topic = "C client test1";
...@@ -472,10 +472,11 @@ int test1(struct Options options) ...@@ -472,10 +472,11 @@ int test1(struct Options options)
MQTTProperties_free(response.properties); MQTTProperties_free(response.properties);
} }
subopts.retainAsPublished = 1;
property.identifier = SUBSCRIPTION_IDENTIFIER; property.identifier = SUBSCRIPTION_IDENTIFIER;
property.value.integer4 = 33; property.value.integer4 = 33;
MQTTProperties_add(&props, &property); MQTTProperties_add(&props, &property);
response = MQTTClient_subscribe5(c, test_topic, subsqos, NULL, &props); response = MQTTClient_subscribe5(c, test_topic, subsqos, &subopts, &props);
assert("Good rc from subscribe", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode); assert("Good rc from subscribe", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
MQTTProperties_free(&props); MQTTProperties_free(&props);
......
...@@ -293,6 +293,7 @@ int test1_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync ...@@ -293,6 +293,7 @@ int test1_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync
pubmsg.qos = 2; pubmsg.qos = 2;
pubmsg.retained = 0; pubmsg.retained = 0;
rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts); rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
assert("Good rc from send", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
} }
else else
{ {
...@@ -324,6 +325,7 @@ void test1_onSubscribe(void* context, MQTTAsync_successData* response) ...@@ -324,6 +325,7 @@ void test1_onSubscribe(void* context, MQTTAsync_successData* response)
pubmsg.retained = 0; pubmsg.retained = 0;
rc = MQTTAsync_send(c, test_topic, pubmsg.payloadlen, pubmsg.payload, pubmsg.qos, pubmsg.retained, NULL); rc = MQTTAsync_send(c, test_topic, pubmsg.payloadlen, pubmsg.payload, pubmsg.qos, pubmsg.retained, NULL);
assert("Good rc from send", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
} }
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment