Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in
Toggle navigation
P
paho.mqtt.c
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
eclipse
paho.mqtt.c
Commits
d889b818
Commit
d889b818
authored
Dec 07, 2017
by
Ian Craggs
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Add latest restart test changes
parent
b4278276
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
92 additions
and
92 deletions
+92
-92
Makefile
Makefile
+1
-1
test6.c
test/test6.c
+91
-91
No files found.
Makefile
View file @
d889b818
...
...
@@ -96,7 +96,7 @@ SYNC_TESTS = ${addprefix ${blddir}/test/,${TEST_FILES_C}}
TEST_FILES_CS
=
test3
SYNC_SSL_TESTS
=
${
addprefix
${
blddir
}
/test/,
${
TEST_FILES_CS
}}
TEST_FILES_A
=
test4 test
9 test_mqtt4async
TEST_FILES_A
=
test4 test
6 test9 test_mqtt4async test_issue373
ASYNC_TESTS
=
${
addprefix
${
blddir
}
/test/,
${
TEST_FILES_A
}}
TEST_FILES_AS
=
test5
...
...
test/test6.c
View file @
d889b818
/*******************************************************************************
* Copyright (c) 2011, 201
7
IBM Corp.
* Copyright (c) 2011, 201
4
IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
...
...
@@ -17,11 +17,12 @@
/**
* @file
* Async C client program for the MQTT
restart/recovery test suite
* Async C client program for the MQTT
v3 restart/recovery test suite.
*/
#include "MQTTAsync.h"
#define NO_HEAP_TRACKING
#include "Heap.h"
#include <string.h>
#include <stdlib.h>
...
...
@@ -44,12 +45,12 @@ static char sub_topic[200];
struct
{
char
*
connection
;
/**< connection to system under test. */
char
*
connection
;
/**< connection to system under test. */
char
**
connections
;
/**< HA connection list */
int
connection_count
;
char
*
control_connection
;
/**< MQTT control connection, for test sync */
char
*
topic
;
/**< test message topic */
char
*
control_topic
;
/**< topic for control messages */
char
*
control_connection
;
/**< MQTT control connection, for test sync */
char
*
topic
;
char
*
control_topic
;
char
*
clientid
;
int
slot_no
;
int
qos
;
...
...
@@ -58,24 +59,22 @@ struct
char
*
password
;
int
verbose
;
int
persistence
;
int
payload_len
;
}
opts
=
{
"tcp://localhost:188
4
"
,
"tcp://localhost:188
5
"
,
NULL
,
0
,
"tcp://localhost:7777"
,
"Eclipse/Paho/restart_test"
,
"Eclipse/Paho/restart_test/control"
,
"C_broken_client"
,
1
,
/* slot_no */
0
,
/* QoS */
0
,
/* retained */
1
,
2
,
0
,
NULL
,
NULL
,
0
,
1
,
1000
0
,
};
void
getopts
(
int
argc
,
char
**
argv
)
...
...
@@ -168,10 +167,6 @@ void getopts(int argc, char** argv)
}
}
#if 0
#include <logaX.h> /* For general log messages */
#define MyLog logaLine
#else
#define LOGA_DEBUG 0
#define LOGA_ALWAYS 1
#define LOGA_INFO 2
...
...
@@ -204,11 +199,17 @@ void MyLog(int log_level, char* format, ...)
printf
(
"%s
\n
"
,
msg_buf
);
fflush
(
stdout
);
}
#endif
void
MySleep
(
long
milliseconds
)
{
#if defined(WIN32) || defined(WIN64)
Sleep
(
milliseconds
);
#else
usleep
(
milliseconds
*
1000
);
#endif
}
#if defined(WIN32) || defined(_WINDOWS)
#define mysleep(A) Sleep(1000*A)
#define START_TIME_TYPE DWORD
static
DWORD
start_time
=
0
;
START_TIME_TYPE
start_clock
(
void
)
...
...
@@ -216,7 +217,6 @@ START_TIME_TYPE start_clock(void)
return
GetTickCount
();
}
#elif defined(AIX)
#define mysleep sleep
#define START_TIME_TYPE struct timespec
START_TIME_TYPE
start_clock
(
void
)
{
...
...
@@ -225,9 +225,8 @@ START_TIME_TYPE start_clock(void)
return
start
;
}
#else
#define mysleep sleep
#define START_TIME_TYPE struct timeval
static
struct
timeval
start_time
;
/* TODO - unused - remove? static struct timeval start_time; */
START_TIME_TYPE
start_clock
(
void
)
{
struct
timeval
start_time
;
...
...
@@ -236,41 +235,6 @@ START_TIME_TYPE start_clock(void)
}
#endif
int
tests
=
0
;
int
failures
=
0
;
FILE
*
xml
;
START_TIME_TYPE
global_start_time
;
char
output
[
3000
];
char
*
cur_output
=
output
;
#define assert(a, b, c, d) myassert(__FILE__, __LINE__, a, b, c, d)
#define assert1(a, b, c, d, e) myassert(__FILE__, __LINE__, a, b, c, d, e)
void
myassert
(
char
*
filename
,
int
lineno
,
char
*
description
,
int
value
,
char
*
format
,
...)
{
++
tests
;
if
(
!
value
)
{
va_list
args
;
++
failures
;
MyLog
(
LOGA_INFO
,
"Assertion failed, file %s, line %d, description: %s"
,
filename
,
lineno
,
description
);
va_start
(
args
,
format
);
vprintf
(
format
,
args
);
va_end
(
args
);
cur_output
+=
sprintf
(
cur_output
,
"<failure type=
\"
%s
\"
>file %s, line %d </failure>
\n
"
,
description
,
filename
,
lineno
);
}
else
MyLog
(
LOGA_DEBUG
,
"Assertion succeeded, file %s, line %d, description: %s"
,
filename
,
lineno
,
description
);
}
#if defined(WIN32)
long
elapsed
(
START_TIME_TYPE
start_time
)
{
...
...
@@ -333,9 +297,13 @@ void control_connectionLost(void* context, char* cause)
*/
int
control_messageArrived
(
void
*
context
,
char
*
topicName
,
int
topicLen
,
MQTTAsync_message
*
m
)
{
MyLog
(
LOGA_INFO
,
"Control message arrived: %.*s"
,
m
->
payloadlen
,
m
->
payload
);
if
(
strcmp
(
m
->
payload
,
"stop"
)
==
0
)
MyLog
(
LOGA_ALWAYS
,
"Control message arrived: %.*s %s"
,
m
->
payloadlen
,
m
->
payload
,
(
wait_message
==
NULL
)
?
"None"
:
wait_message
);
if
(
strncmp
(
m
->
payload
,
"stop"
,
4
)
==
0
)
{
MyLog
(
LOGA_ALWAYS
,
"Stop message arrived, stopping..."
);
stopping
=
1
;
}
else
if
(
wait_message
!=
NULL
&&
strncmp
(
wait_message
,
m
->
payload
,
strlen
(
wait_message
))
==
0
)
{
...
...
@@ -361,8 +329,8 @@ int control_send(char* message)
int
rc
=
0
;
MQTTAsync_responseOptions
ropts
=
MQTTAsync_responseOptions_initializer
;
MyLog
(
LOGA_ALWAYS
,
"Sending control message: %s"
,
message
);
sprintf
(
buf
,
"%s: %s"
,
opts
.
clientid
,
message
);
MyLog
(
LOGA_ALWAYS
,
"Sending control message: %s"
,
message
);
rc
=
MQTTAsync_send
(
control_client
,
pub_topic
,
(
int
)
strlen
(
buf
),
buf
,
1
,
0
,
&
ropts
);
MyLog
(
LOGA_DEBUG
,
"Control message sent: %s"
,
buf
);
...
...
@@ -381,9 +349,9 @@ int control_wait(char* message)
wait_message
=
message
;
sprintf
(
buf
,
"waiting for: %s"
,
message
);
MyLog
(
LOGA_ALWAYS
,
"%s"
,
buf
);
control_send
(
buf
);
MyLog
(
LOGA_ALWAYS
,
"waiting for: %s"
,
message
);
while
(
control_found
==
0
&&
stopping
==
0
)
{
if
(
++
count
==
300
)
...
...
@@ -392,10 +360,8 @@ int control_wait(char* message)
MyLog
(
LOGA_ALWAYS
,
"Failed to receive message %s, stopping "
,
message
);
return
0
;
/* time out and tell the caller the message was not found */
}
mysleep
(
1
);
MySleep
(
1000
);
}
MyLog
(
LOGA_ALWAYS
,
(
control_found
==
0
)
?
"Waited... not found"
:
"Waited... found %d"
,
control_found
);
return
control_found
;
}
...
...
@@ -412,7 +378,7 @@ int control_which(char* message1, char* message2)
{
if
(
++
count
==
300
)
return
0
;
/* time out and tell the caller the message was not found */
mysleep
(
1
);
MySleep
(
1000
);
}
return
control_found
;
}
...
...
@@ -517,13 +483,14 @@ void connectionLost(void* context, char* cause)
{
conn_opts
.
serverURIcount
=
opts
.
connection_count
;
conn_opts
.
serverURIs
=
opts
.
connections
;
printf
(
"reconnecting to first serverURI %s
\n
"
,
conn_opts
.
serverURIs
[
0
]);
}
else
{
conn_opts
.
serverURIcount
=
0
;
conn_opts
.
serverURIs
=
NULL
;
}
//printf("reconnecting to first serverURI %s\n", conn_opts.serverURIs[0]);
MyLog
(
LOGA_ALWAYS
,
"Starting reconnect attempt"
);
rc
=
MQTTAsync_connect
(
context
,
&
conn_opts
);
if
(
rc
!=
MQTTASYNC_SUCCESS
)
{
...
...
@@ -544,6 +511,12 @@ int recreateReconnect(void)
MQTTAsync_destroy
(
&
client
);
/* destroy the client object so that we force persistence to be read on recreate */
heap_info
*
mqtt_mem
=
0
;
mqtt_mem
=
Heap_get_info
();
MyLog
(
LOGA_INFO
,
"MQTT mem current %ld, max %ld"
,
mqtt_mem
->
current_size
,
mqtt_mem
->
max_size
);
//if (mqtt_mem->current_size > 20)
// HeapScan(5);
rc
=
MQTTAsync_create
(
&
client
,
opts
.
connection
,
opts
.
clientid
,
MQTTCLIENT_PERSISTENCE_DEFAULT
,
NULL
);
if
(
rc
!=
MQTTASYNC_SUCCESS
)
{
...
...
@@ -612,7 +585,10 @@ int waitForCompletion(START_TIME_TYPE start_time)
int
wait_count
=
0
;
int
limit
=
120
;
mysleep
(
1
);
MyLog
(
LOGA_ALWAYS
,
"Wait for completion"
);
if
(
opts
.
qos
==
0
)
limit
=
30
;
/* we aren't going to get back all QoS 0 messages anyway */
MySleep
(
1000
);
while
(
arrivedCount
<
expectedCount
)
{
if
(
arrivedCount
>
lastreport
)
...
...
@@ -621,15 +597,18 @@ int waitForCompletion(START_TIME_TYPE start_time)
arrivedCount
,
expectedCount
,
elapsed
(
start_time
)
/
1000
);
lastreport
=
arrivedCount
;
}
mysleep
(
1
);
MySleep
(
1000
);
if
(
opts
.
persistence
&&
connection_lost
)
recreateReconnect
();
if
(
++
wait_count
>
limit
||
stopping
)
break
;
}
last_completion_time
=
elapsed
(
start_time
)
/
1000
;
MyLog
(
LOGA_ALWAYS
,
"Extra wait to see if any duplicates arrive"
);
mysleep
(
10
);
/* check if any duplicate messages arrive */
if
(
opts
.
qos
>
0
)
{
MyLog
(
LOGA_ALWAYS
,
"Extra wait to see if any duplicates arrive"
);
MySleep
(
10000
);
/* check if any duplicate messages arrive */
}
MyLog
(
LOGA_ALWAYS
,
"%d messages arrived out of %d expected, in %d seconds"
,
arrivedCount
,
expectedCount
,
elapsed
(
start_time
)
/
1000
);
return
success
(
expectedCount
);
...
...
@@ -652,7 +631,6 @@ void one_iteration(void)
START_TIME_TYPE
start_time
;
int
last_expected_count
=
expectedCount
;
int
test_interval
=
30
;
char
*
payload
=
malloc
(
opts
.
payload_len
);
if
(
control_wait
(
"start_measuring"
)
==
0
)
goto
exit
;
...
...
@@ -667,9 +645,11 @@ void one_iteration(void)
global_start_time
=
start_clock
();
for
(
i
=
1
;
i
<=
test_count
;
++
i
)
{
char
payload
[
128
];
sprintf
(
payload
,
"message number %d"
,
i
);
rc
=
MQTTAsync_send
(
client
,
opts
.
topic
,
opts
.
payload_len
,
payload
,
rc
=
MQTTAsync_send
(
client
,
opts
.
topic
,
(
int
)(
strlen
(
payload
)
+
1
)
,
payload
,
opts
.
qos
,
opts
.
retained
,
NULL
);
while
(
rc
!=
MQTTASYNC_SUCCESS
)
{
...
...
@@ -677,26 +657,32 @@ void one_iteration(void)
recreateReconnect
();
if
(
stopping
)
goto
exit
;
mysleep
(
1
);
MySleep
(
1000
);
rc
=
MQTTAsync_send
(
client
,
opts
.
topic
,
(
int
)(
strlen
(
payload
)
+
1
),
payload
,
opts
.
qos
,
opts
.
retained
,
NULL
);
while
(
seqno
-
messagesSent
>
2000
)
MySleep
(
1000
);
}
}
MyLog
(
LOGA_INFO
,
"Messages sent... waiting for echoes"
);
while
(
arrivedCount
<
test_count
)
{
if
(
opts
.
persistence
&&
connection_lost
)
recreateReconnect
();
if
(
stopping
)
goto
exit
;
mysleep
(
1
);
printf
(
"arrivedCount %d
\n
"
,
arrivedCount
);
MySleep
(
1000
);
MyLog
(
LOGA_ALWAYS
,
"arrivedCount %d
"
,
arrivedCount
);
}
measuring
=
0
;
/* Now set a target of 30 seconds total round trip */
if
(
last_completion_time
==
-
1
)
if
(
1
)
//
last_completion_time == -1)
{
MyLog
(
LOGA_ALWAYS
,
"Round trip time for %d messages is %d ms"
,
test_count
,
roundtrip_time
);
expectedCount
=
1000
*
test_count
*
test_interval
/
roundtrip_time
/
2
;
// test_count messages in 3039 ms: (test_interval * 1000) / roundtrip_time * test_count
//expectedCount = 1000 * test_count * test_interval / roundtrip_time / 2;
expectedCount
=
(
test_interval
*
1000
)
/
roundtrip_time
*
test_count
;
}
else
{
...
...
@@ -722,22 +708,24 @@ void one_iteration(void)
sprintf
(
payload
,
"message number %d"
,
seqno
);
rc
=
MQTTAsync_send
(
client
,
opts
.
topic
,
(
int
)(
strlen
(
payload
)
+
1
),
payload
,
opts
.
qos
,
opts
.
retained
,
&
ropts
);
assert
(
"Good rc from send"
,
rc
==
MQTTASYNC_SUCCESS
,
"rc was %d "
,
rc
);
while
(
rc
!=
MQTTASYNC_SUCCESS
)
{
MyLog
(
LOGA_
DEBUG
,
"Rc %d from publish with payload %s, retrying"
,
rc
,
payload
);
MyLog
(
LOGA_
INFO
,
"Rc %d from publish with payload %s, retrying"
,
rc
,
payload
);
if
(
opts
.
persistence
&&
(
connection_lost
||
rc
==
MQTTASYNC_DISCONNECTED
))
recreateReconnect
();
if
(
stopping
)
goto
exit
;
mysleep
(
1
);
MySleep
(
1000
);
rc
=
MQTTAsync_send
(
client
,
opts
.
topic
,
(
int
)(
strlen
(
payload
)
+
1
),
payload
,
opts
.
qos
,
opts
.
retained
,
&
ropts
);
assert
(
"Good rc from send"
,
rc
==
MQTTASYNC_SUCCESS
,
"rc was %d "
,
rc
);
}
//MyLog(LOGA_DEBUG, "Successful publish with payload %s", payload);
while
(
seqno
-
messagesSent
>
2000
)
mysleep
(
1
);
//while (seqno - messagesSent > 2000)
//{
//if (opts.persistence && (connection_lost || rc == MQTTASYNC_DISCONNECTED))
// recreateReconnect();
//}
// MySleep(1000);
}
MyLog
(
LOGA_ALWAYS
,
"%d messages sent in %d seconds"
,
expectedCount
,
elapsed
(
start_time
)
/
1000
);
...
...
@@ -840,11 +828,14 @@ int sendAndReceive(void)
}
/* wait to know that the controlling process is running before connecting to the SUT */
control_wait
(
"who is ready?"
);
if
(
control_wait
(
"who is ready?"
)
==
0
)
{
MyLog
(
LOGA_ALWAYS
,
"Wait for controller failed"
);
goto
exit
;
}
/* connect cleansession, and then disconnect, to clean up */
conn_opts
.
keepAliveInterval
=
10
;
conn_opts
.
maxInflight
=
test_count
;
conn_opts
.
username
=
opts
.
username
;
conn_opts
.
password
=
opts
.
password
;
conn_opts
.
cleansession
=
1
;
...
...
@@ -869,7 +860,7 @@ int sendAndReceive(void)
}
while
(
client_cleaned
==
0
)
mysleep
(
1
);
MySleep
(
1000
);
MyLog
(
LOGA_ALWAYS
,
"Client state cleaned up"
);
...
...
@@ -887,7 +878,7 @@ int sendAndReceive(void)
/* wait until subscribed */
while
(
client_subscribed
==
0
)
mysleep
(
1
);
MySleep
(
1000
);
if
(
client_subscribed
!=
1
)
goto
disconnect_exit
;
...
...
@@ -925,6 +916,7 @@ void control_onSubscribe(void* context, MQTTAsync_successData* response)
MyLog
(
LOGA_DEBUG
,
"In control subscribe onSuccess callback %p granted qos %d"
,
c
,
response
->
alt
.
qos
);
control_subscribed
=
1
;
MyLog
(
LOGA_ALWAYS
,
"Connected and subscribed to control connection"
);
}
void
control_onFailure
(
void
*
context
,
MQTTAsync_failureData
*
response
)
...
...
@@ -946,6 +938,7 @@ void control_onConnect(void* context, MQTTAsync_successData* response)
ropts
.
onSuccess
=
control_onSubscribe
;
ropts
.
onFailure
=
control_onFailure
;
ropts
.
context
=
c
;
MyLog
(
LOGA_ALWAYS
,
"Subscribing to control topic %s"
,
sub_topic
);
if
((
rc
=
MQTTAsync_subscribe
(
c
,
sub_topic
,
2
,
&
ropts
))
!=
MQTTASYNC_SUCCESS
)
{
MyLog
(
LOGA_ALWAYS
,
"control MQTTAsync_subscribe failed, rc %d"
,
rc
);
...
...
@@ -955,7 +948,7 @@ void control_onConnect(void* context, MQTTAsync_successData* response)
void
trace_callback
(
enum
MQTTASYNC_TRACE_LEVELS
level
,
char
*
message
)
{
if
(
level
==
MQTTASYNC_TRACE_ERROR
||
strstr
(
message
,
"Connect"
)
||
strstr
(
message
,
"failed"
))
//
if (level == MQTTASYNC_TRACE_ERROR || strstr(message, "Connect") || strstr(message, "failed"))
printf
(
"Trace : %d, %s
\n
"
,
level
,
message
);
}
...
...
@@ -988,7 +981,7 @@ int main(int argc, char** argv)
MyLog
(
LOGA_ALWAYS
,
"Starting with clientid %s"
,
opts
.
clientid
);
//MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_MAXIMUM
);
MQTTAsync_setTraceLevel
(
MQTTASYNC_TRACE_ERROR
);
MQTTAsync_setTraceCallback
(
trace_callback
);
rc
=
MQTTAsync_create
(
&
control_client
,
opts
.
control_connection
,
...
...
@@ -1022,7 +1015,7 @@ int main(int argc, char** argv)
}
while
(
control_subscribed
==
0
)
mysleep
(
1
);
MySleep
(
1000
);
if
(
control_subscribed
!=
1
)
goto
destroy_exit
;
...
...
@@ -1035,5 +1028,12 @@ exit:
destroy_exit:
MQTTAsync_destroy
(
&
control_client
);
#include "Heap.h"
heap_info
*
mqtt_mem
=
0
;
mqtt_mem
=
Heap_get_info
();
MyLog
(
LOGA_INFO
,
"MQTT mem current %ld, max %ld"
,
mqtt_mem
->
current_size
,
mqtt_mem
->
max_size
);
if
(
mqtt_mem
->
current_size
>
0
)
/*failures++*/
;
/* consider any not freed memory as failure */
return
0
;
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment