Commit d0f87d58 authored by Ian Craggs's avatar Ian Craggs

MQTT V5 async tests for shared subs, subs options and request response #470 471 487

parent 7dc676e5
...@@ -3073,7 +3073,8 @@ int MQTTAsync_subscribeMany(MQTTAsync handle, int count, char* const* topic, int ...@@ -3073,7 +3073,8 @@ int MQTTAsync_subscribeMany(MQTTAsync handle, int count, char* const* topic, int
rc = MQTTASYNC_NO_MORE_MSGIDS; rc = MQTTASYNC_NO_MORE_MSGIDS;
goto exit; goto exit;
} }
if (m->c->MQTTVersion >= MQTTVERSION_5 && count > 1 && count != response->subscribe_options_count) if (m->c->MQTTVersion >= MQTTVERSION_5 && count > 1 && (count != response->subscribe_options_count
&& response->subscribe_options_count != 0))
{ {
rc = MQTTASYNC_BAD_MQTT_OPTIONS; rc = MQTTASYNC_BAD_MQTT_OPTIONS;
goto exit; goto exit;
...@@ -3099,8 +3100,17 @@ int MQTTAsync_subscribeMany(MQTTAsync handle, int count, char* const* topic, int ...@@ -3099,8 +3100,17 @@ int MQTTAsync_subscribeMany(MQTTAsync handle, int count, char* const* topic, int
if (count > 1) if (count > 1)
{ {
sub->command.details.sub.optlist = malloc(sizeof(MQTTSubscribe_options) * count); sub->command.details.sub.optlist = malloc(sizeof(MQTTSubscribe_options) * count);
for (i = 0; i < count; ++i) if (response->subscribe_options_count == 0)
sub->command.details.sub.optlist[i] = response->subscribe_options_list[i]; {
MQTTSubscribe_options initialized = MQTTSubscribe_options_initializer;
for (i = 0; i < count; ++i)
sub->command.details.sub.optlist[i] = initialized;
}
else
{
for (i = 0; i < count; ++i)
sub->command.details.sub.optlist[i] = response->subscribe_options_list[i];
}
} }
} }
} }
......
...@@ -672,6 +672,21 @@ ADD_TEST( ...@@ -672,6 +672,21 @@ ADD_TEST(
COMMAND "test11" "--test_no" "6" "--connection" ${MQTT_TEST_BROKER} "--proxy_connection" ${MQTT_TEST_PROXY} COMMAND "test11" "--test_no" "6" "--connection" ${MQTT_TEST_BROKER} "--proxy_connection" ${MQTT_TEST_PROXY}
) )
ADD_TEST(
NAME test11-7-request_response
COMMAND "test11" "--test_no" "7" "--connection" ${MQTT_TEST_BROKER} "--proxy_connection" ${MQTT_TEST_PROXY}
)
ADD_TEST(
NAME test11-8-subscribe_options
COMMAND "test11" "--test_no" "8" "--connection" ${MQTT_TEST_BROKER} "--proxy_connection" ${MQTT_TEST_PROXY}
)
ADD_TEST(
NAME test11-9-shared_subscriptions
COMMAND "test11" "--test_no" "9" "--connection" ${MQTT_TEST_BROKER} "--proxy_connection" ${MQTT_TEST_PROXY}
)
SET_TESTS_PROPERTIES( SET_TESTS_PROPERTIES(
test11-1-client_topic_aliases test11-1-client_topic_aliases
test11-2-server_topic_aliases test11-2-server_topic_aliases
...@@ -679,6 +694,9 @@ SET_TESTS_PROPERTIES( ...@@ -679,6 +694,9 @@ SET_TESTS_PROPERTIES(
test11-4-flow_control test11-4-flow_control
test11-5-error_handling test11-5-error_handling
test11-6-qos_1_2_errors test11-6-qos_1_2_errors
test11-7-request_response
test11-8-subscribe_options
test11-9-shared_subscriptions
PROPERTIES TIMEOUT 540 PROPERTIES TIMEOUT 540
) )
......
...@@ -1628,7 +1628,7 @@ struct ...@@ -1628,7 +1628,7 @@ struct
} test_shared_subscriptions_globals = } test_shared_subscriptions_globals =
{ {
"$share/share_test/#", "$share/share_test/#",
"#", "a",
0, 0,
}; };
...@@ -1756,7 +1756,7 @@ int test_shared_subscriptions(struct Options options) ...@@ -1756,7 +1756,7 @@ int test_shared_subscriptions(struct Options options)
test_shared_subscriptions_globals.messages_arrived = 0; test_shared_subscriptions_globals.messages_arrived = 0;
for (i = 0; i < 10; ++i) for (i = 0; i < 10; ++i)
{ {
response = MQTTClient_publishMessage5(c, test_subscribe_options_globals.topic, &pubmsg, &dt); response = MQTTClient_publishMessage5(c, test_shared_subscriptions_globals.topic, &pubmsg, &dt);
assert("Good rc from publish", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode); assert("Good rc from publish", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
/* should get the request */ /* should get the request */
......
...@@ -1431,6 +1431,638 @@ exit: ...@@ -1431,6 +1431,638 @@ exit:
return failures; return failures;
} }
struct
{
int test_finished;
char* response_topic;
char* request_topic;
char* correlation_id;
int messages_arrived;
} test_request_response_globals =
{
0,
"test_7_request",
"test_7_response",
"test_7_correlation_id",
0
};
int test_request_response_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
{
MQTTAsync c = (MQTTAsync)context;
int rc;
test_request_response_globals.messages_arrived++;
assert("Message structure version should be 1", message->struct_version == 1,
"message->struct_version was %d", message->struct_version);
MyLog(LOGA_DEBUG, "Callback: message received on topic %s is %.*s.",
topicName, message->payloadlen, (char*)(message->payload));
if (message->struct_version == 1)
logProperties(&message->properties);
if (test_request_response_globals.messages_arrived == 1)
{
/* this is the request */
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTProperty *corr_prop = NULL, *response_topic_prop = NULL;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MQTTProperty property;
char myTopicName[100];
assert("Topic should be request",
strcmp(test_request_response_globals.request_topic, topicName) == 0,
"topic was %s\n", topicName);
if (MQTTProperties_hasProperty(&message->properties, RESPONSE_TOPIC))
response_topic_prop = MQTTProperties_getProperty(&message->properties, RESPONSE_TOPIC);
assert("Topic should be response",
strncmp(test_request_response_globals.response_topic, response_topic_prop->value.data.data,
response_topic_prop->value.data.len) == 0,
"topic was %.4s\n", response_topic_prop->value.data.data);
if (MQTTProperties_hasProperty(&message->properties, CORRELATION_DATA))
corr_prop = MQTTProperties_getProperty(&message->properties, CORRELATION_DATA);
assert("Correlation data should be",
strncmp(test_request_response_globals.correlation_id, corr_prop->value.data.data,
corr_prop->value.data.len) == 0,
"Correlation data was %.4s\n", corr_prop->value.data.data);
/* send response */
pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
pubmsg.payloadlen = 11;
pubmsg.retained = 0;
pubmsg.qos = 1;
property.identifier = CORRELATION_DATA;
property.value.data.data = test_request_response_globals.correlation_id;
property.value.data.len = strlen(property.value.data.data);
MQTTProperties_add(&pubmsg.properties, &property);
memcpy(myTopicName, response_topic_prop->value.data.data, response_topic_prop->value.data.len);
myTopicName[response_topic_prop->value.data.len] = '\0';
rc = MQTTAsync_sendMessage(c, myTopicName, &pubmsg, &opts);
if (rc != SUCCESS)
test_request_response_globals.test_finished = 1;
assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
MQTTProperties_free(&pubmsg.properties);
}
else if (test_request_response_globals.messages_arrived == 2)
{
MQTTProperty *corr_prop = NULL;
/* this should be response */
assert("Topic should be response",
strcmp(test_request_response_globals.response_topic, topicName) == 0,
"topic was %s\n", topicName);
if (MQTTProperties_hasProperty(&message->properties, CORRELATION_DATA))
corr_prop = MQTTProperties_getProperty(&message->properties, CORRELATION_DATA);
assert("Correlation data should be",
strncmp(test_request_response_globals.correlation_id, corr_prop->value.data.data,
corr_prop->value.data.len) == 0,
"Correlation data was %.4s\n", corr_prop->value.data.data);
test_request_response_globals.test_finished = 1;
}
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
void test_request_response_onSubscribe(void* context, MQTTAsync_successData5* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MQTTProperty property;
char* topics[2] = {test_error_reporting_globals.test_topic, "+"};
int rc;
int i = 0;
MyLog(LOGA_INFO, "Suback properties:");
logProperties(&response->properties);
assert("Reason code count should be 2", response->alt.sub.reasonCodeCount == 2,
"Reason code count was %d\n", response->alt.sub.reasonCodeCount);
if (response->alt.sub.reasonCodeCount == 1)
MyLog(LOGA_INFO, "reason code %d", response->reasonCode);
else if (response->alt.sub.reasonCodeCount > 1)
{
for (i = 0; i < response->alt.sub.reasonCodeCount; ++i)
{
MyLog(LOGA_INFO, "Subscribe reason code %d", response->alt.sub.reasonCodes[i]);
assert("Reason code should be 2", response->alt.sub.reasonCodes[i] == GRANTED_QOS_2,
"Reason code was %d\n", response->alt.sub.reasonCodes[i]);
}
}
pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
pubmsg.payloadlen = 11;
pubmsg.retained = 0;
pubmsg.qos = 1;
property.identifier = RESPONSE_TOPIC;
property.value.data.data = test_request_response_globals.response_topic;
property.value.data.len = strlen(property.value.data.data);
MQTTProperties_add(&pubmsg.properties, &property);
property.identifier = CORRELATION_DATA;
property.value.data.data = test_request_response_globals.correlation_id;
property.value.data.len = strlen(property.value.data.data);
MQTTProperties_add(&pubmsg.properties, &property);
rc = MQTTAsync_sendMessage(c, test_request_response_globals.request_topic, &pubmsg, &opts);
if (rc != SUCCESS)
test_request_response_globals.test_finished = 1;
assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
MQTTProperties_free(&pubmsg.properties);
}
void test_request_response_onConnect(void* context, MQTTAsync_successData5* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
char* topics[2] = {test_request_response_globals.request_topic,
test_request_response_globals.response_topic};
int qos[2] = {2, 2};
MyLog(LOGA_DEBUG, "In request response connect onSuccess callback, context %p", context);
assert("Reason code should be 0", response->reasonCode == SUCCESS,
"Reason code was %d\n", response->reasonCode);
MyLog(LOGA_INFO, "Connack properties:");
logProperties(&response->properties);
opts.onSuccess5 = test_request_response_onSubscribe;
opts.context = c;
rc = MQTTAsync_subscribeMany(c, 2, topics, qos, &opts);
if (rc != SUCCESS)
test_request_response_globals.test_finished = 1;
assert("Good rc from subscribeMany", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
}
int test_request_response(struct Options options)
{
MQTTAsync c;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer5;
int rc = 0;
MyLog(LOGA_INFO, "Starting V5 test - request response");
fprintf(xml, "<testcase classname=\"test11\" name=\"request response\"");
global_start_time = start_clock();
rc = MQTTAsync_create(&c, options.connection, "request response",
MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit;
}
rc = MQTTAsync_setCallbacks(c, c, NULL, test_request_response_messageArrived, NULL);
assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
opts.MQTTVersion = options.MQTTVersion;
opts.onSuccess5 = test_request_response_onConnect;
opts.context = c;
opts.cleanstart = 1;
MyLog(LOGA_DEBUG, "Connecting");
rc = MQTTAsync_connect(c, &opts);
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit;
while (test_request_response_globals.test_finished == 0)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
MQTTAsync_destroy(&c);
exit:
MyLog(LOGA_INFO, "TEST7: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", tests, failures);
write_test_result();
return failures;
}
struct
{
int test_finished;
char* topic1;
char* topic2;
int messages_arrived;
} test_subscribe_options_globals =
{
0,
"subscribe options topic",
"+",
0
};
int test_subscribe_options_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
{
MQTTAsync c = (MQTTAsync)context;
int rc;
test_subscribe_options_globals.messages_arrived++;
assert("Message structure version should be 1", message->struct_version == 1,
"message->struct_version was %d", message->struct_version);
MyLog(LOGA_DEBUG, "Callback: message received on topic %s is %.*s.",
topicName, message->payloadlen, (char*)(message->payload));
if (message->struct_version == 1)
logProperties(&message->properties);
if (test_subscribe_options_globals.messages_arrived == 1)
{
int subsidcount, subsid;
subsidcount = MQTTProperties_propertyCount(&message->properties, SUBSCRIPTION_IDENTIFIER);
assert("Subsidcount is i", subsidcount == 1, "subsidcount is not correct %d\n", subsidcount);
subsid = MQTTProperties_getNumericValueAt(&message->properties, SUBSCRIPTION_IDENTIFIER, 0);
assert("Subsid is 2", subsid == 2, "subsid is not correct %d\n", subsid);
test_subscribe_options_globals.test_finished = 1;
}
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
void test_subscribe_options_onSubscribe(void* context, MQTTAsync_successData5* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
static int called = 0;
MyLog(LOGA_DEBUG, "In subscribe options connect onSuccess callback, context %p", context);
called++;
assert("Reason code should be 0", response->reasonCode == GRANTED_QOS_2,
"Reason code was %d\n", response->reasonCode);
if (response->properties.count > 0)
{
MyLog(LOGA_INFO, "Suback properties:");
logProperties(&response->properties);
}
if (called == 1)
{
MQTTProperty property;
property.identifier = SUBSCRIPTION_IDENTIFIER;
property.value.integer4 = 2;
MQTTProperties_add(&opts.properties, &property);
opts.onSuccess5 = test_subscribe_options_onSubscribe;
opts.context = c;
opts.subscribe_options.retainHandling = 2;
opts.subscribe_options.retainAsPublished = 1;
rc = MQTTAsync_subscribe(c, test_subscribe_options_globals.topic2, 2, &opts);
if (rc != SUCCESS)
test_subscribe_options_globals.test_finished = 1;
assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
MQTTProperties_free(&opts.properties);
}
else if (called == 2)
{
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
pubmsg.payloadlen = 11;
pubmsg.retained = 0;
pubmsg.qos = 2;
rc = MQTTAsync_sendMessage(c, test_subscribe_options_globals.topic1, &pubmsg, &opts);
assert("Good rc from send", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
test_subscribe_options_globals.test_finished = 1;
}
}
void test_subscribe_options_onConnect(void* context, MQTTAsync_successData5* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTProperty property;
int rc;
MyLog(LOGA_DEBUG, "In subscribe options connect onSuccess callback, context %p", context);
assert("Reason code should be 0", response->reasonCode == SUCCESS,
"Reason code was %d\n", response->reasonCode);
MyLog(LOGA_INFO, "Connack properties:");
logProperties(&response->properties);
property.identifier = SUBSCRIPTION_IDENTIFIER;
property.value.integer4 = 1;
MQTTProperties_add(&opts.properties, &property);
opts.subscribe_options.noLocal = 1;
opts.onSuccess5 = test_subscribe_options_onSubscribe;
opts.context = c;
rc = MQTTAsync_subscribe(c, test_subscribe_options_globals.topic1, 2, &opts);
if (rc != SUCCESS)
test_request_response_globals.test_finished = 1;
assert("Good rc from subscribeMany", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
MQTTProperties_free(&opts.properties);
}
int test_subscribe_options(struct Options options)
{
MQTTAsync c;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer5;
int rc = 0;
MyLog(LOGA_INFO, "Starting V5 test - subscribe options");
fprintf(xml, "<testcase classname=\"test11\" name=\"subscribe options\"");
global_start_time = start_clock();
rc = MQTTAsync_create(&c, options.connection, "subscribe options",
MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit;
}
rc = MQTTAsync_setCallbacks(c, c, NULL, test_subscribe_options_messageArrived, NULL);
assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
opts.MQTTVersion = options.MQTTVersion;
opts.onSuccess5 = test_subscribe_options_onConnect;
opts.context = c;
opts.cleanstart = 1;
MyLog(LOGA_DEBUG, "Connecting");
rc = MQTTAsync_connect(c, &opts);
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit;
while (test_subscribe_options_globals.test_finished == 0)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
MQTTAsync_destroy(&c);
exit:
MyLog(LOGA_INFO, "TEST8: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", tests, failures);
write_test_result();
return failures;
}
struct
{
char* shared_topic;
char* topic;
int messages_arrived;
int test_finished;
int message_count;
} test_shared_subscriptions_globals =
{
"$share/share_test/any",
"any",
0,
0,
10,
};
int test_shared_subscriptions_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
{
MQTTAsync c = (MQTTAsync)context;
int rc;
test_shared_subscriptions_globals.messages_arrived++;
assert("Message structure version should be 1", message->struct_version == 1,
"message->struct_version was %d", message->struct_version);
MyLog(LOGA_DEBUG, "Callback: message received on topic %s is %.*s.",
topicName, message->payloadlen, (char*)(message->payload));
if (message->struct_version == 1)
logProperties(&message->properties);
if (test_shared_subscriptions_globals.message_count == test_shared_subscriptions_globals.messages_arrived)
test_shared_subscriptions_globals.test_finished = 1;
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
void test_shared_subscriptions_onSubscribe(void* context, MQTTAsync_successData5* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
static int called = 0;
called++;
MyLog(LOGA_DEBUG, "In subscribe options connect onSuccess callback, context %p, called %d", context, called);
assert("Reason code should be 0", response->reasonCode == GRANTED_QOS_2,
"Reason code was %d\n", response->reasonCode);
if (response->properties.count > 0)
{
MyLog(LOGA_INFO, "Suback properties:");
logProperties(&response->properties);
}
if (called == 2) /* both clients now connected and subscribed */
{
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
int i = 0;
char buf[100];
/* for each message we publish, only one of the subscribers should get it, not both */
sprintf(buf, "shared subscriptions sequence number %d", i);
pubmsg.payload = buf;
pubmsg.payloadlen = strlen(buf);
pubmsg.retained = 0;
pubmsg.qos = 2;
for (i = 0; i < test_shared_subscriptions_globals.message_count; ++i)
{
rc = MQTTAsync_sendMessage(c, test_shared_subscriptions_globals.topic, &pubmsg, &opts);
assert("Good rc from send", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
{
test_subscribe_options_globals.test_finished = 1;
break;
}
}
}
}
void test_shared_subscriptions_onConnectd(void* context, MQTTAsync_successData5* response)
{
MQTTAsync d = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTProperty property;
int rc;
MyLog(LOGA_DEBUG, "In shared subscriptions connect d onSuccess callback, context %p", context);
assert("Reason code should be 0", response->reasonCode == SUCCESS,
"Reason code was %d\n", response->reasonCode);
opts.onSuccess5 = test_shared_subscriptions_onSubscribe;
opts.context = d;
rc = MQTTAsync_subscribe(d, test_shared_subscriptions_globals.shared_topic, 2, &opts);
assert("Good rc from subscribe", rc == SUCCESS, "rc was %d", rc);
if (rc != SUCCESS)
test_shared_subscriptions_globals.test_finished = 1;
}
void test_shared_subscriptions_onConnectc(void* context, MQTTAsync_successData5* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTProperty property;
int rc;
MyLog(LOGA_DEBUG, "In shared subscriptions connect c onSuccess callback, context %p", context);
assert("Reason code should be 0", response->reasonCode == SUCCESS,
"Reason code was %d\n", response->reasonCode);
opts.onSuccess5 = test_shared_subscriptions_onSubscribe;
opts.context = c;
rc = MQTTAsync_subscribe(c, test_shared_subscriptions_globals.shared_topic, 2, &opts);
assert("Good rc from subscribe", rc == SUCCESS, "rc was %d", rc);
if (rc != SUCCESS)
test_shared_subscriptions_globals.test_finished = 1;
}
int test_shared_subscriptions(struct Options options)
{
MQTTAsync c, d;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer5;
int rc = 0, count = 0;
MyLog(LOGA_INFO, "Starting V5 test - shared subscriptions");
fprintf(xml, "<testcase classname=\"test11\" name=\"shared subscriptions\"");
global_start_time = start_clock();
rc = MQTTAsync_create(&c, options.connection, "shared subscriptions c",
MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit;
}
rc = MQTTAsync_create(&d, options.connection, "shared subscriptions d",
MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit;
}
rc = MQTTAsync_setCallbacks(c, c, NULL, test_shared_subscriptions_messageArrived, NULL);
assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
rc = MQTTAsync_setCallbacks(d, d, NULL, test_shared_subscriptions_messageArrived, NULL);
assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
opts.MQTTVersion = options.MQTTVersion;
opts.onSuccess5 = test_shared_subscriptions_onConnectc;
opts.context = c;
opts.cleanstart = 1;
MyLog(LOGA_DEBUG, "Connecting");
rc = MQTTAsync_connect(c, &opts);
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit;
opts.onSuccess5 = test_shared_subscriptions_onConnectd;
opts.context = d;
MyLog(LOGA_DEBUG, "Connecting");
rc = MQTTAsync_connect(d, &opts);
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit;
while (test_shared_subscriptions_globals.test_finished == 0 && ++count < 1000)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
/* sleep a bit more to see if any other messages arrive */
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
assert("Correct number of messages arrived",
test_shared_subscriptions_globals.message_count == test_shared_subscriptions_globals.messages_arrived,
"Actual number of messages received %d\n", test_shared_subscriptions_globals.messages_arrived);
MQTTAsync_destroy(&c);
MQTTAsync_destroy(&d);
exit:
MyLog(LOGA_INFO, "TEST9: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", tests, failures);
write_test_result();
return failures;
}
void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message) void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message)
{ {
...@@ -1448,6 +2080,9 @@ int main(int argc, char** argv) ...@@ -1448,6 +2080,9 @@ int main(int argc, char** argv)
test_flow_control, test_flow_control,
test_error_reporting, test_error_reporting,
test_qos_1_2_errors, test_qos_1_2_errors,
test_request_response,
test_subscribe_options,
test_shared_subscriptions
}; /* indexed starting from 1 */ }; /* indexed starting from 1 */
MQTTAsync_nameValue* info; MQTTAsync_nameValue* info;
int i; int i;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment