Commit 6bd9c691 authored by Ian Craggs's avatar Ian Craggs

Add continuous send test

parent d5b74003
/******************************************************************************* /*******************************************************************************
* Copyright (c) 2011, 2014 IBM Corp. * Copyright (c) 2011, 2017 IBM Corp.
* *
* All rights reserved. This program and the accompanying materials * All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0 * are made available under the terms of the Eclipse Public License v1.0
...@@ -17,14 +17,9 @@ ...@@ -17,14 +17,9 @@
/** /**
* @file * @file
* Async C client program for the MQTT v3 restart/recovery test suite. * Async C client program for the MQTT restart/recovery test suite
*/ */
/*
#if !defined(_RTSHEADER)
#include <rts.h>
#endif
*/
#include "MQTTAsync.h" #include "MQTTAsync.h"
#include <string.h> #include <string.h>
...@@ -49,12 +44,12 @@ static char sub_topic[200]; ...@@ -49,12 +44,12 @@ static char sub_topic[200];
struct struct
{ {
char* connection; /**< connection to system under test. */ char* connection; /**< connection to system under test. */
char** connections; /**< HA connection list */ char** connections; /**< HA connection list */
int connection_count; int connection_count;
char* control_connection; /**< MQTT control connection, for test sync */ char* control_connection; /**< MQTT control connection, for test sync */
char* topic; char* topic; /**< test message topic */
char* control_topic; char* control_topic; /**< topic for control messages */
char* clientid; char* clientid;
int slot_no; int slot_no;
int qos; int qos;
...@@ -65,20 +60,20 @@ struct ...@@ -65,20 +60,20 @@ struct
int persistence; int persistence;
} opts = } opts =
{ {
"tcp://localhost:1885", "tcp://localhost:1884",
NULL, NULL,
0, 0,
"tcp://localhost:7777", "tcp://localhost:7777",
"XR9TT3", "Eclipse/Paho/restart_test",
"XR9TT3/control", "Eclipse/Paho/restart_test/control",
"C_broken_client", "C_broken_client",
1, 1, /* slot_no */
2, 2, /* QoS */
0, 0, /* retained */
NULL, NULL,
NULL, NULL,
0, 0,
0, 1,
}; };
void getopts(int argc, char** argv) void getopts(int argc, char** argv)
...@@ -211,7 +206,7 @@ void MyLog(int log_level, char* format, ...) ...@@ -211,7 +206,7 @@ void MyLog(int log_level, char* format, ...)
#if defined(WIN32) || defined(_WINDOWS) #if defined(WIN32) || defined(_WINDOWS)
#define mqsleep(A) Sleep(1000*A) #define mysleep(A) Sleep(1000*A)
#define START_TIME_TYPE DWORD #define START_TIME_TYPE DWORD
static DWORD start_time = 0; static DWORD start_time = 0;
START_TIME_TYPE start_clock(void) START_TIME_TYPE start_clock(void)
...@@ -219,7 +214,7 @@ START_TIME_TYPE start_clock(void) ...@@ -219,7 +214,7 @@ START_TIME_TYPE start_clock(void)
return GetTickCount(); return GetTickCount();
} }
#elif defined(AIX) #elif defined(AIX)
#define mqsleep sleep #define mysleep sleep
#define START_TIME_TYPE struct timespec #define START_TIME_TYPE struct timespec
START_TIME_TYPE start_clock(void) START_TIME_TYPE start_clock(void)
{ {
...@@ -228,7 +223,7 @@ START_TIME_TYPE start_clock(void) ...@@ -228,7 +223,7 @@ START_TIME_TYPE start_clock(void)
return start; return start;
} }
#else #else
#define mqsleep sleep #define mysleep sleep
#define START_TIME_TYPE struct timeval #define START_TIME_TYPE struct timeval
static struct timeval start_time; static struct timeval start_time;
START_TIME_TYPE start_clock(void) START_TIME_TYPE start_clock(void)
...@@ -302,8 +297,7 @@ void control_connectionLost(void* context, char* cause) ...@@ -302,8 +297,7 @@ void control_connectionLost(void* context, char* cause)
*/ */
int control_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* m) int control_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* m)
{ {
MyLog(LOGA_DEBUG, "Control message arrived: %.*s %s", MyLog(LOGA_INFO, "Control message arrived: %.*s", m->payloadlen, m->payload);
m->payloadlen, m->payload, wait_message);
if (strcmp(m->payload, "stop") == 0) if (strcmp(m->payload, "stop") == 0)
stopping = 1; stopping = 1;
else if (wait_message != NULL && strncmp(wait_message, m->payload, else if (wait_message != NULL && strncmp(wait_message, m->payload,
...@@ -331,6 +325,7 @@ int control_send(char* message) ...@@ -331,6 +325,7 @@ int control_send(char* message)
int rc = 0; int rc = 0;
MQTTAsync_responseOptions ropts = MQTTAsync_responseOptions_initializer; MQTTAsync_responseOptions ropts = MQTTAsync_responseOptions_initializer;
MyLog(LOGA_ALWAYS, "Sending control message: %s", message);
sprintf(buf, "%s: %s", opts.clientid, message); sprintf(buf, "%s: %s", opts.clientid, message);
rc = MQTTAsync_send(control_client, pub_topic, (int)strlen(buf), rc = MQTTAsync_send(control_client, pub_topic, (int)strlen(buf),
buf, 1, 0, &ropts); buf, 1, 0, &ropts);
...@@ -350,6 +345,7 @@ int control_wait(char* message) ...@@ -350,6 +345,7 @@ int control_wait(char* message)
wait_message = message; wait_message = message;
sprintf(buf, "waiting for: %s", message); sprintf(buf, "waiting for: %s", message);
MyLog(LOGA_ALWAYS, "%s", buf);
control_send(buf); control_send(buf);
while (control_found == 0 && stopping == 0) while (control_found == 0 && stopping == 0)
...@@ -360,8 +356,10 @@ int control_wait(char* message) ...@@ -360,8 +356,10 @@ int control_wait(char* message)
MyLog(LOGA_ALWAYS, "Failed to receive message %s, stopping ", message); MyLog(LOGA_ALWAYS, "Failed to receive message %s, stopping ", message);
return 0; /* time out and tell the caller the message was not found */ return 0; /* time out and tell the caller the message was not found */
} }
mqsleep(1); mysleep(1);
} }
MyLog(LOGA_ALWAYS, (control_found == 0) ?
"Waited... not found" : "Waited... found %d" , control_found);
return control_found; return control_found;
} }
...@@ -378,7 +376,7 @@ int control_which(char* message1, char* message2) ...@@ -378,7 +376,7 @@ int control_which(char* message1, char* message2)
{ {
if (++count == 300) if (++count == 300)
return 0; /* time out and tell the caller the message was not found */ return 0; /* time out and tell the caller the message was not found */
mqsleep(1); mysleep(1);
} }
return control_found; return control_found;
} }
...@@ -483,13 +481,13 @@ void connectionLost(void* context, char* cause) ...@@ -483,13 +481,13 @@ void connectionLost(void* context, char* cause)
{ {
conn_opts.serverURIcount = opts.connection_count; conn_opts.serverURIcount = opts.connection_count;
conn_opts.serverURIs = opts.connections; conn_opts.serverURIs = opts.connections;
printf("reconnecting to first serverURI %s\n", conn_opts.serverURIs[0]);
} }
else else
{ {
conn_opts.serverURIcount = 0; conn_opts.serverURIcount = 0;
conn_opts.serverURIs = NULL; conn_opts.serverURIs = NULL;
} }
printf("reconnecting to first serverURI %s\n", conn_opts.serverURIs[0]);
rc = MQTTAsync_connect(context, &conn_opts); rc = MQTTAsync_connect(context, &conn_opts);
if (rc != MQTTASYNC_SUCCESS) if (rc != MQTTASYNC_SUCCESS)
{ {
...@@ -578,7 +576,7 @@ int waitForCompletion(START_TIME_TYPE start_time) ...@@ -578,7 +576,7 @@ int waitForCompletion(START_TIME_TYPE start_time)
int wait_count = 0; int wait_count = 0;
int limit = 120; int limit = 120;
mqsleep(1); mysleep(1);
while (arrivedCount < expectedCount) while (arrivedCount < expectedCount)
{ {
if (arrivedCount > lastreport) if (arrivedCount > lastreport)
...@@ -587,7 +585,7 @@ int waitForCompletion(START_TIME_TYPE start_time) ...@@ -587,7 +585,7 @@ int waitForCompletion(START_TIME_TYPE start_time)
arrivedCount, expectedCount, elapsed(start_time) / 1000); arrivedCount, expectedCount, elapsed(start_time) / 1000);
lastreport = arrivedCount; lastreport = arrivedCount;
} }
mqsleep(1); mysleep(1);
if (opts.persistence && connection_lost) if (opts.persistence && connection_lost)
recreateReconnect(); recreateReconnect();
if (++wait_count > limit || stopping) if (++wait_count > limit || stopping)
...@@ -595,7 +593,7 @@ int waitForCompletion(START_TIME_TYPE start_time) ...@@ -595,7 +593,7 @@ int waitForCompletion(START_TIME_TYPE start_time)
} }
last_completion_time = elapsed(start_time) / 1000; last_completion_time = elapsed(start_time) / 1000;
MyLog(LOGA_ALWAYS, "Extra wait to see if any duplicates arrive"); MyLog(LOGA_ALWAYS, "Extra wait to see if any duplicates arrive");
mqsleep(10); /* check if any duplicate messages arrive */ mysleep(10); /* check if any duplicate messages arrive */
MyLog(LOGA_ALWAYS, "%d messages arrived out of %d expected, in %d seconds", MyLog(LOGA_ALWAYS, "%d messages arrived out of %d expected, in %d seconds",
arrivedCount, expectedCount, elapsed(start_time) / 1000); arrivedCount, expectedCount, elapsed(start_time) / 1000);
return success(expectedCount); return success(expectedCount);
...@@ -644,7 +642,7 @@ void one_iteration(void) ...@@ -644,7 +642,7 @@ void one_iteration(void)
recreateReconnect(); recreateReconnect();
if (stopping) if (stopping)
goto exit; goto exit;
mqsleep(1); mysleep(1);
rc = MQTTAsync_send(client, opts.topic, (int)(strlen(payload)+1), payload, rc = MQTTAsync_send(client, opts.topic, (int)(strlen(payload)+1), payload,
opts.qos, opts.retained, NULL); opts.qos, opts.retained, NULL);
} }
...@@ -654,7 +652,7 @@ void one_iteration(void) ...@@ -654,7 +652,7 @@ void one_iteration(void)
{ {
if (stopping) if (stopping)
goto exit; goto exit;
mqsleep(1); mysleep(1);
printf("arrivedCount %d\n", arrivedCount); printf("arrivedCount %d\n", arrivedCount);
} }
measuring = 0; measuring = 0;
...@@ -696,13 +694,13 @@ void one_iteration(void) ...@@ -696,13 +694,13 @@ void one_iteration(void)
recreateReconnect(); recreateReconnect();
if (stopping) if (stopping)
goto exit; goto exit;
mqsleep(1); mysleep(1);
rc = MQTTAsync_send(client, opts.topic, (int)(strlen(payload)+1), payload, rc = MQTTAsync_send(client, opts.topic, (int)(strlen(payload)+1), payload,
opts.qos, opts.retained, &ropts); opts.qos, opts.retained, &ropts);
} }
//MyLog(LOGA_DEBUG, "Successful publish with payload %s", payload); //MyLog(LOGA_DEBUG, "Successful publish with payload %s", payload);
while (seqno - messagesSent > 2000) while (seqno - messagesSent > 2000)
mqsleep(1); mysleep(1);
} }
MyLog(LOGA_ALWAYS, "%d messages sent in %d seconds", expectedCount, elapsed(start_time) / 1000); MyLog(LOGA_ALWAYS, "%d messages sent in %d seconds", expectedCount, elapsed(start_time) / 1000);
...@@ -833,7 +831,7 @@ int sendAndReceive(void) ...@@ -833,7 +831,7 @@ int sendAndReceive(void)
} }
while (client_cleaned == 0) while (client_cleaned == 0)
mqsleep(1); mysleep(1);
MyLog(LOGA_ALWAYS, "Client state cleaned up"); MyLog(LOGA_ALWAYS, "Client state cleaned up");
...@@ -851,7 +849,7 @@ int sendAndReceive(void) ...@@ -851,7 +849,7 @@ int sendAndReceive(void)
/* wait until subscribed */ /* wait until subscribed */
while (client_subscribed == 0) while (client_subscribed == 0)
mqsleep(1); mysleep(1);
if (client_subscribed != 1) if (client_subscribed != 1)
goto disconnect_exit; goto disconnect_exit;
...@@ -986,7 +984,7 @@ int main(int argc, char** argv) ...@@ -986,7 +984,7 @@ int main(int argc, char** argv)
} }
while (control_subscribed == 0) while (control_subscribed == 0)
mqsleep(1); mysleep(1);
if (control_subscribed != 1) if (control_subscribed != 1)
goto destroy_exit; goto destroy_exit;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment