Commit e18db33e authored by Ian Craggs's avatar Ian Craggs

Add another continuous write test

parent 6bd9c691
...@@ -1820,6 +1820,284 @@ exit: ...@@ -1820,6 +1820,284 @@ exit:
} }
/*********************************************************************
Test7: Fill up TCP buffer with QoS 0 messages
*********************************************************************/
int test7c_connected = 0;
int test7_will_message_received = 0;
int test7_messages_received = 0;
int test7Finished = 0;
int test7OnFailureCalled = 0;
int test7dReady = 0;
int test7_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
{
MQTTAsync c = (MQTTAsync)context;
static int message_count = 0;
MyLog(LOGA_DEBUG, "Message received on topic %s, \"%.*s\"", topicName, message->payloadlen, message->payload);
if (memcmp(message->payload, "will message", message->payloadlen) == 0)
test7_will_message_received = 1;
else
test7_messages_received++;
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
void test7cConnected(void* context, char* cause)
{
MQTTAsync c = (MQTTAsync)context;
MyLog(LOGA_DEBUG, "In connected callback for client c, context %p\n", context);
test7c_connected = 1;
}
void test7cOnConnectFailure(void* context, MQTTAsync_failureData* response)
{
MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
test7OnFailureCalled++;
test7Finished = 1;
}
void test7cOnConnectSuccess(void* context, MQTTAsync_successData* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
int rc;
MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
/* send a message to the proxy to break the connection */
pubmsg.payload = "TERMINATE";
pubmsg.payloadlen = (int)strlen(pubmsg.payload);
pubmsg.qos = 0;
pubmsg.retained = 0;
//rc = MQTTAsync_sendMessage(c, "MQTTSAS topic", &pubmsg, NULL);
//assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
}
void test7dOnConnectFailure(void* context, MQTTAsync_failureData* response)
{
MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
test7OnFailureCalled++;
test7Finished = 1;
}
void test7donSubscribe(void* context, MQTTAsync_successData* response)
{
MQTTAsync c = (MQTTAsync)context;
MyLog(LOGA_DEBUG, "In subscribe onSuccess callback for client d, %p granted qos %d", c, response->alt.qos);
test7dReady = 1;
}
void test7dOnConnectSuccess(void* context, MQTTAsync_successData* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
int qoss[2] = {2, 2};
char* topics[2] = {willTopic, test_topic};
MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
opts.onSuccess = test7donSubscribe;
opts.context = c;
//rc = MQTTAsync_subscribeMany(c, 2, topics, qoss, &opts);
//assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
//if (rc != MQTTASYNC_SUCCESS)
// test5Finished = 1;
test7dReady = 1;
}
int test7(struct Options options)
{
char* testname = "test7";
int subsqos = 2;
MQTTAsync c, d;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
//MQTTAsync_createOptions createOptions = MQTTAsync_createOptions_initializer;
int rc = 0;
int count = 0;
char clientidc[50];
char clientidd[50];
int i = 0;
MQTTAsync_token *tokens;
test7_will_message_received = 0;
test7_messages_received = 0;
test7Finished = 0;
test7OnFailureCalled = 0;
test7c_connected = 0;
sprintf(willTopic, "paho-test9-7-%s", unique);
sprintf(clientidc, "paho-test9-7-c-%s", unique);
sprintf(clientidd, "paho-test9-7-d-%s", unique);
sprintf(test_topic, "longer paho-test9-7-test topic %s", unique);
test7Finished = 0;
failures = 0;
MyLog(LOGA_INFO, "Starting Offline buffering 7 - fill TCP buffer");
fprintf(xml, "<testcase classname=\"test7\" name=\"%s\"", testname);
global_start_time = start_clock();
rc = MQTTAsync_create(&c, options.proxy_connection, clientidc, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit;
}
rc = MQTTAsync_create(&d, options.connection, clientidd, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit;
}
opts.keepAliveInterval = 20;
opts.cleansession = 1;
rc = MQTTAsync_setCallbacks(d, d, NULL, test7_messageArrived, NULL);
assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
opts.will = NULL; /* don't need will for this client, as it's going to be connected all the time */
opts.context = d;
opts.onSuccess = test7dOnConnectSuccess;
opts.onFailure = test7dOnConnectFailure;
MyLog(LOGA_DEBUG, "Connecting client d");
rc = MQTTAsync_connect(d, &opts);
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
if (rc != MQTTASYNC_SUCCESS)
{
failures++;
goto exit;
}
/* wait until d is ready: connected and subscribed */
count = 0;
while (!test7dReady && ++count < 10000)
MySleep(100);
assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
rc = MQTTAsync_setConnected(c, c, test7cConnected);
assert("Good rc from setConnectedCallback", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
/* let client c go: connect, and send disconnect command to proxy */
opts.will = &wopts;
opts.will->payload.data = "will message";
opts.will->payload.len = strlen(opts.will->payload.data) + 1;
opts.will->qos = 1;
opts.will->retained = 0;
opts.will->topicName = willTopic;
opts.onSuccess = test7cOnConnectSuccess;
opts.onFailure = test7cOnConnectFailure;
opts.context = c;
opts.cleansession = 0;
MyLog(LOGA_DEBUG, "Connecting client c");
rc = MQTTAsync_connect(c, &opts);
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
if (rc != MQTTASYNC_SUCCESS)
{
failures++;
goto exit;
}
count = 0;
while (!test7c_connected && ++count < 10000)
MySleep(100);
assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
/* wait for will message */
//while (test7_will_message_received == 0 && ++count < 10000)
// MySleep(100);
MyLog(LOGA_DEBUG, "Now we can send some messages to be buffered by TCP");
test7c_connected = 0;
char buf[20000];
/* send some messages. Then reconnect (check connected callback), and check that those messages are received */
for (i = 0; i < 50000; ++i)
{
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
pubmsg.qos = 0; /*i % 3;*/
sprintf(buf, "QoS %d message", pubmsg.qos);
pubmsg.payload = buf;
pubmsg.payloadlen = 20000; //(int)(strlen(pubmsg.payload) + 1);
pubmsg.retained = 0;
rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
if (rc != 0)
break;
}
#if 0
rc = MQTTAsync_getPendingTokens(c, &tokens);
assert("Good rc from getPendingTokens", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
i = 0;
if (tokens)
{
while (tokens[i] != -1)
++i;
MQTTAsync_free(tokens);
}
assert("Number of getPendingTokens should be 3", i == 3, "i was %d ", i);
rc = MQTTAsync_reconnect(c);
assert("Good rc from reconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
/* wait for client to be reconnected */
while (!test5c_connected && ++count < 10000)
MySleep(100);
/* wait for success or failure callback */
while (test5_messages_received < 3 && ++count < 10000)
MySleep(100);
rc = MQTTAsync_getPendingTokens(c, &tokens);
assert("Good rc from getPendingTokens", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
i = 0;
if (tokens)
{
while (tokens[i] != -1)
++i;
MQTTAsync_free(tokens);
}
assert("Number of getPendingTokens should be 0", i == 0, "i was %d ", i);
#endif
rc = MQTTAsync_disconnect(c, NULL);
assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
rc = MQTTAsync_disconnect(d, NULL);
assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
exit:
MySleep(200);
MQTTAsync_destroy(&c);
MQTTAsync_destroy(&d);
MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", testname, tests, failures);
write_test_result();
return failures;
}
void handleTrace(enum MQTTASYNC_TRACE_LEVELS level, char* message) void handleTrace(enum MQTTASYNC_TRACE_LEVELS level, char* message)
{ {
...@@ -1831,8 +2109,10 @@ int main(int argc, char** argv) ...@@ -1831,8 +2109,10 @@ int main(int argc, char** argv)
{ {
int* numtests = &tests; int* numtests = &tests;
int rc = 0; int rc = 0;
int (*tests[])() = { NULL, test1, test2, test3, test4, test5, test6}; int (*tests[])() = { NULL, test1, test2, test3, test4, test5, test6, test7};
time_t randtime;
srand((unsigned) time(&randtime));
sprintf(unique, "%u", rand()); sprintf(unique, "%u", rand());
MyLog(LOGA_INFO, "Random prefix/suffix is %s", unique); MyLog(LOGA_INFO, "Random prefix/suffix is %s", unique);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment