Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in
Toggle navigation
P
paho.mqtt.c
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
eclipse
paho.mqtt.c
Commits
81fba83b
Commit
81fba83b
authored
Feb 13, 2018
by
Ian Craggs
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Remove 373 test from Makefile... add the test source for posterity #416
parent
6eb1daab
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
398 additions
and
2 deletions
+398
-2
Makefile
Makefile
+2
-2
test_issue373.c
test/test_issue373.c
+396
-0
No files found.
Makefile
View file @
81fba83b
#*******************************************************************************
# Copyright (c) 2009, 201
7
IBM Corp.
# Copyright (c) 2009, 201
8
IBM Corp.
#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Eclipse Public License v1.0
...
...
@@ -96,7 +96,7 @@ SYNC_TESTS = ${addprefix ${blddir}/test/,${TEST_FILES_C}}
TEST_FILES_CS
=
test3
SYNC_SSL_TESTS
=
${
addprefix
${
blddir
}
/test/,
${
TEST_FILES_CS
}}
TEST_FILES_A
=
test4 test6 test9 test_mqtt4async
test_issue373
TEST_FILES_A
=
test4 test6 test9 test_mqtt4async
ASYNC_TESTS
=
${
addprefix
${
blddir
}
/test/,
${
TEST_FILES_A
}}
TEST_FILES_AS
=
test5
...
...
test/test_issue373.c
0 → 100644
View file @
81fba83b
/*******************************************************************************
* Copyright (c) 2012, 2017 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
*******************************************************************************/
/**
* @file
* Test for issues 373, 385: Memory leak and segmentation fault during connection lost and reconnect
*
*/
#include "MQTTAsync.h"
#include <string.h>
#include <stdlib.h>
#include "Thread.h"
#if !defined(_WINDOWS)
#include <sys/time.h>
#include <sys/socket.h>
#include <unistd.h>
#include <errno.h>
#else
#include <windows.h>
#endif
#include "Heap.h" // for Heap_get_info
// undefine macros from Heap.h:
#undef malloc
#undef realloc
#undef free
char
unique
[
50
];
// unique suffix/prefix to add to clientid/topic etc
#define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0]))
void
usage
(
void
)
{
printf
(
"help!!
\n
"
);
exit
(
EXIT_FAILURE
);
}
struct
Options
{
char
*
connection
;
/**< connection to system under test. */
char
*
proxy_connection
;
/**< connection to proxy */
int
verbose
;
int
test_no
;
}
options
=
{
"localhost:1883"
,
"localhost:1884"
,
0
,
0
,
};
void
getopts
(
int
argc
,
char
**
argv
)
{
int
count
=
1
;
while
(
count
<
argc
)
{
if
(
strcmp
(
argv
[
count
],
"--test_no"
)
==
0
)
{
if
(
++
count
<
argc
)
options
.
test_no
=
atoi
(
argv
[
count
]);
else
usage
();
}
else
if
(
strcmp
(
argv
[
count
],
"--connection"
)
==
0
)
{
if
(
++
count
<
argc
)
options
.
connection
=
argv
[
count
];
else
usage
();
}
else
if
(
strcmp
(
argv
[
count
],
"--proxy_connection"
)
==
0
)
{
if
(
++
count
<
argc
)
options
.
proxy_connection
=
argv
[
count
];
else
usage
();
}
else
if
(
strcmp
(
argv
[
count
],
"--verbose"
)
==
0
)
options
.
verbose
=
1
;
count
++
;
}
}
#define LOGA_DEBUG 0
#define LOGA_INFO 1
#include <stdarg.h>
#include <time.h>
#include <sys/timeb.h>
void
MyLog
(
int
LOGA_level
,
char
*
format
,
...)
{
static
char
msg_buf
[
256
];
va_list
args
;
struct
timeb
ts
;
struct
tm
*
timeinfo
;
if
(
LOGA_level
==
LOGA_DEBUG
&&
options
.
verbose
==
0
)
return
;
ftime
(
&
ts
);
timeinfo
=
localtime
(
&
ts
.
time
);
strftime
(
msg_buf
,
80
,
"%Y%m%d %H%M%S"
,
timeinfo
);
sprintf
(
&
msg_buf
[
strlen
(
msg_buf
)],
".%.3hu "
,
ts
.
millitm
);
va_start
(
args
,
format
);
vsnprintf
(
&
msg_buf
[
strlen
(
msg_buf
)],
sizeof
(
msg_buf
)
-
strlen
(
msg_buf
),
format
,
args
);
va_end
(
args
);
printf
(
"%s
\n
"
,
msg_buf
);
fflush
(
stdout
);
}
void
MySleep
(
long
milliseconds
)
{
#if defined(WIN32) || defined(WIN64)
Sleep
(
milliseconds
);
#else
usleep
(
milliseconds
*
1000
);
#endif
}
#define assert(a, b, c, d) myassert(__FILE__, __LINE__, a, b, c, d)
int
tests
=
0
;
int
failures
=
0
;
int
connected
=
0
;
int
pendingMessageCnt
=
0
;
/* counter of messages which are currently queued for publish */
int
pendingMessageCntMax
=
0
;
int
failedPublishCnt
=
0
;
int
goodPublishCnt
=
0
;
int
connectCnt
=
0
;
int
connecting
=
0
;
void
myassert
(
char
*
filename
,
int
lineno
,
char
*
description
,
int
value
,
char
*
format
,
...)
{
++
tests
;
if
(
!
value
)
{
va_list
args
;
++
failures
;
MyLog
(
LOGA_INFO
,
"Assertion failed, file %s, line %d, description: %s"
,
filename
,
lineno
,
description
);
va_start
(
args
,
format
);
vprintf
(
format
,
args
);
va_end
(
args
);
}
else
MyLog
(
LOGA_DEBUG
,
"Assertion succeeded, file %s, line %d, description: %s"
,
filename
,
lineno
,
description
);
}
void
test1373OnFailure
(
void
*
context
,
MQTTAsync_failureData
*
response
)
{
MyLog
(
LOGA_INFO
,
"In connect onFailure callback, context %p"
,
context
);
connecting
=
0
;
}
void
test373OnConnect
(
void
*
context
,
MQTTAsync_successData
*
response
)
{
connected
=
1
;
connecting
=
0
;
connectCnt
++
;
MyLog
(
LOGA_INFO
,
"Established MQTT connection to %s"
,
response
->
alt
.
connect
.
serverURI
);
char
MqttVersion
[
40
];
switch
(
response
->
alt
.
connect
.
MQTTVersion
)
{
case
MQTTVERSION_3_1
:
sprintf
(
MqttVersion
,
" MQTT version 3.1"
);
break
;
case
MQTTVERSION_3_1_1
:
sprintf
(
MqttVersion
,
" MQTT version 3.1.1"
);
break
;
default:
sprintf
(
MqttVersion
,
" MQTT version %d"
,
response
->
alt
.
connect
.
MQTTVersion
);
}
MyLog
(
LOGA_INFO
,
" %s
\n
"
,
MqttVersion
);
MyLog
(
LOGA_INFO
,
"connectCnt %d
\n
"
,
connectCnt
);
}
void
test373ConnectionLost
(
void
*
context
,
char
*
cause
)
{
connected
=
0
;
MyLog
(
LOGA_INFO
,
"Disconnected from MQTT broker reason %s"
,
cause
);
}
void
test373DeliveryComplete
(
void
*
context
,
MQTTAsync_token
token
)
{
}
void
test373_onWriteSuccess
(
void
*
context
,
MQTTAsync_successData
*
response
)
{
pendingMessageCnt
--
;
goodPublishCnt
++
;
}
void
test373_onWriteFailure
(
void
*
context
,
MQTTAsync_failureData
*
response
)
{
pendingMessageCnt
--
;
failedPublishCnt
++
;
}
int
test373_messageArrived
(
void
*
context
,
char
*
topicName
,
int
topicLen
,
MQTTAsync_message
*
message
)
{
return
0
;
}
static
char
test373Payload
[]
=
"No one is interested in this payload"
;
int
test373SendPublishMessage
(
MQTTAsync
handle
,
int
id
)
{
int
rc
=
0
;
MQTTAsync_responseOptions
opts
=
MQTTAsync_responseOptions_initializer
;
MQTTAsync_message
pubmsg
=
MQTTAsync_message_initializer
;
char
topic
[
sizeof
(
unique
)
+
40
];
sprintf
(
topic
,
"%s/test373/item_%03d"
,
unique
,
id
);
opts
.
onFailure
=
test373_onWriteFailure
;
opts
.
onSuccess
=
test373_onWriteSuccess
;
pubmsg
.
payload
=
test373Payload
;
pubmsg
.
payloadlen
=
sizeof
(
test373Payload
);
pubmsg
.
qos
=
1
;
rc
=
MQTTAsync_sendMessage
(
handle
,
topic
,
&
pubmsg
,
&
opts
);
if
(
rc
==
MQTTASYNC_SUCCESS
)
{
pendingMessageCnt
++
;
if
(
pendingMessageCnt
>
pendingMessageCntMax
)
pendingMessageCntMax
=
pendingMessageCnt
;
}
return
rc
;
}
int
test_373
(
struct
Options
options
)
{
char
*
testname
=
"test373"
;
MQTTAsync
mqttasyncContext
;
MQTTAsync_connectOptions
opts
=
MQTTAsync_connectOptions_initializer
;
MQTTAsync_willOptions
wopts
=
MQTTAsync_willOptions_initializer
;
int
rc
=
0
;
char
clientid
[
30
+
sizeof
(
unique
)];
heap_info
*
mqtt_mem
=
0
;
sprintf
(
clientid
,
"paho-test373-%s"
,
unique
);
rc
=
MQTTAsync_create
(
&
mqttasyncContext
,
options
.
proxy_connection
,
clientid
,
MQTTCLIENT_PERSISTENCE_NONE
,
NULL
);
assert
(
"good rc from create"
,
rc
==
MQTTASYNC_SUCCESS
,
"rc was %d
\n
"
,
rc
);
if
(
rc
!=
MQTTASYNC_SUCCESS
)
{
goto
exit
;
}
opts
.
connectTimeout
=
2
;
opts
.
keepAliveInterval
=
20
;
opts
.
cleansession
=
0
;
opts
.
MQTTVersion
=
MQTTVERSION_DEFAULT
;
opts
.
onSuccess
=
test373OnConnect
;
opts
.
onFailure
=
test1373OnFailure
;
opts
.
context
=
mqttasyncContext
;
rc
=
MQTTAsync_setCallbacks
(
mqttasyncContext
,
mqttasyncContext
,
test373ConnectionLost
,
test373_messageArrived
,
test373DeliveryComplete
);
if
(
rc
!=
MQTTASYNC_SUCCESS
)
{
goto
exit
;
}
MQTTAsync_setTraceLevel
(
MQTTASYNC_TRACE_ERROR
);
while
(
connectCnt
<
5
)
{
if
(
!
connected
)
{
MyLog
(
LOGA_INFO
,
"Connected %d connectCnt %d
\n
"
,
connected
,
connectCnt
);
MyLog
(
LOGA_INFO
,
"PublishCnt %d, FailedCnt %d, Pending %d maxPending %d"
,
goodPublishCnt
,
failedPublishCnt
,
pendingMessageCnt
,
pendingMessageCntMax
);
#if !defined(_WINDOWS)
mqtt_mem
=
Heap_get_info
();
MyLog
(
LOGA_INFO
,
"MQTT mem current %ld, max %ld"
,
mqtt_mem
->
current_size
,
mqtt_mem
->
max_size
);
#endif
/* (re)connect to the broker */
if
(
connecting
)
{
MySleep
((
1
+
opts
.
connectTimeout
)
*
1000
);
/* but wait for all pending connect attempts to timeout */
}
else
{
rc
=
MQTTAsync_connect
(
mqttasyncContext
,
&
opts
);
if
(
rc
!=
MQTTASYNC_SUCCESS
)
{
failures
++
;
goto
exit
;
}
connecting
=
1
;
}
}
else
{
/* while connected send 1000 message per second */
int
topicId
;
for
(
topicId
=
0
;
topicId
<
1000
;
topicId
++
)
{
rc
=
test373SendPublishMessage
(
mqttasyncContext
,
topicId
);
if
(
rc
!=
MQTTASYNC_SUCCESS
)
break
;
}
MySleep
(
100
);
}
}
MySleep
(
5000
);
MyLog
(
LOGA_INFO
,
"PublishCnt %d, FailedCnt %d, Pending %d maxPending %d"
,
goodPublishCnt
,
failedPublishCnt
,
pendingMessageCnt
,
pendingMessageCntMax
);
#if !defined(_WINDOWS)
mqtt_mem
=
Heap_get_info
();
MyLog
(
LOGA_INFO
,
"MQTT mem current %ld, max %ld"
,
mqtt_mem
->
current_size
,
mqtt_mem
->
max_size
);
#endif
MQTTAsync_disconnect
(
mqttasyncContext
,
NULL
);
MyLog
(
LOGA_INFO
,
"PublishCnt %d, FailedCnt %d, Pending %d maxPending %d"
,
goodPublishCnt
,
failedPublishCnt
,
pendingMessageCnt
,
pendingMessageCntMax
);
#if !defined(_WINDOWS)
mqtt_mem
=
Heap_get_info
();
MyLog
(
LOGA_INFO
,
"MQTT mem current %ld, max %ld"
,
mqtt_mem
->
current_size
,
mqtt_mem
->
max_size
);
#endif
exit:
MQTTAsync_destroy
(
&
mqttasyncContext
);
#if !defined(_WINDOWS)
mqtt_mem
=
Heap_get_info
();
MyLog
(
LOGA_INFO
,
"MQTT mem current %ld, max %ld"
,
mqtt_mem
->
current_size
,
mqtt_mem
->
max_size
);
if
(
mqtt_mem
->
current_size
>
0
)
failures
++
;
/* consider any not freed memory as failure */
#endif
return
failures
;
}
void
handleTrace
(
enum
MQTTASYNC_TRACE_LEVELS
level
,
char
*
message
)
{
printf
(
"%s
\n
"
,
message
);
}
int
main
(
int
argc
,
char
**
argv
)
{
int
*
numtests
=
&
tests
;
int
rc
=
0
;
int
(
*
tests
[])()
=
{
NULL
,
test_373
};
sprintf
(
unique
,
"%u"
,
rand
());
MyLog
(
LOGA_INFO
,
"Random prefix/suffix is %s"
,
unique
);
MQTTAsync_setTraceCallback
(
handleTrace
);
getopts
(
argc
,
argv
);
if
(
options
.
test_no
==
0
)
{
/* run all the tests */
for
(
options
.
test_no
=
1
;
options
.
test_no
<
ARRAY_SIZE
(
tests
);
++
options
.
test_no
)
{
failures
=
0
;
MQTTAsync_setTraceLevel
(
MQTTASYNC_TRACE_ERROR
);
rc
+=
tests
[
options
.
test_no
](
options
);
/* return number of failures. 0 = test succeeded */
}
}
else
{
MQTTAsync_setTraceLevel
(
MQTTASYNC_TRACE_ERROR
);
rc
=
tests
[
options
.
test_no
](
options
);
/* run just the selected test */
}
if
(
rc
==
0
)
MyLog
(
LOGA_INFO
,
"verdict pass"
);
else
MyLog
(
LOGA_INFO
,
"verdict fail"
);
return
rc
;
}
/* Local Variables: */
/* indent-tabs-mode: t */
/* c-basic-offset: 8 */
/* End: */
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment