Commit 23aebfa5 authored by Ian Craggs's avatar Ian Craggs

Add function and test for max buffered messages

parent eb738d62
...@@ -2574,6 +2574,22 @@ int MQTTAsync_unsubscribe(MQTTAsync handle, const char* topic, MQTTAsync_respons ...@@ -2574,6 +2574,22 @@ int MQTTAsync_unsubscribe(MQTTAsync handle, const char* topic, MQTTAsync_respons
} }
int MQTTAsync_countBufferedMessages(MQTTAsyncs* m)
{
ListElement* current = NULL;
int count = 0;
while (ListNextElement(commands, &current))
{
MQTTAsync_queuedCommand* cmd = (MQTTAsync_queuedCommand*)(current->content);
if (cmd->client == m && cmd->command.type == PUBLISH)
count++;
}
return count;
}
int MQTTAsync_send(MQTTAsync handle, const char* destinationName, int payloadlen, void* payload, int MQTTAsync_send(MQTTAsync handle, const char* destinationName, int payloadlen, void* payload,
int qos, int retained, MQTTAsync_responseOptions* response) int qos, int retained, MQTTAsync_responseOptions* response)
{ {
...@@ -2594,6 +2610,8 @@ int MQTTAsync_send(MQTTAsync handle, const char* destinationName, int payloadlen ...@@ -2594,6 +2610,8 @@ int MQTTAsync_send(MQTTAsync handle, const char* destinationName, int payloadlen
rc = MQTTASYNC_BAD_QOS; rc = MQTTASYNC_BAD_QOS;
else if (qos > 0 && (msgid = MQTTAsync_assignMsgId(m)) == 0) else if (qos > 0 && (msgid = MQTTAsync_assignMsgId(m)) == 0)
rc = MQTTASYNC_NO_MORE_MSGIDS; rc = MQTTASYNC_NO_MORE_MSGIDS;
else if (m->createOptions && (MQTTAsync_countBufferedMessages(m) >= m->createOptions->maxBufferedMessages))
rc = MQTTASYNC_MAX_BUFFERED_MESSAGES;
if (rc != MQTTASYNC_SUCCESS) if (rc != MQTTASYNC_SUCCESS)
goto exit; goto exit;
......
...@@ -155,6 +155,10 @@ ...@@ -155,6 +155,10 @@
* Return code: the request is being discarded when not complete * Return code: the request is being discarded when not complete
*/ */
#define MQTTASYNC_OPERATION_INCOMPLETE -11 #define MQTTASYNC_OPERATION_INCOMPLETE -11
/**
* Return code: the request is being discarded when not complete
*/
#define MQTTASYNC_MAX_BUFFERED_MESSAGES -12
/** /**
* Default MQTT version to connect with. Use 3.1.1 then fall back to 3.1 * Default MQTT version to connect with. Use 3.1.1 then fall back to 3.1
......
...@@ -1364,6 +1364,287 @@ exit: ...@@ -1364,6 +1364,287 @@ exit:
} }
/*********************************************************************
test5: offline buffering - check max buffered
1. call connect
2. use proxy to disconnect the client
3. while the client is disconnected, send more messages
4. when the client reconnects, check that those messages are sent
*********************************************************************/
int test5_will_message_received = 0;
int test5_messages_received = 0;
int test5_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
{
MQTTAsync c = (MQTTAsync)context;
static int message_count = 0;
int rc;
MyLog(LOGA_DEBUG, "Message received on topic %s, \"%.*s\"", topicName, message->payloadlen, message->payload);
if (memcmp(message->payload, "will message", message->payloadlen) == 0)
test5_will_message_received = 1;
else
test5_messages_received++;
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
int test5Finished = 0;
int test5OnFailureCalled = 0;
void test5cOnFailure(void* context, MQTTAsync_failureData* response)
{
MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
test5OnFailureCalled++;
test5Finished = 1;
}
void test5dOnFailure(void* context, MQTTAsync_failureData* response)
{
MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
test5OnFailureCalled++;
test5Finished = 1;
}
void test5cOnConnect(void* context, MQTTAsync_successData* response)
{
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MyLog(LOGA_DEBUG, "In connect onSuccess callback for client d, context %p\n", context);
MQTTAsync c = (MQTTAsync)context;
int rc;
/* send a message to the proxy to break the connection */
pubmsg.payload = "TERMINATE";
pubmsg.payloadlen = strlen(pubmsg.payload);
pubmsg.qos = 0;
pubmsg.retained = 0;
rc = MQTTAsync_sendMessage(c, "MQTTSAS topic", &pubmsg, NULL);
assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
}
int test5dReady = 0;
char willTopic[100];
char test_topic[50];
void test5donSubscribe(void* context, MQTTAsync_successData* response)
{
MQTTAsync c = (MQTTAsync)context;
MyLog(LOGA_DEBUG, "In subscribe onSuccess callback for client d, %p granted qos %d", c, response->alt.qos);
test5dReady = 1;
}
void test5dOnConnect(void* context, MQTTAsync_successData* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
int qoss[2] = {2, 2};
char* topics[2] = {willTopic, test_topic};
MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
opts.onSuccess = test5donSubscribe;
opts.context = c;
rc = MQTTAsync_subscribeMany(c, 2, topics, qoss, &opts);
assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
test5Finished = 1;
}
int test5c_connected = 0;
void test5cConnected(void* context, char* cause)
{
MQTTAsync c = (MQTTAsync)context;
MyLog(LOGA_DEBUG, "In connected callback for client c, context %p\n", context);
test5c_connected = 1;
}
int test5(struct Options options)
{
char* testname = "test5";
int subsqos = 2;
MQTTAsync c, d;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
MQTTAsync_createOptions createOptions = MQTTAsync_createOptions_initializer;
int rc = 0;
int count = 0;
char clientidc[50];
char clientidd[50];
int i = 0;
MQTTAsync_token *tokens;
sprintf(willTopic, "paho-test9-5-%s", unique);
sprintf(clientidc, "paho-test9-5-c-%s", unique);
sprintf(clientidd, "paho-test9-5-d-%s", unique);
sprintf(test_topic, "paho-test9-5-test topic %s", unique);
test5Finished = 0;
failures = 0;
MyLog(LOGA_INFO, "Starting Offline buffering 5 - max buffered");
fprintf(xml, "<testcase classname=\"test5\" name=\"%s\"", testname);
global_start_time = start_clock();
createOptions.sendWhileDisconnected = 1;
createOptions.maxBufferedMessages = 3;
rc = MQTTAsync_createWithOptions(&c, options.proxy_connection, clientidc, MQTTCLIENT_PERSISTENCE_DEFAULT,
NULL, &createOptions);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit;
}
rc = MQTTAsync_create(&d, options.connection, clientidd, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit;
}
opts.keepAliveInterval = 20;
opts.cleansession = 1;
//opts.username = "testuser";
//opts.password = "testpassword";
rc = MQTTAsync_setCallbacks(d, d, NULL, test5_messageArrived, NULL);
assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
opts.will = NULL; /* don't need will for this client, as it's going to be connected all the time */
opts.context = d;
opts.onSuccess = test5dOnConnect;
opts.onFailure = test5dOnFailure;
MyLog(LOGA_DEBUG, "Connecting client d");
rc = MQTTAsync_connect(d, &opts);
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
if (rc != MQTTASYNC_SUCCESS)
{
failures++;
goto exit;
}
/* wait until d is ready: connected and subscribed */
count = 0;
while (!test5dReady && ++count < 10000)
MySleep(100);
assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
rc = MQTTAsync_setConnected(c, c, test5cConnected);
assert("Good rc from setConnectedCallback", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
/* let client c go: connect, and send disconnect command to proxy */
opts.will = &wopts;
opts.will->message = "will message";
opts.will->qos = 1;
opts.will->retained = 0;
opts.will->topicName = willTopic;
opts.onSuccess = test5cOnConnect;
opts.onFailure = test5cOnFailure;
opts.context = c;
opts.cleansession = 0;
MyLog(LOGA_DEBUG, "Connecting client c");
rc = MQTTAsync_connect(c, &opts);
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
if (rc != MQTTASYNC_SUCCESS)
{
failures++;
goto exit;
}
/* wait for will message */
while (!test5_will_message_received && ++count < 10000)
MySleep(100);
MyLog(LOGA_DEBUG, "Now we can send some messages to be buffered");
test5c_connected = 0;
/* send some messages. Then reconnect (check connected callback), and check that those messages are received */
for (i = 0; i < 5; ++i)
{
char buf[50];
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
sprintf(buf, "QoS %d message", i);
pubmsg.payload = buf;
pubmsg.payloadlen = strlen(pubmsg.payload) + 1;
pubmsg.qos = i % 3;
pubmsg.retained = 0;
rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
if (i <= 2)
assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
else
assert("Bad rc from sendMessage", rc == MQTTASYNC_MAX_BUFFERED_MESSAGES, "rc was %d ", rc);
}
rc = MQTTAsync_getPendingTokens(c, &tokens);
assert("Good rc from getPendingTokens", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
i = 0;
if (tokens)
{
while (tokens[i] != -1)
++i;
MQTTAsync_free(tokens);
}
assert("Number of getPendingTokens should be 3", i == 3, "i was %d ", i);
rc = MQTTAsync_reconnect(c);
assert("Good rc from reconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
/* wait for client to be reconnected */
while (!test5c_connected == 0 && ++count < 10000)
MySleep(100);
/* wait for success or failure callback */
while (test5_messages_received < 3 && ++count < 10000)
MySleep(100);
rc = MQTTAsync_getPendingTokens(c, &tokens);
assert("Good rc from getPendingTokens", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
i = 0;
if (tokens)
{
while (tokens[i] != -1)
++i;
MQTTAsync_free(tokens);
}
assert("Number of getPendingTokens should be 0", i == 0, "i was %d ", i);
rc = MQTTAsync_disconnect(c, NULL);
assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
rc = MQTTAsync_disconnect(d, NULL);
assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
exit:
MQTTAsync_destroy(&c);
MQTTAsync_destroy(&d);
MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", testname, tests, failures);
write_test_result();
return failures;
}
void handleTrace(enum MQTTASYNC_TRACE_LEVELS level, char* message) void handleTrace(enum MQTTASYNC_TRACE_LEVELS level, char* message)
{ {
printf("%s\n", message); printf("%s\n", message);
...@@ -1374,7 +1655,7 @@ int main(int argc, char** argv) ...@@ -1374,7 +1655,7 @@ int main(int argc, char** argv)
{ {
int* numtests = &tests; int* numtests = &tests;
int rc = 0; int rc = 0;
int (*tests[])() = { NULL, test1, test2, test3, test4}; int (*tests[])() = { NULL, test1, test2, test3, test4, test5};
sprintf(unique, "%u", rand()); sprintf(unique, "%u", rand());
MyLog(LOGA_INFO, "Random prefix/suffix is %s", unique); MyLog(LOGA_INFO, "Random prefix/suffix is %s", unique);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment