Commit 2aebba99 authored by Ian Craggs's avatar Ian Craggs

Change MQTTClient cleansession to cleanstart for V5 #468

parent 2bc9c403
......@@ -2652,7 +2652,7 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
}
if (options->MQTTVersion >= MQTTVERSION_5 && options->cleansession != 0)
{
rc = MQTTASYNC_BAD_MQTTV5_OPTIONS;
rc = MQTTASYNC_BAD_MQTT_OPTIONS;
goto exit;
}
if (options->MQTTVersion < MQTTVERSION_5)
......@@ -2660,7 +2660,7 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
if (options->cleanstart != 0 || options->onFailure5 || options->onSuccess5 ||
options->connectProperties || options->willProperties)
{
rc = MQTTASYNC_BAD_MQTTV5_OPTIONS;
rc = MQTTASYNC_BAD_MQTT_OPTIONS;
goto exit;
}
}
......
......@@ -181,9 +181,9 @@
*/
#define MQTTASYNC_BAD_PROTOCOL -14
/**
* Return code: don't use MQTTV5 options if MQTT 3 is chosen
* Return code: don't use options for another version of MQTT
*/
#define MQTTASYNC_BAD_MQTTV5_OPTIONS -15
#define MQTTASYNC_BAD_MQTT_OPTIONS -15
/**
......
......@@ -849,7 +849,7 @@ static thread_return_type WINAPI MQTTClient_run(void* n)
m->serverURI, m->c->sslopts->verify);
if (rc == 1 || rc == SSL_FATAL)
{
if (rc == 1 && !m->c->cleansession && m->c->session == NULL)
if (rc == 1 && (m->c->cleansession == 0 && m->c->cleanstart == 0) && m->c->session == NULL)
m->c->session = SSL_get1_session(m->c->net.ssl);
m->rc = rc;
Log(TRACE_MIN, -1, "Posting connect semaphore for SSL client %s rc %d", m->c->clientID, m->rc);
......@@ -1127,7 +1127,7 @@ static MQTTResponse MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_c
rc = SOCKET_ERROR;
goto exit;
}
if (!m->c->cleansession && m->c->session == NULL)
if ((m->c->cleansession == 0 && m->c->cleanstart == 0) && m->c->session == NULL)
m->c->session = SSL_get1_session(m->c->net.ssl);
}
}
......@@ -1170,7 +1170,7 @@ static MQTTResponse MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_c
rc = SOCKET_ERROR;
goto exit;
}
if(!m->c->cleansession && m->c->session == NULL)
if((m->c->cleansession == 0 && m->c->cleanstart == 0) && m->c->session == NULL)
m->c->session = SSL_get1_session(m->c->net.ssl);
if ( m->websocket )
......@@ -1228,7 +1228,7 @@ static MQTTResponse MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_c
m->c->connect_state = NOT_IN_PROGRESS;
if (MQTTVersion == 4)
sessionPresent = connack->flags.bits.sessionPresent;
if (m->c->cleansession)
if (m->c->cleansession || m->c->cleanstart)
rc = MQTTClient_cleanSession(m->c);
if (m->c->outboundMsgs->count > 0)
{
......@@ -1301,8 +1301,12 @@ static MQTTResponse MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectO
m->c->keepAliveInterval = options->keepAliveInterval;
setRetryLoopInterval(options->keepAliveInterval);
m->c->cleansession = options->cleansession;
m->c->MQTTVersion = options->MQTTVersion;
m->c->cleanstart = m->c->cleansession = 0;
if (m->c->MQTTVersion >= MQTTVERSION_5)
m->c->cleanstart = options->cleanstart;
else
m->c->cleansession = options->cleansession;
m->c->maxInflightMessages = (options->reliable) ? 1 : 10;
if (options->struct_version >= 6)
{
......@@ -1463,7 +1467,7 @@ MQTTResponse MQTTClient_connect5(MQTTClient handle, MQTTClient_connectOptions* o
goto exit;
}
if (strncmp(options->struct_id, "MQTC", 4) != 0 || options->struct_version < 0 || options->struct_version > 6)
if (strncmp(options->struct_id, "MQTC", 4) != 0 || options->struct_version < 6 || options->struct_version > 6)
{
rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
goto exit;
......@@ -1512,6 +1516,20 @@ MQTTResponse MQTTClient_connect5(MQTTClient handle, MQTTClient_connectOptions* o
goto exit;
}
if (options->MQTTVersion >= MQTTVERSION_5)
{
if (options->cleansession != 0)
{
rc.reasonCode = MQTTCLIENT_BAD_MQTT_OPTION;
goto exit;
}
}
else if (options->cleanstart != 0)
{
rc.reasonCode = MQTTCLIENT_BAD_MQTT_OPTION;
goto exit;
}
if (options->struct_version < 2 || options->serverURIcount == 0)
{
if ( !m )
......@@ -2269,7 +2287,7 @@ static MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* r
break;
else if (*rc == 1) /* rc == 1 means SSL connect has finished and succeeded */
{
if (!m->c->cleansession && m->c->session == NULL)
if ((m->c->cleansession == 0 && m->c->cleanstart == 0) && m->c->session == NULL)
m->c->session = SSL_get1_session(m->c->net.ssl);
break;
}
......
......@@ -186,6 +186,11 @@
* Return code: protocol prefix in serverURI should be tcp:// or ssl://
*/
#define MQTTCLIENT_BAD_PROTOCOL -14
/**
* Return code: option not applicable to the requested version of MQTT
*/
#define MQTTCLIENT_BAD_MQTT_OPTION -15
/**
* Default MQTT version to connect with. Use 3.1.1 then fall back to 3.1
......@@ -678,7 +683,7 @@ typedef struct
* 2 signifies no MQTTVersion
* 3 signifies no returned values
* 4 signifies no binary password option
* 5 signifies no maxInflightMessages
* 5 signifies no maxInflightMessages and cleanstart
*/
int struct_version;
/** The "keep alive" interval, measured in seconds, defines the maximum time
......@@ -798,9 +803,15 @@ typedef struct
* The maximum number of messages in flight
*/
int maxInflightMessages;
/*
* MQTT V5 clean start flag. Only clears state at the beginning of the session.
*/
int cleanstart;
} MQTTClient_connectOptions;
#define MQTTClient_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 6, 60, 1, 1, NULL, NULL, NULL, 30, 20, NULL, 0, NULL, 0, {NULL, 0, 0}, {0, NULL}, -1}
#define MQTTClient_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 6, 60, 1, 1, NULL, NULL, NULL, 30, 20, NULL, 0, NULL, MQTTVERSION_DEFAULT, {NULL, 0, 0}, {0, NULL}, -1, 0}
#define MQTTClient_connectOptions_initializer5 { {'M', 'Q', 'T', 'C'}, 6, 60, 0, 1, NULL, NULL, NULL, 30, 20, NULL, 0, NULL, MQTTVERSION_5, {NULL, 0, 0}, {0, NULL}, -1, 1}
/**
* MQTTClient_libraryInfo is used to store details relating to the currently used
......
......@@ -378,7 +378,7 @@ int test_client_topic_aliases(struct Options options)
{
int subsqos = 2;
MQTTClient c;
MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer5;
MQTTClient_willOptions wopts = MQTTClient_willOptions_initializer;
MQTTProperties props = MQTTProperties_initializer;
MQTTProperties connect_props = MQTTProperties_initializer;
......@@ -413,7 +413,7 @@ int test_client_topic_aliases(struct Options options)
assert("Good rc from setDisconnected", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
opts.keepAliveInterval = 20;
opts.cleansession = 1;
opts.cleanstart = 1;
opts.MQTTVersion = options.MQTTVersion;
if (options.haconnections != NULL)
{
......@@ -523,7 +523,7 @@ int test_client_topic_aliases(struct Options options)
rc = MQTTClient_disconnect5(c, 1000, SUCCESS, NULL);
/* Reconnect. Topic aliases should be deleted, but not subscription */
opts.cleansession = 0;
opts.cleanstart = 0;
response = MQTTClient_connect5(c, &opts, NULL, NULL);
assert("Good rc from connect", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
MQTTResponse_free(response);
......@@ -634,7 +634,7 @@ int test_server_topic_aliases(struct Options options)
{
int subsqos = 2;
MQTTClient c;
MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer5;
MQTTClient_willOptions wopts = MQTTClient_willOptions_initializer;
MQTTProperties connect_props = MQTTProperties_initializer;
MQTTProperty property;
......@@ -666,7 +666,7 @@ int test_server_topic_aliases(struct Options options)
assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
opts.keepAliveInterval = 20;
opts.cleansession = 1;
opts.cleanstart = 1;
opts.MQTTVersion = options.MQTTVersion;
if (options.haconnections != NULL)
{
......@@ -772,7 +772,7 @@ int test_subscription_ids(struct Options options)
{
int subsqos = 2;
MQTTClient c;
MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer5;
MQTTProperties connect_props = MQTTProperties_initializer;
MQTTProperties subs_props = MQTTProperties_initializer;
MQTTProperty property;
......@@ -803,7 +803,7 @@ int test_subscription_ids(struct Options options)
assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
opts.keepAliveInterval = 20;
opts.cleansession = 1;
opts.cleanstart = 1;
opts.MQTTVersion = options.MQTTVersion;
if (options.haconnections != NULL)
{
......@@ -909,7 +909,7 @@ int test_flow_control(struct Options options)
{
int subsqos = 2;
MQTTClient c;
MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer5;
MQTTProperties connect_props = MQTTProperties_initializer;
MQTTProperty property;
MQTTClient_message pubmsg = MQTTClient_message_initializer;
......@@ -936,7 +936,7 @@ int test_flow_control(struct Options options)
assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
opts.keepAliveInterval = 20;
opts.cleansession = 1;
opts.cleanstart = 1;
opts.MQTTVersion = options.MQTTVersion;
opts.reliable = 0;
opts.maxInflightMessages = 100;
......
......@@ -398,7 +398,7 @@ int test1(struct Options options)
{
int subsqos = 2;
MQTTClient c;
MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer5;
MQTTClient_willOptions wopts = MQTTClient_willOptions_initializer;
MQTTProperties props = MQTTProperties_initializer;
MQTTProperties willProps = MQTTProperties_initializer;
......@@ -423,7 +423,7 @@ int test1(struct Options options)
}
opts.keepAliveInterval = 20;
opts.cleansession = 1;
opts.cleanstart = 1;
opts.username = "testuser";
opts.password = "testpassword";
opts.MQTTVersion = options.MQTTVersion;
......@@ -639,7 +639,7 @@ int test2(struct Options options)
int subsqos = 2;
/* TODO - usused - remove ? MQTTClient_deliveryToken* dt = NULL; */
MQTTClient c;
MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer5;
MQTTProperties props = MQTTProperties_initializer;
MQTTProperties willProps = MQTTProperties_initializer;
MQTTResponse response = {SUCCESS, NULL};
......@@ -655,7 +655,7 @@ int test2(struct Options options)
MQTTClient_create(&c, options.connection, "multi_threaded_sample", MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
opts.keepAliveInterval = 20;
opts.cleansession = 1;
opts.cleanstart = 1;
opts.MQTTVersion = options.MQTTVersion;
opts.username = "testuser";
opts.binarypwd.data = "testpassword";
......@@ -720,7 +720,7 @@ int test3(struct Options options)
char* testname = "test3";
int rc;
MQTTClient c;
MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer5;
MQTTClient_willOptions wopts = MQTTClient_willOptions_initializer;
MQTTResponse response;
......@@ -795,7 +795,7 @@ int test4_run(int qos)
char* topic = "Persistence test 1";
int subsqos = 2;
MQTTClient c;
MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer5;
MQTTClient_message* m = NULL;
char* topicName = NULL;
int topicLen;
......@@ -823,7 +823,7 @@ int test4_run(int qos)
}
MyLog(LOGA_DEBUG, "Cleanup by connecting clean start, add session expiry > 0\n");
opts.cleansession = 1;
opts.cleanstart = 1;
property.identifier = SESSION_EXPIRY_INTERVAL;
property.value.integer4 = 30; /* in seconds */
MQTTProperties_add(&props, &property);
......@@ -882,7 +882,7 @@ int test4_run(int qos)
}
MyLog(LOGA_DEBUG, "Reconnecting");
opts.cleansession = 0;
opts.cleanstart = 0;
response = MQTTClient_connect5(c, &opts, NULL, NULL);
assert("Good rc from connect", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
MQTTResponse_free(response);
......@@ -961,7 +961,7 @@ int test5(struct Options options)
char* topic = "Persistence test 2";
int subsqos = 2;
MQTTClient c;
MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer5;
MQTTClient_deliveryToken* tokens = NULL;
char buffer[100];
int count = 5;
......@@ -978,7 +978,7 @@ int test5(struct Options options)
MQTTClient_create(&c, options.connection, "xrctest15_test_5", MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
opts.keepAliveInterval = 20;
opts.cleansession = 1;
opts.cleanstart = 1;
opts.reliable = 0;
opts.MQTTVersion = options.MQTTVersion;
if (options.haconnections != NULL)
......@@ -1069,9 +1069,9 @@ int test6_messageArrived(void* context, char* topicName, int topicLen, MQTTClien
int test6(struct Options options)
{
char* testname = "test6";
MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer5;
MQTTClient_willOptions wopts = MQTTClient_willOptions_initializer;
MQTTClient_connectOptions opts2 = MQTTClient_connectOptions_initializer;
MQTTClient_connectOptions opts2 = MQTTClient_connectOptions_initializer5;
MQTTResponse response = {SUCCESS, NULL};
int rc, count;
char* mqttsas_topic = "MQTTSAS topic";
......@@ -1082,7 +1082,7 @@ int test6(struct Options options)
global_start_time = start_clock();
opts.keepAliveInterval = 2;
opts.cleansession = 1;
opts.cleanstart = 1;
opts.MQTTVersion = options.MQTTVersion;
opts.will = &wopts;
opts.will->message = test6_will_message;
......@@ -1123,7 +1123,7 @@ int test6(struct Options options)
/* Connect to the broker */
opts2.keepAliveInterval = 20;
opts2.cleansession = 1;
opts2.cleanstart = 1;
MyLog(LOGA_INFO, "Connecting Client_2 ...");
response = MQTTClient_connect5(test6_c2, &opts2, NULL, NULL);
MQTTResponse_free(response);
......@@ -1179,9 +1179,9 @@ exit:
int test6a(struct Options options)
{
char* testname = "test6a";
MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer5;
MQTTClient_willOptions wopts = MQTTClient_willOptions_initializer;
MQTTClient_connectOptions opts2 = MQTTClient_connectOptions_initializer;
MQTTClient_connectOptions opts2 = MQTTClient_connectOptions_initializer5;
int rc, count;
MQTTResponse response = {SUCCESS, NULL};
char* mqttsas_topic = "MQTTSAS topic";
......@@ -1192,7 +1192,7 @@ int test6a(struct Options options)
global_start_time = start_clock();
opts.keepAliveInterval = 2;
opts.cleansession = 1;
opts.cleanstart = 1;
opts.MQTTVersion = options.MQTTVersion;
opts.will = &wopts;
opts.will->payload.data = test6_will_message;
......@@ -1235,7 +1235,7 @@ int test6a(struct Options options)
/* Connect to the broker */
opts2.keepAliveInterval = 20;
opts2.cleansession = 1;
opts2.cleanstart = 1;
MyLog(LOGA_INFO, "Connecting Client_2 ...");
response = MQTTClient_connect5(test6_c2, &opts2, NULL, NULL);
MQTTResponse_free(response);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment