Commit 2ba53a38 authored by Ian Craggs's avatar Ian Craggs

add MQTTAsync_waitForCompletion and MQTTAsync_isComplete

Bug: 433871
parent 0a045757
...@@ -2647,6 +2647,8 @@ int MQTTAsync_getPendingTokens(MQTTAsync handle, MQTTAsync_token **tokens) ...@@ -2647,6 +2647,8 @@ int MQTTAsync_getPendingTokens(MQTTAsync handle, MQTTAsync_token **tokens)
int rc = MQTTASYNC_SUCCESS; int rc = MQTTASYNC_SUCCESS;
MQTTAsyncs* m = handle; MQTTAsyncs* m = handle;
*tokens = NULL; *tokens = NULL;
ListElement* current = NULL;
int count = 0;
FUNC_ENTRY; FUNC_ENTRY;
MQTTAsync_lock_mutex(mqttasync_mutex); MQTTAsync_lock_mutex(mqttasync_mutex);
...@@ -2657,19 +2659,87 @@ int MQTTAsync_getPendingTokens(MQTTAsync handle, MQTTAsync_token **tokens) ...@@ -2657,19 +2659,87 @@ int MQTTAsync_getPendingTokens(MQTTAsync handle, MQTTAsync_token **tokens)
goto exit; goto exit;
} }
/* calculate the number of pending tokens - commands plus inflight */
while (ListNextElement(commands, &current))
{
MQTTAsync_queuedCommand* cmd = (MQTTAsync_queuedCommand*)(current->content);
if (cmd->client == m)
count++;
}
if (m->c)
count += m->c->outboundMsgs->count;
if (count == 0)
goto exit; /* no tokens to return */
*tokens = malloc(sizeof(MQTTAsync_token) * (count + 1)); /* add space for sentinel at end of list */
/* First add the unprocessed commands to the pending tokens */
current = NULL;
count = 0;
while (ListNextElement(commands, &current))
{
MQTTAsync_queuedCommand* cmd = (MQTTAsync_queuedCommand*)(current->content);
if (cmd->client == m)
(*tokens)[count++] = cmd->command.token;
}
/* Now add the inflight messages */
if (m->c && m->c->outboundMsgs->count > 0) if (m->c && m->c->outboundMsgs->count > 0)
{ {
current = NULL;
while (ListNextElement(m->c->outboundMsgs, &current))
{
Messages* m = (Messages*)(current->content);
(*tokens)[count++] = m->msgid;
}
}
(*tokens)[count] = -1; /* indicate end of list */
exit:
MQTTAsync_unlock_mutex(mqttasync_mutex);
FUNC_EXIT_RC(rc);
return rc;
}
int MQTTAsync_isComplete(MQTTAsync handle, MQTTAsync_token dt)
{
int rc = MQTTASYNC_SUCCESS;
MQTTAsyncs* m = handle;
ListElement* current = NULL; ListElement* current = NULL;
int count = 0;
*tokens = malloc(sizeof(MQTTAsync_token) * (m->c->outboundMsgs->count + 1)); FUNC_ENTRY;
MQTTAsync_lock_mutex(mqttasync_mutex);
if (m == NULL)
{
rc = MQTTASYNC_FAILURE;
goto exit;
}
/* First check unprocessed commands */
current = NULL;
while (ListNextElement(commands, &current))
{
MQTTAsync_queuedCommand* cmd = (MQTTAsync_queuedCommand*)(current->content);
if (cmd->client == m && cmd->command.token == dt)
goto exit;
}
/* Now check the inflight messages */
if (m->c && m->c->outboundMsgs->count > 0)
{
current = NULL;
while (ListNextElement(m->c->outboundMsgs, &current)) while (ListNextElement(m->c->outboundMsgs, &current))
{ {
Messages* m = (Messages*)(current->content); Messages* m = (Messages*)(current->content);
(*tokens)[count++] = m->msgid; if (m->msgid == dt)
goto exit;
} }
(*tokens)[count] = -1;
} }
rc = MQTTASYNC_TRUE; /* Can't find it, so it must be complete */
exit: exit:
MQTTAsync_unlock_mutex(mqttasync_mutex); MQTTAsync_unlock_mutex(mqttasync_mutex);
...@@ -2678,6 +2748,51 @@ exit: ...@@ -2678,6 +2748,51 @@ exit:
} }
int MQTTAsync_waitForCompletion(MQTTAsync handle, MQTTAsync_token dt, unsigned long timeout)
{
int rc = MQTTASYNC_FAILURE;
START_TIME_TYPE start = MQTTAsync_start_clock();
unsigned long elapsed = 0L;
MQTTAsyncs* m = handle;
FUNC_ENTRY;
MQTTAsync_lock_mutex(mqttasync_mutex);
if (m == NULL || m->c == NULL)
{
rc = MQTTASYNC_FAILURE;
goto exit;
}
if (m->c->connected == 0)
{
rc = MQTTASYNC_DISCONNECTED;
goto exit;
}
MQTTAsync_unlock_mutex(mqttasync_mutex);
if (MQTTAsync_isComplete(handle, dt) == 1)
{
rc = MQTTASYNC_SUCCESS; /* well we couldn't find it */
goto exit;
}
elapsed = MQTTAsync_elapsed(start);
while (elapsed < timeout)
{
MQTTAsync_sleep(100);
if (MQTTAsync_isComplete(handle, dt) == 1)
{
rc = MQTTASYNC_SUCCESS; /* well we couldn't find it */
goto exit;
}
elapsed = MQTTAsync_elapsed(start);
}
exit:
FUNC_EXIT_RC(rc);
return rc;
}
void MQTTAsync_setTraceLevel(enum MQTTASYNC_TRACE_LEVELS level) void MQTTAsync_setTraceLevel(enum MQTTASYNC_TRACE_LEVELS level)
{ {
......
...@@ -908,6 +908,11 @@ DLLExport int MQTTAsync_sendMessage(MQTTAsync handle, const char* destinationNam ...@@ -908,6 +908,11 @@ DLLExport int MQTTAsync_sendMessage(MQTTAsync handle, const char* destinationNam
*/ */
DLLExport int MQTTAsync_getPendingTokens(MQTTAsync handle, MQTTAsync_token **tokens); DLLExport int MQTTAsync_getPendingTokens(MQTTAsync handle, MQTTAsync_token **tokens);
#define MQTTASYNC_TRUE 1
DLLExport int MQTTAsync_isComplete(MQTTAsync handle, MQTTAsync_token dt);
DLLExport int MQTTAsync_waitForCompletion(MQTTAsync handle, MQTTAsync_token dt, unsigned long timeout);
/** /**
* This function frees memory allocated to an MQTT message, including the * This function frees memory allocated to an MQTT message, including the
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
/** /**
* @file * @file
* Tests for the MQ Telemetry MQTT C client * Tests for the Paho Asynchronous MQTT C client
*/ */
...@@ -1134,6 +1134,260 @@ exit: ...@@ -1134,6 +1134,260 @@ exit:
/********************************************************************
Test7: Persistence
*********************************************************************/
char* test7_topic = "C client test7";
int test7_messageCount = 0;
void test7_onDisconnect(void* context, MQTTAsync_successData* response)
{
MQTTAsync c = (MQTTAsync)context;
MyLog(LOGA_DEBUG, "In onDisconnect callback %p", c);
test_finished = 1;
}
void test7_onUnsubscribe(void* context, MQTTAsync_successData* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer;
int rc;
MyLog(LOGA_DEBUG, "In onUnsubscribe onSuccess callback %p", c);
opts.onSuccess = test7_onDisconnect;
opts.context = c;
rc = MQTTAsync_disconnect(c, &opts);
assert("Disconnect successful", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
}
int test7_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
{
MQTTAsync c = (MQTTAsync)context;
static int message_count = 0;
int rc;
MyLog(LOGA_DEBUG, "Test7: received message id %d", message->msgid);
test7_messageCount++;
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
static int test7_subscribed = 0;
void test7_onSubscribe(void* context, MQTTAsync_successData* response)
{
MQTTAsync c = (MQTTAsync)context;
MyLog(LOGA_DEBUG, "In subscribe onSuccess callback %p granted qos %d", c, response->alt.qos);
test7_subscribed = 1;
}
void test7_onConnect(void* context, MQTTAsync_successData* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p", context);
opts.onSuccess = test7_onSubscribe;
opts.context = c;
rc = MQTTAsync_subscribe(c, test7_topic, 2, &opts);
assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
test_finished = 1;
}
/*********************************************************************
Test7: Pending tokens
*********************************************************************/
int test7(struct Options options)
{
int subsqos = 2;
MQTTAsync c;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
int rc = 0;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MQTTAsync_responseOptions ropts = MQTTAsync_responseOptions_initializer;
MQTTAsync_disconnectOptions dopts = MQTTAsync_disconnectOptions_initializer;
MQTTAsync_token* tokens = NULL;
int msg_count = 6;
MyLog(LOGA_INFO, "Starting test 7 - pending tokens");
fprintf(xml, "<testcase classname=\"test4\" name=\"asynchronous connect\"");
global_start_time = start_clock();
test_finished = 0;
rc = MQTTAsync_create(&c, options.connection, "async_test7",
MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit;
}
rc = MQTTAsync_setCallbacks(c, c, NULL, test7_messageArrived, NULL);
assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
opts.keepAliveInterval = 20;
opts.cleansession = 0;
opts.username = "testuser";
opts.password = "testpassword";
opts.MQTTVersion = options.MQTTVersion;
opts.will = &wopts;
opts.will->message = "will message";
opts.will->qos = 1;
opts.will->retained = 0;
opts.will->topicName = "will topic";
opts.will = NULL;
opts.onSuccess = test7_onConnect;
opts.onFailure = NULL;
opts.context = c;
MyLog(LOGA_DEBUG, "Connecting");
rc = MQTTAsync_connect(c, &opts);
rc = 0;
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit;
while (!test7_subscribed)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
pubmsg.payloadlen = 11;
pubmsg.qos = 2;
pubmsg.retained = 0;
rc = MQTTAsync_send(c, test_topic, pubmsg.payloadlen, pubmsg.payload, pubmsg.qos, pubmsg.retained, &ropts);
MyLog(LOGA_DEBUG, "Token was %d", ropts.token);
rc = MQTTAsync_isComplete(c, ropts.token);
assert("0 rc from isComplete", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
rc = MQTTAsync_waitForCompletion(c, ropts.token, 5000L);
assert("Good rc from waitForCompletion", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
rc = MQTTAsync_isComplete(c, ropts.token);
assert("1 rc from isComplete", rc == 1, "rc was %d", rc);
test7_messageCount = 0;
int i = 0;
pubmsg.qos = 2;
for (i = 0; i < msg_count; ++i)
{
pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
pubmsg.payloadlen = 11;
//pubmsg.qos = (pubmsg.qos == 2) ? 1 : 2;
pubmsg.retained = 0;
rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &ropts);
}
/* disconnect immediately without receiving the incoming messages */
dopts.timeout = 0;
dopts.onSuccess = test7_onDisconnect;
MQTTAsync_disconnect(c, &dopts); /* now there should be "orphaned" publications */
while (!test_finished)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
test_finished = 0;
rc = MQTTAsync_getPendingTokens(c, &tokens);
assert("getPendingTokens rc == 0", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
assert("should get some tokens back", tokens != NULL, "tokens was %p", tokens);
MQTTAsync_free(tokens);
MQTTAsync_destroy(&c); /* force re-reading persistence on create */
MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
rc = MQTTAsync_create(&c, options.connection, "async_test7", MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit;
}
rc = MQTTAsync_getPendingTokens(c, &tokens);
assert("getPendingTokens rc == 0", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
assert("should get some tokens back", tokens != NULL, "tokens was %p", tokens);
if (tokens)
{
int i = 0;
while (tokens[i] != -1)
MyLog(LOGA_DEBUG, "Delivery token %d", tokens[i++]);
MQTTAsync_free(tokens);
assert1("no of tokens should be count", i == msg_count, "no of tokens %d count %d", i, msg_count);
}
rc = MQTTAsync_setCallbacks(c, c, NULL, test7_messageArrived, NULL);
assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
MyLog(LOGA_DEBUG, "Reconnecting");
opts.context = c;
if (MQTTAsync_connect(c, &opts) != 0)
{
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
goto exit;
}
#if defined(WIN32)
Sleep(5000);
#else
usleep(5000000L);
#endif
rc = MQTTAsync_getPendingTokens(c, &tokens);
assert("getPendingTokens rc == 0", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
assert("should get no tokens back", tokens == NULL, "tokens was %p", tokens);
/* assert1("no of messages should be count", test7_messageCount == msg_count, "no of tokens %d count %d",
test7_messageCount, msg_count); fails against Mosquitto - needs testing */
MQTTAsync_disconnect(c, &dopts);
while (!test_finished)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
MQTTAsync_destroy(&c);
exit:
MyLog(LOGA_INFO, "TEST7: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", tests, failures);
write_test_result();
return failures;
}
void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message) void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message)
{ {
printf("Trace : %d, %s\n", level, message); printf("Trace : %d, %s\n", level, message);
...@@ -1145,7 +1399,7 @@ void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message) ...@@ -1145,7 +1399,7 @@ void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message)
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
int rc = 0; int rc = 0;
int (*tests[])() = {NULL, test1, test2, test3, test4, test5, test6}; /* indexed starting from 1 */ int (*tests[])() = {NULL, test1, test2, test3, test4, test5, test6, test7}; /* indexed starting from 1 */
MQTTAsync_nameValue* info; MQTTAsync_nameValue* info;
int i; int i;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment