Commit 7c34e20a authored by Juergen Kosel's avatar Juergen Kosel

Merge branch 'test4issue373' into 373

* test4issue373:
  Add log output for failed message queueing in test due to disconnect
  Add trace output to verify that reducing the tcp send buffer forces TCPSOCKET_INTERRUPTED
  Reduce the tcp snd buffer to provocate issue #385
  Move the evaluation of the not freed memory _after_ call of MQTTAsync_destroy()
  Fix Windows build of test_issue373 and increase number of iterations
  Consider any not freed memory as test failure
  Modified number of connect iterations and messages send per second
  Moved the decrement of pendingMessageCnt to another call back function
  Add test for issue #373 and #385
  Add skeleton of test_issue373
parents f5aeca49 1f36a313
......@@ -532,6 +532,12 @@ int Socket_putdatas(int socket, char* buf0, size_t buf0len, int count, char** bu
}
}
exit:
#if 1
if (rc == TCPSOCKET_INTERRUPTED)
{
Log(LOG_ERROR, -1, "Socket_putdatas: TCPSOCKET_INTERRUPTED");
}
#endif
FUNC_EXIT_RC(rc);
return rc;
}
......@@ -707,6 +713,13 @@ int Socket_new(char* addr, int port, int* sock)
if (setsockopt(*sock, SOL_SOCKET, SO_NOSIGPIPE, (void*)&opt, sizeof(opt)) != 0)
Log(LOG_ERROR, -1, "Could not set SO_NOSIGPIPE for socket %d", *sock);
#endif
#if 1
{
int optsend = 2 * 1440;
if (setsockopt(*sock, SOL_SOCKET, SO_SNDBUF, (void*)&optsend, sizeof(optsend)) != 0)
Log(LOG_ERROR, -1, "Could not set SO_SNDBUF for socket %d", *sock);
}
#endif
Log(TRACE_MIN, -1, "New socket %d for %s, port %d", *sock, addr, port);
if (Socket_addSocket(*sock) == SOCKET_ERROR)
......
......@@ -449,3 +449,13 @@ SET_TESTS_PROPERTIES(
test9-6-offline-buffering-max-buffered-binary-will
PROPERTIES TIMEOUT 540
)
ADD_EXECUTABLE(
test_issue373
test_issue373.c
)
TARGET_LINK_LIBRARIES(
test_issue373
paho-mqtt3a
)
/*******************************************************************************
* Copyright (c) 2012, 2017 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
*******************************************************************************/
/**
* @file
* Test for issues 373, 385: Memory leak and segmentation fault during connection lost and reconnect
*
*/
#include "MQTTAsync.h"
#include <string.h>
#include <stdlib.h>
#include "Thread.h"
#if !defined(_WINDOWS)
#include <sys/time.h>
#include <sys/socket.h>
#include <unistd.h>
#include <errno.h>
#else
#include <windows.h>
#endif
#include "Heap.h" // for Heap_get_info
// undefine macros from Heap.h:
#undef malloc
#undef realloc
#undef free
char unique[50]; // unique suffix/prefix to add to clientid/topic etc
#define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0]))
void usage(void)
{
printf("help!!\n");
exit(EXIT_FAILURE);
}
struct Options
{
char* connection; /**< connection to system under test. */
char* proxy_connection; /**< connection to proxy */
int verbose;
int test_no;
} options =
{
"iot.eclipse.org:1883",
"localhost:1883",
0,
0,
};
void getopts(int argc, char** argv)
{
int count = 1;
while (count < argc)
{
if (strcmp(argv[count], "--test_no") == 0)
{
if (++count < argc)
options.test_no = atoi(argv[count]);
else
usage();
}
else if (strcmp(argv[count], "--connection") == 0)
{
if (++count < argc)
options.connection = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--proxy_connection") == 0)
{
if (++count < argc)
options.proxy_connection = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--verbose") == 0)
options.verbose = 1;
count++;
}
}
#define LOGA_DEBUG 0
#define LOGA_INFO 1
#include <stdarg.h>
#include <time.h>
#include <sys/timeb.h>
void MyLog(int LOGA_level, char* format, ...)
{
static char msg_buf[256];
va_list args;
struct timeb ts;
struct tm *timeinfo;
if (LOGA_level == LOGA_DEBUG && options.verbose == 0)
return;
ftime(&ts);
timeinfo = localtime(&ts.time);
strftime(msg_buf, 80, "%Y%m%d %H%M%S", timeinfo);
sprintf(&msg_buf[strlen(msg_buf)], ".%.3hu ", ts.millitm);
va_start(args, format);
vsnprintf(&msg_buf[strlen(msg_buf)], sizeof(msg_buf) - strlen(msg_buf),
format, args);
va_end(args);
printf("%s\n", msg_buf);
fflush(stdout);
}
void MySleep(long milliseconds)
{
#if defined(WIN32) || defined(WIN64)
Sleep(milliseconds);
#else
usleep(milliseconds*1000);
#endif
}
#define assert(a, b, c, d) myassert(__FILE__, __LINE__, a, b, c, d)
int tests = 0;
int failures = 0;
int connected = 0;
int pendingMessageCnt = 0; /* counter of messages which are currently queued for publish */
int pendingMessageCntMax = 0;
int failedPublishCnt = 0;
int goodPublishCnt = 0;
int connectCnt = 0;
int connecting = 0;
void myassert(char* filename, int lineno, char* description, int value,
char* format, ...)
{
++tests;
if (!value)
{
va_list args;
++failures;
MyLog(LOGA_INFO, "Assertion failed, file %s, line %d, description: %s", filename,
lineno, description);
va_start(args, format);
vprintf(format, args);
va_end(args);
}
else
MyLog(LOGA_DEBUG, "Assertion succeeded, file %s, line %d, description: %s",
filename, lineno, description);
}
void test1373OnFailure(void* context, MQTTAsync_failureData* response)
{
MyLog(LOGA_INFO, "In connect onFailure callback, context %p", context);
connecting = 0;
}
void test373OnConnect(void* context, MQTTAsync_successData* response)
{
connected = 1;
connecting = 0;
connectCnt++;
MyLog(LOGA_INFO, "Established MQTT connection to %s",response->alt.connect.serverURI);
char MqttVersion[40];
switch (response->alt.connect.MQTTVersion)
{
case MQTTVERSION_3_1:
sprintf(MqttVersion," MQTT version 3.1");
break;
case MQTTVERSION_3_1_1:
sprintf(MqttVersion, " MQTT version 3.1.1");
break;
default:
sprintf(MqttVersion, " MQTT version %d",response->alt.connect.MQTTVersion);
}
MyLog(LOGA_INFO, " %s\n",MqttVersion);
MyLog(LOGA_INFO, "connectCnt %d\n",connectCnt);
}
void test373ConnectionLost(void* context, char* cause)
{
connected = 0;
MyLog(LOGA_INFO, "Disconnected from MQTT broker reason %s",cause);
}
void test373DeliveryComplete(void* context, MQTTAsync_token token)
{
}
void test373_onWriteSuccess(void* context, MQTTAsync_successData* response)
{
pendingMessageCnt--;
goodPublishCnt++;
}
void test373_onWriteFailure(void* context, MQTTAsync_failureData* response)
{
pendingMessageCnt--;
failedPublishCnt++;
}
int test373_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
{
return 0;
}
static char test373Payload[] = "No one is interested in this payload";
int test373SendPublishMessage(MQTTAsync handle,int id)
{
int rc = 0;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
char topic[ sizeof(unique) + 40];
sprintf(topic,"%s/test373/item_%03d",unique,id);
opts.onFailure = test373_onWriteFailure;
opts.onSuccess = test373_onWriteSuccess;
pubmsg.payload = test373Payload;
pubmsg.payloadlen = sizeof(test373Payload);
pubmsg.qos = 0;
rc = MQTTAsync_sendMessage( handle, topic,&pubmsg,&opts);
if (rc == MQTTASYNC_SUCCESS)
{
pendingMessageCnt++;
if (pendingMessageCnt > pendingMessageCntMax) pendingMessageCntMax = pendingMessageCnt;
}
else
{
MyLog(LOGA_INFO, "Failed to queue message for send with retvalue %d",rc);
}
return rc;
}
int test_373(struct Options options)
{
char* testname = "test373";
MQTTAsync mqttasyncContext;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
int rc = 0;
char clientid[30 + sizeof(unique)];
heap_info* mqtt_mem = 0;
sprintf(clientid, "paho-test373-%s", unique);
rc = MQTTAsync_create(&mqttasyncContext, options.proxy_connection, clientid,
MQTTCLIENT_PERSISTENCE_NONE,
NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
goto exit;
}
opts.connectTimeout = 2;
opts.keepAliveInterval = 20;
opts.cleansession = 0;
opts.MQTTVersion = MQTTVERSION_DEFAULT;
opts.onSuccess = test373OnConnect;
opts.onFailure = test1373OnFailure;
opts.context = mqttasyncContext;
rc = MQTTAsync_setCallbacks(mqttasyncContext,mqttasyncContext,
test373ConnectionLost,
test373_messageArrived,
test373DeliveryComplete);
if (rc != MQTTASYNC_SUCCESS)
{
goto exit;
}
MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
while (connectCnt < 10)
{
if (!connected)
{
MyLog(LOGA_INFO, "Connected %d connectCnt %d\n",connected,connectCnt);
MyLog(LOGA_INFO, "PublishCnt %d, FailedCnt %d, Pending %d maxPending %d",
goodPublishCnt,failedPublishCnt,pendingMessageCnt,pendingMessageCntMax);
#if !defined(_WINDOWS)
mqtt_mem = Heap_get_info();
MyLog(LOGA_INFO, "MQTT mem current %ld, max %ld",mqtt_mem->current_size,mqtt_mem->max_size);
#endif
/* (re)connect to the broker */
if (connecting)
{
MySleep((1+opts.connectTimeout) * 1000); /* but wait for all pending connect attempts to timeout */
}
else
{
rc = MQTTAsync_connect(mqttasyncContext, &opts);
if (rc != MQTTASYNC_SUCCESS)
{
failures++;
goto exit;
}
connecting = 1;
}
}
else
{
/* while connected send 1000 message per second */
int topicId;
for(topicId=0; topicId < 1000; topicId++)
{
rc = test373SendPublishMessage(mqttasyncContext,topicId);
if (rc != MQTTASYNC_SUCCESS) break;
}
MySleep(100);
}
}
MySleep(5000);
MyLog(LOGA_INFO, "PublishCnt %d, FailedCnt %d, Pending %d maxPending %d",
goodPublishCnt,failedPublishCnt,pendingMessageCnt,pendingMessageCntMax);
#if !defined(_WINDOWS)
mqtt_mem = Heap_get_info();
MyLog(LOGA_INFO, "MQTT mem current %ld, max %ld",mqtt_mem->current_size,mqtt_mem->max_size);
#endif
MQTTAsync_disconnect(mqttasyncContext, NULL);
MyLog(LOGA_INFO, "PublishCnt %d, FailedCnt %d, Pending %d maxPending %d",
goodPublishCnt,failedPublishCnt,pendingMessageCnt,pendingMessageCntMax);
#if !defined(_WINDOWS)
mqtt_mem = Heap_get_info();
MyLog(LOGA_INFO, "MQTT mem current %ld, max %ld",mqtt_mem->current_size,mqtt_mem->max_size);
#endif
exit:
MQTTAsync_destroy(&mqttasyncContext);
#if !defined(_WINDOWS)
mqtt_mem = Heap_get_info();
MyLog(LOGA_INFO, "MQTT mem current %ld, max %ld",mqtt_mem->current_size,mqtt_mem->max_size);
if (mqtt_mem->current_size > 0) failures++; /* consider any not freed memory as failure */
#endif
return failures;
}
void handleTrace(enum MQTTASYNC_TRACE_LEVELS level, char* message)
{
printf("%s\n", message);
}
int main(int argc, char** argv)
{
int* numtests = &tests;
int rc = 0;
int (*tests[])() = { NULL, test_373};
sprintf(unique, "%u", rand());
MyLog(LOGA_INFO, "Random prefix/suffix is %s", unique);
MQTTAsync_setTraceCallback(handleTrace);
getopts(argc, argv);
if (options.test_no == 0)
{ /* run all the tests */
for (options.test_no = 1; options.test_no < ARRAY_SIZE(tests); ++options.test_no)
{
failures = 0;
MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
rc += tests[options.test_no](options); /* return number of failures. 0 = test succeeded */
}
}
else
{
MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
rc = tests[options.test_no](options); /* run just the selected test */
}
if (rc == 0)
MyLog(LOGA_INFO, "verdict pass");
else
MyLog(LOGA_INFO, "verdict fail");
return rc;
}
/* Local Variables: */
/* indent-tabs-mode: t */
/* c-basic-offset: 8 */
/* End: */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment