Commit 5ea74966 authored by Ian Craggs's avatar Ian Craggs

Allow applications to choose MQTT 3.1 or 3.1.1 explicitly

parent 5b48fbda
......@@ -182,6 +182,7 @@ typedef struct
void* phandle; /* the persistence handle */
MQTTClient_persistence* persistence; /* a persistence implementation */
void* context; /* calling context - used when calling disconnect_internal */
int MQTTVersion;
#if defined(OPENSSL)
MQTTClient_SSLOptions *sslopts;
SSL_SESSION* session; /***< SSL session pointer for fast handhake */
......
......@@ -249,7 +249,7 @@ typedef struct
int serverURIcount;
char** serverURIs;
int currentURI;
int MQTTVersion;
int MQTTVersion; /**< current MQTT version being used to connect */
} conn;
} details;
} MQTTAsync_command;
......@@ -336,13 +336,13 @@ void MQTTAsync_unlock_mutex(mutex_type amutex)
}
int MQTTAsync_checkConn(MQTTAsync_command* command)
int MQTTAsync_checkConn(MQTTAsync_command* command, MQTTAsyncs* client)
{
int rc;
FUNC_ENTRY;
rc = command->details.conn.currentURI < command->details.conn.serverURIcount ||
command->details.conn.MQTTVersion == 4;
(command->details.conn.MQTTVersion == 4 && client->c->MQTTVersion == MQTTVERSION_DEFAULT);
FUNC_EXIT_RC(rc);
return rc;
}
......@@ -977,11 +977,16 @@ void MQTTAsync_processCommand()
if (command->command.details.conn.serverURIcount > 0)
{
if (command->command.details.conn.MQTTVersion == 3)
if (command->client->c->MQTTVersion == MQTTVERSION_DEFAULT)
{
if (command->command.details.conn.MQTTVersion == 3)
{
command->command.details.conn.currentURI++;
command->command.details.conn.MQTTVersion = 4;
}
}
else
command->command.details.conn.currentURI++;
command->command.details.conn.MQTTVersion = 4;
}
serverURI = command->command.details.conn.serverURIs[command->command.details.conn.currentURI];
if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
......@@ -995,10 +1000,15 @@ void MQTTAsync_processCommand()
#endif
}
if (command->command.details.conn.MQTTVersion == 0)
command->command.details.conn.MQTTVersion = 4;
else if (command->command.details.conn.MQTTVersion == 4)
command->command.details.conn.MQTTVersion = 3;
if (command->client->c->MQTTVersion == MQTTVERSION_DEFAULT)
{
if (command->command.details.conn.MQTTVersion == 0)
command->command.details.conn.MQTTVersion = MQTTVERSION_3_1_1;
else if (command->command.details.conn.MQTTVersion == MQTTVERSION_3_1_1)
command->command.details.conn.MQTTVersion = MQTTVERSION_3_1;
}
else
command->command.details.conn.MQTTVersion = command->client->c->MQTTVersion;
Log(TRACE_MIN, -1, "Connecting to serverURI %s with MQTT version %d", serverURI, command->command.details.conn.MQTTVersion);
#if defined(OPENSSL)
......@@ -1121,7 +1131,7 @@ void MQTTAsync_processCommand()
else
MQTTAsync_disconnect_internal(command->client, 0);
if (command->command.type == CONNECT && MQTTAsync_checkConn(&command->command))
if (command->command.type == CONNECT && MQTTAsync_checkConn(&command->command, command->client))
{
Log(TRACE_MIN, -1, "Connect failed, more to try");
/* put the connect command back to the head of the command queue, using the next serverURI */
......@@ -1175,7 +1185,7 @@ void MQTTAsync_checkTimeouts()
/* check connect timeout */
if (m->c->connect_state != 0 && MQTTAsync_elapsed(m->connect.start_time) > (m->connect.details.conn.timeout * 1000))
{
if (MQTTAsync_checkConn(&m->connect))
if (MQTTAsync_checkConn(&m->connect, m))
{
MQTTAsync_queuedCommand* conn;
......@@ -1185,7 +1195,7 @@ void MQTTAsync_checkTimeouts()
memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
conn->client = m;
conn->command = m->connect;
Log(TRACE_MIN, -1, "Connect failed, more to try");
Log(TRACE_MIN, -1, "Connect failed with timeout, more to try");
MQTTAsync_addCommand(conn, sizeof(m->connect));
}
else
......@@ -1508,7 +1518,7 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
}
else
{
if (MQTTAsync_checkConn(&m->connect))
if (MQTTAsync_checkConn(&m->connect, m))
{
MQTTAsync_queuedCommand* conn;
......@@ -2015,7 +2025,8 @@ int MQTTAsync_connect(MQTTAsync handle, MQTTAsync_connectOptions* options)
}
if (strncmp(options->struct_id, "MQTC", 4) != 0 ||
(options->struct_version != 0 && options->struct_version != 1 && options->struct_version != 2))
(options->struct_version != 0 && options->struct_version != 1 && options->struct_version != 2 &&
options->struct_version != 3))
{
rc = MQTTASYNC_BAD_STRUCTURE;
goto exit;
......@@ -2071,6 +2082,10 @@ int MQTTAsync_connect(MQTTAsync handle, MQTTAsync_connectOptions* options)
m->c->keepAliveInterval = options->keepAliveInterval;
m->c->cleansession = options->cleansession;
m->c->maxInflightMessages = options->maxInflight;
if (options->struct_version == 3)
m->c->MQTTVersion = options->MQTTVersion;
else
m->c->MQTTVersion = 0;
if (m->c->will)
{
......@@ -2572,7 +2587,7 @@ int MQTTAsync_connecting(MQTTAsyncs* m)
exit:
if ((rc != 0 && rc != TCPSOCKET_INTERRUPTED && m->c->connect_state != 2) || (rc == SSL_FATAL))
{
if (MQTTAsync_checkConn(&m->connect))
if (MQTTAsync_checkConn(&m->connect, m))
{
MQTTAsync_queuedCommand* conn;
......@@ -2650,7 +2665,7 @@ MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
if ((m->c->connect_state == 3) && (*rc == SOCKET_ERROR))
{
Log(TRACE_MINIMUM, -1, "CONNECT sent but MQTTPacket_Factory has returned SOCKET_ERROR");
if (MQTTAsync_checkConn(&m->connect))
if (MQTTAsync_checkConn(&m->connect, m))
{
MQTTAsync_queuedCommand* conn;
......
......@@ -14,6 +14,7 @@
* Ian Craggs - initial API and implementation
* Ian Craggs, Allan Stockdill-Mander - SSL connections
* Ian Craggs - multiple server connection support
* Ian Craggs - MQTT 3.1.1 support
*******************************************************************************/
/********************************************************************/
......@@ -145,6 +146,19 @@
*/
#define MQTTASYNC_NO_MORE_MSGIDS -10
/**
* Default MQTT version to connect with. Use 3.1.1 then fall back to 3.1
*/
#define MQTTVERSION_DEFAULT 0
/**
* MQTT version to connect with: 3.1
*/
#define MQTTVERSION_3_1 3
/**
* MQTT version to connect with: 3.1.1
*/
#define MQTTVERSION_3_1_1 4
/**
* A handle representing an MQTT client. A valid client handle is available
* following a successful call to MQTTAsync_create().
......@@ -549,9 +563,10 @@ typedef struct
{
/** The eyecatcher for this structure. must be MQTC. */
char struct_id[4];
/** The version number of this structure. Must be 0, 1 or 2.
/** The version number of this structure. Must be 0, 1, 2 or 3.
* 0 signifies no SSL options and no serverURIs
* 1 signifies no serverURIs
* 2 signifies no MQTTVersion
*/
int struct_version;
/** The "keep alive" interval, measured in seconds, defines the maximum time
......@@ -653,10 +668,17 @@ typedef struct
* <i>tcp://localhost:1883</i>.
*/
char** serverURIs;
/**
* Sets the version of MQTT to be used on the connect.
* MQTTVERSION_DEFAULT (0) = default: start with 3.1.1, and if that fails, fall back to 3.1
* MQTTVERSION_3_1 (3) = only try version 3.1
* MQTTVERSION_3_1_1 (4) = only try version 3.1.1
*/
int MQTTVersion;
} MQTTAsync_connectOptions;
#define MQTTAsync_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 2, 60, 1, 10, NULL, NULL, NULL, 30, 20, NULL, NULL, 0, NULL}
#define MQTTAsync_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 3, 60, 1, 10, NULL, NULL, NULL, 30, 20, NULL, NULL, 0, NULL, 0}
/**
* This function attempts to connect a previously-created client (see
......
......@@ -748,7 +748,7 @@ int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOptions* o
MQTTClients* m = handle;
int rc = SOCKET_ERROR;
FUNC_ENTRY;
FUNC_ENTRY;
if (m->ma && !running)
{
Thread_start(MQTTClient_run, handle);
......@@ -914,6 +914,7 @@ int MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions* options,
START_TIME_TYPE start;
long millisecsTimeout = 30000L;
int rc = SOCKET_ERROR;
int MQTTVersion = 0;
FUNC_ENTRY;
millisecsTimeout = options->connectTimeout * 1000;
......@@ -996,8 +997,18 @@ int MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions* options,
m->c->password = options->password;
m->c->retryInterval = options->retryInterval;
if ((rc = MQTTClient_connectURIVersion(handle, options, serverURI, 4 , start, millisecsTimeout)) != MQTTCLIENT_SUCCESS)
rc = MQTTClient_connectURIVersion(handle, options, serverURI, 3, start, millisecsTimeout);
if (options->struct_version == 3)
MQTTVersion = options->MQTTVersion;
else
MQTTVersion = MQTTVERSION_DEFAULT;
if (MQTTVersion == MQTTVERSION_DEFAULT)
{
if ((rc = MQTTClient_connectURIVersion(handle, options, serverURI, 4 , start, millisecsTimeout)) != MQTTCLIENT_SUCCESS)
rc = MQTTClient_connectURIVersion(handle, options, serverURI, 3, start, millisecsTimeout);
}
else
rc = MQTTClient_connectURIVersion(handle, options, serverURI, MQTTVersion, start, millisecsTimeout);
FUNC_EXIT_RC(rc);
return rc;
......
......@@ -14,6 +14,7 @@
* Ian Craggs - initial API and implementation and/or initial documentation
* Ian Craggs, Allan Stockdill-Mander - SSL updates
* Ian Craggs - multiple server connection support
* Ian Craggs - MQTT 3.1.1 support
*******************************************************************************/
/**
......@@ -160,6 +161,19 @@
*/
#define MQTTCLIENT_BAD_QOS -9
/**
* Default MQTT version to connect with. Use 3.1.1 then fall back to 3.1
*/
#define MQTTVERSION_DEFAULT 0
/**
* MQTT version to connect with: 3.1
*/
#define MQTTVERSION_3_1 3
/**
* MQTT version to connect with: 3.1.1
*/
#define MQTTVERSION_3_1_1 4
/**
* A handle representing an MQTT client. A valid client handle is available
* following a successful call to MQTTClient_create().
......@@ -490,9 +504,10 @@ typedef struct
{
/** The eyecatcher for this structure. must be MQTC. */
char struct_id[4];
/** The version number of this structure. Must be 0, 1 or 2.
/** The version number of this structure. Must be 0, 1, 2 or 3.
* 0 signifies no SSL options and no serverURIs
* 1 signifies no serverURIs
* 2 signifies no MQTTVersion
*/
int struct_version;
/** The "keep alive" interval, measured in seconds, defines the maximum time
......@@ -583,9 +598,16 @@ typedef struct
* is used.
*/
char** serverURIs;
/**
* Sets the version of MQTT to be used on the connect.
* MQTTVERSION_DEFAULT (0) = default: start with 3.1.1, and if that fails, fall back to 3.1
* MQTTVERSION_3_1 (3) = only try version 3.1
* MQTTVERSION_3_1_1 (4) = only try version 3.1.1
*/
int MQTTVersion;
} MQTTClient_connectOptions;
#define MQTTClient_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 2, 60, 1, 1, NULL, NULL, NULL, 30, 20, NULL, 0, NULL }
#define MQTTClient_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 2, 60, 1, 1, NULL, NULL, NULL, 30, 20, NULL, 0, NULL, 0}
/**
* MQTTClient_libraryInfo is used to store details relating to the currently used
......
/*******************************************************************************
* Copyright (c) 2009, 2013 IBM Corp.
* Copyright (c) 2009, 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
......@@ -12,6 +12,7 @@
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
* Ian Craggs - MQTT 3.1.1 updates
*******************************************************************************/
#if !defined(MQTTPROTOCOL_H)
......@@ -22,7 +23,7 @@
#include "Clients.h"
#define MAX_MSG_ID 65535
#define MAX_CLIENTID_LEN 23
#define MAX_CLIENTID_LEN 65535
typedef struct
{
......
......@@ -13,6 +13,7 @@
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
* Ian Craggs, Allan Stockdill-Mander - SSL updates
* Ian Craggs - MQTT 3.1.1 updates
*******************************************************************************/
#if !defined(MQTTPROTOCOLCLIENT_H)
......@@ -25,7 +26,7 @@
#include "Messages.h"
#define MAX_MSG_ID 65535
#define MAX_CLIENTID_LEN 23
#define MAX_CLIENTID_LEN 65535
int MQTTProtocol_assignMsgId(Clients* client);
int MQTTProtocol_startPublish(Clients* pubclient, Publish* publish, int qos, int retained, Messages** m);
......
......@@ -61,14 +61,18 @@ struct Options
{
char* connection; /**< connection to system under test. */
int verbose;
int test_no;
int test_no;
int size; /**< size of big message */
int MQTTVersion;
int iterations;
} options =
{
"m2m.eclipse.org:1883",
0,
-1,
10000,
MQTTVERSION_DEFAULT,
1,
};
void getopts(int argc, char** argv)
......@@ -98,6 +102,23 @@ void getopts(int argc, char** argv)
else
usage();
}
else if (strcmp(argv[count], "--MQTTversion") == 0)
{
if (++count < argc)
{
options.MQTTVersion = atoi(argv[count]);
printf("setting MQTT version to %d\n", options.MQTTVersion);
}
else
usage();
}
else if (strcmp(argv[count], "--iterations") == 0)
{
if (++count < argc)
options.iterations = atoi(argv[count]);
else
usage();
}
else if (strcmp(argv[count], "--verbose") == 0)
options.verbose = 1;
count++;
......@@ -370,6 +391,7 @@ int test1(struct Options options)
opts.cleansession = 1;
opts.username = "testuser";
opts.password = "testpassword";
opts.MQTTVersion = options.MQTTVersion;
opts.will = &wopts;
opts.will->message = "will message";
......@@ -462,6 +484,7 @@ int test2(struct Options options)
opts.cleansession = 1;
opts.username = "testuser";
opts.password = "testpassword";
opts.MQTTVersion = options.MQTTVersion;
opts.will = &wopts;
opts.will->message = "will message";
......@@ -674,6 +697,7 @@ int test3(struct Options options)
opts.cleansession = 1;
opts.username = "testuser";
opts.password = "testpassword";
opts.MQTTVersion = options.MQTTVersion;
opts.will = &wopts;
opts.will->message = "will message";
......@@ -861,6 +885,7 @@ int test4(struct Options options)
opts.cleansession = 1;
opts.username = "testuser";
opts.password = "testpassword";
opts.MQTTVersion = options.MQTTVersion;
opts.will = &wopts;
opts.will->message = "will message";
......@@ -1051,6 +1076,7 @@ int test6(struct Options options)
opts.onSuccess = test6_onConnect;
opts.onFailure = test6_onConnectFailure;
opts.context = &cinfo;
opts.MQTTVersion = options.MQTTVersion;
MyLog(LOGA_DEBUG, "Connecting");
rc = MQTTAsync_connect(cinfo.c, &opts);
......
......@@ -438,7 +438,6 @@ void asyncTestOnDisconnect(void* context, MQTTAsync_successData* response)
AsyncTestClient* tc = (AsyncTestClient*) context;
MyLog(LOGA_DEBUG, "In asyncTestOnDisconnect callback, %s", tc->clientid);
printf("Successful disconnection\n");
//rc = Thread_lock_mutex(client_mutex);
tc->testFinished = 1;
//rc = Thread_unlock_mutex(client_mutex);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment