Commit 35080b11 authored by Ian Craggs's avatar Ian Craggs

Report reason codes and properties back from un/subscribe #467

parent 2aebba99
......@@ -277,6 +277,7 @@ typedef struct
char** topics;
int* qoss;
MQTTSubscribe_options opts;
MQTTSubscribe_options* optlist;
} sub;
struct
{
......@@ -1378,11 +1379,16 @@ static int MQTTAsync_processCommand(void)
if (command->client->c->MQTTVersion >= MQTTVERSION_5)
{
props = &command->command.properties;
subopts = &command->command.details.sub.opts;
if (command->command.details.sub.count > 1)
subopts = command->command.details.sub.optlist;
else
subopts = &command->command.details.sub.opts;
}
rc = MQTTProtocol_subscribe(command->client->c, topics, qoss, command->command.token, subopts, props);
ListFreeNoContent(topics);
ListFreeNoContent(qoss);
if (command->command.details.sub.count > 1)
free(command->command.details.sub.optlist);
}
else if (command->command.type == UNSUBSCRIBE)
{
......@@ -2171,14 +2177,14 @@ static thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
else if (command->command.onSuccess5)
{
MQTTAsync_successData5 data;
int* array = NULL;
enum MQTTReasonCodes* array = NULL;
if (sub->qoss->count == 1)
data.alt.qos = *(int*)(sub->qoss->first->content);
else if (sub->qoss->count > 1)
data.reasonCode = *(int*)(sub->qoss->first->content);
data.alt.sub.reasonCodeCount = sub->qoss->count;
if (sub->qoss->count > 1)
{
ListElement* cur_qos = NULL;
int* element = array = data.alt.qosList = malloc(sub->qoss->count * sizeof(int));
enum MQTTReasonCodes* element = array = data.alt.sub.reasonCodes = malloc(sub->qoss->count * sizeof(enum MQTTReasonCodes));
while (ListNextElement(sub->qoss, &cur_qos))
*element++ = *(int*)(cur_qos->content);
}
......@@ -2252,9 +2258,9 @@ static thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
MQTTAsync_successData5 data;
enum MQTTReasonCodes* array = NULL;
if (unsub->reasonCodes->count == 1)
data.alt.unsub.reasonCode = *(enum MQTTReasonCodes*)(unsub->reasonCodes->first->content);
else if (unsub->reasonCodes->count > 1)
data.reasonCode = *(enum MQTTReasonCodes*)(unsub->reasonCodes->first->content);
data.alt.unsub.reasonCodeCount = unsub->reasonCodes->count;
if (unsub->reasonCodes->count > 1)
{
ListElement* cur_rc = NULL;
enum MQTTReasonCodes* element = array = data.alt.unsub.reasonCodes = malloc(unsub->reasonCodes->count * sizeof(enum MQTTReasonCodes));
......@@ -3066,6 +3072,11 @@ int MQTTAsync_subscribeMany(MQTTAsync handle, int count, char* const* topic, int
rc = MQTTASYNC_NO_MORE_MSGIDS;
goto exit;
}
if (m->c->MQTTVersion >= MQTTVERSION_5 && count > 1 && count != response->subscribe_options_count)
{
rc = MQTTASYNC_BAD_MQTT_OPTIONS;
goto exit;
}
/* Add subscribe request to operation queue */
sub = malloc(sizeof(MQTTAsync_queuedCommand));
......@@ -3084,6 +3095,12 @@ int MQTTAsync_subscribeMany(MQTTAsync handle, int count, char* const* topic, int
{
sub->command.properties = MQTTProperties_copy(&response->properties);
sub->command.details.sub.opts = response->subscribe_options;
if (count > 1)
{
sub->command.details.sub.optlist = malloc(sizeof(MQTTSubscribe_options) * count);
for (i = 0; i < count; ++i)
sub->command.details.sub.optlist[i] = response->subscribe_options_list[i];
}
}
}
sub->command.type = SUBSCRIBE;
......
......@@ -499,10 +499,12 @@ typedef struct
/** A union of the different values that can be returned for subscribe, unsubscribe and publish. */
union
{
/** For subscribe, the granted QoS of the subscription returned by the server. */
int qos;
/** For subscribeMany, the list of granted QoSs of the subscriptions returned by the server. */
int* qosList;
/** For subscribeMany, the list of reasonCodes returned by the server. */
struct
{
int reasonCodeCount;
enum MQTTReasonCodes* reasonCodes;
} sub;
/** For publish, the message being sent to the server. */
struct
{
......@@ -516,9 +518,10 @@ typedef struct
int MQTTVersion;
int sessionPresent;
} connect;
/** For unsubscribeMany, the list of reasonCodes returned by the server. */
struct
{
enum MQTTReasonCodes reasonCode;
int reasonCodeCount;
enum MQTTReasonCodes* reasonCodes;
} unsub;
} alt;
......@@ -603,9 +606,11 @@ typedef struct MQTTAsync_responseOptions
*/
MQTTProperties properties;
MQTTSubscribe_options subscribe_options;
int subscribe_options_count;
MQTTSubscribe_options* subscribe_options_list;
} MQTTAsync_responseOptions;
#define MQTTAsync_responseOptions_initializer { {'M', 'Q', 'T', 'R'}, 1, NULL, NULL, 0, 0, NULL, NULL, MQTTProperties_initializer, MQTTSubscribe_options_initializer }
#define MQTTAsync_responseOptions_initializer { {'M', 'Q', 'T', 'R'}, 1, NULL, NULL, 0, 0, NULL, NULL, MQTTProperties_initializer, MQTTSubscribe_options_initializer, 0, NULL}
typedef struct MQTTAsync_responseOptions MQTTAsync_callOptions;
#define MQTTAsync_callOptions_initializer MQTTAsync_responseOptions_initializer
......
......@@ -564,6 +564,8 @@ DLLExport void MQTTResponse_free(MQTTResponse response)
FUNC_ENTRY;
if (response.properties)
{
if (response.reasonCodeCount > 0 && response.reasonCodes)
free(response.reasonCodes);
MQTTProperties_free(response.properties);
free(response.properties);
}
......@@ -1044,9 +1046,10 @@ static MQTTResponse MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_c
MQTTClients* m = handle;
int rc = SOCKET_ERROR;
int sessionPresent = 0;
MQTTResponse resp = {SOCKET_ERROR, NULL};
MQTTResponse resp = MQTTResponse_initializer;
FUNC_ENTRY;
resp.reasonCode = SOCKET_ERROR;
if (m->ma && !running)
{
Thread_start(MQTTClient_run, handle);
......@@ -1292,10 +1295,11 @@ static MQTTResponse MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectO
MQTTClients* m = handle;
START_TIME_TYPE start;
long millisecsTimeout = 30000L;
MQTTResponse rc = {SOCKET_ERROR, NULL};
MQTTResponse rc = MQTTResponse_initializer;
int MQTTVersion = 0;
FUNC_ENTRY;
rc.reasonCode = SOCKET_ERROR;
millisecsTimeout = options->connectTimeout * 1000;
start = MQTTClient_start_clock();
......@@ -1455,12 +1459,13 @@ MQTTResponse MQTTClient_connect5(MQTTClient handle, MQTTClient_connectOptions* o
MQTTProperties* connectProperties, MQTTProperties* willProperties)
{
MQTTClients* m = handle;
MQTTResponse rc = {SOCKET_ERROR, NULL};
MQTTResponse rc = MQTTResponse_initializer;
FUNC_ENTRY;
Thread_lock_mutex(connect_mutex);
Thread_lock_mutex(mqttclient_mutex);
rc.reasonCode = SOCKET_ERROR;
if (options == NULL)
{
rc.reasonCode = MQTTCLIENT_NULL_PARAMETER;
......@@ -1714,21 +1719,22 @@ int MQTTClient_isConnected(MQTTClient handle)
}
MQTTResponse MQTTClient_subscribeMany5(MQTTClient handle, int count, char* const* topic, int* qos,
MQTTSubscribe_options* opts, MQTTProperties* props)
MQTTResponse MQTTClient_subscribeMany5(MQTTClient handle, int count, char* const* topic,
int* qos, MQTTSubscribe_options* opts, MQTTProperties* props)
{
MQTTClients* m = handle;
List* topics = NULL;
List* qoss = NULL;
int i = 0;
int rc = MQTTCLIENT_FAILURE;
MQTTResponse resp = {MQTTCLIENT_FAILURE, NULL};
MQTTResponse resp = MQTTResponse_initializer;
int msgid = 0;
FUNC_ENTRY;
Thread_lock_mutex(subscribe_mutex);
Thread_lock_mutex(mqttclient_mutex);
resp.reasonCode = MQTTCLIENT_FAILURE;
if (m == NULL || m->c == NULL)
{
rc = MQTTCLIENT_FAILURE;
......@@ -1747,7 +1753,7 @@ MQTTResponse MQTTClient_subscribeMany5(MQTTClient handle, int count, char* const
goto exit;
}
if(qos[i] < 0 || qos[i] > 2)
if (qos[i] < 0 || qos[i] > 2)
{
rc = MQTTCLIENT_BAD_QOS;
goto exit;
......@@ -1781,12 +1787,36 @@ MQTTResponse MQTTClient_subscribeMany5(MQTTClient handle, int count, char* const
if (pack != NULL)
{
Suback* sub = (Suback*)pack;
ListElement* current = NULL;
i = 0;
while (ListNextElement(sub->qoss, &current))
if (m->c->MQTTVersion == MQTTVERSION_5)
{
int* reqqos = (int*)(current->content);
qos[i++] = *reqqos;
if (sub->properties.count > 0)
{
resp.properties = malloc(sizeof(MQTTProperties));
*resp.properties = MQTTProperties_copy(&sub->properties);
}
resp.reasonCodeCount = sub->qoss->count;
resp.reasonCode = *(int*)sub->qoss->first->content;
if (sub->qoss->count > 1)
{
ListElement* current = NULL;
int count = 0;
resp.reasonCodes = malloc(sizeof(enum MQTTReasonCodes) * (sub->qoss->count));
while (ListNextElement(sub->qoss, &current))
(resp.reasonCodes)[count++] = *(enum MQTTReasonCodes*)(current->content);
}
}
else
{
ListElement* current = NULL;
i = 0;
while (ListNextElement(sub->qoss, &current))
{
int* reqqos = (int*)(current->content);
qos[i++] = *reqqos;
}
resp.reasonCode = rc;
}
rc = MQTTProtocol_handleSubacks(pack, m->c->net.socket);
m->pack = NULL;
......@@ -1801,7 +1831,8 @@ MQTTResponse MQTTClient_subscribeMany5(MQTTClient handle, int count, char* const
rc = MQTTCLIENT_SUCCESS;
exit:
resp.reasonCode = rc;
if (rc < 0)
resp.reasonCode = rc;
Thread_unlock_mutex(mqttclient_mutex);
Thread_unlock_mutex(subscribe_mutex);
FUNC_EXIT_RC(resp.reasonCode);
......@@ -1813,7 +1844,7 @@ exit:
int MQTTClient_subscribeMany(MQTTClient handle, int count, char* const* topic, int* qos)
{
MQTTClients* m = handle;
MQTTResponse response = {MQTTCLIENT_SUCCESS, NULL};
MQTTResponse response = MQTTResponse_initializer;
if (m->c->MQTTVersion >= MQTTVERSION_5)
response.reasonCode = MQTTCLIENT_BAD_MQTT_VERSION;
......@@ -1843,7 +1874,7 @@ MQTTResponse MQTTClient_subscribe5(MQTTClient handle, const char* topic, int qos
int MQTTClient_subscribe(MQTTClient handle, const char* topic, int qos)
{
MQTTClients* m = handle;
MQTTResponse response = {MQTTCLIENT_SUCCESS, NULL};
MQTTResponse response = MQTTResponse_initializer;
if (m->c->MQTTVersion >= MQTTVERSION_5)
response.reasonCode = MQTTCLIENT_BAD_MQTT_VERSION;
......@@ -1860,13 +1891,14 @@ MQTTResponse MQTTClient_unsubscribeMany5(MQTTClient handle, int count, char* con
List* topics = NULL;
int i = 0;
int rc = SOCKET_ERROR;
MQTTResponse resp = {MQTTCLIENT_FAILURE, NULL};
MQTTResponse resp = MQTTResponse_initializer;
int msgid = 0;
FUNC_ENTRY;
Thread_lock_mutex(unsubscribe_mutex);
Thread_lock_mutex(mqttclient_mutex);
resp.reasonCode = MQTTCLIENT_FAILURE;
if (m == NULL || m->c == NULL)
{
rc = MQTTCLIENT_FAILURE;
......@@ -1906,6 +1938,29 @@ MQTTResponse MQTTClient_unsubscribeMany5(MQTTClient handle, int count, char* con
Thread_lock_mutex(mqttclient_mutex);
if (pack != NULL)
{
Unsuback* unsub = (Unsuback*)pack;
if (m->c->MQTTVersion == MQTTVERSION_5)
{
if (unsub->properties.count > 0)
{
resp.properties = malloc(sizeof(MQTTProperties));
*resp.properties = MQTTProperties_copy(&unsub->properties);
}
resp.reasonCodeCount = unsub->reasonCodes->count;
resp.reasonCode = *(int*)unsub->reasonCodes->first->content;
if (unsub->reasonCodes->count > 1)
{
ListElement* current = NULL;
int count = 0;
resp.reasonCodes = malloc(sizeof(enum MQTTReasonCodes) * (unsub->reasonCodes->count));
while (ListNextElement(unsub->reasonCodes, &current))
(resp.reasonCodes)[count++] = *(enum MQTTReasonCodes*)(current->content);
}
}
else
resp.reasonCode = rc;
rc = MQTTProtocol_handleUnsubacks(pack, m->c->net.socket);
m->pack = NULL;
}
......@@ -1917,7 +1972,8 @@ MQTTResponse MQTTClient_unsubscribeMany5(MQTTClient handle, int count, char* con
MQTTClient_disconnect_internal(handle, 0);
exit:
resp.reasonCode = rc;
if (rc < 0)
resp.reasonCode = rc;
Thread_unlock_mutex(mqttclient_mutex);
Thread_unlock_mutex(unsubscribe_mutex);
FUNC_EXIT_RC(resp.reasonCode);
......@@ -1960,7 +2016,7 @@ MQTTResponse MQTTClient_publish5(MQTTClient handle, const char* topicName, int p
Publish* p = NULL;
int blocked = 0;
int msgid = 0;
MQTTResponse resp = {MQTTCLIENT_SUCCESS, NULL};
MQTTResponse resp = MQTTResponse_initializer;
FUNC_ENTRY;
Thread_lock_mutex(mqttclient_mutex);
......@@ -2067,7 +2123,7 @@ int MQTTClient_publish(MQTTClient handle, const char* topicName, int payloadlen,
int qos, int retained, MQTTClient_deliveryToken* deliveryToken)
{
MQTTClients* m = handle;
MQTTResponse rc = {MQTTCLIENT_SUCCESS, NULL};
MQTTResponse rc = MQTTResponse_initializer;
if (m->c->MQTTVersion >= MQTTVERSION_5)
rc.reasonCode = MQTTCLIENT_BAD_MQTT_VERSION;
......@@ -2080,7 +2136,7 @@ int MQTTClient_publish(MQTTClient handle, const char* topicName, int payloadlen,
MQTTResponse MQTTClient_publishMessage5(MQTTClient handle, const char* topicName, MQTTClient_message* message,
MQTTClient_deliveryToken* deliveryToken)
{
MQTTResponse rc = {MQTTCLIENT_SUCCESS, NULL};
MQTTResponse rc = MQTTResponse_initializer;
MQTTProperties* props = NULL;
FUNC_ENTRY;
......@@ -2112,7 +2168,7 @@ int MQTTClient_publishMessage(MQTTClient handle, const char* topicName, MQTTClie
MQTTClient_deliveryToken* deliveryToken)
{
MQTTClients* m = handle;
MQTTResponse rc = {MQTTCLIENT_SUCCESS, NULL};
MQTTResponse rc = MQTTResponse_initializer;
if (strncmp(message->struct_id, "MQTM", 4) != 0 ||
(message->struct_version != 0 && message->struct_version != 1))
......
......@@ -858,10 +858,15 @@ DLLExport int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions* o
typedef struct MQTTResponse
{
int version;
enum MQTTReasonCodes reasonCode;
MQTTProperties* properties; /* optional */
int reasonCodeCount; /* used for subscribeMany5 and unsubscribeMany5 */
enum MQTTReasonCodes* reasonCodes; /* used for subscribeMany5 and unsubscribeMany5 */
MQTTProperties* properties; /* optional */
} MQTTResponse;
#define MQTTResponse_initializer {1, SUCCESS, 0, NULL, NULL}
DLLExport void MQTTResponse_free(MQTTResponse response);
DLLExport MQTTResponse MQTTClient_connect5(MQTTClient handle, MQTTClient_connectOptions* options,
......@@ -939,8 +944,8 @@ DLLExport MQTTResponse MQTTClient_subscribe5(MQTTClient handle, const char* topi
*/
DLLExport int MQTTClient_subscribeMany(MQTTClient handle, int count, char* const* topic, int* qos);
DLLExport MQTTResponse MQTTClient_subscribeMany5(MQTTClient handle, int count, char* const* topic, int* qos,
MQTTSubscribe_options* opts, MQTTProperties* props);
DLLExport MQTTResponse MQTTClient_subscribeMany5(MQTTClient handle, int count, char* const* topic,
int* qos, MQTTSubscribe_options* opts, MQTTProperties* props);
/**
* This function attempts to remove an existing subscription made by the
......
......@@ -385,7 +385,7 @@ int test_client_topic_aliases(struct Options options)
MQTTProperty property;
MQTTSubscribe_options subopts = MQTTSubscribe_options_initializer;
MQTTClient_message pubmsg = MQTTClient_message_initializer;
MQTTResponse response = {SUCCESS, NULL};
MQTTResponse response = MQTTResponse_initializer;
MQTTClient_deliveryToken dt;
int rc = 0;
int count = 0;
......@@ -481,7 +481,7 @@ int test_client_topic_aliases(struct Options options)
/* subscribe to a topic */
response = MQTTClient_subscribe5(c, test_topic, 2, NULL, NULL);
assert("Good rc from subscribe", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
assert("Good rc from subscribe", response.reasonCode == GRANTED_QOS_2, "rc was %d", response.reasonCode);
/* then publish to the topic */
MQTTProperties_free(&pubmsg.properties);
......@@ -639,7 +639,7 @@ int test_server_topic_aliases(struct Options options)
MQTTProperties connect_props = MQTTProperties_initializer;
MQTTProperty property;
MQTTClient_message pubmsg = MQTTClient_message_initializer;
MQTTResponse response = {SUCCESS, NULL};
MQTTResponse response = MQTTResponse_initializer;
MQTTClient_deliveryToken dt;
int rc = 0;
int count = 0;
......@@ -696,7 +696,7 @@ int test_server_topic_aliases(struct Options options)
/* subscribe to a topic */
response = MQTTClient_subscribe5(c, test_topic, 2, NULL, NULL);
assert("Good rc from subscribe", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
assert("Good rc from subscribe", response.reasonCode == GRANTED_QOS_2, "rc was %d", response.reasonCode);
messages_arrived = 0;
pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
......@@ -777,7 +777,7 @@ int test_subscription_ids(struct Options options)
MQTTProperties subs_props = MQTTProperties_initializer;
MQTTProperty property;
MQTTClient_message pubmsg = MQTTClient_message_initializer;
MQTTResponse response = {SUCCESS, NULL};
MQTTResponse response = MQTTResponse_initializer;
MQTTClient_deliveryToken dt;
int rc = 0;
int count = 0;
......@@ -832,13 +832,13 @@ int test_subscription_ids(struct Options options)
property.value.integer4 = 1;
MQTTProperties_add(&subs_props, &property);
response = MQTTClient_subscribe5(c, test_topic, 2, NULL, &subs_props);
assert("Good rc from subscribe", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
assert("Good rc from subscribe", response.reasonCode == GRANTED_QOS_2, "rc was %d", response.reasonCode);
/* now to an overlapping topic */
property.value.integer4 = 2;
subs_props.array[0].value.integer4 = 2;
response = MQTTClient_subscribe5(c, "+", 2, NULL, &subs_props);
assert("Good rc from subscribe", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
assert("Good rc from subscribe", response.reasonCode == GRANTED_QOS_2, "rc was %d", response.reasonCode);
messages_arrived = 0;
pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
......@@ -913,7 +913,7 @@ int test_flow_control(struct Options options)
MQTTProperties connect_props = MQTTProperties_initializer;
MQTTProperty property;
MQTTClient_message pubmsg = MQTTClient_message_initializer;
MQTTResponse response = {SUCCESS, NULL};
MQTTResponse response = MQTTResponse_initializer;
MQTTClient_deliveryToken dt;
int rc = 0, i = 0, count = 0;
char* test_topic = "test_flow_control";
......@@ -962,7 +962,7 @@ int test_flow_control(struct Options options)
}
response = MQTTClient_subscribe5(c, test_topic, 2, NULL, NULL);
assert("Good rc from subscribe", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
assert("Good rc from subscribe", response.reasonCode == GRANTED_QOS_2, "rc was %d", response.reasonCode);
messages_arrived = 0;
pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
......@@ -1002,6 +1002,95 @@ exit:
}
int test_error_reporting(struct Options options)
{
int subsqos = 2;
MQTTClient c;
MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer5;
MQTTProperties props = MQTTProperties_initializer;
MQTTProperty property;
MQTTResponse response = MQTTResponse_initializer;
MQTTClient_deliveryToken dt;
int rc = 0, i = 0, count = 0;
char* test_topic = "test_error_reporting";
int receive_maximum = 65535;
fprintf(xml, "<testcase classname=\"test_error_reporting\" name=\"error reporting\"");
global_start_time = start_clock();
failures = 0;
MyLog(LOGA_INFO, "Starting test - error reporting");
//MQTTClient_setTraceCallback(test_flow_control_trace_callback);
rc = MQTTClient_create(&c, options.connection, "error_reporting",
MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
if (rc != MQTTCLIENT_SUCCESS)
goto exit;
rc = MQTTClient_setCallbacks(c, NULL, NULL, test_flow_control_messageArrived, NULL);
assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
opts.MQTTVersion = options.MQTTVersion;
if (options.haconnections != NULL)
{
opts.serverURIs = options.haconnections;
opts.serverURIcount = options.hacount;
}
MyLog(LOGA_DEBUG, "Connecting");
response = MQTTClient_connect5(c, &opts, NULL, NULL);
assert("Good rc from connect", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
if (response.reasonCode != MQTTCLIENT_SUCCESS)
goto exit;
if (response.properties)
{
if (MQTTProperties_hasProperty(response.properties, RECEIVE_MAXIMUM))
receive_maximum = MQTTProperties_getNumericValue(response.properties, RECEIVE_MAXIMUM);
logProperties(response.properties);
MQTTResponse_free(response);
}
property.identifier = USER_PROPERTY;
property.value.data.data = "unsub user property";
property.value.data.len = strlen(property.value.data.data);
property.value.value.data = "unsub user property value";
property.value.value.len = strlen(property.value.value.data);
MQTTProperties_add(&props, &property);
response = MQTTClient_subscribe5(c, test_topic, 2, NULL, &props);
assert("Good rc from subscribe", response.reasonCode == GRANTED_QOS_2, "rc was %d", response.reasonCode);
assert("Properties should exist", response.properties != NULL, "props was %p", response.properties);
if (response.properties)
{
logProperties(response.properties);
MQTTResponse_free(response);
}
response = MQTTClient_unsubscribe5(c, test_topic, &props);
assert("Good rc from unsubscribe", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
assert("Properties should exist", response.properties != NULL, "props was %p", response.properties);
if (response.properties)
{
logProperties(response.properties);
MQTTResponse_free(response);
}
rc = MQTTClient_disconnect5(c, 1000, SUCCESS, NULL);
exit:
MQTTClient_setTraceCallback(NULL);
MQTTProperties_free(&props);
MQTTClient_destroy(&c);
MyLog(LOGA_INFO, "TEST3: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", tests, failures);
write_test_result();
return failures;
}
int main(int argc, char** argv)
{
int rc = 0,
......@@ -1011,6 +1100,7 @@ int main(int argc, char** argv)
test_server_topic_aliases,
test_subscription_ids,
test_flow_control,
test_error_reporting
};
xml = fopen("TEST-test1.xml", "w");
......
......@@ -901,7 +901,7 @@ Test: flow control
struct
{
char* test_topic;
char * test_topic;
int test_finished;
int messages_arrived;
int receive_maximum;
......@@ -1056,6 +1056,177 @@ exit:
return failures;
}
struct
{
char* test_topic;
int test_finished;
int messages_arrived;
} test_error_reporting_globals =
{
"error reporting topic", 0, 0
};
void test_error_reporting_onUnsubscribe(void* context, MQTTAsync_successData5* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
int i = 0;
MyLog(LOGA_INFO, "Unsuback properties:");
logProperties(&response->properties);
assert("Reason code count should be 2", response->alt.unsub.reasonCodeCount == 2,
"Reason code count was %d\n", response->alt.unsub.reasonCodeCount);
if (response->alt.unsub.reasonCodeCount == 1)
MyLog(LOGA_INFO, "reason code %d", response->reasonCode);
else if (response->alt.unsub.reasonCodeCount > 1)
{
for (i = 0; i < response->alt.unsub.reasonCodeCount; ++i)
{
MyLog(LOGA_INFO, "Unsubscribe reason code %d", response->alt.unsub.reasonCodes[i]);
}
}
test_error_reporting_globals.test_finished = 1;
}
void test_error_reporting_onSubscribe(void* context, MQTTAsync_successData5* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTProperty property;
char* topics[2] = {test_error_reporting_globals.test_topic, "+"};
int rc;
int i = 0;
MyLog(LOGA_INFO, "Suback properties:");
logProperties(&response->properties);
assert("Reason code count should be 2", response->alt.sub.reasonCodeCount == 2,
"Reason code count was %d\n", response->alt.sub.reasonCodeCount);
if (response->alt.sub.reasonCodeCount == 1)
MyLog(LOGA_INFO, "reason code %d", response->reasonCode);
else if (response->alt.sub.reasonCodeCount > 1)
{
for (i = 0; i < response->alt.sub.reasonCodeCount; ++i)
{
MyLog(LOGA_INFO, "Subscribe reason code %d", response->alt.sub.reasonCodes[i]);
}
}
opts.onSuccess5 = test_error_reporting_onUnsubscribe;
opts.context = c;
property.identifier = USER_PROPERTY;
property.value.data.data = "test user property";
property.value.data.len = strlen(property.value.data.data);
property.value.value.data = "test user property value";
property.value.value.len = strlen(property.value.value.data);
MQTTProperties_add(&opts.properties, &property);
rc = MQTTAsync_unsubscribeMany(c, 2, topics, &opts);
assert("Good rc from unsubscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
test_flow_control_globals.test_finished = 1;
MQTTProperties_free(&opts.properties);
}
void test_error_reporting_onConnect(void* context, MQTTAsync_successData5* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
char* topics[2] = {test_error_reporting_globals.test_topic, "+"};
int qoss[2] = {2, 2};
MQTTSubscribe_options subopts[2] = {MQTTSubscribe_options_initializer, MQTTSubscribe_options_initializer};
MQTTProperty property;
MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p", context);
assert("Reason code should be 0", response->reasonCode == SUCCESS,
"Reason code was %d\n", response->reasonCode);
MyLog(LOGA_INFO, "Connack properties:");
logProperties(&response->properties);
opts.onSuccess5 = test_error_reporting_onSubscribe;
opts.context = c;
property.identifier = USER_PROPERTY;
property.value.data.data = "test user property";
property.value.data.len = strlen(property.value.data.data);
property.value.value.data = "test user property value";
property.value.value.len = strlen(property.value.value.data);
MQTTProperties_add(&opts.properties, &property);
opts.subscribe_options_count = 2;
opts.subscribe_options_list = subopts;
rc = MQTTAsync_subscribeMany(c, 2, topics, qoss, &opts);
assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
test_flow_control_globals.test_finished = 1;
MQTTProperties_free(&opts.properties);
}
int test_error_reporting(struct Options options)
{
MQTTAsync c;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer5;
int rc = 0;
MyLog(LOGA_INFO, "Starting V5 test - error reporting");
fprintf(xml, "<testcase classname=\"test11\" name=\"error reporting\"");
global_start_time = start_clock();
rc = MQTTAsync_create(&c, options.connection, "error reporting",
MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit;
}
rc = MQTTAsync_setCallbacks(c, c, NULL, test_flow_control_messageArrived, NULL);
assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
opts.MQTTVersion = options.MQTTVersion;
opts.onSuccess5 = test_error_reporting_onConnect;
opts.context = c;
opts.cleanstart = 1;
MyLog(LOGA_DEBUG, "Connecting");
rc = MQTTAsync_connect(c, &opts);
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit;
while (test_error_reporting_globals.test_finished == 0)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
MQTTAsync_destroy(&c);
exit:
MyLog(LOGA_INFO, "TEST5: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", tests, failures);
write_test_result();
return failures;
}
void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message)
{
......@@ -1065,12 +1236,13 @@ void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message)
int main(int argc, char** argv)
{
int rc = 0;
int rc = -1;
int (*tests[])() = {NULL,
test_client_topic_aliases,
test_server_topic_aliases,
test_subscription_ids,
test_flow_control,
test_error_reporting,
}; /* indexed starting from 1 */
MQTTAsync_nameValue* info;
int i;
......@@ -1102,8 +1274,13 @@ int main(int argc, char** argv)
}
else
{
MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
rc = tests[options.test_no](options); /* run just the selected test */
if (options.test_no >= ARRAY_SIZE(tests))
MyLog(LOGA_INFO, "No test number %d", options.test_no);
else
{
MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
rc = tests[options.test_no](options); /* run just the selected test */
}
}
}
......
......@@ -404,7 +404,7 @@ int test1(struct Options options)
MQTTProperties willProps = MQTTProperties_initializer;
MQTTProperty property;
MQTTSubscribe_options subopts = MQTTSubscribe_options_initializer;
MQTTResponse response = {SUCCESS, NULL};
MQTTResponse response = MQTTResponse_initializer;
int rc = 0;
char* test_topic = "C client test1";
......@@ -471,7 +471,7 @@ int test1(struct Options options)
property.value.integer4 = 33;
MQTTProperties_add(&props, &property);
response = MQTTClient_subscribe5(c, test_topic, subsqos, &subopts, &props);
assert("Good rc from subscribe", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
assert("Good rc from subscribe", response.reasonCode == subsqos, "rc was %d", response.reasonCode);
MQTTProperties_free(&props);
if (response.properties)
......@@ -496,6 +496,7 @@ int test1(struct Options options)
response = MQTTClient_unsubscribe5(c, test_topic, &props);
assert("Unsubscribe successful", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
MQTTResponse_free(response);
MQTTProperties_free(&props);
property.identifier = SESSION_EXPIRY_INTERVAL;
......@@ -571,7 +572,7 @@ void test2_sendAndReceive(MQTTClient* c, int qos, char* test_topic)
MQTTClient_deliveryToken dt;
int i = 0;
int iterations = 50;
MQTTResponse response = {SUCCESS, NULL};
MQTTResponse response = MQTTResponse_initializer;
int wait_seconds = 0;
test2_deliveryCompleted = 0;
......@@ -642,7 +643,7 @@ int test2(struct Options options)
MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer5;
MQTTProperties props = MQTTProperties_initializer;
MQTTProperties willProps = MQTTProperties_initializer;
MQTTResponse response = {SUCCESS, NULL};
MQTTResponse response = MQTTResponse_initializer;
MQTTSubscribe_options subopts = MQTTSubscribe_options_initializer;
int rc = 0;
char* test_topic = "C client test2";
......@@ -678,7 +679,7 @@ int test2(struct Options options)
goto exit;
response = MQTTClient_subscribe5(c, test_topic, subsqos, &subopts, &props);
assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
assert("Good rc from subscribe", response.reasonCode == subsqos, "rc was %d", rc);
test2_sendAndReceive(c, 0, test_topic);
test2_sendAndReceive(c, 1, test_topic);
......@@ -805,7 +806,7 @@ int test4_run(int qos)
int count = 3;
MQTTProperty property;
MQTTProperties props = MQTTProperties_initializer;
MQTTResponse response = {SUCCESS, NULL};
MQTTResponse response = MQTTResponse_initializer;
int i, rc;
failures = 0;
......@@ -836,7 +837,7 @@ int test4_run(int qos)
/* subscribe so we can get messages back */
response = MQTTClient_subscribe5(c, topic, subsqos, NULL, NULL);
assert("Good rc from subscribe", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
assert("Good rc from subscribe", response.reasonCode == subsqos, "rc was %d", response.reasonCode);
/* send messages so that we can receive the same ones */
for (i = 0; i < count; ++i)
......@@ -967,7 +968,7 @@ int test5(struct Options options)
int count = 5;
MQTTProperty property;
MQTTProperties props = MQTTProperties_initializer;
MQTTResponse response = {SUCCESS, NULL};
MQTTResponse response = MQTTResponse_initializer;
int i, rc;
fprintf(xml, "<testcase classname=\"test1\" name=\"disconnect with quiesce timeout should allow exchanges to complete\"");
......@@ -1001,7 +1002,7 @@ int test5(struct Options options)
}
response = MQTTClient_subscribe5(c, topic, subsqos, NULL, NULL);
assert("Good rc from subscribe", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
assert("Good rc from subscribe", response.reasonCode == subsqos, "rc was %d", response.reasonCode);
for (i = 0; i < count; ++i)
{
......@@ -1072,7 +1073,7 @@ int test6(struct Options options)
MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer5;
MQTTClient_willOptions wopts = MQTTClient_willOptions_initializer;
MQTTClient_connectOptions opts2 = MQTTClient_connectOptions_initializer5;
MQTTResponse response = {SUCCESS, NULL};
MQTTResponse response = MQTTResponse_initializer;
int rc, count;
char* mqttsas_topic = "MQTTSAS topic";
......@@ -1130,7 +1131,7 @@ int test6(struct Options options)
assert("Good rc from connect", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d\n", response.reasonCode);
response = MQTTClient_subscribe5(test6_c2, test6_will_topic, 2, NULL, NULL);
assert("Good rc from subscribe", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d\n", response.reasonCode);
assert("Good rc from subscribe", response.reasonCode == GRANTED_QOS_2, "rc was %d\n", response.reasonCode);
/* now send the command which will break the connection and cause the will message to be sent */
response = MQTTClient_publish5(test6_c1, mqttsas_topic, (int)strlen("TERMINATE"), "TERMINATE", 0, 0, NULL, NULL);
......@@ -1183,7 +1184,7 @@ int test6a(struct Options options)
MQTTClient_willOptions wopts = MQTTClient_willOptions_initializer;
MQTTClient_connectOptions opts2 = MQTTClient_connectOptions_initializer5;
int rc, count;
MQTTResponse response = {SUCCESS, NULL};
MQTTResponse response = MQTTResponse_initializer;
char* mqttsas_topic = "MQTTSAS topic";
failures = 0;
......@@ -1242,7 +1243,7 @@ int test6a(struct Options options)
assert("Good rc from connect", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d\n", response.reasonCode);
response = MQTTClient_subscribe5(test6_c2, test6_will_topic, 2, NULL, NULL);
assert("Good rc from subscribe", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d\n", response.reasonCode);
assert("Good rc from subscribe", response.reasonCode == GRANTED_QOS_2, "rc was %d\n", response.reasonCode);
/* now send the command which will break the connection and cause the will message to be sent */
response = MQTTClient_publish5(test6_c1, mqttsas_topic, (int)strlen("TERMINATE"), "TERMINATE", 0, 0, NULL, NULL);
......
......@@ -316,6 +316,8 @@ void test1_onUnsubscribe(void* context, MQTTAsync_successData5* response)
MyLog(LOGA_DEBUG, "In onUnsubscribe onSuccess callback %p", c);
MyLog(LOGA_INFO, "Unsuback properties:");
logProperties(&response->properties);
assert("A property should exist", response->properties.count > 0,
"Property count was %d\n", response->properties);
opts.onSuccess = test1_onDisconnect;
opts.context = c;
......@@ -407,7 +409,9 @@ void test1_onSubscribe(void* context, MQTTAsync_successData5* response)
MQTTProperties props = MQTTProperties_initializer;
MQTTAsync_callOptions opts = MQTTAsync_callOptions_initializer;
MyLog(LOGA_DEBUG, "In subscribe onSuccess callback %p granted qos %d", c, response->alt.qos);
MyLog(LOGA_DEBUG, "In subscribe onSuccess callback %p granted qos %d", c, response->reasonCode);
assert("Subscribe response should be 2", response->reasonCode == GRANTED_QOS_2,
"response was %d", response->reasonCode);
MyLog(LOGA_INFO, "Suback properties:");
logProperties(&response->properties);
......@@ -918,11 +922,23 @@ int test4_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync
else
{
MQTTAsync_callOptions opts = MQTTAsync_callOptions_initializer;
MQTTProperties props = MQTTProperties_initializer;
MQTTProperty property;
opts.onSuccess5 = test1_onUnsubscribe;
opts.context = c;
property.identifier = USER_PROPERTY;
property.value.data.data = "test user property";
property.value.data.len = strlen(property.value.data.data);
property.value.value.data = "test user property value";
property.value.value.len = strlen(property.value.value.data);
MQTTProperties_add(&props, &property);
opts.properties = props;
rc = MQTTAsync_unsubscribe(c, test_topic, &opts);
assert("Unsubscribe successful", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
MQTTProperties_free(&props);
}
MQTTAsync_freeMessage(&message);
......@@ -1325,7 +1341,7 @@ void test7_onSubscribe(void* context, MQTTAsync_successData5* response)
{
MQTTAsync c = (MQTTAsync)context;
MyLog(LOGA_DEBUG, "In subscribe onSuccess callback %p granted qos %d", c, response->alt.qos);
MyLog(LOGA_DEBUG, "In subscribe onSuccess callback %p granted qos %d", c, response->reasonCode);
test7_subscribed = 1;
}
......@@ -1595,7 +1611,7 @@ void test8_onSubscribe(void* context, MQTTAsync_successData5* response)
{
MQTTAsync c = (MQTTAsync)context;
MyLog(LOGA_DEBUG, "In subscribe onSuccess callback %p granted qos %d", c, response->alt.qos);
MyLog(LOGA_DEBUG, "In subscribe onSuccess callback %p granted qos %d", c, response->reasonCode);
test8_subscribed = 1;
}
......@@ -1810,7 +1826,7 @@ void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message)
int main(int argc, char** argv)
{
int rc = 0;
int rc = -1;
int (*tests[])() = {NULL, test1, test2, test3, test4, test5, test6, test7, test8}; /* indexed starting from 1 */
MQTTAsync_nameValue* info;
int i;
......@@ -1842,8 +1858,13 @@ int main(int argc, char** argv)
}
else
{
MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
rc = tests[options.test_no](options); /* run just the selected test */
if (options.test_no >= ARRAY_SIZE(tests))
MyLog(LOGA_INFO, "No test number %d", options.test_no);
else
{
MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
rc = tests[options.test_no](options); /* run just the selected test */
}
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment