Commit 77cc4fb9 authored by Ian Craggs's avatar Ian Craggs

Add cleanstart and session expiry to async test/client

parent ced0d0ef
......@@ -94,7 +94,8 @@ typedef struct
const char* username; /**< MQTT v3.1 user name */
int passwordlen; /**< MQTT password length */
const void* password; /**< MQTT v3.1 binary password */
unsigned int cleansession : 1; /**< MQTT clean session flag */
unsigned int cleansession : 1; /**< MQTT V3 clean session flag */
unsigned int cleanstart : 1; /**< MQTT V5 clean start flag */
unsigned int connected : 1; /**< whether it is currently connected */
unsigned int good : 1; /**< if we have an error on the socket we turn this off */
unsigned int ping_outstanding : 1;
......@@ -113,6 +114,7 @@ typedef struct
MQTTClient_persistence* persistence; /* a persistence implementation */
void* context; /* calling context - used when calling disconnect_internal */
int MQTTVersion;
int sessionExpiry; /**< MQTT 5 session expiry */
#if defined(OPENSSL)
MQTTClient_SSLOptions *sslopts;
SSL_SESSION* session; /***< SSL session pointer for fast handhake */
......
......@@ -1913,7 +1913,7 @@ static int MQTTAsync_completeConnection(MQTTAsyncs* m, MQTTPacket* pack)
m->c->connected = 1;
m->c->good = 1;
m->c->connect_state = 0;
if (m->c->cleansession)
if (m->c->cleansession || m->c->cleanstart)
rc = MQTTAsync_cleanSession(m->c);
if (m->c->outboundMsgs->count > 0)
{
......@@ -2360,7 +2360,8 @@ static void MQTTAsync_closeSession(Clients* client, enum MQTTReasonCodes reasonC
FUNC_ENTRY;
MQTTAsync_closeOnly(client, reasonCode, props);
if (client->cleansession)
if (client->cleansession ||
(client->MQTTVersion >= MQTTVERSION_5 && client->sessionExpiry == 0))
MQTTAsync_cleanSession(client);
FUNC_EXIT;
......@@ -2555,6 +2556,25 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
rc = MQTTASYNC_BAD_UTF8_STRING;
goto exit;
}
if (options->MQTTVersion >= MQTTVERSION_5 && options->struct_version < 6)
{
rc = MQTTASYNC_BAD_STRUCTURE;
goto exit;
}
if (options->MQTTVersion >= MQTTVERSION_5 && options->cleansession != 0)
{
rc = MQTTASYNC_BAD_MQTTV5_OPTIONS;
goto exit;
}
if (options->MQTTVersion < MQTTVERSION_5)
{
if (options->cleanstart != 0 || options->onFailure5 || options->onSuccess5 ||
options->connectProperties || options->willProperties)
{
rc = MQTTASYNC_BAD_MQTTV5_OPTIONS;
goto exit;
}
}
m->connect.onSuccess = options->onSuccess;
m->connect.onFailure = options->onFailure;
......@@ -2728,22 +2748,28 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
free(m->connectProps);
m->connectProps = NULL;
}
if (options->struct_version >=6 && options->connectProperties)
if (m->willProps)
{
MQTTProperties_free(m->willProps);
free(m->willProps);
m->willProps = NULL;
}
if (options->struct_version >=6)
{
if (options->connectProperties)
{
MQTTProperties initialized = MQTTProperties_initializer;
m->connectProps = malloc(sizeof(MQTTProperties));
*m->connectProps = initialized;
*m->connectProps = MQTTProperties_copy(options->connectProperties);
}
if (m->willProps)
{
MQTTProperties_free(m->willProps);
free(m->willProps);
m->willProps = NULL;
if (MQTTProperties_hasProperty(options->connectProperties, SESSION_EXPIRY_INTERVAL))
m->c->sessionExpiry = MQTTProperties_getNumericValue(options->connectProperties,
SESSION_EXPIRY_INTERVAL);
}
if (options->struct_version >=6 && options->willProperties)
if (options->willProperties)
{
MQTTProperties initialized = MQTTProperties_initializer;
......@@ -2751,6 +2777,8 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
*m->willProps = initialized;
*m->willProps = MQTTProperties_copy(options->willProperties);
}
m->c->cleanstart = options->cleanstart;
}
/* Add connect request to operation queue */
conn = malloc(sizeof(MQTTAsync_queuedCommand));
......
......@@ -180,6 +180,10 @@
* Return code: protocol prefix in serverURI should be tcp:// or ssl://
*/
#define MQTTASYNC_BAD_PROTOCOL -14
/**
* Return code: don't use MQTTV5 options if MQTT 3 is chosen
*/
#define MQTTASYNC_BAD_MQTTV5_OPTIONS -15
/**
......@@ -989,7 +993,11 @@ typedef struct
#define MQTTAsync_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 6, 60, 1, 10, NULL, NULL, NULL, 30, 0,\
NULL, NULL, NULL, NULL, 0, NULL, 0, 0, 1, 60, {0, NULL}, 0, NULL, NULL, NULL, NULL}
NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_DEFAULT, 0, 1, 60, {0, NULL}, 0, NULL, NULL, NULL, NULL}
#define MQTTAsync_connectOptions_initializer5 { {'M', 'Q', 'T', 'C'}, 6, 60, 0, 10, NULL, NULL, NULL, 30, 0,\
NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_5, 0, 1, 60, {0, NULL}, 1, NULL, NULL, NULL, NULL}
/**
* This function attempts to connect a previously-created client (see
......
......@@ -85,6 +85,9 @@ int MQTTPacket_send_connect(Clients* client, int MQTTVersion,
goto exit;
packet.flags.all = 0;
if (MQTTVersion >= MQTTVERSION_5)
packet.flags.bits.cleanstart = client->cleanstart;
else
packet.flags.bits.cleanstart = client->cleansession;
packet.flags.bits.will = (client->will) ? 1 : 0;
if (packet.flags.bits.will)
......
......@@ -414,3 +414,53 @@ MQTTProperties MQTTProperties_copy(const MQTTProperties* props)
FUNC_EXIT;
return result;
}
int MQTTProperties_hasProperty(MQTTProperties *props, int propid)
{
int i = 0;
int found = 0;
for (i = 0; i < props->count; ++i)
{
if (propid == props->array[i].identifier)
{
found = 1;
break;
}
}
return found;
}
int MQTTProperties_getNumericValue(MQTTProperties *props, int propid)
{
int i = 0;
int rc = -9999999;
for (i = 0; i < props->count; ++i)
{
int id = props->array[i].identifier;
if (id == propid)
{
switch (MQTTProperty_getType(id))
{
case PROPERTY_TYPE_BYTE:
rc = props->array[i].value.byte;
break;
case TWO_BYTE_INTEGER:
rc = props->array[i].value.integer2;
break;
case FOUR_BYTE_INTEGER:
case VARIABLE_BYTE_INTEGER:
rc = props->array[i].value.integer4;
break;
default:
rc = -999999;
break;
}
}
}
return rc;
}
......@@ -119,4 +119,7 @@ DLLExport void MQTTProperties_free(MQTTProperties* properties);
MQTTProperties MQTTProperties_copy(const MQTTProperties* props);
DLLExport int MQTTProperties_hasProperty(MQTTProperties *props, int propid);
DLLExport int MQTTProperties_getNumericValue(MQTTProperties *props, int propid);
#endif /* MQTTPROPERTIES_H */
......@@ -289,43 +289,6 @@ void logProperties(MQTTProperties *props)
}
int getNumericPropertyValue(MQTTProperties *props, int propid)
{
int i = 0;
int rc = 0;
for (i = 0; i < props->count; ++i)
{
int id = props->array[i].identifier;
const char* name = MQTTPropertyName(id);
char* intformat = "Got property name %s value %d";
if (id == propid)
{
switch (MQTTProperty_getType(id))
{
case PROPERTY_TYPE_BYTE:
rc = props->array[i].value.byte;
MyLog(LOGA_INFO, intformat, name, props->array[i].value.byte);
break;
case TWO_BYTE_INTEGER:
rc = props->array[i].value.integer2;
MyLog(LOGA_INFO, intformat, name, props->array[i].value.integer2);
break;
case FOUR_BYTE_INTEGER:
case VARIABLE_BYTE_INTEGER:
rc = props->array[i].value.integer4;
MyLog(LOGA_INFO, intformat, name, props->array[i].value.integer4);
break;
default:
rc = -999999;
break;
}
}
}
return rc;
}
volatile int test_finished = 0;
char* test_topic = "async test topic";
......@@ -506,7 +469,7 @@ int test1(struct Options options)
{
int subsqos = 2;
MQTTAsync c;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer5;
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
MQTTProperties props = MQTTProperties_initializer;
MQTTProperties willProps = MQTTProperties_initializer;
......@@ -531,7 +494,6 @@ int test1(struct Options options)
assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
opts.keepAliveInterval = 20;
opts.cleansession = 1;
opts.username = "testuser";
opts.password = "testpassword";
opts.MQTTVersion = options.MQTTVersion;
......@@ -614,7 +576,7 @@ int test2(struct Options options)
{
int subsqos = 2;
MQTTAsync c;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer5;
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
int rc = 0;
char* test_topic = "C client test2";
......@@ -639,7 +601,6 @@ int test2(struct Options options)
opts.connectTimeout = 5;
opts.keepAliveInterval = 20;
opts.cleansession = 1;
opts.username = "testuser";
opts.binarypwd.data = "testpassword";
opts.binarypwd.len = strlen(opts.binarypwd.data);
......@@ -657,7 +618,6 @@ int test2(struct Options options)
MyLog(LOGA_DEBUG, "Connecting");
rc = MQTTAsync_connect(c, &opts);
rc = 0;
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit;
......@@ -826,7 +786,7 @@ int test3(struct Options options)
{
#define num_clients 10
int subsqos = 2;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer5;
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
int rc = 0;
int i;
......@@ -852,7 +812,6 @@ int test3(struct Options options)
assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
opts.keepAliveInterval = 20;
opts.cleansession = 1;
opts.username = "testuser";
opts.password = "testpassword";
opts.MQTTVersion = options.MQTTVersion;
......@@ -1004,7 +963,7 @@ void test4_onConnect(void* context, MQTTAsync_successData5* response)
MQTTAsync_callOptions opts = MQTTAsync_callOptions_initializer;
int rc;
test4_packet_size = getNumericPropertyValue(&response->props, MAXIMUM_PACKET_SIZE);
test4_packet_size = MQTTProperties_getNumericValue(&response->props, MAXIMUM_PACKET_SIZE);
MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p", context);
opts.onSuccess5 = test4_onSubscribe;
......@@ -1026,7 +985,7 @@ int test4(struct Options options)
{
int subsqos = 2;
MQTTAsync c;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer5;
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
int rc = 0;
char* test_topic = "C client test4";
......@@ -1049,7 +1008,6 @@ int test4(struct Options options)
assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
opts.keepAliveInterval = 20;
opts.cleansession = 1;
opts.username = "testuser";
opts.password = "testpassword";
opts.MQTTVersion = options.MQTTVersion;
......@@ -1120,14 +1078,14 @@ int test5(struct Options options)
{
int subsqos = 2;
MQTTAsync c;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer5;
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
int rc = 0;
char* test_topic = "C client test1";
test_finished = failures = 0;
MyLog(LOGA_INFO, "Starting test 5 - connack return codes");
fprintf(xml, "<testcase classname=\"test4\" name=\"connack return codes\"");
fprintf(xml, "<testcase classname=\"test45\" name=\"connack return codes\"");
global_start_time = start_clock();
rc = MQTTAsync_create(&c, options.connection, "a clientid that is too long to be accepted",
......@@ -1148,7 +1106,6 @@ int test5(struct Options options)
MyLog(LOGA_DEBUG, "Connecting");
rc = MQTTAsync_connect(c, &opts);
rc = 0;
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit;
......@@ -1212,7 +1169,7 @@ int test6(struct Options options)
{
int subsqos = 2;
test6_client_info cinfo;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer5;
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
int rc = 0;
char* test_topic = "C client test1";
......@@ -1278,7 +1235,6 @@ int test6(struct Options options)
MyLog(LOGA_DEBUG, "Connecting");
rc = MQTTAsync_connect(cinfo.c, &opts);
rc = 0;
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit;
......@@ -1388,24 +1344,6 @@ void test7_onConnect(void* context, MQTTAsync_successData5* response)
}
void test7_onConnectOnly(void* context, MQTTAsync_successData5* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_disconnectOptions dopts = MQTTAsync_disconnectOptions_initializer;
int rc;
MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p", context);
dopts.context = context;
dopts.timeout = 1000;
dopts.onSuccess5 = test7_onDisconnect;
rc = MQTTAsync_disconnect(c, &dopts);
assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
test_finished = 1;
}
/*********************************************************************
Test7: Pending tokens
......@@ -1415,7 +1353,7 @@ int test7(struct Options options)
{
int subsqos = 2;
MQTTAsync c;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer5;
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
int rc = 0;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
......@@ -1458,34 +1396,16 @@ int test7(struct Options options)
opts.onFailure5 = NULL;
opts.context = c;
/* connect to clean up state only */
opts.cleansession = 1;
opts.onSuccess5 = test7_onConnectOnly;
MyLog(LOGA_DEBUG, "Connecting to clean up");
rc = MQTTAsync_connect(c, &opts);
rc = 0;
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit;
while (!test_finished)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
/* now connect and leave messages lying around */
/* connect clean and then leave messages lying around */
test_finished = 0;
MyLog(LOGA_DEBUG, "Connecting");
opts.cleansession = 0;
opts.cleanstart = 1;
opts.connectProperties = &props;
property.identifier = SESSION_EXPIRY_INTERVAL;
property.value.integer4 = 999999;
MQTTProperties_add(opts.connectProperties, &property);
opts.onSuccess5 = test7_onConnect;
rc = MQTTAsync_connect(c, &opts);
rc = 0;
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit;
......@@ -1571,7 +1491,7 @@ int test7(struct Options options)
MyLog(LOGA_DEBUG, "Reconnecting");
opts.context = c;
opts.cleansession = 0;
opts.cleanstart = 0;
if (MQTTAsync_connect(c, &opts) != 0)
{
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
......@@ -1713,13 +1633,15 @@ int test8(struct Options options)
{
int subsqos = 2;
MQTTAsync c;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer5;
int rc = 0;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MQTTAsync_responseOptions ropts = MQTTAsync_responseOptions_initializer;
MQTTAsync_disconnectOptions dopts = MQTTAsync_disconnectOptions_initializer;
MQTTAsync_token* tokens = NULL;
int msg_count = 6;
MQTTProperty property;
MQTTProperties props = MQTTProperties_initializer;
MyLog(LOGA_INFO, "Starting test 8 - incomplete commands");
fprintf(xml, "<testcase classname=\"test4\" name=\"incomplete commands\"");
......@@ -1747,10 +1669,9 @@ int test8(struct Options options)
opts.context = c;
MyLog(LOGA_DEBUG, "Connecting");
opts.cleansession = 1;
opts.cleanstart = 1;
opts.onSuccess5 = test8_onConnect;
rc = MQTTAsync_connect(c, &opts);
rc = 0;
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit;
......@@ -1803,10 +1724,15 @@ int test8(struct Options options)
test8_subscribed = test8_publishFailures = 0;
MyLog(LOGA_DEBUG, "Connecting");
opts.cleansession = 0;
opts.onSuccess5 = test8_onConnect;
property.identifier = SESSION_EXPIRY_INTERVAL;
property.value.integer4 = 30;
MQTTProperties_add(&props, &property);
opts.connectProperties = &props;
rc = MQTTAsync_connect(c, &opts);
rc = 0;
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit;
......@@ -1851,6 +1777,7 @@ int test8(struct Options options)
assert("getPendingTokens rc == 0", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
assert("should get some tokens back", tokens != NULL, "tokens was %p", tokens);
MQTTAsync_free(tokens);
MQTTProperties_free(&props);
assert("test8_publishFailures == 0", test8_publishFailures == 0,
"test8_publishFailures = %d", test8_publishFailures);
......@@ -1912,7 +1839,6 @@ int main(int argc, char** argv)
else
{
MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
//MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_PROTOCOL);
rc = tests[options.test_no](options); /* run just the selected test */
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment