Commit 7cc0e3fa authored by Ian Craggs's avatar Ian Craggs

MQTT 5.0 publish qos 0 send and receive

parent 31d8ad42
/*******************************************************************************
* Copyright (c) 2009, 2017 IBM Corp.
* Copyright (c) 2009, 2018 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
......@@ -16,6 +16,7 @@
* Ian Craggs - fix for bug 413429 - connectionLost not called
* Ian Craggs - change will payload to binary
* Ian Craggs - password to binary
* Ian Craggs - MQTT 5 support
*******************************************************************************/
#if !defined(CLIENTS_H)
......@@ -86,6 +87,8 @@ typedef struct
int qos;
int retain;
int msgid;
int MQTTVersion;
MQTTProperties properties;
Publications *publish;
time_t lastTouch; /**> used for retry and expiry */
char nextMessageType; /**> PUBREC, PUBREL, PUBCOMP */
......
......@@ -526,6 +526,7 @@ int MQTTAsync_createWithOptions(MQTTAsync* handle, const char* serverURI, const
m->c->inboundMsgs = ListInitialize();
m->c->messageQueue = ListInitialize();
m->c->clientID = MQTTStrdup(clientId);
m->c->MQTTVersion = MQTTVERSION_DEFAULT;
m->shouldBeConnected = 0;
if (options)
......@@ -1293,6 +1294,7 @@ static int MQTTAsync_processCommand(void)
p->payloadlen = command->command.details.pub.payloadlen;
p->topic = command->command.details.pub.destinationName;
p->msgId = command->command.token;
p->MQTTVersion = MQTTVERSION_DEFAULT;
rc = MQTTProtocol_startPublish(command->client->c, p, command->command.details.pub.qos, command->command.details.pub.retained, &msg);
......@@ -3020,7 +3022,7 @@ static MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
if (m->c->connect_state == 1 || m->c->connect_state == 2)
*rc = MQTTAsync_connecting(m);
else
pack = MQTTPacket_Factory(&m->c->net, rc);
pack = MQTTPacket_Factory(MQTTVERSION_DEFAULT, &m->c->net, rc);
if (m->c->connect_state == 3 && *rc == SOCKET_ERROR)
{
Log(TRACE_MINIMUM, -1, "CONNECT sent but MQTTPacket_Factory has returned SOCKET_ERROR");
......
......@@ -303,8 +303,8 @@ static void MQTTProtocol_checkPendingWrites(void);
static void MQTTClient_writeComplete(int socket, int rc);
int MQTTClient_create(MQTTClient* handle, const char* serverURI, const char* clientId,
int persistence_type, void* persistence_context)
int MQTTClient_createWithOptions(MQTTClient* handle, const char* serverURI, const char* clientId,
int persistence_type, void* persistence_context, MQTTClient_createOptions* options)
{
int rc = 0;
MQTTClients *m = NULL;
......@@ -338,6 +338,12 @@ int MQTTClient_create(MQTTClient* handle, const char* serverURI, const char* cli
}
}
if (options && (strncmp(options->struct_id, "MQCO", 4) != 0 || options->struct_version != 0))
{
rc = MQTTCLIENT_BAD_STRUCTURE;
goto exit;
}
if (!initialized)
{
#if defined(HEAP_H)
......@@ -374,6 +380,7 @@ int MQTTClient_create(MQTTClient* handle, const char* serverURI, const char* cli
m->c = malloc(sizeof(Clients));
memset(m->c, '\0', sizeof(Clients));
m->c->context = m;
m->c->MQTTVersion = (options) ? options->MQTTVersion : MQTTVERSION_DEFAULT;
m->c->outboundMsgs = ListInitialize();
m->c->inboundMsgs = ListInitialize();
m->c->messageQueue = ListInitialize();
......@@ -401,6 +408,14 @@ exit:
}
int MQTTClient_create(MQTTClient* handle, const char* serverURI, const char* clientId,
int persistence_type, void* persistence_context)
{
return MQTTClient_createWithOptions(handle, serverURI, clientId, persistence_type,
persistence_context, NULL);
}
static void MQTTClient_terminate(void)
{
FUNC_ENTRY;
......@@ -1017,7 +1032,7 @@ static MQTTResponse MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_c
if (m->c->MQTTVersion == MQTTVERSION_5)
resp.properties = &connack->properties;
}
free(connack);
MQTTPacket_freeConnack(connack);
m->pack = NULL;
}
}
......@@ -1628,8 +1643,8 @@ int MQTTClient_unsubscribe(MQTTClient handle, const char* topic)
}
int MQTTClient_publish(MQTTClient handle, const char* topicName, int payloadlen, void* payload,
int qos, int retained, MQTTClient_deliveryToken* deliveryToken)
MQTTResponse MQTTClient_publish5(MQTTClient handle, const char* topicName, int payloadlen, void* payload,
int qos, int retained, MQTTProperties* properties, MQTTClient_deliveryToken* deliveryToken)
{
int rc = MQTTCLIENT_SUCCESS;
MQTTClients* m = handle;
......@@ -1637,6 +1652,7 @@ int MQTTClient_publish(MQTTClient handle, const char* topicName, int payloadlen,
Publish* p = NULL;
int blocked = 0;
int msgid = 0;
MQTTResponse resp = {MQTTCLIENT_SUCCESS, NULL};
FUNC_ENTRY;
Thread_lock_mutex(mqttclient_mutex);
......@@ -1647,6 +1663,9 @@ int MQTTClient_publish(MQTTClient handle, const char* topicName, int payloadlen,
rc = MQTTCLIENT_DISCONNECTED;
else if (!UTF8_validateString(topicName))
rc = MQTTCLIENT_BAD_UTF8_STRING;
else if (m->c->MQTTVersion >= MQTTVERSION_5 && properties == NULL)
rc = MQTTCLIENT_NULL_PARAMETER;
if (rc != MQTTCLIENT_SUCCESS)
goto exit;
......@@ -1682,6 +1701,9 @@ int MQTTClient_publish(MQTTClient handle, const char* topicName, int payloadlen,
p->payloadlen = payloadlen;
p->topic = (char*)topicName;
p->msgId = msgid;
p->MQTTVersion = m->c->MQTTVersion;
if (m->c->MQTTVersion >= MQTTVERSION_5)
p->properties = *properties;
rc = MQTTProtocol_startPublish(m->c, p, qos, retained, &msg);
......@@ -1715,38 +1737,54 @@ int MQTTClient_publish(MQTTClient handle, const char* topicName, int payloadlen,
exit:
Thread_unlock_mutex(mqttclient_mutex);
FUNC_EXIT_RC(rc);
return rc;
resp.reasonCode = rc;
FUNC_EXIT_RC(resp.reasonCode);
return resp;
}
int MQTTClient_publish(MQTTClient handle, const char* topicName, int payloadlen, void* payload,
int qos, int retained, MQTTClient_deliveryToken* deliveryToken)
{
MQTTResponse rc = MQTTClient_publish5(handle, topicName, payloadlen, payload, qos, retained, NULL, deliveryToken);
return rc.reasonCode;
}
int MQTTClient_publishMessage(MQTTClient handle, const char* topicName, MQTTClient_message* message,
MQTTClient_deliveryToken* deliveryToken)
MQTTResponse MQTTClient_publishMessage5(MQTTClient handle, const char* topicName, MQTTClient_message* message,
MQTTProperties* props, MQTTClient_deliveryToken* deliveryToken)
{
int rc = MQTTCLIENT_SUCCESS;
MQTTResponse rc = {MQTTCLIENT_SUCCESS, NULL};
FUNC_ENTRY;
if (message == NULL)
{
rc = MQTTCLIENT_NULL_PARAMETER;
rc.reasonCode = MQTTCLIENT_NULL_PARAMETER;
goto exit;
}
if (strncmp(message->struct_id, "MQTM", 4) != 0 || message->struct_version != 0)
{
rc = MQTTCLIENT_BAD_STRUCTURE;
rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
goto exit;
}
rc = MQTTClient_publish(handle, topicName, message->payloadlen, message->payload,
message->qos, message->retained, deliveryToken);
rc = MQTTClient_publish5(handle, topicName, message->payloadlen, message->payload,
message->qos, message->retained, props, deliveryToken);
exit:
FUNC_EXIT_RC(rc);
FUNC_EXIT_RC(rc.reasonCode);
return rc;
}
int MQTTClient_publishMessage(MQTTClient handle, const char* topicName, MQTTClient_message* message,
MQTTClient_deliveryToken* deliveryToken)
{
MQTTResponse rc = MQTTClient_publishMessage5(handle, topicName, message, NULL, deliveryToken);
return rc.reasonCode;
}
static void MQTTClient_retry(void)
{
static time_t last = 0L;
......@@ -1802,7 +1840,7 @@ static MQTTPacket* MQTTClient_cycle(int* sock, unsigned long timeout, int* rc)
*rc = 0; /* waiting for connect state to clear */
else
{
pack = MQTTPacket_Factory(&m->c->net, rc);
pack = MQTTPacket_Factory(m->c->MQTTVersion, &m->c->net, rc);
if (*rc == TCPSOCKET_INTERRUPTED)
*rc = 0;
}
......
......@@ -453,6 +453,21 @@ DLLExport int MQTTClient_setCallbacks(MQTTClient handle, void* context, MQTTClie
DLLExport int MQTTClient_create(MQTTClient* handle, const char* serverURI, const char* clientId,
int persistence_type, void* persistence_context);
typedef struct
{
/** The eyecatcher for this structure. must be MQCO. */
char struct_id[4];
/** The version number of this structure. Must be 0 */
int struct_version;
/** Whether the MQTT version is 3 and 4, or 5. To use 5, this must be set. */
int MQTTVersion;
} MQTTClient_createOptions;
#define MQTTClient_createOptions_initializer { {'M', 'Q', 'C', 'O'}, MQTTVERSION_DEFAULT }
DLLExport int MQTTClient_createWithOptions(MQTTClient* handle, const char* serverURI, const char* clientId,
int persistence_type, void* persistence_context, MQTTClient_createOptions* options);
/**
* MQTTClient_willOptions defines the MQTT "Last Will and Testament" (LWT) settings for
* the client. In the event that a client unexpectedly loses its connection to
......@@ -901,6 +916,9 @@ DLLExport int MQTTClient_unsubscribeMany(MQTTClient handle, int count, char* con
*/
DLLExport int MQTTClient_publish(MQTTClient handle, const char* topicName, int payloadlen, void* payload, int qos, int retained,
MQTTClient_deliveryToken* dt);
DLLExport MQTTResponse MQTTClient_publish5(MQTTClient handle, const char* topicName, int payloadlen, void* payload,
int qos, int retained, MQTTProperties* properties, MQTTClient_deliveryToken* dt);
/**
* This function attempts to publish a message to a given topic (see also
* MQTTClient_publish()). An ::MQTTClient_deliveryToken is issued when
......@@ -923,6 +941,9 @@ DLLExport int MQTTClient_publish(MQTTClient handle, const char* topicName, int p
DLLExport int MQTTClient_publishMessage(MQTTClient handle, const char* topicName, MQTTClient_message* msg, MQTTClient_deliveryToken* dt);
DLLExport MQTTResponse MQTTClient_publishMessage5(MQTTClient handle, const char* topicName, MQTTClient_message* msg,
MQTTProperties* properties, MQTTClient_deliveryToken* dt);
/**
* This function is called by the client application to synchronize execution
* of the main thread with completed publication of a message. When called,
......
......@@ -42,13 +42,13 @@
#endif
/**
* List of the predefined MQTT v3 packet names.
* List of the predefined MQTT v3/v5 packet names.
*/
static const char *packet_names[] =
{
"RESERVED", "CONNECT", "CONNACK", "PUBLISH", "PUBACK", "PUBREC", "PUBREL",
"PUBCOMP", "SUBSCRIBE", "SUBACK", "UNSUBSCRIBE", "UNSUBACK",
"PINGREQ", "PINGRESP", "DISCONNECT"
"PINGREQ", "PINGRESP", "DISCONNECT", "AUTH"
};
const char** MQTTClient_packet_names = packet_names;
......@@ -61,7 +61,7 @@ const char** MQTTClient_packet_names = packet_names;
*/
const char* MQTTPacket_name(int ptype)
{
return (ptype >= 0 && ptype <= DISCONNECT) ? packet_names[ptype] : "UNKNOWN";
return (ptype >= 0 && ptype <= AUTH) ? packet_names[ptype] : "UNKNOWN";
}
/**
......@@ -96,7 +96,7 @@ static int MQTTPacket_send_ack(int type, int msgid, int dup, networkHandles *net
* @param error pointer to the error code which is completed if no packet is returned
* @return the packet structure or NULL if there was an error
*/
void* MQTTPacket_Factory(networkHandles* net, int* error)
void* MQTTPacket_Factory(int MQTTVersion, networkHandles* net, int* error)
{
char* data = NULL;
static Header header;
......@@ -143,7 +143,7 @@ void* MQTTPacket_Factory(networkHandles* net, int* error)
Log(TRACE_MIN, 2, NULL, ptype);
else
{
if ((pack = (*new_packets[ptype])(header.byte, data, remaining_length)) == NULL)
if ((pack = (*new_packets[ptype])(MQTTVersion, header.byte, data, remaining_length)) == NULL)
*error = SOCKET_ERROR; // was BAD_MQTT_PACKET;
#if !defined(NO_PERSISTENCE)
else if (header.bits.type == PUBLISH && header.bits.qos == 2)
......@@ -457,12 +457,13 @@ void writeData(char** pptr, const void* data, int datalen)
/**
* Function used in the new packets table to create packets which have only a header.
* @param MQTTVersion the version of MQTT
* @param aHeader the MQTT header byte
* @param data the rest of the packet
* @param datalen the length of the rest of the packet
* @return pointer to the packet structure
*/
void* MQTTPacket_header_only(unsigned char aHeader, char* data, size_t datalen)
void* MQTTPacket_header_only(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
{
static unsigned char header = 0;
header = aHeader;
......@@ -492,18 +493,20 @@ int MQTTPacket_send_disconnect(networkHandles *net, const char* clientID)
/**
* Function used in the new packets table to create publish packets.
* @param MQTTVersion
* @param aHeader the MQTT header byte
* @param data the rest of the packet
* @param datalen the length of the rest of the packet
* @return pointer to the packet structure
*/
void* MQTTPacket_publish(unsigned char aHeader, char* data, size_t datalen)
void* MQTTPacket_publish(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
{
Publish* pack = malloc(sizeof(Publish));
char* curdata = data;
char* enddata = &data[datalen];
FUNC_ENTRY;
pack->MQTTVersion = MQTTVersion;
pack->header.byte = aHeader;
if ((pack->topic = readUTFlen(&curdata, enddata, &pack->topiclen)) == NULL) /* Topic name on which to publish */
{
......@@ -515,6 +518,17 @@ void* MQTTPacket_publish(unsigned char aHeader, char* data, size_t datalen)
pack->msgId = readInt(&curdata);
else
pack->msgId = 0;
if (MQTTVersion >= MQTTVERSION_5)
{
MQTTProperties props = MQTTProperties_initializer;
pack->properties = props;
if (MQTTProperties_read(&pack->properties, &curdata, enddata) != 1)
{
free(pack);
pack = NULL;
goto exit;
}
}
pack->payload = curdata;
pack->payloadlen = (int)(datalen-(curdata-data));
exit:
......@@ -532,6 +546,8 @@ void MQTTPacket_freePublish(Publish* pack)
FUNC_ENTRY;
if (pack->topic != NULL)
free(pack->topic);
if (pack->MQTTVersion >= MQTTVERSION_5)
MQTTProperties_free(&pack->properties);
free(pack);
FUNC_EXIT;
}
......@@ -592,6 +608,8 @@ int MQTTPacket_send_puback(int msgid, networkHandles* net, const char* clientID)
void MQTTPacket_freeSuback(Suback* pack)
{
FUNC_ENTRY;
if (pack->MQTTVersion >= MQTTVERSION_5)
MQTTProperties_free(&pack->properties);
if (pack->qoss != NULL)
ListFree(pack->qoss);
free(pack);
......@@ -664,14 +682,30 @@ int MQTTPacket_send_pubcomp(int msgid, networkHandles* net, const char* clientID
* @param datalen the length of the rest of the packet
* @return pointer to the packet structure
*/
void* MQTTPacket_ack(unsigned char aHeader, char* data, size_t datalen)
void* MQTTPacket_ack(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
{
Ack* pack = malloc(sizeof(Ack));
char* curdata = data;
char* enddata = &data[datalen];
FUNC_ENTRY;
pack->MQTTVersion = MQTTVersion;
pack->header.byte = aHeader;
pack->msgId = readInt(&curdata);
if (MQTTVersion >= MQTTVERSION_5)
{
MQTTProperties props = MQTTProperties_initializer;
pack->rc = readChar(&curdata); /* reason code */
pack->properties = props;
pack->properties.max_count = 10;
pack->properties.array = malloc(sizeof(MQTTProperty) * pack->properties.max_count);
if (MQTTProperties_read(&pack->properties, &curdata, enddata) != 1)
{
free(pack);
pack = NULL; /* signal protocol error */
}
}
FUNC_EXIT;
return pack;
}
......@@ -700,20 +734,25 @@ int MQTTPacket_send_publish(Publish* pack, int dup, int qos, int retained, netwo
header.bits.dup = dup;
header.bits.qos = qos;
header.bits.retain = retained;
if (qos > 0)
if (qos > 0 || pack->MQTTVersion >= 5)
{
char *buf = malloc(2);
char *ptr = buf;
char* bufs[4] = {topiclen, pack->topic, buf, pack->payload};
size_t lens[4] = {2, strlen(pack->topic), 2, pack->payloadlen};
int buflen = ((qos > 0) ? 2 : 0) + ((pack->MQTTVersion >= 5) ? MQTTProperties_len(&pack->properties) : 0);
char *ptr = NULL;
char* bufs[4] = {topiclen, pack->topic, NULL, pack->payload};
size_t lens[4] = {2, strlen(pack->topic), buflen, pack->payloadlen};
int frees[4] = {1, 0, 1, 0};
writeInt(&ptr, pack->msgId);
bufs[2] = ptr = malloc(buflen);
if (qos > 0)
writeInt(&ptr, pack->msgId);
if (pack->MQTTVersion >= 5)
MQTTProperties_write(&ptr, &pack->properties);
ptr = topiclen;
writeInt(&ptr, (int)lens[1]);
rc = MQTTPacket_sends(net, header, 4, bufs, lens, frees);
if (rc != TCPSOCKET_INTERRUPTED)
free(buf);
free(bufs[2]);
}
else
{
......
......@@ -29,7 +29,7 @@
#include "Clients.h"
typedef unsigned int bool;
typedef void* (*pf)(unsigned char, char*, size_t);
typedef void* (*pf)(int, unsigned char, char*, size_t);
#include "MQTTProperties.h"
......@@ -46,7 +46,7 @@ enum msgTypes
{
CONNECT = 1, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL,
PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK,
PINGREQ, PINGRESP, DISCONNECT
PINGREQ, PINGRESP, DISCONNECT, AUTH
};
#if defined(__linux__)
......@@ -150,6 +150,7 @@ typedef struct
#endif
} flags; /**< connack flags byte */
char rc; /**< connack reason code */
int MQTTVersion; /**< the version of MQTT */
MQTTProperties properties; /**< MQTT 5.0 properties. Not used for MQTT < 5.0 */
} Connack;
......@@ -163,19 +164,6 @@ typedef struct
} MQTTPacket;
/**
* Data for a subscribe packet.
*/
typedef struct
{
Header header; /**< MQTT header byte */
int msgId; /**< MQTT message id */
List* topics; /**< list of topic strings */
List* qoss; /**< list of corresponding QoSs */
int noTopics; /**< topic and qos count */
} Subscribe;
/**
* Data for a suback packet.
*/
......@@ -183,22 +171,12 @@ typedef struct
{
Header header; /**< MQTT header byte */
int msgId; /**< MQTT message id */
List* qoss; /**< list of granted QoSs */
int MQTTVersion; /**< the version of MQTT */
MQTTProperties properties; /**< MQTT 5.0 properties. Not used for MQTT < 5.0 */
List* qoss; /**< list of granted QoSs (MQTT 3/4) / reason codes (MQTT 5) */
} Suback;
/**
* Data for an unsubscribe packet.
*/
typedef struct
{
Header header; /**< MQTT header byte */
int msgId; /**< MQTT message id */
List* topics; /**< list of topic strings */
int noTopics; /**< topic count */
} Unsubscribe;
/**
* Data for a publish packet.
*/
......@@ -210,6 +188,8 @@ typedef struct
int msgId; /**< MQTT message id */
char* payload; /**< binary payload, length delimited */
int payloadlen; /**< payload length */
int MQTTVersion; /**< the version of MQTT */
MQTTProperties properties; /**< MQTT 5.0 properties. Not used for MQTT < 5.0 */
} Publish;
......@@ -220,6 +200,9 @@ typedef struct
{
Header header; /**< MQTT header byte */
int msgId; /**< MQTT message id */
unsigned char rc; /**< MQTT 5 reason code */
int MQTTVersion; /**< the version of MQTT */
MQTTProperties properties; /**< MQTT 5.0 properties. Not used for MQTT < 5.0 */
} Ack;
typedef Ack Puback;
......@@ -240,18 +223,18 @@ void writeData(char** pptr, const void* data, int datalen);
const char* MQTTPacket_name(int ptype);
void* MQTTPacket_Factory(networkHandles* net, int* error);
void* MQTTPacket_Factory(int MQTTVersion, networkHandles* net, int* error);
int MQTTPacket_send(networkHandles* net, Header header, char* buffer, size_t buflen, int free);
int MQTTPacket_sends(networkHandles* net, Header header, int count, char** buffers, size_t* buflens, int* frees);
void* MQTTPacket_header_only(unsigned char aHeader, char* data, size_t datalen);
void* MQTTPacket_header_only(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen);
int MQTTPacket_send_disconnect(networkHandles* net, const char* clientID);
void* MQTTPacket_publish(unsigned char aHeader, char* data, size_t datalen);
void* MQTTPacket_publish(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen);
void MQTTPacket_freePublish(Publish* pack);
int MQTTPacket_send_publish(Publish* pack, int dup, int qos, int retained, networkHandles* net, const char* clientID);
int MQTTPacket_send_puback(int msgid, networkHandles* net, const char* clientID);
void* MQTTPacket_ack(unsigned char aHeader, char* data, size_t datalen);
void* MQTTPacket_ack(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen);
void MQTTPacket_freeSuback(Suback* pack);
int MQTTPacket_send_pubrec(int msgid, networkHandles* net, const char* clientID);
......
......@@ -126,34 +126,60 @@ exit:
/**
* Function used in the new packets table to create connack packets.
* @param MQTTVersion MQTT 5 or less?
* @param aHeader the MQTT header byte
* @param data the rest of the packet
* @param datalen the length of the rest of the packet
* @return pointer to the packet structure
*/
void* MQTTPacket_connack(unsigned char aHeader, char* data, size_t datalen)
void* MQTTPacket_connack(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
{
Connack* pack = malloc(sizeof(Connack));
char* curdata = data;
char* enddata = &data[datalen];
FUNC_ENTRY;
pack->MQTTVersion = MQTTVersion;
pack->header.byte = aHeader;
pack->flags.all = readChar(&curdata); /* connect flags */
pack->rc = readChar(&curdata); /* reason code */
if (datalen > 2)
if (MQTTVersion < MQTTVERSION_5)
{
if (datalen != 2)
{
free(pack);
pack = NULL;
}
}
else if (datalen > 2)
{
MQTTProperties props = MQTTProperties_initializer;
pack->properties = props;
pack->properties.max_count = 10;
pack->properties.array = malloc(sizeof(MQTTProperty) * pack->properties.max_count);
if (MQTTProperties_read(&pack->properties, &curdata, curdata + datalen) != 1)
if (MQTTProperties_read(&pack->properties, &curdata, enddata) != 1)
{
free(pack);
pack = NULL; /* signal protocol error */
}
}
FUNC_EXIT;
return pack;
}
/**
* Free allocated storage for a connack packet.
* @param pack pointer to the connack packet structure
*/
void MQTTPacket_freeConnack(Connack* pack)
{
FUNC_ENTRY;
if (pack->MQTTVersion >= MQTTVERSION_5)
MQTTProperties_free(&pack->properties);
free(pack);
FUNC_EXIT;
}
/**
* Send an MQTT PINGREQ packet down a socket.
* @param socket the open socket to send the data to
......@@ -241,26 +267,40 @@ int MQTTPacket_send_subscribe(List* topics, List* qoss, MQTTSubscribe_options* o
/**
* Function used in the new packets table to create suback packets.
* @param MQTTVersion the version of MQTT
* @param aHeader the MQTT header byte
* @param data the rest of the packet
* @param datalen the length of the rest of the packet
* @return pointer to the packet structure
*/
void* MQTTPacket_suback(unsigned char aHeader, char* data, size_t datalen)
void* MQTTPacket_suback(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
{
Suback* pack = malloc(sizeof(Suback));
char* curdata = data;
char* enddata = &data[datalen];
FUNC_ENTRY;
pack->MQTTVersion = MQTTVersion;
pack->header.byte = aHeader;
pack->msgId = readInt(&curdata);
if (MQTTVersion >= MQTTVERSION_5)
{
MQTTProperties props = MQTTProperties_initializer;
pack->properties = props;
if (MQTTProperties_read(&pack->properties, &curdata, enddata) != 1)
{
free(pack->properties.array);
free(pack);
pack = NULL; /* signal protocol error */
}
}
pack->qoss = ListInitialize();
while ((size_t)(curdata - data) < datalen)
{
int* newint;
newint = malloc(sizeof(int));
*newint = (int)readChar(&curdata);
ListAppend(pack->qoss, newint, sizeof(int));
unsigned int* newint;
newint = malloc(sizeof(unsigned int));
*newint = (unsigned int)readChar(&curdata);
ListAppend(pack->qoss, newint, sizeof(unsigned int));
}
FUNC_EXIT;
return pack;
......
......@@ -24,13 +24,14 @@
int MQTTPacket_send_connect(Clients* client, int MQTTVersion,
MQTTProperties* connectProperties, MQTTProperties* willProperties);
void* MQTTPacket_connack(unsigned char aHeader, char* data, size_t datalen);
void* MQTTPacket_connack(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen);
void MQTTPacket_freeConnack(Connack* pack);
int MQTTPacket_send_pingreq(networkHandles* net, const char* clientID);
int MQTTPacket_send_subscribe(List* topics, List* qoss, MQTTSubscribe_options* opts, MQTTProperties* props,
int msgid, int dup, Clients* client);
void* MQTTPacket_suback(unsigned char aHeader, char* data, size_t datalen);
void* MQTTPacket_suback(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen);
int MQTTPacket_send_unsubscribe(List* topics, int msgid, int dup, networkHandles* net, const char* clientID);
......
......@@ -196,7 +196,7 @@ int MQTTPersistence_restore(Clients *c)
}
else if ((rc = c->persistence->pget(c->phandle, msgkeys[i], &buffer, &buflen)) == 0)
{
MQTTPacket* pack = MQTTPersistence_restorePacket(buffer, buflen);
MQTTPacket* pack = MQTTPersistence_restorePacket(c->MQTTVersion, buffer, buflen);
if ( pack != NULL )
{
if ( strstr(msgkeys[i],PERSISTENCE_PUBLISH_RECEIVED) != NULL )
......@@ -271,7 +271,7 @@ int MQTTPersistence_restore(Clients *c)
* @param buffer the persisted data.
* @param buflen the number of bytes of the data buffer.
*/
void* MQTTPersistence_restorePacket(char* buffer, size_t buflen)
void* MQTTPersistence_restorePacket(int MQTTVersion, char* buffer, size_t buflen)
{
void* pack = NULL;
Header header;
......@@ -295,7 +295,7 @@ void* MQTTPersistence_restorePacket(char* buffer, size_t buflen)
{
ptype = header.bits.type;
if (ptype >= CONNECT && ptype <= DISCONNECT && new_packets[ptype] != NULL)
pack = (*new_packets[ptype])(header.byte, ++buffer, remaining_length);
pack = (*new_packets[ptype])(MQTTVersion, header.byte, ++buffer, remaining_length);
}
FUNC_EXIT;
......
......@@ -39,7 +39,7 @@ int MQTTPersistence_initialize(Clients* c, const char* serverURI);
int MQTTPersistence_close(Clients* c);
int MQTTPersistence_clear(Clients* c);
int MQTTPersistence_restore(Clients* c);
void* MQTTPersistence_restorePacket(char* buffer, size_t buflen);
void* MQTTPersistence_restorePacket(int MQTTVersion, char* buffer, size_t buflen);
void MQTTPersistence_insertInOrder(List* list, void* content, size_t size);
int MQTTPersistence_put(int socket, char* buf0, size_t buf0len, int count,
char** buffers, size_t* buflens, int htype, int msgId, int scr);
......
......@@ -19,6 +19,7 @@
#include "MQTTPacket.h"
#include "MQTTProtocolClient.h"
#include "Heap.h"
#include "StackTrace.h"
#include <memory.h>
......@@ -279,6 +280,7 @@ int MQTTProperties_read(MQTTProperties* properties, char** pptr, char* enddata)
int rc = 0;
int remlength = 0;
FUNC_ENTRY;
properties->count = 0;
if (enddata - (*pptr) > 0) /* enough length to read the VBI? */
{
......@@ -288,7 +290,10 @@ int MQTTProperties_read(MQTTProperties* properties, char** pptr, char* enddata)
{
if (properties->count == properties->max_count)
{
properties->max_count += 10;
properties->max_count += 10;
if (properties->max_count == 10)
properties->array = malloc(sizeof(MQTTProperty) * properties->max_count);
else
properties->array = realloc(properties->array, sizeof(MQTTProperty) * properties->max_count);
}
remlength -= MQTTProperty_read(&properties->array[properties->count], pptr, enddata);
......@@ -298,6 +303,7 @@ int MQTTProperties_read(MQTTProperties* properties, char** pptr, char* enddata)
rc = 1; /* data read successfully */
}
FUNC_EXIT_RC(rc);
return rc;
}
......@@ -373,6 +379,36 @@ DLLExport void MQTTProperties_free(MQTTProperties* props)
}
}
if (props->array)
free(props->array);
free(props->array);
memset(props, '\0', sizeof(MQTTProperties)); /* zero all fields */
}
MQTTProperties MQTTProperties_copy(MQTTProperties* props)
{
int i = 0;
MQTTProperties result = MQTTProperties_initializer;
for (i = 0; i > props->count; ++i)
{
int id = props->array[i].identifier;
int type = MQTTProperty_getType(id);
MQTTProperties_add(&result, &props->array[i]);
switch (type)
{
case BINARY_DATA:
case UTF_8_ENCODED_STRING:
case UTF_8_STRING_PAIR:
result.array[i].value.data.data = malloc(result.array[i].value.data.len);
memcpy(result.array[i].value.data.data, props->array[i].value.data.data, props->array[i].value.data.len);
if (type == UTF_8_STRING_PAIR)
{
result.array[i].value.value.data = malloc(result.array[i].value.value.len);
memcpy(result.array[i].value.value.data, props->array[i].value.value.data, props->array[i].value.value.len);
}
break;
}
}
return result;
}
......@@ -117,4 +117,6 @@ int MQTTProperties_read(MQTTProperties* properties, char** pptr, char* enddata);
DLLExport void MQTTProperties_free(MQTTProperties* properties);
MQTTProperties MQTTProperties_copy(MQTTProperties* props);
#endif /* MQTTPROPERTIES_H */
......@@ -161,6 +161,7 @@ int MQTTProtocol_startPublish(Clients* pubclient, Publish* publish, int qos, int
entirely; the socket buffer will use these locations to finish writing the packet */
p.payload = (*mm)->publish->payload;
p.topic = (*mm)->publish->topic;
p.properties = (*mm)->properties;
}
rc = MQTTProtocol_startPublishCommon(pubclient, &p, qos, retained);
FUNC_EXIT_RC(rc);
......@@ -197,6 +198,9 @@ Messages* MQTTProtocol_createMessage(Publish* publish, Messages **mm, int qos, i
m->msgid = publish->msgId;
m->qos = qos;
m->retain = retained;
m->MQTTVersion = publish->MQTTVersion;
if (m->MQTTVersion >= 5)
m->properties = MQTTProperties_copy(&publish->properties);
time(&(m->lastTouch));
if (qos == 2)
m->nextMessageType = PUBREC;
......@@ -723,6 +727,8 @@ void MQTTProtocol_emptyMessageList(List* msgList)
while (ListNextElement(msgList, &current))
{
Messages* m = (Messages*)(current->content);
if (m->MQTTVersion >= MQTTVERSION_5)
MQTTProperties_free(&m->properties);
MQTTProtocol_removePublication(m->publish);
}
ListEmpty(msgList);
......
......@@ -26,6 +26,7 @@
#include "Log.h"
#include "MQTTProtocol.h"
#include "Messages.h"
#include "MQTTProperties.h"
#define MAX_MSG_ID 65535
#define MAX_CLIENTID_LEN 65535
......
......@@ -299,6 +299,9 @@ void test1_sendAndReceive(MQTTClient* c, int qos, char* test_topic)
int i = 0;
int iterations = 50;
int rc;
MQTTResponse resp;
MQTTProperties props = MQTTProperties_initializer;
MQTTProperty property;
MyLog(LOGA_DEBUG, "%d messages at QoS %d", iterations, qos);
pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
......@@ -306,13 +309,21 @@ void test1_sendAndReceive(MQTTClient* c, int qos, char* test_topic)
pubmsg.qos = qos;
pubmsg.retained = 0;
for (i = 0; i< iterations; ++i)
property.identifier = USER_PROPERTY;
property.value.data.data = "test user property";
property.value.data.len = strlen(property.value.data.data);
property.value.value.data = "test user property value";
property.value.value.len = strlen(property.value.value.data);
MQTTProperties_add(&props, &property);
for (i = 0; i < iterations; ++i)
{
if (i % 10 == 0)
rc = MQTTClient_publish(c, test_topic, pubmsg.payloadlen, pubmsg.payload, pubmsg.qos, pubmsg.retained, &dt);
resp = MQTTClient_publish5(c, test_topic, pubmsg.payloadlen, pubmsg.payload, pubmsg.qos, pubmsg.retained,
&props, &dt);
else
rc = MQTTClient_publishMessage(c, test_topic, &pubmsg, &dt);
assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
resp = MQTTClient_publishMessage5(c, test_topic, &pubmsg, &props, &dt);
assert("Good rc from publish", resp.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", resp.reasonCode);
if (qos > 0)
{
......@@ -348,6 +359,8 @@ void test1_sendAndReceive(MQTTClient* c, int qos, char* test_topic)
MQTTClient_freeMessage(&m);
MQTTClient_receive(c, &topicName, &topicLen, &m, 2000);
}
MQTTProperties_free(&props);
}
void logProperties(MQTTProperties *props)
......@@ -420,6 +433,7 @@ int test1(struct Options options)
opts.username = "testuser";
opts.password = "testpassword";
opts.MQTTVersion = options.MQTTVersion;
printf("test MQTT version %d\n", options.MQTTVersion);
if (options.haconnections != NULL)
{
opts.serverURIs = options.haconnections;
......@@ -471,7 +485,7 @@ int test1(struct Options options)
MQTTProperties_free(response.properties);
}
//test1_sendAndReceive(c, 0, test_topic);
test1_sendAndReceive(c, 0, test_topic);
//test1_sendAndReceive(c, 1, test_topic);
//test1_sendAndReceive(c, 2, test_topic);
......@@ -483,11 +497,11 @@ int test1(struct Options options)
assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
/* Just to make sure we can connect again */
/*rc = MQTTClient_connect5(c, &opts, &props, &willProps);
assert("Connect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
response = MQTTClient_connect5(c, &opts, NULL, NULL);
assert("Connect successful", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
rc = MQTTClient_disconnect(c, 0);
assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
*/
MQTTClient_destroy(&c);
exit:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment