Commit cdd7f0e4 authored by Juergen Kosel's avatar Juergen Kosel

Add test for issue #373 and #385

This test connects to a broker up to 10 times.
As long as the connection is established, 100 message should be published
each second.
At the end all memory should be freed and no segmentation occurred.
Signed-off-by: 's avatarJuergen Kosel <juergen.kosel@softing.com>
parent 14270a2b
...@@ -25,13 +25,18 @@ ...@@ -25,13 +25,18 @@
#include "Thread.h" #include "Thread.h"
#if !defined(_WINDOWS) #if !defined(_WINDOWS)
#include <sys/time.h> #include <sys/time.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <unistd.h> #include <unistd.h>
#include <errno.h> #include <errno.h>
#else #else
#include <windows.h> #include <windows.h>
#endif #endif
#include "Heap.h" // for Heap_get_info
// undefine macros from Heap.h:
#undef malloc
#undef realloc
#undef free
char unique[50]; // unique suffix/prefix to add to clientid/topic etc char unique[50]; // unique suffix/prefix to add to clientid/topic etc
...@@ -115,7 +120,7 @@ void MyLog(int LOGA_level, char* format, ...) ...@@ -115,7 +120,7 @@ void MyLog(int LOGA_level, char* format, ...)
va_start(args, format); va_start(args, format);
vsnprintf(&msg_buf[strlen(msg_buf)], sizeof(msg_buf) - strlen(msg_buf), vsnprintf(&msg_buf[strlen(msg_buf)], sizeof(msg_buf) - strlen(msg_buf),
format, args); format, args);
va_end(args); va_end(args);
printf("%s\n", msg_buf); printf("%s\n", msg_buf);
...@@ -131,13 +136,203 @@ void MySleep(long milliseconds) ...@@ -131,13 +136,203 @@ void MySleep(long milliseconds)
#endif #endif
} }
#define assert(a, b, c, d) myassert(__FILE__, __LINE__, a, b, c, d)
int tests = 0; int tests = 0;
int failures = 0; int failures = 0;
int connected = 0;
int pendingMessageCnt = 0; /* counter of messages which are currently queued for publish */
int pendingMessageCntMax = 0;
int failedPublishCnt = 0;
int goodPublishCnt = 0;
int connectCnt = 0;
int connecting = 0;
void myassert(char* filename, int lineno, char* description, int value,
char* format, ...)
{
++tests;
if (!value)
{
va_list args;
++failures;
MyLog(LOGA_INFO, "Assertion failed, file %s, line %d, description: %s", filename,
lineno, description);
va_start(args, format);
vprintf(format, args);
va_end(args);
}
else
MyLog(LOGA_DEBUG, "Assertion succeeded, file %s, line %d, description: %s",
filename, lineno, description);
}
void test1373OnFailure(void* context, MQTTAsync_failureData* response)
{
MyLog(LOGA_INFO, "In connect onFailure callback, context %p", context);
connecting = 0;
}
void test373OnConnect(void* context, MQTTAsync_successData* response)
{
connected = 1;
connecting = 0;
connectCnt++;
MyLog(LOGA_INFO, "Established MQTT connection to %s",response->alt.connect.serverURI);
char MqttVersion[40];
switch (response->alt.connect.MQTTVersion)
{
case MQTTVERSION_3_1:
sprintf(MqttVersion," MQTT version 3.1");
break;
case MQTTVERSION_3_1_1:
sprintf(MqttVersion, " MQTT version 3.1.1");
break;
default:
sprintf(MqttVersion, " MQTT version %d",response->alt.connect.MQTTVersion);
}
MyLog(LOGA_INFO, " %s\n",MqttVersion);
MyLog(LOGA_INFO, "connectCnt %d\n",connectCnt);
}
void test373ConnectionLost(void* context, char* cause)
{
connected = 0;
MyLog(LOGA_INFO, "Disconnected from MQTT broker reason %s",cause);
}
void test373DeliveryComplete(void* context, MQTTAsync_token token)
{
pendingMessageCnt--;
}
void test373_onWriteSuccess(void* context, MQTTAsync_successData* response)
{
goodPublishCnt++;
}
void test373_onWriteFailure(void* context, MQTTAsync_failureData* response)
{
failedPublishCnt++;
}
int test373_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
{
return 0;
}
static char test373Payload[] = "No one is interested in this payload";
int test373SendPublishMessage(MQTTAsync handle,int id)
{
int rc = 0;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
char topic[ sizeof(unique) + 40];
sprintf(topic,"%s/test373/item_%03d",unique,id);
opts.onFailure = test373_onWriteFailure;
opts.onSuccess = test373_onWriteSuccess;
pubmsg.payload = test373Payload;
pubmsg.payloadlen = sizeof(test373Payload);
pubmsg.qos = 0;
rc = MQTTAsync_sendMessage( handle, topic,&pubmsg,&opts);
if (rc == MQTTASYNC_SUCCESS)
{
pendingMessageCnt++;
if (pendingMessageCnt > pendingMessageCntMax) pendingMessageCntMax = pendingMessageCnt;
}
return rc;
}
int test_373(struct Options options) int test_373(struct Options options)
{ {
return 0; char* testname = "test373";
MQTTAsync mqttasyncContext;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
int rc = 0;
char clientid[30 + sizeof(unique)];
heap_info* mqtt_mem = 0;
sprintf(clientid, "paho-test373-%s", unique);
rc = MQTTAsync_create(&mqttasyncContext, options.proxy_connection, clientid,
MQTTCLIENT_PERSISTENCE_NONE,
NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
goto exit;
}
opts.connectTimeout = 2;
opts.keepAliveInterval = 20;
opts.cleansession = 0;
opts.MQTTVersion = MQTTVERSION_DEFAULT;
opts.onSuccess = test373OnConnect;
opts.onFailure = test1373OnFailure;
opts.context = mqttasyncContext;
rc = MQTTAsync_setCallbacks(mqttasyncContext,mqttasyncContext,
test373ConnectionLost,
test373_messageArrived,
test373DeliveryComplete);
if (rc != MQTTASYNC_SUCCESS)
{
goto exit;
}
MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
while (connectCnt < 10)
{
MyLog(LOGA_INFO, "Connected %d connectCnt %d\n",connected,connectCnt);
mqtt_mem = Heap_get_info();
MyLog(LOGA_INFO, "PublishCnt %d, FailedCnt %d, Pending %d maxPending %d",
goodPublishCnt,failedPublishCnt,pendingMessageCnt,pendingMessageCntMax);
MyLog(LOGA_INFO, "MQTT mem current %ld, max %ld",mqtt_mem->current_size,mqtt_mem->max_size);
if (!connected)
{
/* (re)connect to the broker */
if (connecting)
{
MySleep((1+opts.connectTimeout) * 1000); /* but wait for all pending connect attempts to timeout */
}
else
{
rc = MQTTAsync_connect(mqttasyncContext, &opts);
if (rc != MQTTASYNC_SUCCESS)
{
failures++;
goto exit;
}
connecting = 1;
}
}
else
{
/* while connected send 100 message per second */
int topicId;
for(topicId=0; topicId < 100; topicId++)
{
rc = test373SendPublishMessage(mqttasyncContext,topicId);
if (rc != MQTTASYNC_SUCCESS) break;
}
MySleep(1000);
}
}
MySleep(5000);
MyLog(LOGA_INFO, "PublishCnt %d, FailedCnt %d, Pending %d maxPending %d",
goodPublishCnt,failedPublishCnt,pendingMessageCnt,pendingMessageCntMax);
MyLog(LOGA_INFO, "MQTT mem current %ld, max %ld",mqtt_mem->current_size,mqtt_mem->max_size);
MQTTAsync_disconnect(mqttasyncContext, NULL);
MyLog(LOGA_INFO, "PublishCnt %d, FailedCnt %d, Pending %d maxPending %d",
goodPublishCnt,failedPublishCnt,pendingMessageCnt,pendingMessageCntMax);
MyLog(LOGA_INFO, "MQTT mem current %ld, max %ld",mqtt_mem->current_size,mqtt_mem->max_size);
exit:
MQTTAsync_destroy(&mqttasyncContext);
return failures;
} }
void handleTrace(enum MQTTASYNC_TRACE_LEVELS level, char* message) void handleTrace(enum MQTTASYNC_TRACE_LEVELS level, char* message)
...@@ -149,7 +344,7 @@ int main(int argc, char** argv) ...@@ -149,7 +344,7 @@ int main(int argc, char** argv)
{ {
int* numtests = &tests; int* numtests = &tests;
int rc = 0; int rc = 0;
int (*tests[])() = { NULL, test_373}; int (*tests[])() = { NULL, test_373};
sprintf(unique, "%u", rand()); sprintf(unique, "%u", rand());
MyLog(LOGA_INFO, "Random prefix/suffix is %s", unique); MyLog(LOGA_INFO, "Random prefix/suffix is %s", unique);
...@@ -179,3 +374,9 @@ int main(int argc, char** argv) ...@@ -179,3 +374,9 @@ int main(int argc, char** argv)
return rc; return rc;
} }
/* Local Variables: */
/* indent-tabs-mode: t */
/* c-basic-offset: 8 */
/* End: */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment