Commit c6aba7f1 authored by Ian Craggs's avatar Ian Craggs

MQTT 5.0 subscribe packets - #417

parent 14035190
......@@ -1266,7 +1266,7 @@ static int MQTTAsync_processCommand(void)
ListAppend(topics, command->command.details.sub.topics[i], strlen(command->command.details.sub.topics[i]));
ListAppend(qoss, &command->command.details.sub.qoss[i], sizeof(int));
}
rc = MQTTProtocol_subscribe(command->client->c, topics, qoss, command->command.token);
rc = MQTTProtocol_subscribe(command->client->c, topics, qoss, command->command.token, NULL, NULL);
ListFreeNoContent(topics);
ListFreeNoContent(qoss);
}
......
......@@ -1412,13 +1412,15 @@ int MQTTClient_isConnected(MQTTClient handle)
}
int MQTTClient_subscribeMany(MQTTClient handle, int count, char* const* topic, int* qos)
MQTTResponse MQTTClient_subscribeMany5(MQTTClient handle, int count, char* const* topic, int* qos,
MQTTSubscribe_options* opts, MQTTProperties* props)
{
MQTTClients* m = handle;
List* topics = NULL;
List* qoss = NULL;
int i = 0;
int rc = MQTTCLIENT_FAILURE;
MQTTResponse resp = {MQTTCLIENT_FAILURE, NULL};
int msgid = 0;
FUNC_ENTRY;
......@@ -1463,7 +1465,7 @@ int MQTTClient_subscribeMany(MQTTClient handle, int count, char* const* topic, i
ListAppend(qoss, &qos[i], sizeof(int));
}
rc = MQTTProtocol_subscribe(m->c, topics, qoss, msgid);
rc = MQTTProtocol_subscribe(m->c, topics, qoss, msgid, opts, props);
ListFreeNoContent(topics);
ListFreeNoContent(qoss);
......@@ -1497,27 +1499,47 @@ int MQTTClient_subscribeMany(MQTTClient handle, int count, char* const* topic, i
rc = MQTTCLIENT_SUCCESS;
exit:
resp.reasonCode = rc;
Thread_unlock_mutex(mqttclient_mutex);
Thread_unlock_mutex(subscribe_mutex);
FUNC_EXIT_RC(rc);
return rc;
FUNC_EXIT_RC(resp.reasonCode);
return resp;
}
int MQTTClient_subscribe(MQTTClient handle, const char* topic, int qos)
int MQTTClient_subscribeMany(MQTTClient handle, int count, char* const* topic, int* qos)
{
int rc = 0;
MQTTResponse response = MQTTClient_subscribeMany5(handle, count, topic, qos, NULL, NULL);
return response.reasonCode;
}
MQTTResponse MQTTClient_subscribe5(MQTTClient handle, const char* topic, int qos,
MQTTSubscribe_options* opts, MQTTProperties* props)
{
MQTTResponse rc;
char *const topics[] = {(char*)topic};
FUNC_ENTRY;
rc = MQTTClient_subscribeMany(handle, 1, topics, &qos);
rc = MQTTClient_subscribeMany5(handle, 1, topics, &qos, opts, props);
if (qos == MQTT_BAD_SUBSCRIBE) /* addition for MQTT 3.1.1 - error code from subscribe */
rc = MQTT_BAD_SUBSCRIBE;
FUNC_EXIT_RC(rc);
rc.reasonCode = MQTT_BAD_SUBSCRIBE;
FUNC_EXIT_RC(rc.reasonCode);
return rc;
}
int MQTTClient_subscribe(MQTTClient handle, const char* topic, int qos)
{
MQTTResponse response = MQTTClient_subscribe5(handle, topic, qos, NULL, NULL);
return response.reasonCode;
}
int MQTTClient_unsubscribeMany(MQTTClient handle, int count, char* const* topic)
{
MQTTClients* m = handle;
......
......@@ -813,6 +813,24 @@ DLLExport int MQTTClient_isConnected(MQTTClient handle);
*/
DLLExport int MQTTClient_subscribe(MQTTClient handle, const char* topic, int qos);
typedef struct MQTTSubscribe_options
{
/** The eyecatcher for this structure. Must be MQSO. */
char struct_id[4];
/** The version number of this structure. Must be 0.
*/
int struct_version;
unsigned char noLocal; /* 0 or 1 */
unsigned char retainAsPublished; /* 0 or 1 */
unsigned char retainHandling; /* 0, 1 or 2 */
} MQTTSubscribe_options;
#define MQTTSubscribe_options_initializer { {'M', 'Q', 'S', 'O'}, 0, 0, 0, 0 }
DLLExport MQTTResponse MQTTClient_subscribe5(MQTTClient handle, const char* topic, int qos,
MQTTSubscribe_options* opts, MQTTProperties* props);
/**
* This function attempts to subscribe a client to a list of topics, which may
* contain wildcards (see @ref wildcard). This call also specifies the
......@@ -831,6 +849,9 @@ DLLExport int MQTTClient_subscribe(MQTTClient handle, const char* topic, int qos
*/
DLLExport int MQTTClient_subscribeMany(MQTTClient handle, int count, char* const* topic, int* qos);
DLLExport MQTTResponse MQTTClient_subscribeMany5(MQTTClient handle, int count, char* const* topic, int* qos,
MQTTSubscribe_options* opts, MQTTProperties* props);
/**
* This function attempts to remove an existing subscription made by the
* specified client.
......
......@@ -65,7 +65,6 @@ int MQTTPacket_send_connect(Clients* client, int MQTTVersion,
len += client->passwordlen+2;
if (MQTTVersion >= 5)
{
if (connectProperties)
len += MQTTProperties_len(connectProperties);
if (client->will && willProperties)
len += MQTTProperties_len(willProperties);
......@@ -187,13 +186,14 @@ int MQTTPacket_send_pingreq(networkHandles* net, const char* clientID)
* @param clientID the string client identifier, only used for tracing
* @return the completion code (e.g. TCPSOCKET_COMPLETE)
*/
int MQTTPacket_send_subscribe(List* topics, List* qoss, int msgid, int dup, networkHandles* net, const char* clientID)
int MQTTPacket_send_subscribe(List* topics, List* qoss, MQTTSubscribe_options* opts, MQTTProperties* props,
int msgid, int dup, Clients* client)
{
Header header;
char *data, *ptr;
int rc = -1;
ListElement *elem = NULL, *qosElem = NULL;
int datalen;
int datalen, i = 0;
FUNC_ENTRY;
header.bits.type = SUBSCRIBE;
......@@ -204,18 +204,34 @@ int MQTTPacket_send_subscribe(List* topics, List* qoss, int msgid, int dup, netw
datalen = 2 + topics->count * 3; /* utf length + char qos == 3 */
while (ListNextElement(topics, &elem))
datalen += (int)strlen((char*)(elem->content));
ptr = data = malloc(datalen);
if (client->MQTTVersion >= 5)
datalen += MQTTProperties_len(props);
ptr = data = malloc(datalen);
writeInt(&ptr, msgid);
if (client->MQTTVersion >= 5)
MQTTProperties_write(&ptr, props);
elem = NULL;
while (ListNextElement(topics, &elem))
{
char subopts = 0;
ListNextElement(qoss, &qosElem);
writeUTF(&ptr, (char*)(elem->content));
subopts = *(int*)(qosElem->content);
if (client->MQTTVersion >= 5 && opts != NULL)
{
subopts |= (opts[i].noLocal << 2); /* 1 bit */
subopts |= (opts[i].retainAsPublished << 3); /* 1 bit */
subopts |= (opts[i].retainHandling << 4); /* 2 bits */
}
writeChar(&ptr, *(int*)(qosElem->content));
++i;
}
rc = MQTTPacket_send(net, header, data, datalen, 1);
Log(LOG_PROTOCOL, 22, NULL, net->socket, clientID, msgid, rc);
rc = MQTTPacket_send(&client->net, header, data, datalen, 1);
Log(LOG_PROTOCOL, 22, NULL, client->net.socket, client->clientID, msgid, rc);
if (rc != TCPSOCKET_INTERRUPTED)
free(data);
FUNC_EXIT_RC(rc);
......
......@@ -28,7 +28,8 @@ void* MQTTPacket_connack(unsigned char aHeader, char* data, size_t datalen);
int MQTTPacket_send_pingreq(networkHandles* net, const char* clientID);
int MQTTPacket_send_subscribe(List* topics, List* qoss, int msgid, int dup, networkHandles* net, const char* clientID);
int MQTTPacket_send_subscribe(List* topics, List* qoss, MQTTSubscribe_options* opts, MQTTProperties* props,
int msgid, int dup, Clients* client);
void* MQTTPacket_suback(unsigned char aHeader, char* data, size_t datalen);
int MQTTPacket_send_unsubscribe(List* topics, int msgid, int dup, networkHandles* net, const char* clientID);
......
......@@ -17,8 +17,11 @@
#include "MQTTProperties.h"
#include "MQTTPacket.h"
#include "MQTTProtocolClient.h"
#include "Heap.h"
#include <memory.h>
#define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0]))
struct nameToType
......@@ -57,6 +60,14 @@ struct nameToType
};
static char* datadup(MQTTLenString* str)
{
char* temp = malloc(str->len);
memcpy(temp, str->data, str->len);
return temp;
}
int MQTTProperty_getType(int identifier)
{
int i, rc = -1;
......@@ -76,7 +87,7 @@ int MQTTProperty_getType(int identifier)
int MQTTProperties_len(MQTTProperties* props)
{
/* properties length is an mbi */
return props->length + MQTTPacket_VBIlen(props->length);
return (props == NULL) ? 1 : props->length + MQTTPacket_VBIlen(props->length);
}
......@@ -84,11 +95,23 @@ int MQTTProperties_add(MQTTProperties* props, MQTTProperty* prop)
{
int rc = 0, type;
if (props->count == props->max_count)
rc = -1; /* max number of properties already in structure */
else if ((type = MQTTProperty_getType(prop->identifier)) < 0)
rc = -2;
else
if ((type = MQTTProperty_getType(prop->identifier)) < 0)
{
rc = MQTT_INVALID_PROPERTY_ID;
goto exit;
}
else if (props->array == NULL)
{
props->max_count = 10;
props->array = malloc(sizeof(MQTTProperty) * props->max_count);
}
else if (props->count == props->max_count)
{
props->max_count += 10;
props->array = realloc(props->array, sizeof(MQTTProperty) * props->max_count);
}
if (props->array)
{
int len = 0;
......@@ -110,16 +133,20 @@ int MQTTProperties_add(MQTTProperties* props, MQTTProperty* prop)
break;
case BINARY_DATA:
case UTF_8_ENCODED_STRING:
len = 2 + prop->value.data.len;
break;
case UTF_8_STRING_PAIR:
len = 2 + prop->value.data.len;
props->array[props->count-1].value.data.data = datadup(&prop->value.data);
if (type == UTF_8_STRING_PAIR)
{
len += 2 + prop->value.value.len;
props->array[props->count-1].value.value.data = datadup(&prop->value.value);
}
break;
}
props->length += len + 1; /* add identifier byte */
}
exit:
return rc;
}
......@@ -149,6 +176,7 @@ int MQTTProperty_write(char** pptr, MQTTProperty* prop)
break;
case VARIABLE_BYTE_INTEGER:
rc = MQTTPacket_encode(*pptr, prop->value.integer4);
*pptr += rc;
break;
case BINARY_DATA:
case UTF_8_ENCODED_STRING:
......@@ -169,7 +197,7 @@ int MQTTProperty_write(char** pptr, MQTTProperty* prop)
/**
* write the supplied properties into a packet buffer
* @param pptr pointer to the buffer - move the pointer as we add data
* @param remlength the max length of the buffer
* @param properties pointer to the property list, can be NULL
* @return whether the write succeeded or not, number of bytes written or < 0
*/
int MQTTProperties_write(char** pptr, MQTTProperties* properties)
......@@ -178,6 +206,13 @@ int MQTTProperties_write(char** pptr, MQTTProperties* properties)
int i = 0, len = 0;
/* write the entire property list length first */
if (properties == NULL)
{
*pptr += MQTTPacket_encode(*pptr, 0);
rc = 1;
}
else
{
*pptr += MQTTPacket_encode(*pptr, properties->length);
len = rc = 1;
for (i = 0; i < properties->count; ++i)
......@@ -190,7 +225,7 @@ int MQTTProperties_write(char** pptr, MQTTProperties* properties)
}
if (rc >= 0)
rc = len;
}
return rc;
}
......@@ -224,11 +259,14 @@ int MQTTProperty_read(MQTTProperty* prop, char** pptr, char* enddata)
break;
case BINARY_DATA:
case UTF_8_ENCODED_STRING:
len = MQTTLenStringRead(&prop->value.data, pptr, enddata);
break;
case UTF_8_STRING_PAIR:
len = MQTTLenStringRead(&prop->value.data, pptr, enddata);
prop->value.data.data = datadup(&prop->value.data);
if (type == UTF_8_STRING_PAIR)
{
len += MQTTLenStringRead(&prop->value.value, pptr, enddata);
prop->value.value.data = datadup(&prop->value.value);
}
break;
}
}
......@@ -321,15 +359,20 @@ DLLExport void MQTTProperties_free(MQTTProperties* props)
for (i = 0; i < props->count; ++i)
{
int id = props->array[i].identifier;
int type = MQTTProperty_getType(id);
switch (MQTTProperty_getType(id))
switch (type)
{
case BINARY_DATA:
case UTF_8_ENCODED_STRING:
break;
case UTF_8_STRING_PAIR:
free(props->array[i].value.data.data);
if (type == UTF_8_STRING_PAIR)
free(props->array[i].value.value.data);
break;
}
}
if (props->array)
free(props->array);
memset(props, '\0', sizeof(MQTTProperties)); /* zero all fields */
}
......@@ -17,11 +17,7 @@
#if !defined(MQTTPROPERTIES_H)
#define MQTTPROPERTIES_H
typedef struct
{
int len;
char* data;
} MQTTLenString;
#define MQTT_INVALID_PROPERTY_ID -2
enum PropertyNames {
PAYLOAD_FORMAT_INDICATOR = 1,
......@@ -75,6 +71,11 @@ enum PropertyTypes {
DLLExport int MQTTProperty_getType(int identifier);
typedef struct
{
int len;
char* data;
} MQTTLenString;
typedef struct
{
......@@ -83,8 +84,10 @@ typedef struct
char byte;
short integer2;
int integer4;
struct {
MQTTLenString data;
MQTTLenString value; /* for user properties */
};
} value;
} MQTTProperty;
......
......@@ -171,15 +171,18 @@ int MQTTProtocol_handlePingresps(void* pack, int sock)
* @param client the client structure
* @param topics list of topics
* @param qoss corresponding list of QoSs
* @param opts MQTT 5.0 subscribe options
* @param props MQTT 5.0 subscribe properties
* @return completion code
*/
int MQTTProtocol_subscribe(Clients* client, List* topics, List* qoss, int msgID)
int MQTTProtocol_subscribe(Clients* client, List* topics, List* qoss, int msgID,
MQTTSubscribe_options* opts, MQTTProperties* props)
{
int rc = 0;
FUNC_ENTRY;
/* we should stack this up for retry processing too */
rc = MQTTPacket_send_subscribe(topics, qoss, msgID, 0, &client->net, client->clientID);
rc = MQTTPacket_send_subscribe(topics, qoss, opts, props, msgID, 0, client);
FUNC_EXIT_RC(rc);
return rc;
}
......
......@@ -41,7 +41,8 @@ int MQTTProtocol_connect(const char* ip_address, Clients* acClients, int MQTTVer
MQTTProperties* connectProperties, MQTTProperties* willProperties);
#endif
int MQTTProtocol_handlePingresps(void* pack, int sock);
int MQTTProtocol_subscribe(Clients* client, List* topics, List* qoss, int msgID);
int MQTTProtocol_subscribe(Clients* client, List* topics, List* qoss, int msgID,
MQTTSubscribe_options* opts, MQTTProperties* props);
int MQTTProtocol_handleSubacks(void* pack, int sock);
int MQTTProtocol_unsubscribe(Clients* client, List* topics, int msgID);
int MQTTProtocol_handleUnsubacks(void* pack, int sock);
......
......@@ -107,7 +107,7 @@ TARGET_LINK_LIBRARIES(
ADD_TEST(
NAME test15-1-single-thread-client
COMMAND "test15" "--test_no" "1" "--connection" ${MQTT_TEST_BROKER}
COMMAND "test15" "--test_no" "1" "--connection" ${MQTT_TEST_BROKER} "--proxy_connection" ${MQTT_TEST_PROXY}
)
SET_TESTS_PROPERTIES(
......
......@@ -396,6 +396,7 @@ int test1(struct Options options)
MQTTClient_willOptions wopts = MQTTClient_willOptions_initializer;
MQTTProperties props = MQTTProperties_initializer;
MQTTProperties willProps = MQTTProperties_initializer;
MQTTProperty property;
MQTTResponse response;
int rc = 0;
char* test_topic = "C client test1";
......@@ -432,10 +433,23 @@ int test1(struct Options options)
opts.will->topicName = "will topic";
opts.will = NULL;
property.identifier = SESSION_EXPIRY_INTERVAL;
property.value.integer4 = 30;
MQTTProperties_add(&props, &property);
property.identifier = USER_PROPERTY;
property.value.data.data = "test user property";
property.value.data.len = strlen(property.value.data.data);
property.value.value.data = "test user property value";
property.value.value.len = strlen(property.value.value.data);
MQTTProperties_add(&props, &property);
MyLog(LOGA_DEBUG, "Connecting");
response = MQTTClient_connect5(c, &opts, &props, &willProps);
assert("Good rc from connect", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
if (rc != MQTTCLIENT_SUCCESS)
MQTTProperties_free(&props);
MQTTProperties_free(&willProps);
if (response.reasonCode != MQTTCLIENT_SUCCESS)
goto exit;
if (response.properties)
......@@ -444,8 +458,18 @@ int test1(struct Options options)
MQTTProperties_free(response.properties);
}
//rc = MQTTClient_subscribe(c, test_topic, subsqos);
//assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
property.identifier = SUBSCRIPTION_IDENTIFIER;
property.value.integer4 = 33;
MQTTProperties_add(&props, &property);
response = MQTTClient_subscribe5(c, test_topic, subsqos, NULL, &props);
assert("Good rc from subscribe", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
MQTTProperties_free(&props);
if (response.properties)
{
logProperties(response.properties);
MQTTProperties_free(response.properties);
}
//test1_sendAndReceive(c, 0, test_topic);
//test1_sendAndReceive(c, 1, test_topic);
......
......@@ -5,8 +5,13 @@ if [ "$TRAVIS_OS_NAME" == "linux" ]; then
sudo service mosquitto stop
# Stop any mosquitto instance which may be still running from previous runs
killall mosquitto
mosquitto -h
mosquitto -c test/tls-testing/mosquitto.conf &
# mosquitto -h
# mosquitto -c test/tls-testing/mosquitto.conf &
git clone https://github.com/eclipse/paho.mqtt.testing.git
cd paho.mqtt.testing/interoperability
python3 startbroker.py -c localhost_testing.conf
cd ../..
fi
if [ "$TRAVIS_OS_NAME" == "osx" ]; then
......@@ -14,5 +19,10 @@ if [ "$TRAVIS_OS_NAME" == "osx" ]; then
brew update
brew install openssl mosquitto
brew services stop mosquitto
/usr/local/sbin/mosquitto -c test/tls-testing/mosquitto.conf &
#/usr/local/sbin/mosquitto -c test/tls-testing/mosquitto.conf &
git clone https://github.com/eclipse/paho.mqtt.testing.git
cd paho.mqtt.testing/interoperability
python3 startbroker.py -c localhost_testing.conf
cd ../..
fi
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment