Commit c9c5b9e2 authored by Juergen Kosel's avatar Juergen Kosel

Merge branch '373' of https://github.com/eclipse/paho.mqtt.c.git into 373

* '373' of https://github.com/eclipse/paho.mqtt.c.git:
  Add latest restart test changes
  Add test changes
parents 779ca42a d889b818
...@@ -96,7 +96,7 @@ SYNC_TESTS = ${addprefix ${blddir}/test/,${TEST_FILES_C}} ...@@ -96,7 +96,7 @@ SYNC_TESTS = ${addprefix ${blddir}/test/,${TEST_FILES_C}}
TEST_FILES_CS = test3 TEST_FILES_CS = test3
SYNC_SSL_TESTS = ${addprefix ${blddir}/test/,${TEST_FILES_CS}} SYNC_SSL_TESTS = ${addprefix ${blddir}/test/,${TEST_FILES_CS}}
TEST_FILES_A = test4 test9 test_mqtt4async TEST_FILES_A = test4 test6 test9 test_mqtt4async test_issue373
ASYNC_TESTS = ${addprefix ${blddir}/test/,${TEST_FILES_A}} ASYNC_TESTS = ${addprefix ${blddir}/test/,${TEST_FILES_A}}
TEST_FILES_AS = test5 TEST_FILES_AS = test5
......
/******************************************************************************* /*******************************************************************************
* Copyright (c) 2011, 2017 IBM Corp. * Copyright (c) 2011, 2014 IBM Corp.
* *
* All rights reserved. This program and the accompanying materials * All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0 * are made available under the terms of the Eclipse Public License v1.0
...@@ -17,11 +17,12 @@ ...@@ -17,11 +17,12 @@
/** /**
* @file * @file
* Async C client program for the MQTT restart/recovery test suite * Async C client program for the MQTT v3 restart/recovery test suite.
*/ */
#include "MQTTAsync.h" #include "MQTTAsync.h"
#define NO_HEAP_TRACKING
#include "Heap.h"
#include <string.h> #include <string.h>
#include <stdlib.h> #include <stdlib.h>
...@@ -44,12 +45,12 @@ static char sub_topic[200]; ...@@ -44,12 +45,12 @@ static char sub_topic[200];
struct struct
{ {
char* connection; /**< connection to system under test. */ char* connection; /**< connection to system under test. */
char** connections; /**< HA connection list */ char** connections; /**< HA connection list */
int connection_count; int connection_count;
char* control_connection; /**< MQTT control connection, for test sync */ char* control_connection; /**< MQTT control connection, for test sync */
char* topic; /**< test message topic */ char* topic;
char* control_topic; /**< topic for control messages */ char* control_topic;
char* clientid; char* clientid;
int slot_no; int slot_no;
int qos; int qos;
...@@ -60,20 +61,20 @@ struct ...@@ -60,20 +61,20 @@ struct
int persistence; int persistence;
} opts = } opts =
{ {
"tcp://localhost:1884", "tcp://localhost:1885",
NULL, NULL,
0, 0,
"tcp://localhost:7777", "tcp://localhost:7777",
"Eclipse/Paho/restart_test", "Eclipse/Paho/restart_test",
"Eclipse/Paho/restart_test/control", "Eclipse/Paho/restart_test/control",
"C_broken_client", "C_broken_client",
1, /* slot_no */ 1,
2, /* QoS */ 2,
0, /* retained */ 0,
NULL, NULL,
NULL, NULL,
0, 0,
1, 0,
}; };
void getopts(int argc, char** argv) void getopts(int argc, char** argv)
...@@ -166,10 +167,6 @@ void getopts(int argc, char** argv) ...@@ -166,10 +167,6 @@ void getopts(int argc, char** argv)
} }
} }
#if 0
#include <logaX.h> /* For general log messages */
#define MyLog logaLine
#else
#define LOGA_DEBUG 0 #define LOGA_DEBUG 0
#define LOGA_ALWAYS 1 #define LOGA_ALWAYS 1
#define LOGA_INFO 2 #define LOGA_INFO 2
...@@ -202,11 +199,17 @@ void MyLog(int log_level, char* format, ...) ...@@ -202,11 +199,17 @@ void MyLog(int log_level, char* format, ...)
printf("%s\n", msg_buf); printf("%s\n", msg_buf);
fflush(stdout); fflush(stdout);
} }
#endif
void MySleep(long milliseconds)
{
#if defined(WIN32) || defined(WIN64)
Sleep(milliseconds);
#else
usleep(milliseconds*1000);
#endif
}
#if defined(WIN32) || defined(_WINDOWS) #if defined(WIN32) || defined(_WINDOWS)
#define mysleep(A) Sleep(1000*A)
#define START_TIME_TYPE DWORD #define START_TIME_TYPE DWORD
static DWORD start_time = 0; static DWORD start_time = 0;
START_TIME_TYPE start_clock(void) START_TIME_TYPE start_clock(void)
...@@ -214,7 +217,6 @@ START_TIME_TYPE start_clock(void) ...@@ -214,7 +217,6 @@ START_TIME_TYPE start_clock(void)
return GetTickCount(); return GetTickCount();
} }
#elif defined(AIX) #elif defined(AIX)
#define mysleep sleep
#define START_TIME_TYPE struct timespec #define START_TIME_TYPE struct timespec
START_TIME_TYPE start_clock(void) START_TIME_TYPE start_clock(void)
{ {
...@@ -223,9 +225,8 @@ START_TIME_TYPE start_clock(void) ...@@ -223,9 +225,8 @@ START_TIME_TYPE start_clock(void)
return start; return start;
} }
#else #else
#define mysleep sleep
#define START_TIME_TYPE struct timeval #define START_TIME_TYPE struct timeval
static struct timeval start_time; /* TODO - unused - remove? static struct timeval start_time; */
START_TIME_TYPE start_clock(void) START_TIME_TYPE start_clock(void)
{ {
struct timeval start_time; struct timeval start_time;
...@@ -234,7 +235,6 @@ START_TIME_TYPE start_clock(void) ...@@ -234,7 +235,6 @@ START_TIME_TYPE start_clock(void)
} }
#endif #endif
#if defined(WIN32) #if defined(WIN32)
long elapsed(START_TIME_TYPE start_time) long elapsed(START_TIME_TYPE start_time)
{ {
...@@ -297,9 +297,13 @@ void control_connectionLost(void* context, char* cause) ...@@ -297,9 +297,13 @@ void control_connectionLost(void* context, char* cause)
*/ */
int control_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* m) int control_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* m)
{ {
MyLog(LOGA_INFO, "Control message arrived: %.*s", m->payloadlen, m->payload); MyLog(LOGA_ALWAYS, "Control message arrived: %.*s %s",
if (strcmp(m->payload, "stop") == 0) m->payloadlen, m->payload, (wait_message == NULL) ? "None" : wait_message);
if (strncmp(m->payload, "stop", 4) == 0)
{
MyLog(LOGA_ALWAYS, "Stop message arrived, stopping...");
stopping = 1; stopping = 1;
}
else if (wait_message != NULL && strncmp(wait_message, m->payload, else if (wait_message != NULL && strncmp(wait_message, m->payload,
strlen(wait_message)) == 0) strlen(wait_message)) == 0)
{ {
...@@ -325,8 +329,8 @@ int control_send(char* message) ...@@ -325,8 +329,8 @@ int control_send(char* message)
int rc = 0; int rc = 0;
MQTTAsync_responseOptions ropts = MQTTAsync_responseOptions_initializer; MQTTAsync_responseOptions ropts = MQTTAsync_responseOptions_initializer;
MyLog(LOGA_ALWAYS, "Sending control message: %s", message);
sprintf(buf, "%s: %s", opts.clientid, message); sprintf(buf, "%s: %s", opts.clientid, message);
MyLog(LOGA_ALWAYS, "Sending control message: %s", message);
rc = MQTTAsync_send(control_client, pub_topic, (int)strlen(buf), rc = MQTTAsync_send(control_client, pub_topic, (int)strlen(buf),
buf, 1, 0, &ropts); buf, 1, 0, &ropts);
MyLog(LOGA_DEBUG, "Control message sent: %s", buf); MyLog(LOGA_DEBUG, "Control message sent: %s", buf);
...@@ -345,9 +349,9 @@ int control_wait(char* message) ...@@ -345,9 +349,9 @@ int control_wait(char* message)
wait_message = message; wait_message = message;
sprintf(buf, "waiting for: %s", message); sprintf(buf, "waiting for: %s", message);
MyLog(LOGA_ALWAYS, "%s", buf);
control_send(buf); control_send(buf);
MyLog(LOGA_ALWAYS, "waiting for: %s", message);
while (control_found == 0 && stopping == 0) while (control_found == 0 && stopping == 0)
{ {
if (++count == 300) if (++count == 300)
...@@ -356,10 +360,8 @@ int control_wait(char* message) ...@@ -356,10 +360,8 @@ int control_wait(char* message)
MyLog(LOGA_ALWAYS, "Failed to receive message %s, stopping ", message); MyLog(LOGA_ALWAYS, "Failed to receive message %s, stopping ", message);
return 0; /* time out and tell the caller the message was not found */ return 0; /* time out and tell the caller the message was not found */
} }
mysleep(1); MySleep(1000);
} }
MyLog(LOGA_ALWAYS, (control_found == 0) ?
"Waited... not found" : "Waited... found %d" , control_found);
return control_found; return control_found;
} }
...@@ -376,7 +378,7 @@ int control_which(char* message1, char* message2) ...@@ -376,7 +378,7 @@ int control_which(char* message1, char* message2)
{ {
if (++count == 300) if (++count == 300)
return 0; /* time out and tell the caller the message was not found */ return 0; /* time out and tell the caller the message was not found */
mysleep(1); MySleep(1000);
} }
return control_found; return control_found;
} }
...@@ -481,13 +483,14 @@ void connectionLost(void* context, char* cause) ...@@ -481,13 +483,14 @@ void connectionLost(void* context, char* cause)
{ {
conn_opts.serverURIcount = opts.connection_count; conn_opts.serverURIcount = opts.connection_count;
conn_opts.serverURIs = opts.connections; conn_opts.serverURIs = opts.connections;
printf("reconnecting to first serverURI %s\n", conn_opts.serverURIs[0]);
} }
else else
{ {
conn_opts.serverURIcount = 0; conn_opts.serverURIcount = 0;
conn_opts.serverURIs = NULL; conn_opts.serverURIs = NULL;
} }
//printf("reconnecting to first serverURI %s\n", conn_opts.serverURIs[0]);
MyLog(LOGA_ALWAYS, "Starting reconnect attempt");
rc = MQTTAsync_connect(context, &conn_opts); rc = MQTTAsync_connect(context, &conn_opts);
if (rc != MQTTASYNC_SUCCESS) if (rc != MQTTASYNC_SUCCESS)
{ {
...@@ -508,6 +511,12 @@ int recreateReconnect(void) ...@@ -508,6 +511,12 @@ int recreateReconnect(void)
MQTTAsync_destroy(&client); /* destroy the client object so that we force persistence to be read on recreate */ MQTTAsync_destroy(&client); /* destroy the client object so that we force persistence to be read on recreate */
heap_info* mqtt_mem = 0;
mqtt_mem = Heap_get_info();
MyLog(LOGA_INFO, "MQTT mem current %ld, max %ld",mqtt_mem->current_size,mqtt_mem->max_size);
//if (mqtt_mem->current_size > 20)
// HeapScan(5);
rc = MQTTAsync_create(&client, opts.connection, opts.clientid, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL); rc = MQTTAsync_create(&client, opts.connection, opts.clientid, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
if (rc != MQTTASYNC_SUCCESS) if (rc != MQTTASYNC_SUCCESS)
{ {
...@@ -576,7 +585,10 @@ int waitForCompletion(START_TIME_TYPE start_time) ...@@ -576,7 +585,10 @@ int waitForCompletion(START_TIME_TYPE start_time)
int wait_count = 0; int wait_count = 0;
int limit = 120; int limit = 120;
mysleep(1); MyLog(LOGA_ALWAYS, "Wait for completion");
if (opts.qos == 0)
limit = 30; /* we aren't going to get back all QoS 0 messages anyway */
MySleep(1000);
while (arrivedCount < expectedCount) while (arrivedCount < expectedCount)
{ {
if (arrivedCount > lastreport) if (arrivedCount > lastreport)
...@@ -585,15 +597,18 @@ int waitForCompletion(START_TIME_TYPE start_time) ...@@ -585,15 +597,18 @@ int waitForCompletion(START_TIME_TYPE start_time)
arrivedCount, expectedCount, elapsed(start_time) / 1000); arrivedCount, expectedCount, elapsed(start_time) / 1000);
lastreport = arrivedCount; lastreport = arrivedCount;
} }
mysleep(1); MySleep(1000);
if (opts.persistence && connection_lost) if (opts.persistence && connection_lost)
recreateReconnect(); recreateReconnect();
if (++wait_count > limit || stopping) if (++wait_count > limit || stopping)
break; break;
} }
last_completion_time = elapsed(start_time) / 1000; last_completion_time = elapsed(start_time) / 1000;
MyLog(LOGA_ALWAYS, "Extra wait to see if any duplicates arrive"); if (opts.qos > 0)
mysleep(10); /* check if any duplicate messages arrive */ {
MyLog(LOGA_ALWAYS, "Extra wait to see if any duplicates arrive");
MySleep(10000); /* check if any duplicate messages arrive */
}
MyLog(LOGA_ALWAYS, "%d messages arrived out of %d expected, in %d seconds", MyLog(LOGA_ALWAYS, "%d messages arrived out of %d expected, in %d seconds",
arrivedCount, expectedCount, elapsed(start_time) / 1000); arrivedCount, expectedCount, elapsed(start_time) / 1000);
return success(expectedCount); return success(expectedCount);
...@@ -642,26 +657,32 @@ void one_iteration(void) ...@@ -642,26 +657,32 @@ void one_iteration(void)
recreateReconnect(); recreateReconnect();
if (stopping) if (stopping)
goto exit; goto exit;
mysleep(1); MySleep(1000);
rc = MQTTAsync_send(client, opts.topic, (int)(strlen(payload)+1), payload, rc = MQTTAsync_send(client, opts.topic, (int)(strlen(payload)+1), payload,
opts.qos, opts.retained, NULL); opts.qos, opts.retained, NULL);
while (seqno - messagesSent > 2000)
MySleep(1000);
} }
} }
MyLog(LOGA_INFO, "Messages sent... waiting for echoes"); MyLog(LOGA_INFO, "Messages sent... waiting for echoes");
while (arrivedCount < test_count) while (arrivedCount < test_count)
{ {
if (opts.persistence && connection_lost)
recreateReconnect();
if (stopping) if (stopping)
goto exit; goto exit;
mysleep(1); MySleep(1000);
printf("arrivedCount %d\n", arrivedCount); MyLog(LOGA_ALWAYS, "arrivedCount %d", arrivedCount);
} }
measuring = 0; measuring = 0;
/* Now set a target of 30 seconds total round trip */ /* Now set a target of 30 seconds total round trip */
if (last_completion_time == -1) if (1) //last_completion_time == -1)
{ {
MyLog(LOGA_ALWAYS, "Round trip time for %d messages is %d ms", test_count, roundtrip_time); MyLog(LOGA_ALWAYS, "Round trip time for %d messages is %d ms", test_count, roundtrip_time);
expectedCount = 1000 * test_count * test_interval / roundtrip_time / 2; // test_count messages in 3039 ms: (test_interval * 1000) / roundtrip_time * test_count
//expectedCount = 1000 * test_count * test_interval / roundtrip_time / 2;
expectedCount = (test_interval * 1000) / roundtrip_time * test_count;
} }
else else
{ {
...@@ -689,18 +710,22 @@ void one_iteration(void) ...@@ -689,18 +710,22 @@ void one_iteration(void)
opts.qos, opts.retained, &ropts); opts.qos, opts.retained, &ropts);
while (rc != MQTTASYNC_SUCCESS) while (rc != MQTTASYNC_SUCCESS)
{ {
MyLog(LOGA_DEBUG, "Rc %d from publish with payload %s, retrying", rc, payload); MyLog(LOGA_INFO, "Rc %d from publish with payload %s, retrying", rc, payload);
if (opts.persistence && (connection_lost || rc == MQTTASYNC_DISCONNECTED)) if (opts.persistence && (connection_lost || rc == MQTTASYNC_DISCONNECTED))
recreateReconnect(); recreateReconnect();
if (stopping) if (stopping)
goto exit; goto exit;
mysleep(1); MySleep(1000);
rc = MQTTAsync_send(client, opts.topic, (int)(strlen(payload)+1), payload, rc = MQTTAsync_send(client, opts.topic, (int)(strlen(payload)+1), payload,
opts.qos, opts.retained, &ropts); opts.qos, opts.retained, &ropts);
} }
//MyLog(LOGA_DEBUG, "Successful publish with payload %s", payload); //MyLog(LOGA_DEBUG, "Successful publish with payload %s", payload);
while (seqno - messagesSent > 2000) //while (seqno - messagesSent > 2000)
mysleep(1); //{
//if (opts.persistence && (connection_lost || rc == MQTTASYNC_DISCONNECTED))
// recreateReconnect();
//}
// MySleep(1000);
} }
MyLog(LOGA_ALWAYS, "%d messages sent in %d seconds", expectedCount, elapsed(start_time) / 1000); MyLog(LOGA_ALWAYS, "%d messages sent in %d seconds", expectedCount, elapsed(start_time) / 1000);
...@@ -803,7 +828,11 @@ int sendAndReceive(void) ...@@ -803,7 +828,11 @@ int sendAndReceive(void)
} }
/* wait to know that the controlling process is running before connecting to the SUT */ /* wait to know that the controlling process is running before connecting to the SUT */
control_wait("who is ready?"); if (control_wait("who is ready?") == 0)
{
MyLog(LOGA_ALWAYS, "Wait for controller failed");
goto exit;
}
/* connect cleansession, and then disconnect, to clean up */ /* connect cleansession, and then disconnect, to clean up */
conn_opts.keepAliveInterval = 10; conn_opts.keepAliveInterval = 10;
...@@ -831,7 +860,7 @@ int sendAndReceive(void) ...@@ -831,7 +860,7 @@ int sendAndReceive(void)
} }
while (client_cleaned == 0) while (client_cleaned == 0)
mysleep(1); MySleep(1000);
MyLog(LOGA_ALWAYS, "Client state cleaned up"); MyLog(LOGA_ALWAYS, "Client state cleaned up");
...@@ -849,7 +878,7 @@ int sendAndReceive(void) ...@@ -849,7 +878,7 @@ int sendAndReceive(void)
/* wait until subscribed */ /* wait until subscribed */
while (client_subscribed == 0) while (client_subscribed == 0)
mysleep(1); MySleep(1000);
if (client_subscribed != 1) if (client_subscribed != 1)
goto disconnect_exit; goto disconnect_exit;
...@@ -887,6 +916,7 @@ void control_onSubscribe(void* context, MQTTAsync_successData* response) ...@@ -887,6 +916,7 @@ void control_onSubscribe(void* context, MQTTAsync_successData* response)
MyLog(LOGA_DEBUG, "In control subscribe onSuccess callback %p granted qos %d", c, response->alt.qos); MyLog(LOGA_DEBUG, "In control subscribe onSuccess callback %p granted qos %d", c, response->alt.qos);
control_subscribed = 1; control_subscribed = 1;
MyLog(LOGA_ALWAYS, "Connected and subscribed to control connection");
} }
void control_onFailure(void* context, MQTTAsync_failureData* response) void control_onFailure(void* context, MQTTAsync_failureData* response)
...@@ -908,6 +938,7 @@ void control_onConnect(void* context, MQTTAsync_successData* response) ...@@ -908,6 +938,7 @@ void control_onConnect(void* context, MQTTAsync_successData* response)
ropts.onSuccess = control_onSubscribe; ropts.onSuccess = control_onSubscribe;
ropts.onFailure = control_onFailure; ropts.onFailure = control_onFailure;
ropts.context = c; ropts.context = c;
MyLog(LOGA_ALWAYS, "Subscribing to control topic %s", sub_topic);
if ((rc = MQTTAsync_subscribe(c, sub_topic, 2, &ropts)) != MQTTASYNC_SUCCESS) if ((rc = MQTTAsync_subscribe(c, sub_topic, 2, &ropts)) != MQTTASYNC_SUCCESS)
{ {
MyLog(LOGA_ALWAYS, "control MQTTAsync_subscribe failed, rc %d", rc); MyLog(LOGA_ALWAYS, "control MQTTAsync_subscribe failed, rc %d", rc);
...@@ -917,7 +948,7 @@ void control_onConnect(void* context, MQTTAsync_successData* response) ...@@ -917,7 +948,7 @@ void control_onConnect(void* context, MQTTAsync_successData* response)
void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message) void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message)
{ {
if (level == MQTTASYNC_TRACE_ERROR || strstr(message, "Connect") || strstr(message, "failed")) //if (level == MQTTASYNC_TRACE_ERROR || strstr(message, "Connect") || strstr(message, "failed"))
printf("Trace : %d, %s\n", level, message); printf("Trace : %d, %s\n", level, message);
} }
...@@ -950,7 +981,7 @@ int main(int argc, char** argv) ...@@ -950,7 +981,7 @@ int main(int argc, char** argv)
MyLog(LOGA_ALWAYS, "Starting with clientid %s", opts.clientid); MyLog(LOGA_ALWAYS, "Starting with clientid %s", opts.clientid);
//MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_MAXIMUM); MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
MQTTAsync_setTraceCallback(trace_callback); MQTTAsync_setTraceCallback(trace_callback);
rc = MQTTAsync_create(&control_client, opts.control_connection, rc = MQTTAsync_create(&control_client, opts.control_connection,
...@@ -984,7 +1015,7 @@ int main(int argc, char** argv) ...@@ -984,7 +1015,7 @@ int main(int argc, char** argv)
} }
while (control_subscribed == 0) while (control_subscribed == 0)
mysleep(1); MySleep(1000);
if (control_subscribed != 1) if (control_subscribed != 1)
goto destroy_exit; goto destroy_exit;
...@@ -997,5 +1028,12 @@ exit: ...@@ -997,5 +1028,12 @@ exit:
destroy_exit: destroy_exit:
MQTTAsync_destroy(&control_client); MQTTAsync_destroy(&control_client);
#include "Heap.h"
heap_info* mqtt_mem = 0;
mqtt_mem = Heap_get_info();
MyLog(LOGA_INFO, "MQTT mem current %ld, max %ld",mqtt_mem->current_size,mqtt_mem->max_size);
if (mqtt_mem->current_size > 0)
/*failures++*/; /* consider any not freed memory as failure */
return 0; return 0;
} }
...@@ -1860,7 +1860,7 @@ void test7cConnected(void* context, char* cause) ...@@ -1860,7 +1860,7 @@ void test7cConnected(void* context, char* cause)
void test7cOnConnectFailure(void* context, MQTTAsync_failureData* response) void test7cOnConnectFailure(void* context, MQTTAsync_failureData* response)
{ {
MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context); MyLog(LOGA_DEBUG, "In c connect onFailure callback, context %p", context);
test7OnFailureCalled++; test7OnFailureCalled++;
test7Finished = 1; test7Finished = 1;
...@@ -1990,7 +1990,11 @@ int test7(struct Options options) ...@@ -1990,7 +1990,11 @@ int test7(struct Options options)
/* wait until d is ready: connected and subscribed */ /* wait until d is ready: connected and subscribed */
count = 0; count = 0;
while (!test7dReady && ++count < 10000) while (!test7dReady && ++count < 10000)
{
if (test7Finished)
goto exit;
MySleep(100); MySleep(100);
}
assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */ assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
rc = MQTTAsync_setConnected(c, c, test7cConnected); rc = MQTTAsync_setConnected(c, c, test7cConnected);
...@@ -2007,6 +2011,9 @@ int test7(struct Options options) ...@@ -2007,6 +2011,9 @@ int test7(struct Options options)
opts.onFailure = test7cOnConnectFailure; opts.onFailure = test7cOnConnectFailure;
opts.context = c; opts.context = c;
opts.cleansession = 0; opts.cleansession = 0;
/*opts.automaticReconnect = 1;
opts.minRetryInterval = 3;
opts.maxRetryInterval = 6;*/
MyLog(LOGA_DEBUG, "Connecting client c"); MyLog(LOGA_DEBUG, "Connecting client c");
rc = MQTTAsync_connect(c, &opts); rc = MQTTAsync_connect(c, &opts);
...@@ -2029,21 +2036,26 @@ int test7(struct Options options) ...@@ -2029,21 +2036,26 @@ int test7(struct Options options)
MyLog(LOGA_DEBUG, "Now we can send some messages to be buffered by TCP"); MyLog(LOGA_DEBUG, "Now we can send some messages to be buffered by TCP");
test7c_connected = 0; test7c_connected = 0;
char buf[20000]; char buf[5000000];
/* send some messages. Then reconnect (check connected callback), and check that those messages are received */ /* send some messages. Then reconnect (check connected callback), and check that those messages are received */
for (i = 0; i < 50000; ++i) for (i = 0; i < 50000; ++i)
{ {
MQTTAsync_message pubmsg = MQTTAsync_message_initializer; MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; MQTTAsync_responseOptions pubopts = MQTTAsync_responseOptions_initializer;
pubmsg.qos = 0; /*i % 3;*/ pubmsg.qos = 0; /*i % 3;*/
sprintf(buf, "QoS %d message", pubmsg.qos); sprintf(buf, "QoS %d message", pubmsg.qos);
pubmsg.payload = buf; pubmsg.payload = buf;
pubmsg.payloadlen = 20000; //(int)(strlen(pubmsg.payload) + 1); pubmsg.payloadlen = 5000000; //(int)(strlen(pubmsg.payload) + 1);
pubmsg.retained = 0; pubmsg.retained = 0;
rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts); rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &pubopts);
assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc); assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
if (rc != 0) if (rc != 0)
{
//MyLog(LOGA_DEBUG, "Connecting client c");
//rc = MQTTAsync_connect(c, &opts);
//MySleep(1000);
break; break;
}
} }
#if 0 #if 0
...@@ -2081,13 +2093,13 @@ int test7(struct Options options) ...@@ -2081,13 +2093,13 @@ int test7(struct Options options)
assert("Number of getPendingTokens should be 0", i == 0, "i was %d ", i); assert("Number of getPendingTokens should be 0", i == 0, "i was %d ", i);
#endif #endif
exit:
rc = MQTTAsync_disconnect(c, NULL); rc = MQTTAsync_disconnect(c, NULL);
assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc); assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
rc = MQTTAsync_disconnect(d, NULL); rc = MQTTAsync_disconnect(d, NULL);
assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc); assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
exit:
MySleep(200); MySleep(200);
MQTTAsync_destroy(&c); MQTTAsync_destroy(&c);
MQTTAsync_destroy(&d); MQTTAsync_destroy(&d);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment