Commit eb738d62 authored by Ian Craggs's avatar Ian Craggs

Check getPendingTokens == 0

parent d7bf2266
...@@ -509,6 +509,17 @@ int test1(struct Options options) ...@@ -509,6 +509,17 @@ int test1(struct Options options)
while (test1_messages_received < 3 && ++count < 10000) while (test1_messages_received < 3 && ++count < 10000)
MySleep(100); MySleep(100);
rc = MQTTAsync_getPendingTokens(c, &tokens);
assert("Good rc from getPendingTokens", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
i = 0;
if (tokens)
{
while (tokens[i] != -1)
++i;
MQTTAsync_free(tokens);
}
assert("Number of getPendingTokens should be 0", i == 0, "i was %d ", i);
rc = MQTTAsync_disconnect(c, NULL); rc = MQTTAsync_disconnect(c, NULL);
assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc); assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
...@@ -776,6 +787,17 @@ int test2(struct Options options) ...@@ -776,6 +787,17 @@ int test2(struct Options options)
while (test2_messages_received < 3 && ++count < 10000) while (test2_messages_received < 3 && ++count < 10000)
MySleep(100); MySleep(100);
rc = MQTTAsync_getPendingTokens(c, &tokens);
assert("Good rc from getPendingTokens", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
i = 0;
if (tokens)
{
while (tokens[i] != -1)
++i;
MQTTAsync_free(tokens);
}
assert("Number of getPendingTokens should be 0", i == 0, "i was %d ", i);
rc = MQTTAsync_disconnect(c, NULL); rc = MQTTAsync_disconnect(c, NULL);
assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc); assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
...@@ -791,6 +813,557 @@ exit: ...@@ -791,6 +813,557 @@ exit:
return failures; return failures;
} }
/*********************************************************************
test3: offline buffering - sending messages while disconnected
1. call connect
2. use proxy to disconnect the client
3. while the client is disconnected, send more messages
4. when the client auto reconnects, check that those messages are sent
*********************************************************************/
int test3_will_message_received = 0;
int test3_messages_received = 0;
int test3_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
{
MQTTAsync c = (MQTTAsync)context;
static int message_count = 0;
int rc;
MyLog(LOGA_DEBUG, "Message received on topic %s, \"%.*s\"", topicName, message->payloadlen, message->payload);
if (memcmp(message->payload, "will message", message->payloadlen) == 0)
test3_will_message_received = 1;
else
test3_messages_received++;
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
int test3Finished = 0;
int test3OnFailureCalled = 0;
void test3cOnFailure(void* context, MQTTAsync_failureData* response)
{
MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
test3OnFailureCalled++;
test3Finished = 1;
}
void test3dOnFailure(void* context, MQTTAsync_failureData* response)
{
MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
test3OnFailureCalled++;
test3Finished = 1;
}
void test3cOnConnect(void* context, MQTTAsync_successData* response)
{
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MyLog(LOGA_DEBUG, "In connect onSuccess callback for client d, context %p\n", context);
MQTTAsync c = (MQTTAsync)context;
int rc;
/* send a message to the proxy to break the connection */
pubmsg.payload = "TERMINATE";
pubmsg.payloadlen = strlen(pubmsg.payload);
pubmsg.qos = 0;
pubmsg.retained = 0;
rc = MQTTAsync_sendMessage(c, "MQTTSAS topic", &pubmsg, NULL);
assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
}
int test3dReady = 0;
char willTopic[100];
char test_topic[50];
void test3donSubscribe(void* context, MQTTAsync_successData* response)
{
MQTTAsync c = (MQTTAsync)context;
MyLog(LOGA_DEBUG, "In subscribe onSuccess callback for client d, %p granted qos %d", c, response->alt.qos);
test3dReady = 1;
}
void test3dOnConnect(void* context, MQTTAsync_successData* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
int qoss[2] = {2, 2};
char* topics[2] = {willTopic, test_topic};
MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
opts.onSuccess = test3donSubscribe;
opts.context = c;
rc = MQTTAsync_subscribeMany(c, 2, topics, qoss, &opts);
assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
test3Finished = 1;
}
int test3c_connected = 0;
void test3cConnected(void* context, char* cause)
{
MQTTAsync c = (MQTTAsync)context;
MyLog(LOGA_DEBUG, "In connected callback for client c, context %p\n", context);
test3c_connected = 1;
}
int test3(struct Options options)
{
char* testname = "test3";
int subsqos = 2;
MQTTAsync c, d;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
MQTTAsync_createOptions createOptions = MQTTAsync_createOptions_initializer;
int rc = 0;
int count = 0;
char clientidc[50];
char clientidd[50];
int i = 0;
MQTTAsync_token *tokens;
sprintf(willTopic, "paho-test9-3-%s", unique);
sprintf(clientidc, "paho-test9-3-c-%s", unique);
sprintf(clientidd, "paho-test9-3-d-%s", unique);
sprintf(test_topic, "paho-test9-3-test topic %s", unique);
test3Finished = 0;
failures = 0;
MyLog(LOGA_INFO, "Starting Offline buffering 3 - messages while disconnected");
fprintf(xml, "<testcase classname=\"test3\" name=\"%s\"", testname);
global_start_time = start_clock();
createOptions.sendWhileDisconnected = 1;
rc = MQTTAsync_createWithOptions(&c, options.proxy_connection, clientidc, MQTTCLIENT_PERSISTENCE_DEFAULT,
NULL, &createOptions);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit;
}
rc = MQTTAsync_create(&d, options.connection, clientidd, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit;
}
opts.keepAliveInterval = 20;
opts.cleansession = 1;
//opts.username = "testuser";
//opts.password = "testpassword";
rc = MQTTAsync_setCallbacks(d, d, NULL, test3_messageArrived, NULL);
assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
opts.will = NULL; /* don't need will for this client, as it's going to be connected all the time */
opts.context = d;
opts.onSuccess = test3dOnConnect;
opts.onFailure = test3dOnFailure;
MyLog(LOGA_DEBUG, "Connecting client d");
rc = MQTTAsync_connect(d, &opts);
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
if (rc != MQTTASYNC_SUCCESS)
{
failures++;
goto exit;
}
/* wait until d is ready: connected and subscribed */
count = 0;
while (!test3dReady && ++count < 10000)
MySleep(100);
assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
rc = MQTTAsync_setConnected(c, c, test3cConnected);
assert("Good rc from setConnectedCallback", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
/* let client c go: connect, and send disconnect command to proxy */
opts.will = &wopts;
opts.will->message = "will message";
opts.will->qos = 1;
opts.will->retained = 0;
opts.will->topicName = willTopic;
opts.onSuccess = test3cOnConnect;
opts.onFailure = test3cOnFailure;
opts.context = c;
opts.cleansession = 0;
opts.automaticReconnect = 1;
MyLog(LOGA_DEBUG, "Connecting client c");
rc = MQTTAsync_connect(c, &opts);
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
if (rc != MQTTASYNC_SUCCESS)
{
failures++;
goto exit;
}
/* wait for will message */
while (!test3_will_message_received && ++count < 10000)
MySleep(100);
MyLog(LOGA_DEBUG, "Now we can send some messages to be buffered");
test3c_connected = 0;
/* send some messages. Then reconnect (check connected callback), and check that those messages are received */
for (i = 0; i < 3; ++i)
{
char buf[50];
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
sprintf(buf, "QoS %d message", i);
pubmsg.payload = buf;
pubmsg.payloadlen = strlen(pubmsg.payload) + 1;
pubmsg.qos = i;
pubmsg.retained = 0;
rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
}
rc = MQTTAsync_getPendingTokens(c, &tokens);
assert("Good rc from getPendingTokens", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
i = 0;
if (tokens)
{
while (tokens[i] != -1)
++i;
MQTTAsync_free(tokens);
}
assert("Number of getPendingTokens should be 3", i == 3, "i was %d ", i);
/* wait for client to be reconnected */
while (!test3c_connected == 0 && ++count < 10000)
MySleep(100);
/* wait for success or failure callback */
while (test3_messages_received < 3 && ++count < 10000)
MySleep(100);
rc = MQTTAsync_getPendingTokens(c, &tokens);
assert("Good rc from getPendingTokens", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
i = 0;
if (tokens)
{
while (tokens[i] != -1)
++i;
MQTTAsync_free(tokens);
}
assert("Number of getPendingTokens should be 0", i == 0, "i was %d ", i);
rc = MQTTAsync_disconnect(c, NULL);
assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
rc = MQTTAsync_disconnect(d, NULL);
assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
exit:
MQTTAsync_destroy(&c);
MQTTAsync_destroy(&d);
MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", testname, tests, failures);
write_test_result();
return failures;
}
/*********************************************************************
test4: offline buffering - sending messages while disconnected
1. call connect
2. use proxy to disconnect the client
3. while the client is disconnected, send more messages
4. when the client auto reconnects, check that those messages are sent
*********************************************************************/
int test4_will_message_received = 0;
int test4_messages_received = 0;
int test4_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
{
MQTTAsync c = (MQTTAsync)context;
static int message_count = 0;
int rc;
MyLog(LOGA_DEBUG, "Message received on topic %s, \"%.*s\"", topicName, message->payloadlen, message->payload);
if (memcmp(message->payload, "will message", message->payloadlen) == 0)
test4_will_message_received = 1;
else
test4_messages_received++;
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
int test4Finished = 0;
int test4OnFailureCalled = 0;
void test4cOnFailure(void* context, MQTTAsync_failureData* response)
{
MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
test4OnFailureCalled++;
test4Finished = 1;
}
void test4dOnFailure(void* context, MQTTAsync_failureData* response)
{
MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
test4OnFailureCalled++;
test4Finished = 1;
}
void test4cOnConnect(void* context, MQTTAsync_successData* response)
{
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MyLog(LOGA_DEBUG, "In connect onSuccess callback for client d, context %p\n", context);
MQTTAsync c = (MQTTAsync)context;
int rc;
/* send a message to the proxy to break the connection */
pubmsg.payload = "TERMINATE";
pubmsg.payloadlen = strlen(pubmsg.payload);
pubmsg.qos = 0;
pubmsg.retained = 0;
rc = MQTTAsync_sendMessage(c, "MQTTSAS topic", &pubmsg, NULL);
assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
}
int test4dReady = 0;
char willTopic[100];
char test_topic[50];
void test4donSubscribe(void* context, MQTTAsync_successData* response)
{
MQTTAsync c = (MQTTAsync)context;
MyLog(LOGA_DEBUG, "In subscribe onSuccess callback for client d, %p granted qos %d", c, response->alt.qos);
test4dReady = 1;
}
void test4dOnConnect(void* context, MQTTAsync_successData* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
int qoss[2] = {2, 2};
char* topics[2] = {willTopic, test_topic};
MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
opts.onSuccess = test4donSubscribe;
opts.context = c;
rc = MQTTAsync_subscribeMany(c, 2, topics, qoss, &opts);
assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
test4Finished = 1;
}
int test4c_connected = 0;
void test4cConnected(void* context, char* cause)
{
MQTTAsync c = (MQTTAsync)context;
MyLog(LOGA_DEBUG, "In connected callback for client c, context %p\n", context);
test4c_connected = 1;
}
int test4(struct Options options)
{
char* testname = "test4";
int subsqos = 2;
MQTTAsync c, d;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
MQTTAsync_createOptions createOptions = MQTTAsync_createOptions_initializer;
int rc = 0;
int count = 0;
char clientidc[50];
char clientidd[50];
int i = 0;
MQTTAsync_token *tokens;
char *URIs[2] = {"rubbish", options.proxy_connection};
sprintf(willTopic, "paho-test9-4-%s", unique);
sprintf(clientidc, "paho-test9-4-c-%s", unique);
sprintf(clientidd, "paho-test9-4-d-%s", unique);
sprintf(test_topic, "paho-test9-4-test topic %s", unique);
test4Finished = 0;
failures = 0;
MyLog(LOGA_INFO, "Starting Offline buffering 4 - messages while disconnected with serverURIs");
fprintf(xml, "<testcase classname=\"test4\" name=\"%s\"", testname);
global_start_time = start_clock();
createOptions.sendWhileDisconnected = 1;
rc = MQTTAsync_createWithOptions(&c, "not used", clientidc, MQTTCLIENT_PERSISTENCE_DEFAULT,
NULL, &createOptions);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit;
}
rc = MQTTAsync_create(&d, options.connection, clientidd, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit;
}
opts.keepAliveInterval = 20;
opts.cleansession = 1;
rc = MQTTAsync_setCallbacks(d, d, NULL, test4_messageArrived, NULL);
assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
opts.will = NULL; /* don't need will for this client, as it's going to be connected all the time */
opts.context = d;
opts.onSuccess = test4dOnConnect;
opts.onFailure = test4dOnFailure;
MyLog(LOGA_DEBUG, "Connecting client d");
rc = MQTTAsync_connect(d, &opts);
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
if (rc != MQTTASYNC_SUCCESS)
{
failures++;
goto exit;
}
/* wait until d is ready: connected and subscribed */
count = 0;
while (!test4dReady && ++count < 10000)
MySleep(100);
assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
rc = MQTTAsync_setConnected(c, c, test4cConnected);
assert("Good rc from setConnectedCallback", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
/* let client c go: connect, and send disconnect command to proxy */
opts.will = &wopts;
opts.will->message = "will message";
opts.will->qos = 1;
opts.will->retained = 0;
opts.will->topicName = willTopic;
opts.onSuccess = test4cOnConnect;
opts.onFailure = test4cOnFailure;
opts.context = c;
opts.cleansession = 0;
opts.serverURIs = URIs;
opts.serverURIcount = 2;
opts.automaticReconnect = 1;
MyLog(LOGA_DEBUG, "Connecting client c");
rc = MQTTAsync_connect(c, &opts);
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
if (rc != MQTTASYNC_SUCCESS)
{
failures++;
goto exit;
}
/* wait for will message */
while (!test4_will_message_received && ++count < 10000)
MySleep(100);
MyLog(LOGA_DEBUG, "Now we can send some messages to be buffered");
test4c_connected = 0;
/* send some messages. Then reconnect (check connected callback), and check that those messages are received */
for (i = 0; i < 3; ++i)
{
char buf[50];
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
sprintf(buf, "QoS %d message", i);
pubmsg.payload = buf;
pubmsg.payloadlen = strlen(pubmsg.payload) + 1;
pubmsg.qos = i;
pubmsg.retained = 0;
rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
}
rc = MQTTAsync_getPendingTokens(c, &tokens);
assert("Good rc from getPendingTokens", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
i = 0;
if (tokens)
{
while (tokens[i] != -1)
++i;
MQTTAsync_free(tokens);
}
assert("Number of getPendingTokens should be 3", i == 3, "i was %d ", i);
/* wait for client to be reconnected */
while (!test4c_connected == 0 && ++count < 10000)
MySleep(100);
/* wait for success or failure callback */
while (test4_messages_received < 3 && ++count < 10000)
MySleep(100);
rc = MQTTAsync_getPendingTokens(c, &tokens);
assert("Good rc from getPendingTokens", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
i = 0;
if (tokens)
{
while (tokens[i] != -1)
++i;
MQTTAsync_free(tokens);
}
assert("Number of getPendingTokens should be 0", i == 0, "i was %d ", i);
rc = MQTTAsync_disconnect(c, NULL);
assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
rc = MQTTAsync_disconnect(d, NULL);
assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
exit:
MQTTAsync_destroy(&c);
MQTTAsync_destroy(&d);
MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", testname, tests, failures);
write_test_result();
return failures;
}
void handleTrace(enum MQTTASYNC_TRACE_LEVELS level, char* message) void handleTrace(enum MQTTASYNC_TRACE_LEVELS level, char* message)
{ {
printf("%s\n", message); printf("%s\n", message);
...@@ -801,7 +1374,7 @@ int main(int argc, char** argv) ...@@ -801,7 +1374,7 @@ int main(int argc, char** argv)
{ {
int* numtests = &tests; int* numtests = &tests;
int rc = 0; int rc = 0;
int (*tests[])() = { NULL, test1, test2}; int (*tests[])() = { NULL, test1, test2, test3, test4};
sprintf(unique, "%u", rand()); sprintf(unique, "%u", rand());
MyLog(LOGA_INFO, "Random prefix/suffix is %s", unique); MyLog(LOGA_INFO, "Random prefix/suffix is %s", unique);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment