Commit 6a1f39f7 authored by Ian Craggs's avatar Ian Craggs

MQTT 5.0 connect and connack #417

parent 0e211567
......@@ -90,7 +90,7 @@ SYNC_SAMPLES = ${addprefix ${blddir}/samples/,${SAMPLE_FILES_C}}
SAMPLE_FILES_A = paho_c_pub paho_c_sub MQTTAsync_subscribe MQTTAsync_publish
ASYNC_SAMPLES = ${addprefix ${blddir}/samples/,${SAMPLE_FILES_A}}
TEST_FILES_C = test1 test2 sync_client_test test_mqtt4sync
TEST_FILES_C = test1 test15 test2 sync_client_test test_mqtt4sync
SYNC_TESTS = ${addprefix ${blddir}/test/,${TEST_FILES_C}}
TEST_FILES_CS = test3
......
......@@ -47,8 +47,7 @@ SET(common_src
SocketBuffer.c
Heap.c
LinkedList.c
MQTTV5Packet.c
MQTTV5Properties.c
MQTTProperties.c
)
IF (WIN32)
......
......@@ -34,6 +34,7 @@
* Ian Craggs - auto reconnect timing fix #218
* Ian Craggs - fix for issue #190
* Ian Craggs - check for NULL SSL options #334
* Ian Craggs - MQTT 5.0 support
*******************************************************************************/
/**
......@@ -1237,9 +1238,11 @@ static int MQTTAsync_processCommand(void)
Log(TRACE_MIN, -1, "Connecting to serverURI %s with MQTT version %d", serverURI, command->command.details.conn.MQTTVersion);
#if defined(OPENSSL)
rc = MQTTProtocol_connect(serverURI, command->client->c, command->client->ssl, command->command.details.conn.MQTTVersion);
rc = MQTTProtocol_connect(serverURI, command->client->c, command->client->ssl, command->command.details.conn.MQTTVersion,
NULL, NULL);
#else
rc = MQTTProtocol_connect(serverURI, command->client->c, command->command.details.conn.MQTTVersion);
rc = MQTTProtocol_connect(serverURI, command->client->c, command->command.details.conn.MQTTVersion,
NULL, NULL);
#endif
if (command->client->c->connect_state == 0)
rc = SOCKET_ERROR;
......@@ -2921,7 +2924,7 @@ static int MQTTAsync_connecting(MQTTAsyncs* m)
{
rc = MQTTCLIENT_SUCCESS;
m->c->connect_state = 3;
if (MQTTPacket_send_connect(m->c, m->connect.details.conn.MQTTVersion) == SOCKET_ERROR)
if (MQTTPacket_send_connect(m->c, m->connect.details.conn.MQTTVersion, NULL, NULL) == SOCKET_ERROR)
{
rc = SOCKET_ERROR;
goto exit;
......@@ -2940,7 +2943,7 @@ static int MQTTAsync_connecting(MQTTAsyncs* m)
{
#endif
m->c->connect_state = 3; /* TCP/SSL connect completed, in which case send the MQTT connect packet */
if ((rc = MQTTPacket_send_connect(m->c, m->connect.details.conn.MQTTVersion)) == SOCKET_ERROR)
if ((rc = MQTTPacket_send_connect(m->c, m->connect.details.conn.MQTTVersion, NULL, NULL)) == SOCKET_ERROR)
goto exit;
#if defined(OPENSSL)
}
......@@ -2956,7 +2959,7 @@ static int MQTTAsync_connecting(MQTTAsyncs* m)
if(!m->c->cleansession && m->c->session == NULL)
m->c->session = SSL_get1_session(m->c->net.ssl);
m->c->connect_state = 3; /* SSL connect completed, in which case send the MQTT connect packet */
if ((rc = MQTTPacket_send_connect(m->c, m->connect.details.conn.MQTTVersion)) == SOCKET_ERROR)
if ((rc = MQTTPacket_send_connect(m->c, m->connect.details.conn.MQTTVersion, NULL, NULL)) == SOCKET_ERROR)
goto exit;
}
#endif
......
......@@ -33,6 +33,7 @@
* Ian Craggs - binary will message support
* Ian Craggs - waitforCompletion fix #240
* Ian Craggs - check for NULL SSL options #334
* Ian Craggs - MQTT 5.0 support
*******************************************************************************/
/**
......@@ -284,11 +285,13 @@ static thread_return_type WINAPI MQTTClient_run(void* n);
static void MQTTClient_stop(void);
static void MQTTClient_closeSession(Clients* client);
static int MQTTClient_cleanSession(Clients* client);
static int MQTTClient_connectURIVersion(
static MQTTResponse MQTTClient_connectURIVersion(
MQTTClient handle, MQTTClient_connectOptions* options,
const char* serverURI, int MQTTVersion,
START_TIME_TYPE start, long millisecsTimeout);
static int MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions* options, const char* serverURI);
START_TIME_TYPE start, long millisecsTimeout,
MQTTProperties* connectProperties, MQTTProperties* willProperties);
static MQTTResponse MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions* options, const char* serverURI,
MQTTProperties* connectProperties, MQTTProperties* willProperties);
static int MQTTClient_disconnect1(MQTTClient handle, int timeout, int internal, int stop);
static int MQTTClient_disconnect_internal(MQTTClient handle, int timeout);
static void MQTTClient_retry(void);
......@@ -848,12 +851,13 @@ void Protocol_processPublication(Publish* publish, Clients* client)
}
static int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOptions* options, const char* serverURI, int MQTTVersion,
START_TIME_TYPE start, long millisecsTimeout)
static MQTTResponse MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOptions* options, const char* serverURI, int MQTTVersion,
START_TIME_TYPE start, long millisecsTimeout, MQTTProperties* connectProperties, MQTTProperties* willProperties)
{
MQTTClients* m = handle;
int rc = SOCKET_ERROR;
int sessionPresent = 0;
MQTTResponse resp = {SOCKET_ERROR, NULL};
FUNC_ENTRY;
if (m->ma && !running)
......@@ -869,9 +873,9 @@ static int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOpt
Log(TRACE_MIN, -1, "Connecting to serverURI %s with MQTT version %d", serverURI, MQTTVersion);
#if defined(OPENSSL)
rc = MQTTProtocol_connect(serverURI, m->c, m->ssl, MQTTVersion);
rc = MQTTProtocol_connect(serverURI, m->c, m->ssl, MQTTVersion, connectProperties, willProperties);
#else
rc = MQTTProtocol_connect(serverURI, m->c, MQTTVersion);
rc = MQTTProtocol_connect(serverURI, m->c, MQTTVersion, connectProperties, willProperties);
#endif
if (rc == SOCKET_ERROR)
goto exit;
......@@ -923,7 +927,7 @@ static int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOpt
{
rc = MQTTCLIENT_SUCCESS;
m->c->connect_state = 3;
if (MQTTPacket_send_connect(m->c, MQTTVersion) == SOCKET_ERROR)
if (MQTTPacket_send_connect(m->c, MQTTVersion, connectProperties, willProperties) == SOCKET_ERROR)
{
rc = SOCKET_ERROR;
goto exit;
......@@ -942,7 +946,7 @@ static int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOpt
{
#endif
m->c->connect_state = 3; /* TCP connect completed, in which case send the MQTT connect packet */
if (MQTTPacket_send_connect(m->c, MQTTVersion) == SOCKET_ERROR)
if (MQTTPacket_send_connect(m->c, MQTTVersion, connectProperties, willProperties) == SOCKET_ERROR)
{
rc = SOCKET_ERROR;
goto exit;
......@@ -966,7 +970,7 @@ static int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOpt
if(!m->c->cleansession && m->c->session == NULL)
m->c->session = SSL_get1_session(m->c->net.ssl);
m->c->connect_state = 3; /* TCP connect completed, in which case send the MQTT connect packet */
if (MQTTPacket_send_connect(m->c, MQTTVersion) == SOCKET_ERROR)
if (MQTTPacket_send_connect(m->c, MQTTVersion, connectProperties, willProperties) == SOCKET_ERROR)
{
rc = SOCKET_ERROR;
goto exit;
......@@ -1009,6 +1013,8 @@ static int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOpt
if (m->c->connected != 1)
rc = MQTTCLIENT_DISCONNECTED;
}
if (m->c->MQTTVersion == MQTTVERSION_5)
resp.properties = &connack->properties;
}
free(connack);
m->pack = NULL;
......@@ -1026,8 +1032,10 @@ exit:
}
else
MQTTClient_disconnect1(handle, 0, 0, (MQTTVersion == 3)); /* don't want to call connection lost */
FUNC_EXIT_RC(rc);
return rc;
resp.reasonCode = rc;
FUNC_EXIT_RC(resp.reasonCode);
return resp;
}
static int retryLoopInterval = 5;
......@@ -1045,12 +1053,13 @@ static void setRetryLoopInterval(int keepalive)
}
static int MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions* options, const char* serverURI)
static MQTTResponse MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions* options, const char* serverURI,
MQTTProperties* connectProperties, MQTTProperties* willProperties)
{
MQTTClients* m = handle;
START_TIME_TYPE start;
long millisecsTimeout = 30000L;
int rc = SOCKET_ERROR;
MQTTResponse rc = {SOCKET_ERROR, NULL};
int MQTTVersion = 0;
FUNC_ENTRY;
......@@ -1061,6 +1070,7 @@ static int MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions* o
setRetryLoopInterval(options->keepAliveInterval);
m->c->cleansession = options->cleansession;
m->c->maxInflightMessages = (options->reliable) ? 1 : 10;
m->c->MQTTVersion = options->MQTTVersion;
if (m->c->will)
{
......@@ -1167,21 +1177,36 @@ static int MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions* o
if (MQTTVersion == MQTTVERSION_DEFAULT)
{
if ((rc = MQTTClient_connectURIVersion(handle, options, serverURI, 4, start, millisecsTimeout)) != MQTTCLIENT_SUCCESS)
rc = MQTTClient_connectURIVersion(handle, options, serverURI, 3, start, millisecsTimeout);
rc = MQTTClient_connectURIVersion(handle, options, serverURI, 4, start, millisecsTimeout,
connectProperties, willProperties);
if (rc.reasonCode != MQTTCLIENT_SUCCESS)
{
rc = MQTTClient_connectURIVersion(handle, options, serverURI, 3, start, millisecsTimeout,
connectProperties, willProperties);
}
}
else
rc = MQTTClient_connectURIVersion(handle, options, serverURI, MQTTVersion, start, millisecsTimeout);
rc = MQTTClient_connectURIVersion(handle, options, serverURI, MQTTVersion, start, millisecsTimeout,
connectProperties, willProperties);
FUNC_EXIT_RC(rc);
FUNC_EXIT_RC(rc.reasonCode);
return rc;
}
int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions* options)
{
MQTTResponse response = MQTTClient_connect5(handle, options, NULL, NULL);
return response.reasonCode;
}
MQTTResponse MQTTClient_connect5(MQTTClient handle, MQTTClient_connectOptions* options,
MQTTProperties* connectProperties, MQTTProperties* willProperties)
{
MQTTClients* m = handle;
int rc = SOCKET_ERROR;
MQTTResponse rc = {SOCKET_ERROR, NULL};
FUNC_ENTRY;
Thread_lock_mutex(connect_mutex);
......@@ -1189,20 +1214,20 @@ int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions* options)
if (options == NULL)
{
rc = MQTTCLIENT_NULL_PARAMETER;
rc.reasonCode = MQTTCLIENT_NULL_PARAMETER;
goto exit;
}
if (strncmp(options->struct_id, "MQTC", 4) != 0 || options->struct_version < 0 || options->struct_version > 5)
{
rc = MQTTCLIENT_BAD_STRUCTURE;
rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
goto exit;
}
#if defined(OPENSSL)
if (m->ssl && options->ssl == NULL)
{
rc = MQTTCLIENT_NULL_PARAMETER;
rc.reasonCode = MQTTCLIENT_NULL_PARAMETER;
goto exit;
}
#endif
......@@ -1211,7 +1236,7 @@ int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions* options)
{
if (strncmp(options->will->struct_id, "MQTW", 4) != 0 || (options->will->struct_version != 0 && options->will->struct_version != 1))
{
rc = MQTTCLIENT_BAD_STRUCTURE;
rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
goto exit;
}
}
......@@ -1222,7 +1247,7 @@ int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions* options)
{
if (strncmp(options->ssl->struct_id, "MQTS", 4) != 0 || options->ssl->struct_version < 0 || options->ssl->struct_version > 2)
{
rc = MQTTCLIENT_BAD_STRUCTURE;
rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
goto exit;
}
}
......@@ -1231,19 +1256,19 @@ int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions* options)
if ((options->username && !UTF8_validateString(options->username)) ||
(options->password && !UTF8_validateString(options->password)))
{
rc = MQTTCLIENT_BAD_UTF8_STRING;
rc.reasonCode = MQTTCLIENT_BAD_UTF8_STRING;
goto exit;
}
if (options->MQTTVersion != MQTTVERSION_DEFAULT &&
(options->MQTTVersion < MQTTVERSION_3_1 || options->MQTTVersion > MQTTVERSION_5))
{
rc = MQTTCLIENT_BAD_MQTT_VERSION;
rc.reasonCode = MQTTCLIENT_BAD_MQTT_VERSION;
goto exit;
}
if (options->struct_version < 2 || options->serverURIcount == 0)
rc = MQTTClient_connectURI(handle, options, m->serverURI);
rc = MQTTClient_connectURI(handle, options, m->serverURI, connectProperties, willProperties);
else
{
int i;
......@@ -1261,7 +1286,8 @@ int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions* options)
m->ssl = 1;
}
#endif
if ((rc = MQTTClient_connectURI(handle, options, serverURI)) == MQTTCLIENT_SUCCESS)
rc = MQTTClient_connectURI(handle, options, serverURI, connectProperties, willProperties);
if (rc.reasonCode == MQTTCLIENT_SUCCESS)
break;
}
}
......@@ -1278,7 +1304,7 @@ exit:
}
Thread_unlock_mutex(mqttclient_mutex);
Thread_unlock_mutex(connect_mutex);
FUNC_EXIT_RC(rc);
FUNC_EXIT_RC(rc.reasonCode);
return rc;
}
......
......@@ -122,6 +122,8 @@
/// @endcond
*/
#include "MQTTProperties.h"
#include "MQTTReasonCodes.h"
#if !defined(NO_PERSISTENCE)
#include "MQTTClientPersistence.h"
#endif
......@@ -753,6 +755,16 @@ DLLExport MQTTClient_nameValue* MQTTClient_getVersionInfo(void);
*/
DLLExport int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions* options);
typedef struct MQTTResponse
{
enum MQTTReasonCodes reasonCode;
MQTTProperties* properties; /* optional */
} MQTTResponse;
DLLExport MQTTResponse MQTTClient_connect5(MQTTClient handle, MQTTClient_connectOptions* options,
MQTTProperties* connectProperties, MQTTProperties* willProperties);
/**
* This function attempts to disconnect the client from the MQTT
* server. In order to allow the client time to complete handling of messages
......
/*******************************************************************************
* Copyright (c) 2009, 2014 IBM Corp.
* Copyright (c) 2009, 2018 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
......@@ -14,6 +14,7 @@
* Ian Craggs - initial API and implementation and/or initial documentation
* Ian Craggs, Allan Stockdill-Mander - SSL updates
* Ian Craggs - MQTT 3.1.1 support
* Ian Craggs - MQTT 5.0 support
*******************************************************************************/
/**
......@@ -143,7 +144,7 @@ void* MQTTPacket_Factory(networkHandles* net, int* error)
else
{
if ((pack = (*new_packets[ptype])(header.byte, data, remaining_length)) == NULL)
*error = BAD_MQTT_PACKET;
*error = SOCKET_ERROR; // was BAD_MQTT_PACKET;
#if !defined(NO_PERSISTENCE)
else if (header.bits.type == PUBLISH && header.bits.qos == 2)
{
......@@ -753,3 +754,142 @@ void MQTTPacket_free_packet(MQTTPacket* pack)
free(pack);
FUNC_EXIT;
}
/**
* Writes an integer as 4 bytes to an output buffer.
* @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
* @param anInt the integer to write
*/
void writeInt4(char** pptr, int anInt)
{
**pptr = (char)(anInt / 16777216);
(*pptr)++;
anInt %= 16777216;
**pptr = (char)(anInt / 65536);
(*pptr)++;
anInt %= 65536;
**pptr = (char)(anInt / 256);
(*pptr)++;
**pptr = (char)(anInt % 256);
(*pptr)++;
}
/**
* Calculates an integer from two bytes read from the input buffer
* @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
* @return the integer value calculated
*/
int readInt4(char** pptr)
{
unsigned char* ptr = (unsigned char*)*pptr;
int value = 16777216*(*ptr) + 65536*(*(ptr+1)) + 256*(*(ptr+2)) + (*(ptr+3));
*pptr += 4;
return value;
}
void writeMQTTLenString(char** pptr, MQTTLenString lenstring)
{
writeInt(pptr, lenstring.len);
memcpy(*pptr, lenstring.data, lenstring.len);
*pptr += lenstring.len;
}
int MQTTLenStringRead(MQTTLenString* lenstring, char** pptr, char* enddata)
{
int len = 0;
/* the first two bytes are the length of the string */
if (enddata - (*pptr) > 1) /* enough length to read the integer? */
{
lenstring->len = readInt(pptr); /* increments pptr to point past length */
if (&(*pptr)[lenstring->len] <= enddata)
{
lenstring->data = (char*)*pptr;
*pptr += lenstring->len;
len = 2 + lenstring->len;
}
}
return len;
}
/*
if (prop->value.integer4 >= 0 && prop->value.integer4 <= 127)
len = 1;
else if (prop->value.integer4 >= 128 && prop->value.integer4 <= 16383)
len = 2;
else if (prop->value.integer4 >= 16384 && prop->value.integer4 < 2097151)
len = 3;
else if (prop->value.integer4 >= 2097152 && prop->value.integer4 < 268435455)
len = 4;
*/
int MQTTPacket_VBIlen(int rem_len)
{
int rc = 0;
if (rem_len < 128)
rc = 1;
else if (rem_len < 16384)
rc = 2;
else if (rem_len < 2097152)
rc = 3;
else
rc = 4;
return rc;
}
/**
* Decodes the message length according to the MQTT algorithm
* @param getcharfn pointer to function to read the next character from the data source
* @param value the decoded length returned
* @return the number of bytes read from the socket
*/
int MQTTPacket_VBIdecode(int (*getcharfn)(char*, int), int* value)
{
char c;
int multiplier = 1;
int len = 0;
#define MAX_NO_OF_REMAINING_LENGTH_BYTES 4
*value = 0;
do
{
int rc = MQTTPACKET_READ_ERROR;
if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
{
rc = MQTTPACKET_READ_ERROR; /* bad data */
goto exit;
}
rc = (*getcharfn)(&c, 1);
if (rc != 1)
goto exit;
*value += (c & 127) * multiplier;
multiplier *= 128;
} while ((c & 128) != 0);
exit:
return len;
}
static char* bufptr;
int bufchar(char* c, int count)
{
int i;
for (i = 0; i < count; ++i)
*c = *bufptr++;
return count;
}
int MQTTPacket_decodeBuf(char* buf, int* value)
{
bufptr = buf;
return MQTTPacket_VBIdecode(bufchar, value);
}
......@@ -28,16 +28,19 @@
#include "LinkedList.h"
#include "Clients.h"
/*BE
include "Socket"
include "LinkedList"
include "Clients"
BE*/
typedef unsigned int bool;
typedef void* (*pf)(unsigned char, char*, size_t);
#define BAD_MQTT_PACKET -4
#include "MQTTProperties.h"
enum errors
{
MQTTPACKET_BAD = -4,
MQTTPACKET_BUFFER_TOO_SHORT = -2,
MQTTPACKET_READ_ERROR = -1,
MQTTPACKET_READ_COMPLETE
};
enum msgTypes
{
......@@ -146,7 +149,8 @@ typedef struct
} bits;
#endif
} flags; /**< connack flags byte */
char rc; /**< connack return code */
char rc; /**< connack reason code */
MQTTProperties properties; /**< MQTT 5.0 properties. Not used for MQTT < 5.0 */
} Connack;
......@@ -256,7 +260,13 @@ int MQTTPacket_send_pubcomp(int msgid, networkHandles* net, const char* clientID
void MQTTPacket_free_packet(MQTTPacket* pack);
void writeInt4(char** pptr, int anInt);
int readInt4(char** pptr);
void writeMQTTLenString(char** pptr, MQTTLenString lenstring);
int MQTTLenStringRead(MQTTLenString* lenstring, char** pptr, char* enddata);
int MQTTPacket_VBIlen(int rem_len);
int MQTTPacket_decodeBuf(char* buf, int* value);
#include "MQTTPacketOut.h"
#include "MQTTV5Packet.h"
#endif /* MQTTPACKET_H */
......@@ -16,6 +16,7 @@
* Ian Craggs - MQTT 3.1.1 support
* Rong Xiang, Ian Craggs - C++ compatibility
* Ian Craggs - binary password and will payload
* Ian Craggs - MQTT 5.0 support
*******************************************************************************/
/**
......@@ -37,19 +38,20 @@
/**
* Send an MQTT CONNECT packet down a socket.
* Send an MQTT CONNECT packet down a socket for V5 or later
* @param client a structure from which to get all the required values
* @param MQTTVersion the MQTT version to connect with
* @param connectProperties MQTT V5 properties for the connect packet
* @param willProperties MQTT V5 properties for the will message, if any
* @return the completion code (e.g. TCPSOCKET_COMPLETE)
*/
int MQTTPacket_send_connect(Clients* client, int MQTTVersion)
int MQTTPacket_send_connect(Clients* client, int MQTTVersion,
MQTTProperties* connectProperties, MQTTProperties* willProperties)
{
char *buf, *ptr;
Connect packet;
int rc = -1, len;
MQTTProperties connectProperties = MQTTProperties_initializer;
FUNC_ENTRY;
packet.header.byte = 0;
packet.header.bits.type = CONNECT;
......@@ -63,10 +65,10 @@ int MQTTPacket_send_connect(Clients* client, int MQTTVersion)
len += client->passwordlen+2;
if (MQTTVersion >= 5)
{
//if (connectProperties)
len += MQTTProperties_len(&connectProperties);
/*if (client->will && willProperties)
len += MQTTProperties_len(willProperties);*/
if (connectProperties)
len += MQTTProperties_len(connectProperties);
if (client->will && willProperties)
len += MQTTProperties_len(willProperties);
}
ptr = buf = malloc(len);
......@@ -91,7 +93,6 @@ int MQTTPacket_send_connect(Clients* client, int MQTTVersion)
packet.flags.bits.willQoS = client->will->qos;
packet.flags.bits.willRetain = client->will->retained;
}
if (client->username)
packet.flags.bits.username = 1;
if (client->password)
......@@ -100,10 +101,12 @@ int MQTTPacket_send_connect(Clients* client, int MQTTVersion)
writeChar(&ptr, packet.flags.all);
writeInt(&ptr, client->keepAliveInterval);
if (MQTTVersion == 5)
MQTTProperties_write(&ptr, &connectProperties);
MQTTProperties_write(&ptr, connectProperties);
writeUTF(&ptr, client->clientID);
if (client->will)
{
if (MQTTVersion == 5)
MQTTProperties_write(&ptr, willProperties);
writeUTF(&ptr, client->will->topic);
writeData(&ptr, client->will->payload, client->will->payloadlen);
}
......@@ -136,8 +139,17 @@ void* MQTTPacket_connack(unsigned char aHeader, char* data, size_t datalen)
FUNC_ENTRY;
pack->header.byte = aHeader;
pack->flags.all = readChar(&curdata);
pack->rc = readChar(&curdata);
pack->flags.all = readChar(&curdata); /* connect flags */
pack->rc = readChar(&curdata); /* reason code */
if (datalen > 2)
{
MQTTProperties props = MQTTProperties_initializer;
pack->properties = props;
pack->properties.max_count = 10;
pack->properties.array = malloc(sizeof(MQTTProperty) * pack->properties.max_count);
if (MQTTProperties_read(&pack->properties, &curdata, curdata + datalen) != 1)
pack = NULL; /* signal protocol error */
}
FUNC_EXIT;
return pack;
}
......
......@@ -14,6 +14,7 @@
* Ian Craggs - initial API and implementation and/or initial documentation
* Ian Craggs, Allan Stockdill-Mander - SSL updates
* Ian Craggs - MQTT 3.1.1 support
* Ian Craggs - MQTT 5.0 support
*******************************************************************************/
#if !defined(MQTTPACKETOUT_H)
......@@ -21,7 +22,8 @@
#include "MQTTPacket.h"
int MQTTPacket_send_connect(Clients* client, int MQTTVersion);
int MQTTPacket_send_connect(Clients* client, int MQTTVersion,
MQTTProperties* connectProperties, MQTTProperties* willProperties);
void* MQTTPacket_connack(unsigned char aHeader, char* data, size_t datalen);
int MQTTPacket_send_pingreq(networkHandles* net, const char* clientID);
......
......@@ -14,8 +14,10 @@
* Ian Craggs - initial API and implementation and/or initial documentation
*******************************************************************************/
#include "MQTTV5Properties.h"
#include "MQTTProperties.h"
#include "MQTTPacket.h"
#include "Heap.h"
#define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0]))
......@@ -244,8 +246,13 @@ int MQTTProperties_read(MQTTProperties* properties, char** pptr, char* enddata)
{
*pptr += MQTTPacket_decodeBuf(*pptr, &remlength);
properties->length = remlength;
while (properties->count < properties->max_count && remlength > 0)
while (remlength > 0)
{
if (properties->count == properties->max_count)
{
properties->max_count += 10;
properties->array = realloc(properties->array, sizeof(MQTTProperty) * properties->max_count);
}
remlength -= MQTTProperty_read(&properties->array[properties->count], pptr, enddata);
properties->count++;
}
......@@ -255,3 +262,74 @@ int MQTTProperties_read(MQTTProperties* properties, char** pptr, char* enddata)
return rc;
}
struct {
enum PropertyNames value;
const char* name;
} nameToString[] =
{
{PAYLOAD_FORMAT_INDICATOR, "PAYLOAD_FORMAT_INDICATOR"},
{MESSAGE_EXPIRY_INTERVAL, "MESSAGE_EXPIRY_INTERVAL"},
{CONTENT_TYPE, "CONTENT_TYPE"},
{RESPONSE_TOPIC, "RESPONSE_TOPIC"},
{CORRELATION_DATA, "CORRELATION_DATA"},
{SUBSCRIPTION_IDENTIFIER, "SUBSCRIPTION_IDENTIFIER"},
{SESSION_EXPIRY_INTERVAL, "SESSION_EXPIRY_INTERVAL"},
{ASSIGNED_CLIENT_IDENTIFER, "ASSIGNED_CLIENT_IDENTIFER"},
{SERVER_KEEP_ALIVE, "SERVER_KEEP_ALIVE"},
{AUTHENTICATION_METHOD, "AUTHENTICATION_METHOD"},
{AUTHENTICATION_DATA, "AUTHENTICATION_DATA"},
{REQUEST_PROBLEM_INFORMATION, "REQUEST_PROBLEM_INFORMATION"},
{WILL_DELAY_INTERVAL, "WILL_DELAY_INTERVAL"},
{REQUEST_RESPONSE_INFORMATION, "REQUEST_RESPONSE_INFORMATION"},
{RESPONSE_INFORMATION, "RESPONSE_INFORMATION"},
{SERVER_REFERENCE, "SERVER_REFERENCE"},
{REASON_STRING, "REASON_STRING"},
{RECEIVE_MAXIMUM, "RECEIVE_MAXIMUM"},
{TOPIC_ALIAS_MAXIMUM, "TOPIC_ALIAS_MAXIMUM"},
{TOPIC_ALIAS, "TOPIC_ALIAS"},
{MAXIMUM_QOS, "MAXIMUM_QOS"},
{RETAIN_AVAILABLE, "RETAIN_AVAILABLE"},
{USER_PROPERTY, "USER_PROPERTY"},
{MAXIMUM_PACKET_SIZE, "MAXIMUM_PACKET_SIZE"},
{WILDCARD_SUBSCRIPTION_AVAILABLE, "WILDCARD_SUBSCRIPTION_AVAILABLE"},
{SUBSCRIPTION_IDENTIFIER_AVAILABLE, "SUBSCRIPTION_IDENTIFIER_AVAILABLE"},
{SHARED_SUBSCRIPTION_AVAILABLE, "SHARED_SUBSCRIPTION_AVAILABLE"}
};
const char* MQTTPropertyName(enum PropertyNames value)
{
int i = 0;
const char* result = NULL;
for (i = 0; i < ARRAY_SIZE(nameToString); ++i)
{
if (nameToString[i].value == value)
{
result = nameToString[i].name;
break;
}
}
return result;
}
DLLExport void MQTTProperties_free(MQTTProperties* props)
{
int i = 0;
for (i = 0; i < props->count; ++i)
{
int id = props->array[i].identifier;
switch (MQTTProperty_getType(id))
{
case BINARY_DATA:
case UTF_8_ENCODED_STRING:
break;
case UTF_8_STRING_PAIR:
break;
}
}
free(props->array);
}
......@@ -53,6 +53,16 @@ enum PropertyNames {
SHARED_SUBSCRIPTION_AVAILABLE = 42
};
#if defined(WIN32) || defined(WIN64)
#define DLLImport __declspec(dllimport)
#define DLLExport __declspec(dllexport)
#else
#define DLLImport extern
#define DLLExport __attribute__ ((visibility ("default")))
#endif
DLLExport const char* MQTTPropertyName(enum PropertyNames);
enum PropertyTypes {
BYTE,
TWO_BYTE_INTEGER,
......@@ -63,6 +73,8 @@ enum PropertyTypes {
UTF_8_STRING_PAIR
};
DLLExport int MQTTProperty_getType(int identifier);
typedef struct
{
......@@ -94,10 +106,12 @@ int MQTTProperties_len(MQTTProperties* props);
* @param prop
* @return whether the write succeeded or not, number of bytes written or < 0
*/
int MQTTProperties_add(MQTTProperties* props, MQTTProperty* prop);
DLLExport int MQTTProperties_add(MQTTProperties* props, MQTTProperty* prop);
int MQTTProperties_write(char** pptr, MQTTProperties* properties);
int MQTTProperties_read(MQTTProperties* properties, char** pptr, char* enddata);
DLLExport void MQTTProperties_free(MQTTProperties* properties);
#endif /* MQTTPROPERTIES_H */
......@@ -20,6 +20,7 @@
* Ian Craggs - SNI support
* Ian Craggs - fix for issue #164
* Ian Craggs - fix for issue #179
* Ian Craggs - MQTT 5.0 support
*******************************************************************************/
/**
......@@ -94,9 +95,11 @@ char* MQTTProtocol_addressPort(const char* uri, int* port)
* @return return code
*/
#if defined(OPENSSL)
int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int ssl, int MQTTVersion)
int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int ssl, int MQTTVersion,
MQTTProperties* connectProperties, MQTTProperties* willProperties)
#else
int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int MQTTVersion)
int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int MQTTVersion,
MQTTProperties* connectProperties, MQTTProperties* willProperties)
#endif
{
int rc, port;
......@@ -129,7 +132,7 @@ int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int MQTTVersi
if (rc == 0)
{
/* Now send the MQTT connect packet */
if ((rc = MQTTPacket_send_connect(aClient, MQTTVersion)) == 0)
if ((rc = MQTTPacket_send_connect(aClient, MQTTVersion, connectProperties, willProperties)) == 0)
aClient->connect_state = 3; /* MQTT Connect sent - wait for CONNACK */
else
aClient->connect_state = 0;
......
/*******************************************************************************
* Copyright (c) 2009, 2017 IBM Corp.
* Copyright (c) 2009, 2018 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
......@@ -15,6 +15,7 @@
* Ian Craggs, Allan Stockdill-Mander - SSL updates
* Ian Craggs - MQTT 3.1.1 support
* Ian Craggs - SNI support
* Ian Craggs - MQTT 5.0 support
*******************************************************************************/
#if !defined(MQTTPROTOCOLOUT_H)
......@@ -33,9 +34,11 @@
char* MQTTProtocol_addressPort(const char* uri, int* port);
void MQTTProtocol_reconnect(const char* ip_address, Clients* client);
#if defined(OPENSSL)
int MQTTProtocol_connect(const char* ip_address, Clients* acClients, int ssl, int MQTTVersion);
int MQTTProtocol_connect(const char* ip_address, Clients* acClients, int ssl, int MQTTVersion,
MQTTProperties* connectProperties, MQTTProperties* willProperties);
#else
int MQTTProtocol_connect(const char* ip_address, Clients* acClients, int MQTTVersion);
int MQTTProtocol_connect(const char* ip_address, Clients* acClients, int MQTTVersion,
MQTTProperties* connectProperties, MQTTProperties* willProperties);
#endif
int MQTTProtocol_handlePingresps(void* pack, int sock);
int MQTTProtocol_subscribe(Clients* client, List* topics, List* qoss, int msgID);
......
......@@ -14,7 +14,7 @@
* Ian Craggs - initial API and implementation and/or initial documentation
*******************************************************************************/
enum ReasonCodes {
enum MQTTReasonCodes {
SUCCESS = 0,
NORMAL_DISCONNECTION = 0,
GRANTED_QOS_0 = 0,
......
/*******************************************************************************
* Copyright (c) 2017, 2018 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
*******************************************************************************/
#include "MQTTV5Packet.h"
#include "MQTTPacket.h"
#include <string.h>
/**
* Writes an integer as 4 bytes to an output buffer.
* @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
* @param anInt the integer to write
*/
void writeInt4(char** pptr, int anInt)
{
**pptr = (char)(anInt / 16777216);
(*pptr)++;
anInt %= 16777216;
**pptr = (char)(anInt / 65536);
(*pptr)++;
anInt %= 65536;
**pptr = (char)(anInt / 256);
(*pptr)++;
**pptr = (char)(anInt % 256);
(*pptr)++;
}
/**
* Calculates an integer from two bytes read from the input buffer
* @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
* @return the integer value calculated
*/
int readInt4(char** pptr)
{
char* ptr = *pptr;
int value = 16777216*(*ptr) + 65536*(*(ptr+1)) + 256*(*(ptr+2)) + (*(ptr+3));
*pptr += 4;
return value;
}
void writeMQTTLenString(char** pptr, MQTTLenString lenstring)
{
writeInt(pptr, lenstring.len);
memcpy(*pptr, lenstring.data, lenstring.len);
*pptr += lenstring.len;
}
int MQTTLenStringRead(MQTTLenString* lenstring, char** pptr, char* enddata)
{
int len = 0;
/* the first two bytes are the length of the string */
if (enddata - (*pptr) > 1) /* enough length to read the integer? */
{
lenstring->len = readInt(pptr); /* increments pptr to point past length */
if (&(*pptr)[lenstring->len] <= enddata)
{
lenstring->data = (char*)*pptr;
*pptr += lenstring->len;
len = 2 + lenstring->len;
}
}
return len;
}
/*
if (prop->value.integer4 >= 0 && prop->value.integer4 <= 127)
len = 1;
else if (prop->value.integer4 >= 128 && prop->value.integer4 <= 16383)
len = 2;
else if (prop->value.integer4 >= 16384 && prop->value.integer4 < 2097151)
len = 3;
else if (prop->value.integer4 >= 2097152 && prop->value.integer4 < 268435455)
len = 4;
*/
int MQTTPacket_VBIlen(int rem_len)
{
int rc = 0;
if (rem_len < 128)
rc = 1;
else if (rem_len < 16384)
rc = 2;
else if (rem_len < 2097152)
rc = 3;
else
rc = 4;
return rc;
}
/**
* Decodes the message length according to the MQTT algorithm
* @param getcharfn pointer to function to read the next character from the data source
* @param value the decoded length returned
* @return the number of bytes read from the socket
*/
int MQTTPacket_VBIdecode(int (*getcharfn)(char*, int), int* value)
{
char c;
int multiplier = 1;
int len = 0;
#define MAX_NO_OF_REMAINING_LENGTH_BYTES 4
*value = 0;
do
{
int rc = MQTTPACKET_READ_ERROR;
if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
{
rc = MQTTPACKET_READ_ERROR; /* bad data */
goto exit;
}
rc = (*getcharfn)(&c, 1);
if (rc != 1)
goto exit;
*value += (c & 127) * multiplier;
multiplier *= 128;
} while ((c & 128) != 0);
exit:
return len;
}
static char* bufptr;
int bufchar(char* c, int count)
{
int i;
for (i = 0; i < count; ++i)
*c = *bufptr++;
return count;
}
int MQTTPacket_decodeBuf(char* buf, int* value)
{
bufptr = buf;
return MQTTPacket_VBIdecode(bufchar, value);
}
/*******************************************************************************
* Copyright (c) 2017, 2018 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
*******************************************************************************/
#if !defined(MQTTV5PACKET_H)
#define MQTTV5PACKET_H
#include "MQTTV5Properties.h"
enum errors
{
MQTTPACKET_BUFFER_TOO_SHORT = -2,
MQTTPACKET_READ_ERROR = -1,
MQTTPACKET_READ_COMPLETE
};
void writeInt4(char** pptr, int anInt);
int readInt4(char** pptr);
void writeMQTTLenString(char** pptr, MQTTLenString lenstring);
int MQTTLenStringRead(MQTTLenString* lenstring, char** pptr, char* enddata);
int MQTTPacket_VBIlen(int rem_len);
int MQTTPacket_decodeBuf(char* buf, int* value);
#endif /* MQTTV5PACKET_H */
......@@ -95,6 +95,25 @@ SET_TESTS_PROPERTIES(
PROPERTIES TIMEOUT 540
)
ADD_EXECUTABLE(
test15
test15.c
)
TARGET_LINK_LIBRARIES(
test15
paho-mqtt3c
)
ADD_TEST(
NAME test15-1-single-thread-client
COMMAND "test15" "--test_no" "1" "--connection" ${MQTT_TEST_BROKER}
)
SET_TESTS_PROPERTIES(
test15-1-single-thread-client
PROPERTIES TIMEOUT 540
)
ADD_EXECUTABLE(
test2
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment