Commit 6d400792 authored by Ian Craggs's avatar Ian Craggs

Change build level to date and update test4 and test5

parent bd954ae7
...@@ -3,7 +3,6 @@ ...@@ -3,7 +3,6 @@
<taskdef resource="net/sf/antcontrib/antlib.xml"/> <taskdef resource="net/sf/antcontrib/antlib.xml"/>
<property name="output.folder" value="build/output" /> <property name="output.folder" value="build/output" />
<property name="build.level" value="${today}" />
<property name="release.version" value="1.0.0.2" /> <property name="release.version" value="1.0.0.2" />
<property name="libname" value="mqttv3c" /> <property name="libname" value="mqttv3c" />
...@@ -29,7 +28,8 @@ ...@@ -29,7 +28,8 @@
<pathconvert refid="async.source.fileset" property="async.source.files" pathsep=" "/> <pathconvert refid="async.source.fileset" property="async.source.files" pathsep=" "/>
</target> </target>
<target name="version" depends="init" description="replace tags in the source with the right levels"> <target name="version" depends="init" description="replace tags in the source with the right levels">
<property name="build.level" value="${DSTAMP}${TSTAMP}" />
<replace file="src/MQTTClient.c" token="##MQTTCLIENT_BUILD_TAG##" value="${build.level}" /> <replace file="src/MQTTClient.c" token="##MQTTCLIENT_BUILD_TAG##" value="${build.level}" />
<replace file="src/MQTTClient.c" token="##MQTTCLIENT_VERSION_TAG##" value="${release.version}" /> <replace file="src/MQTTClient.c" token="##MQTTCLIENT_VERSION_TAG##" value="${release.version}" />
<replace file="src/MQTTAsync.c" token="##MQTTCLIENT_BUILD_TAG##" value="${build.level}" /> <replace file="src/MQTTAsync.c" token="##MQTTCLIENT_BUILD_TAG##" value="${build.level}" />
...@@ -42,6 +42,7 @@ ...@@ -42,6 +42,7 @@
<then> <then>
<property name="ccflags.so" value="-fPIC -Os -Wall" /> <property name="ccflags.so" value="-fPIC -Os -Wall" />
<property name="ldflags.so" value="-fvisibility=hidden -shared -Wl,-soname,lib${libname}.so" /> <property name="ldflags.so" value="-fvisibility=hidden -shared -Wl,-soname,lib${libname}.so" />
<property name="ldflags.async.so" value="-fvisibility=hidden -shared -Wl,-soname,lib${libname.async}.so" />
<mkdir dir="${output.folder}"/> <mkdir dir="${output.folder}"/>
<!-- non-SSL, synchronous library --> <!-- non-SSL, synchronous library -->
...@@ -65,7 +66,7 @@ ...@@ -65,7 +66,7 @@
<!-- non-SSL, asynchronous library --> <!-- non-SSL, asynchronous library -->
<property name="output.async.filename" value="${output.folder}/lib${libname.async}.so" /> <property name="output.async.filename" value="${output.folder}/lib${libname.async}.so" />
<exec executable="gcc" failonerror="true"> <exec executable="gcc" failonerror="true">
<arg line="${ccflags.so} ${ldflags.so} -o ${output.async.filename} ${async.source.files}"/> <arg line="${ccflags.so} ${ldflags.async.so} -o ${output.async.filename} ${async.source.files}"/>
</exec> </exec>
<exec executable="strip" failonerror="true"> <exec executable="strip" failonerror="true">
<arg value="${output.async.filename}" /> <arg value="${output.async.filename}" />
...@@ -74,7 +75,7 @@ ...@@ -74,7 +75,7 @@
<!-- SSL, asynchronous library --> <!-- SSL, asynchronous library -->
<property name="output.async.ssl.filename" value="${output.folder}/lib${libname.async.ssl}.so" /> <property name="output.async.ssl.filename" value="${output.folder}/lib${libname.async.ssl}.so" />
<exec executable="gcc" failonerror="true"> <exec executable="gcc" failonerror="true">
<arg line="-DOPENSSL ${ccflags.so} ${ldflags.so} -o ${output.async.ssl.filename} ${sync.source.files}"/> <arg line="-DOPENSSL ${ccflags.so} ${ldflags.async.so} -o ${output.async.ssl.filename} ${sync.source.files}"/>
</exec> </exec>
<exec executable="strip" failonerror="true"> <exec executable="strip" failonerror="true">
<arg value="${output.async.ssl.filename}" /> <arg value="${output.async.ssl.filename}" />
...@@ -126,7 +127,7 @@ ...@@ -126,7 +127,7 @@
</target> </target>
<target name="test" > <target name="test" >
<foreach target="runAtest" param="aTest" list="test1,test3"/> <foreach target="runAtest" param="aTest" list="test1"/>
</target> </target>
<target name="runAtest"> <target name="runAtest">
......
#define MY_ID "@(#) stmqcom/pub1.c, stmqcom, cs 1.85 10/08/24 07:36:24"
/*--------------------------------------------------------------------*/ /*--------------------------------------------------------------------*/
/* [Platforms]UNIX NT[/Platforms] */ /* [Platforms]UNIX NT[/Platforms] */
/* [Title]MQ Telemetry MQTT C client tests */ /* [Title]MQ Telemetry MQTT C client tests */
...@@ -54,16 +55,12 @@ void usage() ...@@ -54,16 +55,12 @@ void usage()
struct Options struct Options
{ {
char* connection; /**< connection to system under test. */ char* connection; /**< connection to system under test. */
char** haconnections;
int hacount;
int verbose; int verbose;
int test_no; int test_no;
int size; /**< size of big message */ int size; /**< size of big message */
} options = } options =
{ {
"tcp://localhost:1883", "tcp://m2m.eclipse.org:1883",
NULL,
0,
0, 0,
-1, -1,
5000000, 5000000,
...@@ -96,24 +93,6 @@ void getopts(int argc, char** argv) ...@@ -96,24 +93,6 @@ void getopts(int argc, char** argv)
else else
usage(); usage();
} }
else if (strcmp(argv[count], "--haconnections") == 0)
{
if (++count < argc)
{
char* tok = strtok(argv[count], " ");
options.hacount = 0;
options.haconnections = malloc(sizeof(char*) * 5);
while (tok)
{
options.haconnections[options.hacount] = malloc(strlen(tok) + 1);
strcpy(options.haconnections[options.hacount], tok);
options.hacount++;
tok = strtok(NULL, " ");
}
}
else
usage();
}
else if (strcmp(argv[count], "--verbose") == 0) else if (strcmp(argv[count], "--verbose") == 0)
options.verbose = 1; options.verbose = 1;
count++; count++;
...@@ -371,11 +350,6 @@ int test1(struct Options options) ...@@ -371,11 +350,6 @@ int test1(struct Options options)
opts.cleansession = 1; opts.cleansession = 1;
opts.username = "testuser"; opts.username = "testuser";
opts.password = "testpassword"; opts.password = "testpassword";
if (options.haconnections != NULL)
{
opts.serverURIs = options.haconnections;
opts.serverURIcount = options.hacount;
}
opts.will = &wopts; opts.will = &wopts;
opts.will->message = "will message"; opts.will->message = "will message";
...@@ -466,11 +440,6 @@ int test2(struct Options options) ...@@ -466,11 +440,6 @@ int test2(struct Options options)
opts.cleansession = 1; opts.cleansession = 1;
opts.username = "testuser"; opts.username = "testuser";
opts.password = "testpassword"; opts.password = "testpassword";
if (options.haconnections != NULL)
{
opts.serverURIs = options.haconnections;
opts.serverURIcount = options.hacount;
}
opts.will = &wopts; opts.will = &wopts;
opts.will->message = "will message"; opts.will->message = "will message";
...@@ -634,6 +603,7 @@ void test3_onFailure(void* context, MQTTAsync_failureData* response) ...@@ -634,6 +603,7 @@ void test3_onFailure(void* context, MQTTAsync_failureData* response)
{ {
client_data* cd = (client_data*)context; client_data* cd = (client_data*)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
assert("Should have connected", 0, "failed to connect", NULL); assert("Should have connected", 0, "failed to connect", NULL);
MyLog(LOGA_DEBUG, "In connect onFailure callback, \"%s\" rc %d\n", cd->clientid, response->code); MyLog(LOGA_DEBUG, "In connect onFailure callback, \"%s\" rc %d\n", cd->clientid, response->code);
...@@ -651,14 +621,13 @@ Test3: More than one client object - simultaneous working. ...@@ -651,14 +621,13 @@ Test3: More than one client object - simultaneous working.
*********************************************************************/ *********************************************************************/
int test3(struct Options options) int test3(struct Options options)
{ {
#define TEST3_CLIENTS 10 const int num_clients = 10;
int num_clients = TEST3_CLIENTS;
int subsqos = 2; int subsqos = 2;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer; MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer; MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
int rc = 0; int rc = 0;
int i; int i;
client_data clientdata[TEST3_CLIENTS]; client_data clientdata[num_clients];
test_finished = 0; test_finished = 0;
MyLog(LOGA_INFO, "Starting test 3 - multiple connections"); MyLog(LOGA_INFO, "Starting test 3 - multiple connections");
...@@ -681,11 +650,6 @@ int test3(struct Options options) ...@@ -681,11 +650,6 @@ int test3(struct Options options)
opts.cleansession = 1; opts.cleansession = 1;
opts.username = "testuser"; opts.username = "testuser";
opts.password = "testpassword"; opts.password = "testpassword";
if (options.haconnections != NULL)
{
opts.serverURIs = options.haconnections;
opts.serverURIcount = options.hacount;
}
opts.will = &wopts; opts.will = &wopts;
opts.will->message = "will message"; opts.will->message = "will message";
...@@ -871,11 +835,6 @@ int test4(struct Options options) ...@@ -871,11 +835,6 @@ int test4(struct Options options)
opts.cleansession = 1; opts.cleansession = 1;
opts.username = "testuser"; opts.username = "testuser";
opts.password = "testpassword"; opts.password = "testpassword";
if (options.haconnections != NULL)
{
opts.serverURIs = options.haconnections;
opts.serverURIcount = options.hacount;
}
opts.will = &wopts; opts.will = &wopts;
opts.will->message = "will message"; opts.will->message = "will message";
...@@ -911,26 +870,200 @@ exit: ...@@ -911,26 +870,200 @@ exit:
} }
void test5_onConnectFailure(void* context, MQTTAsync_failureData* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
MyLog(LOGA_INFO, "Connack rc is %d", response ? response->code : -999);
test_finished = 1;
}
void test5_onConnect(void* context, MQTTAsync_successData* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
test_finished = 1;
}
/********************************************************************
Test5: Connack return codes
*********************************************************************/
int test5(struct Options options)
{
int subsqos = 2;
MQTTAsync c;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
int rc = 0;
char* test_topic = "C client test1";
test_finished = failures = 0;
MyLog(LOGA_INFO, "Starting test 5 - connack return codes");
rc = MQTTAsync_create(&c, options.connection, "a clientid that is too long to be accepted",
MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit;
}
rc = MQTTAsync_setCallbacks(c, c, NULL, test1_messageArrived, NULL);
assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
opts.onSuccess = test5_onConnect;
opts.onFailure = test5_onConnectFailure;
opts.context = c;
MyLog(LOGA_DEBUG, "Connecting");
rc = MQTTAsync_connect(c, &opts);
rc = 0;
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit;
while (!test_finished)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
MQTTAsync_destroy(&c);
exit:
MyLog(LOGA_INFO, "TEST5: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", tests, failures);
return failures;
}
void test6_onConnectFailure(void* context, MQTTAsync_failureData* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
MyLog(LOGA_INFO, "Connack rc is %d", response ? response->code : -999);
test_finished = 1;
}
void test6_onConnect(void* context, MQTTAsync_successData* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
test_finished = 1;
}
/********************************************************************
Test6: HA connections
*********************************************************************/
int test6(struct Options options)
{
int subsqos = 2;
MQTTAsync c;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
int rc = 0;
char* test_topic = "C client test1";
char* uris[2] = {"tcp://localhost:1884", "tcp://localhost:1883"};
test_finished = failures = 0;
MyLog(LOGA_INFO, "Starting test 7 - HA connections");
rc = MQTTAsync_create(&c, options.connection, "a clientid that is too long to be accepted",
MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit;
}
rc = MQTTAsync_setCallbacks(c, c, NULL, test1_messageArrived, NULL);
assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
opts.onSuccess = test6_onConnect;
opts.onFailure = test6_onConnectFailure;
opts.context = c;
opts.serverURIs = uris;
opts.serverURIcount = 2;
MyLog(LOGA_DEBUG, "Connecting");
rc = MQTTAsync_connect(c, &opts);
rc = 0;
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit;
while (!test_finished)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
MQTTAsync_destroy(&c);
exit:
MyLog(LOGA_INFO, "TEST6: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", tests, failures);
return failures;
}
void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message) void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message)
{ {
printf("Trace : %d, %s\n", level, message); printf("Trace : %d, %s\n", level, message);
} }
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
int rc = 0; int rc = 0;
int (*tests[])() = {NULL, test1, test2, test3, test4}; /* indexed starting from 1 */ int (*tests[])() = {NULL, test1, test2, test3, test4, test5, test6}; /* indexed starting from 1 */
char** info; MQTTAsync_nameValue* info;
getopts(argc, argv); getopts(argc, argv);
MQTTAsync_setTraceCallback(trace_callback); MQTTAsync_setTraceCallback(trace_callback);
info = MQTTAsync_getVersionInfo(); info = MQTTAsync_getVersionInfo();
while (info->name)
while (*info) {
printf("Version info \"%s\"\n", *info++); MyLog(LOGA_INFO, "%s: %s", info->name, info->value);
info++;
}
if (options.test_no == -1) if (options.test_no == -1)
{ /* run all the tests */ { /* run all the tests */
......
#define MY_ID "@(#) stmqcom/xrctest5.c, stmqcom, cs 1.85 12/11/29 07:36:24" #define MY_ID "@(#) stmqcom/pub1.c, stmqcom, cs 1.85 10/08/24 07:36:24"
/*--------------------------------------------------------------------*/ /*--------------------------------------------------------------------*/
/* [Platforms]UNIX NT[/Platforms] */ /* [Platforms]UNIX NT[/Platforms] */
/* [Title]MQ Telemetry MQTT Asynchronous C client SSL tests */ /* [Title]MQ Telemetry MQTT C client SSL tests */
/* [/Title] */ /* [/Title] */
/* [Testclasses]stcom1 stmqcom1[/Category] */ /* [Testclasses]stcom1 stmqcom1[/Category] */
/* [Category]MQ Telemetry[/Category] */ /* [Category]MQ Telemetry[/Category] */
...@@ -15,6 +15,17 @@ ...@@ -15,6 +15,17 @@
* SSL tests for the MQ Telemetry Asynchronous MQTT C client * SSL tests for the MQ Telemetry Asynchronous MQTT C client
*/ */
/*
#if !defined(_RTSHEADER)
#include <rts.h>
#endif
*/
#include "MQTTAsync.h"
#include <string.h>
#include <stdlib.h>
#include "Thread.h"
#if !defined(_WINDOWS) #if !defined(_WINDOWS)
#include <sys/time.h> #include <sys/time.h>
#include <sys/socket.h> #include <sys/socket.h>
...@@ -33,50 +44,27 @@ ...@@ -33,50 +44,27 @@
#define snprintf _snprintf #define snprintf _snprintf
#endif #endif
#include "MQTTAsync.h"
#include <string.h>
#include <stdlib.h>
#if defined(IOS)
char skeytmp[1024];
char ckeytmp[1024];
char persistenceStore[1024];
#else
char* persistenceStore = NULL;
#endif
#if 0
#include <logaX.h> /* For general log messages */
#define MyLog logaLine
#else
#define LOGA_DEBUG 0
#define LOGA_INFO 1
#include <stdarg.h>
#include <time.h>
#include <sys/timeb.h>
#define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0])) #define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0]))
void usage() void usage()
{ {
printf("Options:\n"); printf("Options:\n");
printf("\t--test_no <test_no> - Run test number <test_no>\n"); printf("\t--test_no <test_no> - Run test number <test_no>\n");
printf("\t--server <mqtt URL> - Connect to <mqtt URL> for tests\n"); printf("\t--server <hostname> - Connect to <hostname> for tests\n");
printf("\t--haserver <mqtt URL> - Use <mqtt URL> as the secondary server for HA tests\n"); printf(
printf("\t--client_key <key_file> - Use <key_file> as the client certificate for SSL authentication\n"); "\t--client_key <key_file> - Use <key_file> as the client certificate for SSL authentication\n");
printf("\t--client_key_pass <password> - Use <password> to access the private key in the client certificate\n"); printf(
printf("\t--server_key <key_file> - Use <key_file> as the trusted certificate for server\n"); "\t--client_key_pass <password> - Use <password> to access the private key in the client certificate\n");
printf("\t--verbose - Enable verbose output \n"); printf(
printf("\tserver connection URLs should be in the form; (tcp|ssl)://hostname:port\n"); "\t--server_key <key_file> - Use <key_file> as the trusted certificate for server\n");
printf("\t--help - This help output\n"); printf("\t--verbose - Enable verbose output \n");
printf("\t--help - This help output\n");
exit(-1); exit(-1);
} }
struct Options struct Options
{ {
char* connection; /**< connection to system under test. */ char* connection; /**< connection to system under test. */
char** haconnections;
int hacount;
char* client_key_file; char* client_key_file;
char* client_key_pass; char* client_key_pass;
char* server_key_file; char* server_key_file;
...@@ -84,17 +72,7 @@ struct Options ...@@ -84,17 +72,7 @@ struct Options
int test_no; int test_no;
int size; int size;
} options = } options =
{ { "localhost", NULL, NULL, NULL, 0, 0, 5000000 };
"ssl://localhost:8883",
NULL,
0,
NULL,
NULL,
NULL,
0,
0,
5000000
};
typedef struct typedef struct
{ {
...@@ -127,43 +105,17 @@ void getopts(int argc, char** argv) ...@@ -127,43 +105,17 @@ void getopts(int argc, char** argv)
else else
usage(); usage();
} }
else if (strcmp(argv[count], "--connection") == 0) else if (strcmp(argv[count], "--server") == 0)
{ {
if (++count < argc) if (++count < argc)
options.connection = argv[count]; options.connection = argv[count];
else else
usage(); usage();
} }
else if (strcmp(argv[count], "--haconnections") == 0)
{
if (++count < argc)
{
char* tok = strtok(argv[count], " ");
options.hacount = 0;
options.haconnections = malloc(sizeof(char*) * 5);
while (tok)
{
options.haconnections[options.hacount] = malloc(strlen(tok) + 1);
strcpy(options.haconnections[options.hacount], tok);
options.hacount++;
tok = strtok(NULL, " ");
}
}
else
usage();
}
else if (strcmp(argv[count], "--client_key") == 0) else if (strcmp(argv[count], "--client_key") == 0)
{ {
if (++count < argc) if (++count < argc)
{ options.client_key_file = argv[count];
#if defined(IOS)
strcat(ckeytmp, getenv("HOME"));
strcat(ckeytmp, argv[count]);
options.client_key_file = ckeytmp;
#else
options.client_key_file = argv[count];
#endif
}
else else
usage(); usage();
} }
...@@ -177,16 +129,8 @@ void getopts(int argc, char** argv) ...@@ -177,16 +129,8 @@ void getopts(int argc, char** argv)
else if (strcmp(argv[count], "--server_key") == 0) else if (strcmp(argv[count], "--server_key") == 0)
{ {
if (++count < argc) if (++count < argc)
{
#if defined(IOS)
strcat(skeytmp, getenv("HOME"));
strcat(skeytmp, argv[count]);
options.server_key_file = skeytmp;
#else
options.server_key_file = argv[count]; options.server_key_file = argv[count];
#endif else
}
else
usage(); usage();
} }
else if (strcmp(argv[count], "--verbose") == 0) else if (strcmp(argv[count], "--verbose") == 0)
...@@ -197,12 +141,17 @@ void getopts(int argc, char** argv) ...@@ -197,12 +141,17 @@ void getopts(int argc, char** argv)
} }
count++; count++;
} }
#if defined(IOS)
strcpy(persistenceStore, getenv("HOME"));
strcat(persistenceStore, "/Library/Caches");
#endif
} }
#if 0
#include <logaX.h> /* For general log messages */
#define MyLog logaLine
#else
#define LOGA_DEBUG 0
#define LOGA_INFO 1
#include <stdarg.h>
#include <time.h>
#include <sys/timeb.h>
void MyLog(int LOGA_level, char* format, ...) void MyLog(int LOGA_level, char* format, ...)
{ {
static char msg_buf[256]; static char msg_buf[256];
...@@ -294,7 +243,7 @@ START_TIME_TYPE global_start_time; ...@@ -294,7 +243,7 @@ START_TIME_TYPE global_start_time;
int tests = 0; int tests = 0;
int failures = 0; int failures = 0;
int myassert(char* filename, int lineno, char* description, int value, void myassert(char* filename, int lineno, char* description, int value,
char* format, ...) char* format, ...)
{ {
++tests; ++tests;
...@@ -303,7 +252,8 @@ int myassert(char* filename, int lineno, char* description, int value, ...@@ -303,7 +252,8 @@ int myassert(char* filename, int lineno, char* description, int value,
va_list args; va_list args;
++failures; ++failures;
printf("Assertion failed, file %s, line %d, description: %s", filename, lineno, description); printf("Assertion failed, file %s, line %d, description: %s", filename,
lineno, description);
va_start(args, format); va_start(args, format);
vprintf(format, args); vprintf(format, args);
...@@ -313,43 +263,145 @@ int myassert(char* filename, int lineno, char* description, int value, ...@@ -313,43 +263,145 @@ int myassert(char* filename, int lineno, char* description, int value,
MyLog(LOGA_DEBUG, MyLog(LOGA_DEBUG,
"Assertion succeeded, file %s, line %d, description: %s", "Assertion succeeded, file %s, line %d, description: %s",
filename, lineno, description); filename, lineno, description);
return value;
} }
/********************************************************************* /*********************************************************************
Async Callbacks - generic callbacks for send/receive tests Test: multi-threaded client using callbacks
*********************************************************************/ *********************************************************************/
volatile int multiThread_arrivedcount = 0;
int multiThread_deliveryCompleted = 0;
MQTTAsync_message multiThread_pubmsg = MQTTAsync_message_initializer;
void asyncTestOnDisconnect(void* context, MQTTAsync_successData* response) void multiThread_deliveryComplete(void* context, MQTTAsync_token dt)
{ {
AsyncTestClient* tc = (AsyncTestClient*) context; ++multiThread_deliveryCompleted;
MyLog(LOGA_DEBUG, "In asyncTestOnDisconnect callback, %s", tc->clientid);
tc->testFinished = 1;
} }
void asyncTestOnDisconnectFailure(void* context, MQTTAsync_failureData* response) int multiThread_messageArrived(void* context, char* topicName, int topicLen,
MQTTAsync_message* m)
{ {
AsyncTestClient* tc = (AsyncTestClient*) context; ++multiThread_arrivedcount;
MyLog(LOGA_DEBUG, "In asyncTestOnDisconnectFailure callback, %s", tc->clientid); MyLog(LOGA_DEBUG, "Callback: %d message received on topic %s is %.*s.",
failures++; multiThread_arrivedcount, topicName, m->payloadlen,
tc->testFinished = 1; (char*) (m->payload));
if (multiThread_pubmsg.payloadlen != m->payloadlen || memcmp(m->payload,
multiThread_pubmsg.payload, m->payloadlen) != 0)
{
failures++;
MyLog(LOGA_INFO, "Error: wrong data received lengths %d %d\n",
multiThread_pubmsg.payloadlen, m->payloadlen);
}
MQTTAsync_free(topicName);
MQTTAsync_freeMessage(&m);
return 1;
} }
void asyncTestOnSend(void* context, MQTTAsync_successData* response) void sendAndReceive(MQTTAsync* c, int qos, char* test_topic)
{ {
MQTTAsync_responseOptions ropts;
int i = 0;
int iterations = 50;
int rc = 0;
int wait_seconds = 0;
multiThread_deliveryCompleted = 0;
multiThread_arrivedcount = 0;
MyLog(LOGA_DEBUG, "%d messages at QoS %d", iterations, qos);
multiThread_pubmsg.payload
= "a much longer message that we can shorten to the extent that we need to";
multiThread_pubmsg.payloadlen = 27;
multiThread_pubmsg.qos = qos;
multiThread_pubmsg.retained = 0;
for (i = 1; i <= iterations; ++i)
{
if (i % 10 == 0)
rc = MQTTAsync_send(c, test_topic, multiThread_pubmsg.payloadlen,
multiThread_pubmsg.payload, multiThread_pubmsg.qos,
multiThread_pubmsg.retained, NULL);
else
rc = MQTTAsync_sendMessage(c, test_topic, &multiThread_pubmsg,
&ropts);
assert("Good rc from publish", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
#if defined(WIN32)
Sleep(100);
#else
usleep(100000L);
#endif
wait_seconds = 10;
while ((multiThread_arrivedcount < i) && (wait_seconds-- > 0))
{
MyLog(LOGA_DEBUG, "Arrived %d count %d", multiThread_arrivedcount,
i);
#if defined(WIN32)
Sleep(1000);
#else
usleep(1000000L);
#endif
}
assert("Message Arrived", wait_seconds > 0,
"Time out waiting for message %d\n", i );
}
if (qos > 0)
{
/* MQ Telemetry can send a message to a subscriber before the server has
completed the QoS 2 handshake with the publisher. For QoS 1 and 2,
allow time for the final delivery complete callback before checking
that all expected callbacks have been made */
wait_seconds = 10;
while ((multiThread_deliveryCompleted < iterations) && (wait_seconds--
> 0))
{
MyLog(LOGA_DEBUG, "Delivery Completed %d count %d",
multiThread_deliveryCompleted, i);
#if defined(WIN32)
Sleep(1000);
#else
usleep(1000000L);
#endif
}
assert("All Deliveries Complete", wait_seconds > 0,
"Number of deliveryCompleted callbacks was %d\n",
multiThread_deliveryCompleted);
}
}
/*********************************************************************
Async Callbacks - generic callbacks for send/receive tests
*********************************************************************/
//static mutex_type client_mutex = NULL;
static pthread_mutex_t client_mutex_store = PTHREAD_MUTEX_INITIALIZER;
static mutex_type client_mutex = &client_mutex_store;
void asyncTestOnDisconnect(void* context, MQTTAsync_successData* response)
{
int rc;
AsyncTestClient* tc = (AsyncTestClient*) context; AsyncTestClient* tc = (AsyncTestClient*) context;
//int qos = response->alt.pub.message.qos; MyLog(LOGA_DEBUG, "In asyncTestOnDisconnect callback, %s", tc->clientid);
MyLog(LOGA_DEBUG, "In asyncTestOnSend callback, %s", tc->clientid); printf("Successful disconnection\n");
//rc = Thread_lock_mutex(client_mutex);
tc->testFinished = 1;
//rc = Thread_unlock_mutex(client_mutex);
} }
void asyncTestOnSendFailure(void* context, MQTTAsync_failureData* response) void asyncTestOnSend(void* context, MQTTAsync_successData* response)
{ {
AsyncTestClient* tc = (AsyncTestClient*) context; AsyncTestClient* tc = (AsyncTestClient*) context;
MyLog(LOGA_DEBUG, "In asyncTestOnSendFailure callback, %s", tc->clientid); int rc;
tc->testFinished = 1; int qos = response->alt.pub.message.qos;
failures++; MyLog(LOGA_DEBUG, "In asyncTestOnSend callback, %s", tc->clientid);
//rc = Thread_lock_mutex(client_mutex);
tc->sentmsgs[qos]++;
//rc = Thread_unlock_mutex(client_mutex);
} }
void asyncTestOnSubscribeFailure(void* context, MQTTAsync_failureData* response) void asyncTestOnSubscribeFailure(void* context, MQTTAsync_failureData* response)
...@@ -357,9 +409,8 @@ void asyncTestOnSubscribeFailure(void* context, MQTTAsync_failureData* response) ...@@ -357,9 +409,8 @@ void asyncTestOnSubscribeFailure(void* context, MQTTAsync_failureData* response)
AsyncTestClient* tc = (AsyncTestClient*) context; AsyncTestClient* tc = (AsyncTestClient*) context;
MyLog(LOGA_DEBUG, "In asyncTestOnSubscribeFailure callback, %s", MyLog(LOGA_DEBUG, "In asyncTestOnSubscribeFailure callback, %s",
tc->clientid); tc->clientid);
assert("There should be no failures in this test. ", 0, "asyncTestOnSubscribeFailure callback was called\n", 0); assert("There should be no failures in this test. ", 0, "asyncTestOnSubscribeFailure callback was called\n", 0);
tc->testFinished = 1;
failures++;
} }
void asyncTestOnUnsubscribe(void* context, MQTTAsync_successData* response) void asyncTestOnUnsubscribe(void* context, MQTTAsync_successData* response)
...@@ -370,33 +421,22 @@ void asyncTestOnUnsubscribe(void* context, MQTTAsync_successData* response) ...@@ -370,33 +421,22 @@ void asyncTestOnUnsubscribe(void* context, MQTTAsync_successData* response)
MyLog(LOGA_DEBUG, "In asyncTestOnUnsubscribe callback, %s", tc->clientid); MyLog(LOGA_DEBUG, "In asyncTestOnUnsubscribe callback, %s", tc->clientid);
opts.onSuccess = asyncTestOnDisconnect; opts.onSuccess = asyncTestOnDisconnect;
opts.onFailure = asyncTestOnDisconnectFailure;
opts.context = tc; opts.context = tc;
MQTTAsync_disconnect(tc->client, &opts); MQTTAsync_disconnect(tc->client, &opts);
} }
void asyncTestOnUnsubscribeFailure(void* context, MQTTAsync_failureData* response)
{
AsyncTestClient* tc = (AsyncTestClient*) context;
MyLog(LOGA_DEBUG, "In asyncTestOnUnsubscribeFailure callback, %s", tc->clientid);
assert("There should be no failures in this test. ", 0, "asyncTestOnUnsubscribeFailure callback was called\n", 0);
tc->testFinished = 1;
failures++;
}
void asyncTestOnSubscribe(void* context, MQTTAsync_successData* response) void asyncTestOnSubscribe(void* context, MQTTAsync_successData* response)
{ {
AsyncTestClient* tc = (AsyncTestClient*) context; AsyncTestClient* tc = (AsyncTestClient*) context;
int rc, i; int rc, i;
MyLog(LOGA_DEBUG, "In asyncTestOnSubscribe callback, %s", tc->clientid); MyLog(LOGA_DEBUG, "In asyncTestOnSubscribe callback, %s", tc->clientid);
//rc = Thread_lock_mutex(client_mutex);
tc->subscribed = 1; tc->subscribed = 1;
//rc = Thread_unlock_mutex(client_mutex);
for (i = 0; i < 3; i++) for (i = 0; i < 3; i++)
{ {
MQTTAsync_message pubmsg = MQTTAsync_message_initializer; MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
pubmsg.payload pubmsg.payload
= "a much longer message that we can shorten to the extent that we need to payload up to 11"; = "a much longer message that we can shorten to the extent that we need to payload up to 11";
...@@ -404,9 +444,9 @@ void asyncTestOnSubscribe(void* context, MQTTAsync_successData* response) ...@@ -404,9 +444,9 @@ void asyncTestOnSubscribe(void* context, MQTTAsync_successData* response)
pubmsg.qos = i; pubmsg.qos = i;
pubmsg.retained = 0; pubmsg.retained = 0;
opts.onSuccess = asyncTestOnSend; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
opts.onFailure = asyncTestOnSendFailure; //opts.onSuccess = asyncTestOnSend;
opts.context = tc; opts.context = &tc;
rc = MQTTAsync_send(tc->client, tc->topic, pubmsg.payloadlen, rc = MQTTAsync_send(tc->client, tc->topic, pubmsg.payloadlen,
pubmsg.payload, pubmsg.qos, pubmsg.retained, &opts); pubmsg.payload, pubmsg.qos, pubmsg.retained, &opts);
...@@ -416,12 +456,13 @@ void asyncTestOnSubscribe(void* context, MQTTAsync_successData* response) ...@@ -416,12 +456,13 @@ void asyncTestOnSubscribe(void* context, MQTTAsync_successData* response)
} }
} }
int asyncTestMessageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* m) int asyncTestMessageArrived(void* context, char* topicName, int topicLen,
MQTTAsync_message* m)
{ {
int rc;
AsyncTestClient* tc = (AsyncTestClient*) context; AsyncTestClient* tc = (AsyncTestClient*) context;
tc->rcvdmsgs[m->qos]++; int rc;
//rc = Thread_lock_mutex(client_mutex);
tc->rcvdmsgs[m->qos]++;
//printf("Received messages: %d\n", tc->rcvdmsgs[m->qos]); //printf("Received messages: %d\n", tc->rcvdmsgs[m->qos]);
...@@ -434,12 +475,11 @@ int asyncTestMessageArrived(void* context, char* topicName, int topicLen, MQTTAs ...@@ -434,12 +475,11 @@ int asyncTestMessageArrived(void* context, char* topicName, int topicLen, MQTTAs
if (tc->sentmsgs[m->qos] < tc->maxmsgs) if (tc->sentmsgs[m->qos] < tc->maxmsgs)
{ {
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
opts.onSuccess = asyncTestOnSend; //opts.onSuccess = asyncTestOnSend;
opts.onFailure = asyncTestOnSendFailure;
opts.context = tc; opts.context = tc;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
pubmsg.payload pubmsg.payload
= "a much longer message that we can shorten to the extent that we need to payload up to 11"; = "a much longer message that we can shorten to the extent that we need to payload up to 11";
pubmsg.payloadlen = 11; pubmsg.payloadlen = 11;
...@@ -456,15 +496,15 @@ int asyncTestMessageArrived(void* context, char* topicName, int topicLen, MQTTAs ...@@ -456,15 +496,15 @@ int asyncTestMessageArrived(void* context, char* topicName, int topicLen, MQTTAs
if ((tc->rcvdmsgs[0] + tc->rcvdmsgs[1] + tc->rcvdmsgs[2]) == (tc->maxmsgs if ((tc->rcvdmsgs[0] + tc->rcvdmsgs[1] + tc->rcvdmsgs[2]) == (tc->maxmsgs
* 3)) * 3))
{ {
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MyLog(LOGA_DEBUG, "Ready to unsubscribe"); MyLog(LOGA_DEBUG, "Ready to unsubscribe");
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
opts.onSuccess = asyncTestOnUnsubscribe; opts.onSuccess = asyncTestOnUnsubscribe;
opts.onFailure = asyncTestOnUnsubscribeFailure;
opts.context = tc; opts.context = tc;
rc = MQTTAsync_unsubscribe(tc->client, tc->topic, &opts); rc = MQTTAsync_unsubscribe(tc->client, tc->topic, &opts);
assert("Unsubscribe successful", rc == MQTTASYNC_SUCCESS, "rc was %d", rc); assert("Unsubscribe successful", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
} }
//rc = Thread_unlock_mutex(client_mutex);
MyLog(LOGA_DEBUG, "Leaving asyncTestMessageArrived callback"); MyLog(LOGA_DEBUG, "Leaving asyncTestMessageArrived callback");
MQTTAsync_freeMessage(&m); MQTTAsync_freeMessage(&m);
MQTTAsync_free(topicName); MQTTAsync_free(topicName);
...@@ -479,12 +519,11 @@ void asyncTestOnDeliveryComplete(void* context, MQTTAsync_token token) ...@@ -479,12 +519,11 @@ void asyncTestOnDeliveryComplete(void* context, MQTTAsync_token token)
void asyncTestOnConnect(void* context, MQTTAsync_successData* response) void asyncTestOnConnect(void* context, MQTTAsync_successData* response)
{ {
AsyncTestClient* tc = (AsyncTestClient*) context; AsyncTestClient* tc = (AsyncTestClient*) context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int subsqos = 2; int subsqos = 2;
int rc; int rc;
MyLog(LOGA_DEBUG, "In asyncTestOnConnect callback, %s", tc->clientid); MyLog(LOGA_DEBUG, "In asyncTestOnConnect callback, %s", tc->clientid);
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
opts.onSuccess = asyncTestOnSubscribe; opts.onSuccess = asyncTestOnSubscribe;
opts.onFailure = asyncTestOnSubscribeFailure; opts.onFailure = asyncTestOnSubscribeFailure;
opts.context = tc; opts.context = tc;
...@@ -501,9 +540,13 @@ void asyncTestOnConnect(void* context, MQTTAsync_successData* response) ...@@ -501,9 +540,13 @@ void asyncTestOnConnect(void* context, MQTTAsync_successData* response)
int test1Finished = 0; int test1Finished = 0;
int test1OnFailureCalled = 0;
void test1OnFailure(void* context, MQTTAsync_failureData* response) void test1OnFailure(void* context, MQTTAsync_failureData* response)
{ {
MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context); MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
test1OnFailureCalled++;
test1Finished = 1; test1Finished = 1;
} }
...@@ -511,9 +554,10 @@ void test1OnConnect(void* context, MQTTAsync_successData* response) ...@@ -511,9 +554,10 @@ void test1OnConnect(void* context, MQTTAsync_successData* response)
{ {
MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p\n", context); MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p\n", context);
assert("Connect should not succeed", 0, "connect success callback was called", 0); assert("Connect should not succeed", 0, "connect success callback was called", 0);
test1Finished = 1; test1Finished = 1;
failures++;
} }
int test1(struct Options options) int test1(struct Options options)
...@@ -526,34 +570,49 @@ int test1(struct Options options) ...@@ -526,34 +570,49 @@ int test1(struct Options options)
MQTTAsync_SSLOptions sslopts = MQTTAsync_SSLOptions_initializer; MQTTAsync_SSLOptions sslopts = MQTTAsync_SSLOptions_initializer;
int rc = 0; int rc = 0;
char* test_topic = "C client SSL test1"; char* test_topic = "C client SSL test1";
char url[100];
int count = 0; int count = 0;
test1Finished = 0; test1Finished = 0;
failures = 0; failures = 0;
MyLog(LOGA_INFO, "Starting SSL test 1 - connection to nonSSL MQTT server"); MyLog(LOGA_INFO, "Starting SSL test 1 - connection to nonSSL MQTT server");
if (!(assert ("Good rc from create", (rc = MQTTAsync_create(&c, options.connection, "test1", MQTTCLIENT_PERSISTENCE_DEFAULT, persistenceStore)) == MQTTASYNC_SUCCESS, "rc was %d", rc))) snprintf(url, 100, "ssl://%s:1883", options.connection);
rc = MQTTAsync_create(&c, url, "test1", MQTTCLIENT_PERSISTENCE_DEFAULT,
NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit; goto exit;
}
opts.keepAliveInterval = 20; opts.keepAliveInterval = 20;
opts.cleansession = 1; opts.cleansession = 1;
opts.username = "testuser"; opts.username = "testuser";
opts.password = "testpassword"; opts.password = "testpassword";
opts.will = &wopts;
opts.will->message = "will message";
opts.will->qos = 1;
opts.will->retained = 0;
opts.will->topicName = "will topic";
opts.will = NULL;
opts.onSuccess = test1OnConnect; opts.onSuccess = test1OnConnect;
opts.onFailure = test1OnFailure; opts.onFailure = test1OnFailure;
opts.context = c; opts.context = c;
if (options.haconnections != NULL)
{
opts.serverURIs = options.haconnections;
opts.serverURIcount = options.hacount;
}
opts.ssl = &sslopts; opts.ssl = &sslopts;
opts.ssl->enableServerCertAuth = 0; opts.ssl->enableServerCertAuth = 0;
MyLog(LOGA_DEBUG, "Connecting"); MyLog(LOGA_DEBUG, "Connecting");
if (!(assert("Good rc from connect", (rc = MQTTAsync_connect(c, &opts)) == MQTTASYNC_SUCCESS, "rc was %d", rc))) rc = MQTTAsync_connect(c, &opts);
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
if (rc != MQTTASYNC_SUCCESS)
{
failures++;
goto exit; goto exit;
}
/* wait for success or failure callback */ /* wait for success or failure callback */
while (!test1Finished && ++count < 10000) while (!test1Finished && ++count < 10000)
...@@ -563,8 +622,7 @@ int test1(struct Options options) ...@@ -563,8 +622,7 @@ int test1(struct Options options)
usleep(10000L); usleep(10000L);
#endif #endif
exit: exit: MQTTAsync_destroy(&c);
MQTTAsync_destroy(&c);
MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.", MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", testname, tests, failures); (failures == 0) ? "passed" : "failed", testname, tests, failures);
...@@ -585,7 +643,6 @@ void test2aOnConnectFailure(void* context, MQTTAsync_failureData* response) ...@@ -585,7 +643,6 @@ void test2aOnConnectFailure(void* context, MQTTAsync_failureData* response)
assert("There should be no failures in this test. ", 0, "test2aOnConnectFailure callback was called\n", 0); assert("There should be no failures in this test. ", 0, "test2aOnConnectFailure callback was called\n", 0);
client->testFinished = 1; client->testFinished = 1;
failures++;
} }
void test2aOnPublishFailure(void* context, MQTTAsync_failureData* response) void test2aOnPublishFailure(void* context, MQTTAsync_failureData* response)
...@@ -595,25 +652,29 @@ void test2aOnPublishFailure(void* context, MQTTAsync_failureData* response) ...@@ -595,25 +652,29 @@ void test2aOnPublishFailure(void* context, MQTTAsync_failureData* response)
client->clientid); client->clientid);
assert("There should be no failures in this test. ", 0, "test2aOnPublishFailure callback was called\n", 0); assert("There should be no failures in this test. ", 0, "test2aOnPublishFailure callback was called\n", 0);
client->testFinished = 1;
failures++;
} }
int test2a(struct Options options) int test2a(struct Options options)
{ {
char* testname = "test2a"; char* testname = "test2a";
AsyncTestClient tc = AsyncTestClient_initializer; AsyncTestClient tc =
AsyncTestClient_initializer;
MQTTAsync c; MQTTAsync c;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer; MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer; MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
MQTTAsync_SSLOptions sslopts = MQTTAsync_SSLOptions_initializer; MQTTAsync_SSLOptions sslopts = MQTTAsync_SSLOptions_initializer;
int i, rc = 0; int rc = 0;
char url[100];
int i;
failures = 0; failures = 0;
MyLog(LOGA_INFO, "Starting test 2a - Mutual SSL authentication"); MyLog(LOGA_INFO, "Starting test 2a - Mutual SSL authentication");
if (!(assert("Good rc from create", (rc = MQTTAsync_create(&c, options.connection, "test2a", MQTTCLIENT_PERSISTENCE_DEFAULT, persistenceStore)) == MQTTASYNC_SUCCESS, "rc was %d", rc))) snprintf(url, 100, "ssl://%s:8883", options.connection);
MQTTAsync_create(&c, url, "test2a", MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit; goto exit;
tc.client = c; tc.client = c;
...@@ -628,14 +689,16 @@ int test2a(struct Options options) ...@@ -628,14 +689,16 @@ int test2a(struct Options options)
opts.cleansession = 1; opts.cleansession = 1;
opts.username = "testuser"; opts.username = "testuser";
opts.password = "testpassword"; opts.password = "testpassword";
opts.will = &wopts;
opts.will->message = "will message";
opts.will->qos = 1;
opts.will->retained = 0;
opts.will->topicName = "will topic";
opts.will = NULL;
opts.onSuccess = asyncTestOnConnect; opts.onSuccess = asyncTestOnConnect;
opts.onFailure = test2aOnConnectFailure; opts.onFailure = test2aOnConnectFailure;
opts.context = &tc; opts.context = &tc;
if (options.haconnections != NULL)
{
opts.serverURIs = options.haconnections;
opts.serverURIcount = options.hacount;
}
opts.ssl = &sslopts; opts.ssl = &sslopts;
if (options.server_key_file != NULL) if (options.server_key_file != NULL)
...@@ -646,12 +709,24 @@ int test2a(struct Options options) ...@@ -646,12 +709,24 @@ int test2a(struct Options options)
//opts.ssl->enabledCipherSuites = "DEFAULT"; //opts.ssl->enabledCipherSuites = "DEFAULT";
//opts.ssl->enabledServerCertAuth = 1; //opts.ssl->enabledServerCertAuth = 1;
if (!(assert("Good rc from setCallbacks", (rc = MQTTAsync_setCallbacks(c, &tc, NULL, asyncTestMessageArrived, asyncTestOnDeliveryComplete)) == MQTTASYNC_SUCCESS, "rc was %d", rc))) rc = MQTTAsync_setCallbacks(c, &tc, NULL, asyncTestMessageArrived,
goto exit; asyncTestOnDeliveryComplete);
assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
MyLog(LOGA_DEBUG, "Connecting"); MyLog(LOGA_DEBUG, "Connecting");
rc = MQTTAsync_connect(c, &opts);
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit;
while (!tc.subscribed && !tc.testFinished)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
if (!(assert("Good rc from connect", (rc = MQTTAsync_connect(c, &opts)) == MQTTASYNC_SUCCESS, "rc was %d", rc))) if (tc.testFinished)
goto exit; goto exit;
while (!tc.testFinished) while (!tc.testFinished)
...@@ -663,8 +738,7 @@ int test2a(struct Options options) ...@@ -663,8 +738,7 @@ int test2a(struct Options options)
MyLog(LOGA_DEBUG, "Stopping"); MyLog(LOGA_DEBUG, "Stopping");
exit: exit: MQTTAsync_destroy(&c);
MQTTAsync_destroy(&c);
MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.", MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", testname, tests, failures); (failures == 0) ? "passed" : "failed", testname, tests, failures);
...@@ -692,7 +766,6 @@ void test2bOnConnect(void* context, MQTTAsync_successData* response) ...@@ -692,7 +766,6 @@ void test2bOnConnect(void* context, MQTTAsync_successData* response)
MyLog(LOGA_DEBUG, "In test2bOnConnectFailure callback, context %p", context); MyLog(LOGA_DEBUG, "In test2bOnConnectFailure callback, context %p", context);
assert("This connect should not succeed. ", 0, "test2bOnConnect callback was called\n", 0); assert("This connect should not succeed. ", 0, "test2bOnConnect callback was called\n", 0);
failures++;
test2bFinished = 1; test2bFinished = 1;
} }
...@@ -705,6 +778,7 @@ int test2b(struct Options options) ...@@ -705,6 +778,7 @@ int test2b(struct Options options)
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer; MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
MQTTAsync_SSLOptions sslopts = MQTTAsync_SSLOptions_initializer; MQTTAsync_SSLOptions sslopts = MQTTAsync_SSLOptions_initializer;
int rc = 0; int rc = 0;
char url[100];
int count = 0; int count = 0;
test2bFinished = 0; test2bFinished = 0;
...@@ -713,21 +787,30 @@ int test2b(struct Options options) ...@@ -713,21 +787,30 @@ int test2b(struct Options options)
LOGA_INFO, LOGA_INFO,
"Starting test 2b - connection to SSL MQTT server with clientauth=req but server does not have client cert"); "Starting test 2b - connection to SSL MQTT server with clientauth=req but server does not have client cert");
if (!(assert("good rc from create", (rc = MQTTAsync_create(&c, options.connection, "test2b", MQTTCLIENT_PERSISTENCE_DEFAULT, persistenceStore)) == MQTTASYNC_SUCCESS, "rc was %d\n", rc))) snprintf(url, 100, "ssl://%s:8884", options.connection);
rc = MQTTAsync_create(&c, url, "test2b", MQTTCLIENT_PERSISTENCE_DEFAULT,
NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit; goto exit;
}
opts.keepAliveInterval = 20; opts.keepAliveInterval = 20;
opts.cleansession = 1; opts.cleansession = 1;
opts.username = "testuser"; opts.username = "testuser";
opts.password = "testpassword"; opts.password = "testpassword";
opts.will = &wopts;
opts.will->message = "will message";
opts.will->qos = 1;
opts.will->retained = 0;
opts.will->topicName = "will topic";
opts.will = NULL;
opts.onSuccess = test2bOnConnect; opts.onSuccess = test2bOnConnect;
opts.onFailure = test2bOnConnectFailure; opts.onFailure = test2bOnConnectFailure;
opts.context = c; opts.context = c;
if (options.haconnections != NULL)
{
opts.serverURIs = options.haconnections;
opts.serverURIcount = options.hacount;
}
opts.ssl = &sslopts; opts.ssl = &sslopts;
if (options.server_key_file != NULL) if (options.server_key_file != NULL)
...@@ -739,10 +822,11 @@ int test2b(struct Options options) ...@@ -739,10 +822,11 @@ int test2b(struct Options options)
//opts.ssl->enabledServerCertAuth = 0; //opts.ssl->enabledServerCertAuth = 0;
MyLog(LOGA_DEBUG, "Connecting"); MyLog(LOGA_DEBUG, "Connecting");
rc = MQTTAsync_connect(c, &opts);
if (!(assert("Good rc from connect", (rc = MQTTAsync_connect(c, &opts)) == MQTTASYNC_SUCCESS, "rc was %d", rc))) assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit; goto exit;
while (!test2bFinished && ++count < 10000) while (!test2bFinished && ++count < 10000)
#if defined(WIN32) #if defined(WIN32)
Sleep(100); Sleep(100);
...@@ -750,8 +834,7 @@ int test2b(struct Options options) ...@@ -750,8 +834,7 @@ int test2b(struct Options options)
usleep(10000L); usleep(10000L);
#endif #endif
exit: exit: MQTTAsync_destroy(&c);
MQTTAsync_destroy(&c);
MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.", MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", testname, tests, failures); (failures == 0) ? "passed" : "failed", testname, tests, failures);
...@@ -779,42 +862,51 @@ void test2cOnConnect(void* context, MQTTAsync_successData* response) ...@@ -779,42 +862,51 @@ void test2cOnConnect(void* context, MQTTAsync_successData* response)
MyLog(LOGA_DEBUG, "In test2cOnConnectFailure callback, context %p", context); MyLog(LOGA_DEBUG, "In test2cOnConnectFailure callback, context %p", context);
assert("This connect should not succeed. ", 0, "test2cOnConnect callback was called\n", 0); assert("This connect should not succeed. ", 0, "test2cOnConnect callback was called\n", 0);
failures++;
test2cFinished = 1; test2cFinished = 1;
} }
int test2c(struct Options options) int test2c(struct Options options)
{ {
char* testname = "test2c"; char* testname = "test2c";
char* test_topic = "C client test2c";
int subsqos = 2; int subsqos = 2;
MQTTAsync c; MQTTAsync c;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer; MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer; MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
MQTTAsync_SSLOptions sslopts = MQTTAsync_SSLOptions_initializer; MQTTAsync_SSLOptions sslopts = MQTTAsync_SSLOptions_initializer;
int rc, count = 0; int rc = 0;
char* test_topic = "C client test2c";
char url[100];
int count = 0;
test2cFinished = 0; failures = 0;
failures = 0;
MyLog( MyLog(
LOGA_INFO, LOGA_INFO,
"Starting test 2c - connection to SSL MQTT server, server auth enabled but unknown cert"); "Starting test 2c - connection to SSL MQTT server, server auth enabled but unknown cert");
if (!(assert("good rc from create", (rc = MQTTAsync_create(&c, options.connection, "test2c", MQTTCLIENT_PERSISTENCE_DEFAULT, persistenceStore)) == MQTTASYNC_SUCCESS, "rc was %d\n", rc))) snprintf(url, 100, "ssl://%s:8883", options.connection);
rc = MQTTAsync_create(&c, url, "test2c", MQTTCLIENT_PERSISTENCE_DEFAULT,
NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit; goto exit;
}
opts.keepAliveInterval = 20; opts.keepAliveInterval = 20;
opts.cleansession = 1; opts.cleansession = 1;
opts.username = "testuser"; opts.username = "testuser";
opts.password = "testpassword"; opts.password = "testpassword";
opts.will = &wopts;
opts.will->message = "will message";
opts.will->qos = 1;
opts.will->retained = 0;
opts.will->topicName = "will topic";
opts.will = NULL;
opts.onSuccess = test2cOnConnect; opts.onSuccess = test2cOnConnect;
opts.onFailure = test2cOnConnectFailure; opts.onFailure = test2cOnConnectFailure;
opts.context = c; opts.context = c;
if (options.haconnections != NULL)
{
opts.serverURIs = options.haconnections;
opts.serverURIcount = options.hacount;
}
opts.ssl = &sslopts; opts.ssl = &sslopts;
//if (options.server_key_file != NULL) opts.ssl->trustStore = options.server_key_file; /*file of certificates trusted by client*/ //if (options.server_key_file != NULL) opts.ssl->trustStore = options.server_key_file; /*file of certificates trusted by client*/
...@@ -825,9 +917,13 @@ int test2c(struct Options options) ...@@ -825,9 +917,13 @@ int test2c(struct Options options)
//opts.ssl->enabledServerCertAuth = 0; //opts.ssl->enabledServerCertAuth = 0;
MyLog(LOGA_DEBUG, "Connecting"); MyLog(LOGA_DEBUG, "Connecting");
rc = MQTTAsync_connect(c, &opts);
if (!(assert("Good rc from connect", (rc = MQTTAsync_connect(c, &opts)) == MQTTASYNC_SUCCESS, "rc was %d", rc))) assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
{
failures++;
goto exit; goto exit;
}
while (!test2cFinished && ++count < 10000) while (!test2cFinished && ++count < 10000)
#if defined(WIN32) #if defined(WIN32)
...@@ -836,8 +932,7 @@ int test2c(struct Options options) ...@@ -836,8 +932,7 @@ int test2c(struct Options options)
usleep(10000L); usleep(10000L);
#endif #endif
exit: exit: MQTTAsync_destroy(&c);
MQTTAsync_destroy(&c);
MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.", MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", testname, tests, failures); (failures == 0) ? "passed" : "failed", testname, tests, failures);
...@@ -856,7 +951,6 @@ void test3aOnConnectFailure(void* context, MQTTAsync_failureData* response) ...@@ -856,7 +951,6 @@ void test3aOnConnectFailure(void* context, MQTTAsync_failureData* response)
MyLog(LOGA_DEBUG, "In test3aOnConnectFailure callback, context %p", context); MyLog(LOGA_DEBUG, "In test3aOnConnectFailure callback, context %p", context);
assert("There should be no failures in this test. ", 0, "test3aOnConnectFailure callback was called\n", 0); assert("There should be no failures in this test. ", 0, "test3aOnConnectFailure callback was called\n", 0);
failures++;
client->testFinished = 1; client->testFinished = 1;
} }
...@@ -871,13 +965,15 @@ int test3a(struct Options options) ...@@ -871,13 +965,15 @@ int test3a(struct Options options)
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer; MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer; MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
MQTTAsync_SSLOptions sslopts = MQTTAsync_SSLOptions_initializer; MQTTAsync_SSLOptions sslopts = MQTTAsync_SSLOptions_initializer;
int i, rc = 0; int rc = 0;
char url[100];
int i;
failures = 0; failures = 0;
MyLog(LOGA_INFO, "Starting test 3a - Server authentication"); MyLog(LOGA_INFO, "Starting test 3a - Server authentication");
if (!(assert("good rc from create", (rc = MQTTAsync_create(&c, options.connection, "test3a", MQTTCLIENT_PERSISTENCE_DEFAULT, persistenceStore)) == MQTTASYNC_SUCCESS, "rc was %d\n", rc))) snprintf(url, 100, "ssl://%s:8885", options.connection);
goto exit; MQTTAsync_create(&c, url, "test3a", MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
tc.client = c; tc.client = c;
sprintf(tc.clientid, "%s", testname); sprintf(tc.clientid, "%s", testname);
...@@ -891,14 +987,16 @@ int test3a(struct Options options) ...@@ -891,14 +987,16 @@ int test3a(struct Options options)
opts.cleansession = 1; opts.cleansession = 1;
opts.username = "testuser"; opts.username = "testuser";
opts.password = "testpassword"; opts.password = "testpassword";
opts.will = &wopts;
opts.will->message = "will message";
opts.will->qos = 1;
opts.will->retained = 0;
opts.will->topicName = "will topic";
opts.will = NULL;
opts.onSuccess = asyncTestOnConnect; opts.onSuccess = asyncTestOnConnect;
opts.onFailure = test3aOnConnectFailure; opts.onFailure = test3aOnConnectFailure;
opts.context = &tc; opts.context = &tc;
if (options.haconnections != NULL)
{
opts.serverURIs = options.haconnections;
opts.serverURIcount = options.hacount;
}
opts.ssl = &sslopts; opts.ssl = &sslopts;
if (options.server_key_file != NULL) if (options.server_key_file != NULL)
...@@ -908,15 +1006,45 @@ int test3a(struct Options options) ...@@ -908,15 +1006,45 @@ int test3a(struct Options options)
//opts.ssl->enabledCipherSuites = "DEFAULT"; //opts.ssl->enabledCipherSuites = "DEFAULT";
//opts.ssl->enabledServerCertAuth = 1; //opts.ssl->enabledServerCertAuth = 1;
rc = MQTTAsync_setCallbacks(c, &tc, NULL, asyncTestMessageArrived,
asyncTestOnDeliveryComplete);
assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (!(assert("Good rc from setCallbacks", (rc = MQTTAsync_setCallbacks(c, &tc, NULL, asyncTestMessageArrived, asyncTestOnDeliveryComplete)) == MQTTASYNC_SUCCESS, "rc was %d", rc))) MyLog(LOGA_DEBUG, "Connecting");
rc = MQTTAsync_connect(c, &opts);
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit; goto exit;
MyLog(LOGA_DEBUG, "Connecting"); while (!tc.subscribed && !tc.testFinished)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
if (!(assert("Good rc from connect", (rc = MQTTAsync_connect(c, &opts)) == MQTTASYNC_SUCCESS, "rc was %d", rc))) if (tc.testFinished)
goto exit; goto exit;
for (i = 0; i < 3; i++)
{
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
pubmsg.payload
= "a much longer message that we can shorten to the extent that we need to payload up to 11";
pubmsg.payloadlen = 11;
pubmsg.qos = i;
pubmsg.retained = 0;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
opts.onSuccess = asyncTestOnSend;
opts.context = &tc;
rc = MQTTAsync_send(c, tc.topic, pubmsg.payloadlen, pubmsg.payload,
pubmsg.qos, pubmsg.retained, &opts);
assert("Good rc from publish", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
}
while (!tc.testFinished) while (!tc.testFinished)
#if defined(WIN32) #if defined(WIN32)
Sleep(100); Sleep(100);
...@@ -926,9 +1054,9 @@ int test3a(struct Options options) ...@@ -926,9 +1054,9 @@ int test3a(struct Options options)
MyLog(LOGA_DEBUG, "Stopping"); MyLog(LOGA_DEBUG, "Stopping");
exit:
MQTTAsync_destroy(&c); MQTTAsync_destroy(&c);
MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.", (failures
exit: MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.", (failures
== 0) ? "passed" : "failed", testname, tests, failures); == 0) ? "passed" : "failed", testname, tests, failures);
return failures; return failures;
...@@ -955,7 +1083,6 @@ void test3bOnConnect(void* context, MQTTAsync_successData* response) ...@@ -955,7 +1083,6 @@ void test3bOnConnect(void* context, MQTTAsync_successData* response)
MyLog(LOGA_DEBUG, "In test3bOnConnectFailure callback, context %p", context); MyLog(LOGA_DEBUG, "In test3bOnConnectFailure callback, context %p", context);
assert("This connect should not succeed. ", 0, "test3bOnConnect callback was called\n", 0); assert("This connect should not succeed. ", 0, "test3bOnConnect callback was called\n", 0);
failures++;
test3bFinished = 1; test3bFinished = 1;
} }
...@@ -967,27 +1094,37 @@ int test3b(struct Options options) ...@@ -967,27 +1094,37 @@ int test3b(struct Options options)
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer; MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer; MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
MQTTAsync_SSLOptions sslopts = MQTTAsync_SSLOptions_initializer; MQTTAsync_SSLOptions sslopts = MQTTAsync_SSLOptions_initializer;
int count, rc = 0; int rc = 0;
char url[100];
int count = 0;
test3bFinished = 0; test3bFinished = 0;
failures = 0; failures = 0;
MyLog( LOGA_INFO, "Starting test 3b - connection to SSL MQTT server with clientauth=opt but client does not have server cert"); MyLog(
LOGA_INFO,
"Starting test 3b - connection to SSL MQTT server with clientauth=opt but client does not have server cert");
if (!(assert("good rc from create", (rc = MQTTAsync_create(&c, options.connection, "test3b", MQTTCLIENT_PERSISTENCE_DEFAULT, persistenceStore)) == MQTTASYNC_SUCCESS, "rc was %d\n", rc))) snprintf(url, 100, "ssl://%s:8885", options.connection);
rc = MQTTAsync_create(&c, url, "test3b", MQTTCLIENT_PERSISTENCE_DEFAULT,
NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit; goto exit;
opts.keepAliveInterval = 20; opts.keepAliveInterval = 20;
opts.cleansession = 1; opts.cleansession = 1;
opts.username = "testuser"; opts.username = "testuser";
opts.password = "testpassword"; opts.password = "testpassword";
opts.will = &wopts;
opts.will->message = "will message";
opts.will->qos = 1;
opts.will->retained = 0;
opts.will->topicName = "will topic";
opts.will = NULL;
opts.onSuccess = test3bOnConnect; opts.onSuccess = test3bOnConnect;
opts.onFailure = test3bOnConnectFailure; opts.onFailure = test3bOnConnectFailure;
opts.context = c; opts.context = c;
if (options.haconnections != NULL)
{
opts.serverURIs = options.haconnections;
opts.serverURIcount = options.hacount;
}
opts.ssl = &sslopts; opts.ssl = &sslopts;
//if (options.server_key_file != NULL) opts.ssl->trustStore = options.server_key_file; /*file of certificates trusted by client*/ //if (options.server_key_file != NULL) opts.ssl->trustStore = options.server_key_file; /*file of certificates trusted by client*/
...@@ -997,10 +1134,11 @@ int test3b(struct Options options) ...@@ -997,10 +1134,11 @@ int test3b(struct Options options)
//opts.ssl->enabledServerCertAuth = 0; //opts.ssl->enabledServerCertAuth = 0;
MyLog(LOGA_DEBUG, "Connecting"); MyLog(LOGA_DEBUG, "Connecting");
rc = MQTTAsync_connect(c, &opts);
if (!(assert("Good rc from connect", (rc = MQTTAsync_connect(c, &opts)) == MQTTASYNC_SUCCESS, "rc was %d", rc))) assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit; goto exit;
while (!test3bFinished && ++count < 10000) while (!test3bFinished && ++count < 10000)
#if defined(WIN32) #if defined(WIN32)
Sleep(100); Sleep(100);
...@@ -1008,8 +1146,7 @@ int test3b(struct Options options) ...@@ -1008,8 +1146,7 @@ int test3b(struct Options options)
usleep(10000L); usleep(10000L);
#endif #endif
exit: exit: MQTTAsync_destroy(&c);
MQTTAsync_destroy(&c);
MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.", MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", testname, tests, failures); (failures == 0) ? "passed" : "failed", testname, tests, failures);
...@@ -1028,18 +1165,14 @@ void test4OnConnectFailure(void* context, MQTTAsync_failureData* response) ...@@ -1028,18 +1165,14 @@ void test4OnConnectFailure(void* context, MQTTAsync_failureData* response)
MyLog(LOGA_DEBUG, "In test4OnConnectFailure callback, context %p", context); MyLog(LOGA_DEBUG, "In test4OnConnectFailure callback, context %p", context);
assert("There should be no failures in this test. ", 0, "test4OnConnectFailure callback was called\n", 0); assert("There should be no failures in this test. ", 0, "test4OnConnectFailure callback was called\n", 0);
failures++;
client->testFinished = 1; client->testFinished = 1;
} }
void test4OnPublishFailure(void* context, MQTTAsync_failureData* response) void test4OnPublishFailure(void* context, MQTTAsync_failureData* response)
{ {
AsyncTestClient* client = (AsyncTestClient*) context; MyLog(LOGA_DEBUG, "In test4OnPublishFailure callback, context %p", context);
MyLog(LOGA_DEBUG, "In test4OnPublishFailure callback, context %p", context);
assert("There should be no failures in this test. ", 0, "test4OnPublishFailure callback was called\n", 0); assert("There should be no failures in this test. ", 0, "test4OnPublishFailure callback was called\n", 0);
failures++;
client->testFinished = 1;
} }
int test4(struct Options options) int test4(struct Options options)
...@@ -1047,19 +1180,22 @@ int test4(struct Options options) ...@@ -1047,19 +1180,22 @@ int test4(struct Options options)
char* testname = "test4"; char* testname = "test4";
int subsqos = 2; int subsqos = 2;
/* TODO - usused - remove ? MQTTAsync_deliveryToken* dt = NULL; */ /* TODO - usused - remove ? MQTTAsync_deliveryToken* dt = NULL; */
AsyncTestClient tc = AsyncTestClient_initializer; AsyncTestClient tc =
AsyncTestClient_initializer;
MQTTAsync c; MQTTAsync c;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer; MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer; MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
MQTTAsync_SSLOptions sslopts = MQTTAsync_SSLOptions_initializer; MQTTAsync_SSLOptions sslopts = MQTTAsync_SSLOptions_initializer;
int i, rc = 0; int rc = 0;
char url[100];
int i;
failures = 0; failures = 0;
MyLog(LOGA_INFO, "Starting test 4 - accept invalid server certificates"); MyLog(LOGA_INFO, "Starting test 4 - accept invalid server certificates");
if (!(assert("Good rc from create", (rc = MQTTAsync_create(&c, options.connection, "test4", MQTTCLIENT_PERSISTENCE_NONE, persistenceStore)) == MQTTASYNC_SUCCESS, "rc was %d", rc))) snprintf(url, 100, "ssl://%s:8885", options.connection);
goto exit; MQTTAsync_create(&c, url, "test4", MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
tc.client = c; tc.client = c;
sprintf(tc.clientid, "%s", testname); sprintf(tc.clientid, "%s", testname);
sprintf(tc.topic, "C client SSL test4"); sprintf(tc.topic, "C client SSL test4");
...@@ -1072,14 +1208,16 @@ int test4(struct Options options) ...@@ -1072,14 +1208,16 @@ int test4(struct Options options)
opts.cleansession = 1; opts.cleansession = 1;
opts.username = "testuser"; opts.username = "testuser";
opts.password = "testpassword"; opts.password = "testpassword";
opts.will = &wopts;
opts.will->message = "will message";
opts.will->qos = 1;
opts.will->retained = 0;
opts.will->topicName = "will topic";
opts.will = NULL;
opts.onSuccess = asyncTestOnConnect; opts.onSuccess = asyncTestOnConnect;
opts.onFailure = test4OnConnectFailure; opts.onFailure = test4OnConnectFailure;
opts.context = &tc; opts.context = &tc;
if (options.haconnections != NULL)
{
opts.serverURIs = options.haconnections;
opts.serverURIcount = options.hacount;
}
opts.ssl = &sslopts; opts.ssl = &sslopts;
//if (options.server_key_file != NULL) opts.ssl->trustStore = options.server_key_file; /*file of certificates trusted by client*/ //if (options.server_key_file != NULL) opts.ssl->trustStore = options.server_key_file; /*file of certificates trusted by client*/
...@@ -1088,27 +1226,55 @@ int test4(struct Options options) ...@@ -1088,27 +1226,55 @@ int test4(struct Options options)
//opts.ssl->enabledCipherSuites = "DEFAULT"; //opts.ssl->enabledCipherSuites = "DEFAULT";
opts.ssl->enableServerCertAuth = 0; opts.ssl->enableServerCertAuth = 0;
if (!(assert("Good rc from setCallbacks", (rc = MQTTAsync_setCallbacks(c, &tc, NULL, asyncTestMessageArrived, asyncTestOnDeliveryComplete)) == MQTTASYNC_SUCCESS, "rc was %d", rc))) rc = MQTTAsync_setCallbacks(c, &tc, NULL, asyncTestMessageArrived,
goto exit; asyncTestOnDeliveryComplete);
assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
MyLog(LOGA_DEBUG, "Connecting"); MyLog(LOGA_DEBUG, "Connecting");
rc = MQTTAsync_connect(c, &opts);
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit;
if (!(assert("Good rc from connect", (rc = MQTTAsync_connect(c, &opts)) == MQTTASYNC_SUCCESS, "rc was %d", rc))) while (!tc.subscribed && !tc.testFinished)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
if (tc.testFinished)
goto exit; goto exit;
for (i = 0; i < 3; i++)
{
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
pubmsg.payload
= "a much longer message that we can shorten to the extent that we need to payload up to 11";
pubmsg.payloadlen = 11;
pubmsg.qos = i;
pubmsg.retained = 0;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
opts.onSuccess = asyncTestOnSend;
opts.context = &tc;
rc = MQTTAsync_send(c, tc.topic, pubmsg.payloadlen, pubmsg.payload,
pubmsg.qos, pubmsg.retained, &opts);
assert("Good rc from publish", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
}
while (!tc.testFinished) while (!tc.testFinished)
{
#if defined(WIN32) #if defined(WIN32)
Sleep(100); Sleep(100);
#else #else
usleep(10000L); usleep(10000L);
#endif #endif
}
MyLog(LOGA_DEBUG, "Stopping"); MyLog(LOGA_DEBUG, "Stopping");
exit: exit: MQTTAsync_destroy(&c);
MQTTAsync_destroy(&c);
MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.", MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", testname, tests, failures); (failures == 0) ? "passed" : "failed", testname, tests, failures);
...@@ -1127,36 +1293,41 @@ void test5aOnConnectFailure(void* context, MQTTAsync_failureData* response) ...@@ -1127,36 +1293,41 @@ void test5aOnConnectFailure(void* context, MQTTAsync_failureData* response)
MyLog(LOGA_DEBUG, "In test5aOnConnectFailure callback, context %p", context); MyLog(LOGA_DEBUG, "In test5aOnConnectFailure callback, context %p", context);
assert("There should be no failures in this test. ", 0, "test5aOnConnectFailure callback was called\n", 0); assert("There should be no failures in this test. ", 0, "test5aOnConnectFailure callback was called\n", 0);
failures++;
client->testFinished = 1; client->testFinished = 1;
} }
void test5aOnPublishFailure(void* context, MQTTAsync_failureData* response) void test5aOnPublishFailure(void* context, MQTTAsync_failureData* response)
{ {
AsyncTestClient* client = (AsyncTestClient*) context; MyLog(LOGA_DEBUG, "In test5aOnPublishFailure callback, context %p", context);
MyLog(LOGA_DEBUG, "In test5aOnPublishFailure callback, context %p", context);
assert("There should be no failures in this test. ", 0, "test5aOnPublishFailure callback was called\n", 0); assert("There should be no failures in this test. ", 0, "test5aOnPublishFailure callback was called\n", 0);
failures++;
client->testFinished = 1;
} }
int test5a(struct Options options) int test5a(struct Options options)
{ {
char* testname = "test5a"; char* testname = "test5a";
AsyncTestClient tc = AsyncTestClient_initializer; AsyncTestClient tc =
AsyncTestClient_initializer;
MQTTAsync c; MQTTAsync c;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer; MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
MQTTAsync_SSLOptions sslopts = MQTTAsync_SSLOptions_initializer; MQTTAsync_SSLOptions sslopts = MQTTAsync_SSLOptions_initializer;
int i, rc = 0; int rc = 0;
char url[100];
int i;
failures = 0; failures = 0;
MyLog(LOGA_INFO, "Starting SSL test 5a - Anonymous ciphers - server authentication disabled"); MyLog(LOGA_INFO,
if (!(assert("good rc from create", (rc = MQTTAsync_create(&c, options.connection, "test5a", MQTTCLIENT_PERSISTENCE_DEFAULT, persistenceStore)) == MQTTASYNC_SUCCESS, "rc was %d\n", rc))) "Starting SSL test 5a - Anonymous ciphers - server authentication disabled");
snprintf(url, 100, "ssl://%s:8886", options.connection);
rc = MQTTAsync_create(&c, url, "test5a", MQTTCLIENT_PERSISTENCE_DEFAULT,
NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit; goto exit;
tc.client = c; tc.client = c;
sprintf(tc.clientid, "%s", testname); sprintf(tc.clientid, "%s", testname);
sprintf(tc.topic, "C client SSL test5a"); sprintf(tc.topic, "C client SSL test5a");
...@@ -1169,14 +1340,16 @@ int test5a(struct Options options) ...@@ -1169,14 +1340,16 @@ int test5a(struct Options options)
opts.cleansession = 1; opts.cleansession = 1;
opts.username = "testuser"; opts.username = "testuser";
opts.password = "testpassword"; opts.password = "testpassword";
opts.will = &wopts;
opts.will->message = "will message";
opts.will->qos = 1;
opts.will->retained = 0;
opts.will->topicName = "will topic";
opts.will = NULL;
opts.onSuccess = asyncTestOnConnect; opts.onSuccess = asyncTestOnConnect;
opts.onFailure = test5aOnConnectFailure; opts.onFailure = test5aOnConnectFailure;
opts.context = &tc; opts.context = &tc;
if (options.haconnections != NULL)
{
opts.serverURIs = options.haconnections;
opts.serverURIcount = options.hacount;
}
opts.ssl = &sslopts; opts.ssl = &sslopts;
//opts.ssl->trustStore = /*file of certificates trusted by client*/ //opts.ssl->trustStore = /*file of certificates trusted by client*/
...@@ -1185,14 +1358,45 @@ int test5a(struct Options options) ...@@ -1185,14 +1358,45 @@ int test5a(struct Options options)
opts.ssl->enabledCipherSuites = "aNULL"; opts.ssl->enabledCipherSuites = "aNULL";
opts.ssl->enableServerCertAuth = 0; opts.ssl->enableServerCertAuth = 0;
if (!(assert("Good rc from setCallbacks", (rc = MQTTAsync_setCallbacks(c, &tc, NULL, asyncTestMessageArrived, asyncTestOnDeliveryComplete)) == MQTTASYNC_SUCCESS, "rc was %d", rc))) rc = MQTTAsync_setCallbacks(c, &tc, NULL, asyncTestMessageArrived,
goto exit; asyncTestOnDeliveryComplete);
assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
MyLog(LOGA_DEBUG, "Connecting"); MyLog(LOGA_DEBUG, "Connecting");
rc = MQTTAsync_connect(c, &opts);
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit;
while (!tc.subscribed && !tc.testFinished)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
if (!(assert("Good rc from connect", (rc = MQTTAsync_connect(c, &opts)) == MQTTASYNC_SUCCESS, "rc was %d", rc))) if (tc.testFinished)
goto exit; goto exit;
for (i = 0; i < 3; i++)
{
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
pubmsg.payload
= "a much longer message that we can shorten to the extent that we need to payload up to 11";
pubmsg.payloadlen = 11;
pubmsg.qos = i;
pubmsg.retained = 0;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
opts.onSuccess = asyncTestOnSend;
opts.context = &tc;
rc = MQTTAsync_send(c, tc.topic, pubmsg.payloadlen, pubmsg.payload,
pubmsg.qos, pubmsg.retained, &opts);
assert("Good rc from publish", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
}
while (!tc.testFinished) while (!tc.testFinished)
#if defined(WIN32) #if defined(WIN32)
Sleep(100); Sleep(100);
...@@ -1202,8 +1406,7 @@ int test5a(struct Options options) ...@@ -1202,8 +1406,7 @@ int test5a(struct Options options)
MyLog(LOGA_DEBUG, "Stopping"); MyLog(LOGA_DEBUG, "Stopping");
exit: exit: MQTTAsync_destroy(&c);
MQTTAsync_destroy(&c);
MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.", MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", testname, tests, failures); (failures == 0) ? "passed" : "failed", testname, tests, failures);
...@@ -1222,18 +1425,14 @@ void test5bOnConnectFailure(void* context, MQTTAsync_failureData* response) ...@@ -1222,18 +1425,14 @@ void test5bOnConnectFailure(void* context, MQTTAsync_failureData* response)
MyLog(LOGA_DEBUG, "In test5bOnConnectFailure callback, context %p", context); MyLog(LOGA_DEBUG, "In test5bOnConnectFailure callback, context %p", context);
assert("There should be no failures in this test. ", 0, "test5bOnConnectFailure callback was called\n", 0); assert("There should be no failures in this test. ", 0, "test5bOnConnectFailure callback was called\n", 0);
failures++;
client->testFinished = 1; client->testFinished = 1;
} }
void test5bOnPublishFailure(void* context, MQTTAsync_failureData* response) void test5bOnPublishFailure(void* context, MQTTAsync_failureData* response)
{ {
AsyncTestClient* client = (AsyncTestClient*) context; MyLog(LOGA_DEBUG, "In test5bOnPublishFailure callback, context %p", context);
MyLog(LOGA_DEBUG, "In test5bOnPublishFailure callback, context %p", context);
assert("There should be no failures in this test. ", 0, "test5bOnPublishFailure callback was called\n", 0); assert("There should be no failures in this test. ", 0, "test5bOnPublishFailure callback was called\n", 0);
failures++;
client->testFinished = 1;
} }
int test5b(struct Options options) int test5b(struct Options options)
...@@ -1244,16 +1443,23 @@ int test5b(struct Options options) ...@@ -1244,16 +1443,23 @@ int test5b(struct Options options)
AsyncTestClient_initializer; AsyncTestClient_initializer;
MQTTAsync c; MQTTAsync c;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer; MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
MQTTAsync_SSLOptions sslopts = MQTTAsync_SSLOptions_initializer; MQTTAsync_SSLOptions sslopts = MQTTAsync_SSLOptions_initializer;
int i, rc = 0; int rc = 0;
char url[100];
int i;
failures = 0; failures = 0;
MyLog(LOGA_INFO, MyLog(LOGA_INFO,
"Starting SSL test 5b - Anonymous ciphers - server authentication enabled"); "Starting SSL test 5b - Anonymous ciphers - server authentication enabled");
if (!(assert("good rc from create", (rc = MQTTAsync_create(&c, options.connection, "test5b", MQTTCLIENT_PERSISTENCE_DEFAULT, persistenceStore)) == MQTTASYNC_SUCCESS, "rc was %d\n", rc))) snprintf(url, 100, "ssl://%s:8886", options.connection);
rc = MQTTAsync_create(&c, url, "test5b", MQTTCLIENT_PERSISTENCE_DEFAULT,
NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit; goto exit;
tc.client = c; tc.client = c;
sprintf(tc.clientid, "%s", testname); sprintf(tc.clientid, "%s", testname);
sprintf(tc.topic, "C client SSL test5b"); sprintf(tc.topic, "C client SSL test5b");
...@@ -1266,14 +1472,16 @@ int test5b(struct Options options) ...@@ -1266,14 +1472,16 @@ int test5b(struct Options options)
opts.cleansession = 1; opts.cleansession = 1;
opts.username = "testuser"; opts.username = "testuser";
opts.password = "testpassword"; opts.password = "testpassword";
opts.will = &wopts;
opts.will->message = "will message";
opts.will->qos = 1;
opts.will->retained = 0;
opts.will->topicName = "will topic";
opts.will = NULL;
opts.onSuccess = asyncTestOnConnect; opts.onSuccess = asyncTestOnConnect;
opts.onFailure = test5bOnConnectFailure; opts.onFailure = test5bOnConnectFailure;
opts.context = &tc; opts.context = &tc;
if (options.haconnections != NULL)
{
opts.serverURIs = options.haconnections;
opts.serverURIcount = options.hacount;
}
opts.ssl = &sslopts; opts.ssl = &sslopts;
//opts.ssl->trustStore = /*file of certificates trusted by client*/ //opts.ssl->trustStore = /*file of certificates trusted by client*/
...@@ -1282,18 +1490,45 @@ int test5b(struct Options options) ...@@ -1282,18 +1490,45 @@ int test5b(struct Options options)
opts.ssl->enabledCipherSuites = "aNULL"; opts.ssl->enabledCipherSuites = "aNULL";
opts.ssl->enableServerCertAuth = 1; opts.ssl->enableServerCertAuth = 1;
rc = MQTTAsync_setCallbacks(c, &tc, NULL, asyncTestMessageArrived, asyncTestOnDeliveryComplete); rc = MQTTAsync_setCallbacks(c, &tc, NULL, asyncTestMessageArrived,
asyncTestOnDeliveryComplete);
assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc); assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
MyLog(LOGA_DEBUG, "Connecting"); MyLog(LOGA_DEBUG, "Connecting");
rc = MQTTAsync_connect(c, &opts); rc = MQTTAsync_connect(c, &opts);
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc); assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS) if (rc != MQTTASYNC_SUCCESS)
{
failures++;
goto exit; goto exit;
}
while (!tc.subscribed && !tc.testFinished)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
if (tc.testFinished)
goto exit;
for (i = 0; i < 3; i++)
{
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
pubmsg.payload
= "a much longer message that we can shorten to the extent that we need to payload up to 11";
pubmsg.payloadlen = 11;
pubmsg.qos = i;
pubmsg.retained = 0;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
opts.onSuccess = asyncTestOnSend;
opts.context = &tc;
rc = MQTTAsync_send(c, tc.topic, pubmsg.payloadlen, pubmsg.payload,
pubmsg.qos, pubmsg.retained, &opts);
assert("Good rc from publish", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
}
while (!tc.testFinished) while (!tc.testFinished)
#if defined(WIN32) #if defined(WIN32)
Sleep(100); Sleep(100);
...@@ -1303,8 +1538,7 @@ int test5b(struct Options options) ...@@ -1303,8 +1538,7 @@ int test5b(struct Options options)
MyLog(LOGA_DEBUG, "Stopping"); MyLog(LOGA_DEBUG, "Stopping");
exit: exit: MQTTAsync_destroy(&c);
MQTTAsync_destroy(&c);
MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.", MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", testname, tests, failures); (failures == 0) ? "passed" : "failed", testname, tests, failures);
...@@ -1329,11 +1563,9 @@ void test5cOnConnectFailure(void* context, MQTTAsync_failureData* response) ...@@ -1329,11 +1563,9 @@ void test5cOnConnectFailure(void* context, MQTTAsync_failureData* response)
void test5cOnConnect(void* context, MQTTAsync_successData* response) void test5cOnConnect(void* context, MQTTAsync_successData* response)
{ {
AsyncTestClient* client = (AsyncTestClient*) context; MyLog(LOGA_DEBUG, "In test5cOnConnectFailure callback, context %p", context);
MyLog(LOGA_DEBUG, "In test5cOnConnectFailure callback, context %p", context);
assert("This connect should not succeed. ", 0, "test5cOnConnect callback was called\n", 0); assert("This connect should not succeed. ", 0, "test5cOnConnect callback was called\n", 0);
failures++;
test5cFinished = 1; test5cFinished = 1;
} }
...@@ -1346,26 +1578,35 @@ int test5c(struct Options options) ...@@ -1346,26 +1578,35 @@ int test5c(struct Options options)
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer; MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
MQTTAsync_SSLOptions sslopts = MQTTAsync_SSLOptions_initializer; MQTTAsync_SSLOptions sslopts = MQTTAsync_SSLOptions_initializer;
int rc = 0; int rc = 0;
char url[100];
int count = 0; int count = 0;
test5cFinished = 0; test5cFinished = 0;
failures = 0; failures = 0;
MyLog(LOGA_INFO, "Starting SSL test 5c - Anonymous ciphers - client not using anonymous cipher"); MyLog(LOGA_INFO,
if (!(assert("good rc from create", (rc = MQTTAsync_create(&c, options.connection, "test5c", MQTTCLIENT_PERSISTENCE_DEFAULT, persistenceStore)) == MQTTASYNC_SUCCESS, "rc was %d\n", rc))) "Starting SSL test 5c - Anonymous ciphers - client not using anonymous cipher");
snprintf(url, 100, "ssl://%s:8886", options.connection);
rc = MQTTAsync_create(&c, url, "test5c", MQTTCLIENT_PERSISTENCE_DEFAULT,
NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit; goto exit;
opts.keepAliveInterval = 20; opts.keepAliveInterval = 20;
opts.cleansession = 1; opts.cleansession = 1;
opts.username = "testuser"; opts.username = "testuser";
opts.password = "testpassword"; opts.password = "testpassword";
opts.will = &wopts;
opts.will->message = "will message";
opts.will->qos = 1;
opts.will->retained = 0;
opts.will->topicName = "will topic";
opts.will = NULL;
opts.onSuccess = test5cOnConnect; opts.onSuccess = test5cOnConnect;
opts.onFailure = test5cOnConnectFailure; opts.onFailure = test5cOnConnectFailure;
opts.context = c; opts.context = c;
if (options.haconnections != NULL)
{
opts.serverURIs = options.haconnections;
opts.serverURIcount = options.hacount;
}
opts.ssl = &sslopts; opts.ssl = &sslopts;
//opts.ssl->trustStore = /*file of certificates trusted by client*/ //opts.ssl->trustStore = /*file of certificates trusted by client*/
...@@ -1375,14 +1616,11 @@ int test5c(struct Options options) ...@@ -1375,14 +1616,11 @@ int test5c(struct Options options)
opts.ssl->enableServerCertAuth = 0; opts.ssl->enableServerCertAuth = 0;
MyLog(LOGA_DEBUG, "Connecting"); MyLog(LOGA_DEBUG, "Connecting");
rc = MQTTAsync_connect(c, &opts);
assert("Good rc from connect", (rc = MQTTAsync_connect(c, &opts)) == MQTTASYNC_SUCCESS, "rc was %d\n", rc); assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
if (rc != MQTTASYNC_SUCCESS) if (rc != MQTTASYNC_SUCCESS)
{
failures++;
goto exit; goto exit;
}
while (!test5cFinished && ++count < 10000) while (!test5cFinished && ++count < 10000)
#if defined(WIN32) #if defined(WIN32)
Sleep(100); Sleep(100);
...@@ -1390,8 +1628,7 @@ int test5c(struct Options options) ...@@ -1390,8 +1628,7 @@ int test5c(struct Options options)
usleep(10000L); usleep(10000L);
#endif #endif
exit: exit: MQTTAsync_destroy(&c);
MQTTAsync_destroy(&c);
MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.", MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", testname, tests, failures); (failures == 0) ? "passed" : "failed", testname, tests, failures);
...@@ -1410,36 +1647,33 @@ void test6OnConnectFailure(void* context, MQTTAsync_failureData* response) ...@@ -1410,36 +1647,33 @@ void test6OnConnectFailure(void* context, MQTTAsync_failureData* response)
MyLog(LOGA_DEBUG, "In test6OnConnectFailure callback, context %p", context); MyLog(LOGA_DEBUG, "In test6OnConnectFailure callback, context %p", context);
assert("There should be no failures in this test. ", 0, "test6OnConnectFailure callback was called\n", 0); assert("There should be no failures in this test. ", 0, "test6OnConnectFailure callback was called\n", 0);
failures++;
client->testFinished = 1; client->testFinished = 1;
} }
void test6OnPublishFailure(void* context, MQTTAsync_failureData* response) void test6OnPublishFailure(void* context, MQTTAsync_failureData* response)
{ {
AsyncTestClient* client = (AsyncTestClient*) context; MyLog(LOGA_DEBUG, "In test6OnPublishFailure callback, context %p", context);
MyLog(LOGA_DEBUG, "In test6OnPublishFailure callback, context %p", context);
assert("There should be no failures in this test. ", 0, "test6OnPublishFailure callback was called\n", 0); assert("There should be no failures in this test. ", 0, "test6OnPublishFailure callback was called\n", 0);
failures++;
client->testFinished = 1;
} }
int test6(struct Options options) int test6(struct Options options)
{ {
#define NUM_CLIENTS 10
char* testname = "test6"; char* testname = "test6";
const int num_clients = NUM_CLIENTS; const int num_clients = 10;
int subsqos = 2; int subsqos = 2;
AsyncTestClient tc[NUM_CLIENTS];
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer; MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer; MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
MQTTAsync_SSLOptions sslopts = MQTTAsync_SSLOptions_initializer; MQTTAsync_SSLOptions sslopts = MQTTAsync_SSLOptions_initializer;
int rc = 0; int rc = 0;
int i; int i;
char url[100];
AsyncTestClient tc[num_clients];
int test6finished = 0; int test6finished = 0;
failures = 0;
MyLog(LOGA_INFO, "Starting test 6 - multiple connections"); MyLog(LOGA_INFO, "Starting test 6 - multiple connections");
snprintf(url, 100, "ssl://%s:8883", options.connection);
for (i = 0; i < num_clients; ++i) for (i = 0; i < num_clients; ++i)
{ {
tc[i].maxmsgs = MAXMSGS; tc[i].maxmsgs = MAXMSGS;
...@@ -1453,24 +1687,27 @@ int test6(struct Options options) ...@@ -1453,24 +1687,27 @@ int test6(struct Options options)
sprintf(tc[i].clientid, "sslasync_test6_num_%d", i); sprintf(tc[i].clientid, "sslasync_test6_num_%d", i);
sprintf(tc[i].topic, "sslasync test6 topic num %d", i); sprintf(tc[i].topic, "sslasync test6 topic num %d", i);
if (!(assert("good rc from create", (rc = MQTTAsync_create(&(tc[i].client), options.connection, tc[i].clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL)) == MQTTASYNC_SUCCESS, "rc was %d\n", rc))) rc = MQTTAsync_create(&(tc[i].client), url, tc[i].clientid,
goto exit; MQTTCLIENT_PERSISTENCE_NONE, NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
rc = MQTTAsync_setCallbacks(tc[i].client, &tc[i], NULL,
asyncTestMessageArrived, NULL);
assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (!(assert("Good rc from setCallbacks", (rc = MQTTAsync_setCallbacks(tc[i].client, &tc[i], NULL, asyncTestMessageArrived, NULL)) == MQTTASYNC_SUCCESS, "rc was %d", rc)))
goto exit;
opts.keepAliveInterval = 20; opts.keepAliveInterval = 20;
opts.cleansession = 1; opts.cleansession = 1;
opts.username = "testuser"; opts.username = "testuser";
opts.password = "testpassword"; opts.password = "testpassword";
opts.will = &wopts;
opts.will->message = "will message";
opts.will->qos = 1;
opts.will->retained = 0;
opts.will->topicName = "will topic";
opts.onSuccess = asyncTestOnConnect; opts.onSuccess = asyncTestOnConnect;
opts.onFailure = test6OnConnectFailure; opts.onFailure = test6OnConnectFailure;
opts.context = &tc[i]; opts.context = &tc[i];
if (options.haconnections != NULL)
{
opts.serverURIs = options.haconnections;
opts.serverURIcount = options.hacount;
}
opts.ssl = &sslopts; opts.ssl = &sslopts;
if (options.server_key_file != NULL) if (options.server_key_file != NULL)
...@@ -1482,8 +1719,8 @@ int test6(struct Options options) ...@@ -1482,8 +1719,8 @@ int test6(struct Options options)
//opts.ssl->enabledServerCertAuth = 1; //opts.ssl->enabledServerCertAuth = 1;
MyLog(LOGA_DEBUG, "Connecting"); MyLog(LOGA_DEBUG, "Connecting");
rc = MQTTAsync_connect(tc[i].client, &opts);
assert("Good rc from connect", (rc = MQTTAsync_connect(tc[i].client, &opts)) == MQTTASYNC_SUCCESS, "rc was %d", rc); assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
} }
while (test6finished < num_clients) while (test6finished < num_clients)
...@@ -1506,13 +1743,12 @@ int test6(struct Options options) ...@@ -1506,13 +1743,12 @@ int test6(struct Options options)
} }
} }
exit:
MyLog(LOGA_DEBUG, "test6: destroying clients"); MyLog(LOGA_DEBUG, "test6: destroying clients");
for (i = 0; i < num_clients; ++i) for (i = 0; i < num_clients; ++i)
MQTTAsync_destroy(&tc[i].client); MQTTAsync_destroy(&tc[i].client);
MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.", exit: MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", testname, tests, failures); (failures == 0) ? "passed" : "failed", testname, tests, failures);
return failures; return failures;
...@@ -1533,7 +1769,6 @@ void test7OnConnectFailure(void* context, MQTTAsync_failureData* response) ...@@ -1533,7 +1769,6 @@ void test7OnConnectFailure(void* context, MQTTAsync_failureData* response)
MyLog(LOGA_DEBUG, "In test7OnConnectFailure callback, %s", client->clientid); MyLog(LOGA_DEBUG, "In test7OnConnectFailure callback, %s", client->clientid);
assert("There should be no failures in this test. ", 0, "test7OnConnectFailure callback was called\n", 0); assert("There should be no failures in this test. ", 0, "test7OnConnectFailure callback was called\n", 0);
failures++;
client->testFinished = 1; client->testFinished = 1;
} }
...@@ -1543,11 +1778,11 @@ void test7OnPublishFailure(void* context, MQTTAsync_failureData* response) ...@@ -1543,11 +1778,11 @@ void test7OnPublishFailure(void* context, MQTTAsync_failureData* response)
MyLog(LOGA_DEBUG, "In test7OnPublishFailure callback, %s", client->clientid); MyLog(LOGA_DEBUG, "In test7OnPublishFailure callback, %s", client->clientid);
assert("There should be no failures in this test. ", 0, "test7OnPublishFailure callback was called\n", 0); assert("There should be no failures in this test. ", 0, "test7OnPublishFailure callback was called\n", 0);
failures++;
client->testFinished = 1; client->testFinished = 1;
} }
int test7MessageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message) int test7MessageArrived(void* context, char* topicName, int topicLen,
MQTTAsync_message* message)
{ {
AsyncTestClient* tc = (AsyncTestClient*) context; AsyncTestClient* tc = (AsyncTestClient*) context;
static int message_count = 0; static int message_count = 0;
...@@ -1564,7 +1799,6 @@ int test7MessageArrived(void* context, char* topicName, int topicLen, MQTTAsync_ ...@@ -1564,7 +1799,6 @@ int test7MessageArrived(void* context, char* topicName, int topicLen, MQTTAsync_
{ {
assert("Message contents correct", ((char*)test7_payload)[i] != ((char*)message->payload)[i], assert("Message contents correct", ((char*)test7_payload)[i] != ((char*)message->payload)[i],
"message content was %c", ((char*)message->payload)[i]); "message content was %c", ((char*)message->payload)[i]);
failures++;
break; break;
} }
} }
...@@ -1606,7 +1840,6 @@ int test7MessageArrived(void* context, char* topicName, int topicLen, MQTTAsync_ ...@@ -1606,7 +1840,6 @@ int test7MessageArrived(void* context, char* topicName, int topicLen, MQTTAsync_
opts.context = tc; opts.context = tc;
rc = MQTTAsync_unsubscribe(tc->client, tc->topic, &opts); rc = MQTTAsync_unsubscribe(tc->client, tc->topic, &opts);
assert("Unsubscribe successful", rc == MQTTASYNC_SUCCESS, "rc was %d", rc); assert("Unsubscribe successful", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
message_count = 0;
} }
MQTTAsync_freeMessage(&message); MQTTAsync_freeMessage(&message);
...@@ -1647,8 +1880,9 @@ void test7OnConnect(void* context, MQTTAsync_successData* response) ...@@ -1647,8 +1880,9 @@ void test7OnConnect(void* context, MQTTAsync_successData* response)
opts.onSuccess = test7OnSubscribe; opts.onSuccess = test7OnSubscribe;
opts.context = tc; opts.context = tc;
rc = MQTTAsync_subscribe(tc->client, tc->topic, 2, &opts);
if (!(assert("Good rc from subscribe", (rc = MQTTAsync_subscribe(tc->client, tc->topic, 2, &opts)) == MQTTASYNC_SUCCESS, "rc was %d", rc))) assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
tc->testFinished = 1; tc->testFinished = 1;
} }
...@@ -1664,15 +1898,24 @@ int test7(struct Options options) ...@@ -1664,15 +1898,24 @@ int test7(struct Options options)
MQTTAsync_SSLOptions sslopts = MQTTAsync_SSLOptions_initializer; MQTTAsync_SSLOptions sslopts = MQTTAsync_SSLOptions_initializer;
int rc = 0; int rc = 0;
char* test_topic = "C client test7"; char* test_topic = "C client test7";
char url[100];
int test_finished; int test_finished;
failures = 0; test_finished = failures = 0;
snprintf(url, 100, "ssl://%s:8883", options.connection);
MyLog(LOGA_INFO, "Starting test 7 - big messages"); MyLog(LOGA_INFO, "Starting test 7 - big messages");
if (!(assert("good rc from create", (rc = MQTTAsync_create(&c, options.connection, "async_test_7", MQTTCLIENT_PERSISTENCE_NONE, NULL)) == MQTTASYNC_SUCCESS, "rc was %d\n", rc)))
goto exit;
if (!(assert("Good rc from setCallbacks", (rc = MQTTAsync_setCallbacks(c, &tc, NULL, test7MessageArrived, NULL)) == MQTTASYNC_SUCCESS, "rc was %d", rc))) rc = MQTTAsync_create(&c, url, "async_test_7", MQTTCLIENT_PERSISTENCE_NONE,
NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit; goto exit;
}
rc = MQTTAsync_setCallbacks(c, &tc, NULL, test7MessageArrived, NULL);
assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
tc.client = c; tc.client = c;
sprintf(tc.clientid, "%s", testname); sprintf(tc.clientid, "%s", testname);
...@@ -1686,14 +1929,16 @@ int test7(struct Options options) ...@@ -1686,14 +1929,16 @@ int test7(struct Options options)
opts.cleansession = 1; opts.cleansession = 1;
opts.username = "testuser"; opts.username = "testuser";
opts.password = "testpassword"; opts.password = "testpassword";
opts.will = &wopts;
opts.will->message = "will message";
opts.will->qos = 1;
opts.will->retained = 0;
opts.will->topicName = "will topic";
opts.will = NULL;
opts.onSuccess = test7OnConnect; opts.onSuccess = test7OnConnect;
opts.onFailure = test7OnConnectFailure; opts.onFailure = test7OnConnectFailure;
opts.context = &tc; opts.context = &tc;
if (options.haconnections != NULL)
{
opts.serverURIs = options.haconnections;
opts.serverURIcount = options.hacount;
}
opts.ssl = &sslopts; opts.ssl = &sslopts;
if (options.server_key_file != NULL) if (options.server_key_file != NULL)
...@@ -1705,20 +1950,22 @@ int test7(struct Options options) ...@@ -1705,20 +1950,22 @@ int test7(struct Options options)
//opts.ssl->enabledServerCertAuth = 1; //opts.ssl->enabledServerCertAuth = 1;
MyLog(LOGA_DEBUG, "Connecting"); MyLog(LOGA_DEBUG, "Connecting");
rc = MQTTAsync_connect(c, &opts);
if (!(assert("Good rc from connect", (rc = MQTTAsync_connect(c, &opts)) == MQTTASYNC_SUCCESS, "rc was %d", rc))) rc = 0;
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit; goto exit;
while (!tc.testFinished) while (!tc.testFinished)
#if defined(WIN32) #if defined(WIN32)
Sleep(100); Sleep(100);
#else #else
usleep(10000L); usleep(1000L);
#endif #endif
exit: MQTTAsync_destroy(&c);
MQTTAsync_destroy(&c);
MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.", exit: MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", testname, tests, failures); (failures == 0) ? "passed" : "failed", testname, tests, failures);
return failures; return failures;
...@@ -1726,7 +1973,7 @@ exit: ...@@ -1726,7 +1973,7 @@ exit:
void handleTrace(enum MQTTASYNC_TRACE_LEVELS level, char* message) void handleTrace(enum MQTTASYNC_TRACE_LEVELS level, char* message)
{ {
//printf("Trace: %d, %s\n", level, message); printf("%s\n", message);
} }
int main(int argc, char** argv) int main(int argc, char** argv)
...@@ -1737,16 +1984,16 @@ int main(int argc, char** argv) ...@@ -1737,16 +1984,16 @@ int main(int argc, char** argv)
int (*tests[])() = int (*tests[])() =
{ NULL, test1, test2a, test2b, test2c, test3a, test3b, test4, test5a, { NULL, test1, test2a, test2b, test2c, test3a, test3b, test4, test5a,
test5b, test5c, test6, test7 }; test5b, test5c, test6, test7 };
char** versionInfo = MQTTAsync_getVersionInfo(); //char** versionInfo = MQTTAsync_getVersionInfo();
for (i = 0; versionInfo[i] != NULL; i++) /*for (i = 0; versionInfo[i] != NULL; i++)
{ {
printf("%s\n", versionInfo[i]); printf("%s\n", versionInfo[i]);
} }*/
//client_mutex = CreateMutex(NULL, 0, NULL);
MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_PROTOCOL); //MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_PROTOCOL);
MQTTAsync_setTraceCallback(handleTrace); //MQTTAsync_setTraceCallback(handleTrace);
getopts(argc, argv); getopts(argc, argv);
...@@ -1762,7 +2009,7 @@ int main(int argc, char** argv) ...@@ -1762,7 +2009,7 @@ int main(int argc, char** argv)
if (rc == 0) if (rc == 0)
MyLog(LOGA_INFO, "verdict pass"); MyLog(LOGA_INFO, "verdict pass");
else else
MyLog(LOGA_INFO, "verdict fail"); MyLog(LOGA_INFO, "verdict fail");
return rc; return rc;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment