Commit 85da5246 authored by Ian Craggs's avatar Ian Craggs

Bug #409267 - add multiple server list to the connectOptions

parent adaada7f
......@@ -13,6 +13,7 @@
* Contributors:
* Ian Craggs - initial implementation and documentation
* Ian Craggs, Allan Stockdill-Mander - SSL support
* Ian Craggs - multiple server connection support
*******************************************************************************/
#include <stdlib.h>
......@@ -114,6 +115,7 @@ MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc);
int MQTTAsync_cleanSession(Clients* client);
void MQTTAsync_stop();
int MQTTAsync_disconnect_internal(MQTTAsync handle, int timeout);
void MQTTAsync_closeOnly(Clients* client);
void MQTTProtocol_closeSession(Clients* client, int sendwill);
void MQTTAsync_writeComplete(int socket);
......@@ -214,6 +216,9 @@ typedef struct
struct
{
int timeout;
int serverURIcount;
char** serverURIs;
int currentURI;
} conn;
} details;
} MQTTAsync_command;
......@@ -758,6 +763,20 @@ void MQTTProtocol_checkPendingWrites()
}
void MQTTAsync_freeConnect(MQTTAsync_command command)
{
if (command.type == CONNECT)
{
int i;
for (i = 0; i < command.details.conn.serverURIcount; ++i)
free(command.details.conn.serverURIs[i]);
if (command.details.conn.serverURIs)
free(command.details.conn.serverURIs);
}
}
void MQTTAsync_freeCommand1(MQTTAsync_queuedCommand *command)
{
if (command->command.type == SUBSCRIBE)
......@@ -904,11 +923,28 @@ void MQTTAsync_processCommand()
rc = 0;
else
{
Log(TRACE_MIN, -1, "Connecting to serverURI %s", command->client->serverURI);
char* serverURI = command->client->serverURI;
if (command->command.details.conn.serverURIcount > 0)
{
serverURI = command->command.details.conn.serverURIs[command->command.details.conn.currentURI++];
if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
serverURI += strlen(URI_TCP);
#if defined(OPENSSL)
else if (strncmp(URI_SSL, serverURI, strlen(URI_SSL)) == 0)
{
serverURI += strlen(URI_SSL);
command->client->ssl = 1;
}
#endif
}
Log(TRACE_MIN, -1, "Connecting to serverURI %s", serverURI);
#if defined(OPENSSL)
rc = MQTTProtocol_connect(command->client->serverURI, command->client->c, command->client->ssl);
rc = MQTTProtocol_connect(serverURI, command->client->c, command->client->ssl);
#else
rc = MQTTProtocol_connect(command->client->serverURI, command->client->c);
rc = MQTTProtocol_connect(serverURI, command->client->c);
#endif
if (command->client->c->connect_state == 0)
rc = SOCKET_ERROR;
......@@ -1026,6 +1062,17 @@ void MQTTAsync_processCommand()
}
else
MQTTAsync_disconnect_internal(command->client, 0);
if (command->command.type == CONNECT &&
command->command.details.conn.currentURI < command->command.details.conn.serverURIcount)
{
/* put the connect command back to the head of the command queue, using the next serverURI */
Log(TRACE_MIN, -1, "Connect failed, now trying %s",
command->command.details.conn.serverURIs[command->command.details.conn.currentURI]);
rc = MQTTAsync_addCommand(command, sizeof(command->command.details.conn));
}
else
{
if (command->command.onFailure)
{
Log(TRACE_MIN, -1, "Calling command failure for client %s", command->client->c->clientID);
......@@ -1034,8 +1081,10 @@ void MQTTAsync_processCommand()
(*(command->command.onFailure))(command->command.context, NULL);
Thread_lock_mutex(mqttasync_mutex);
}
MQTTAsync_freeConnect(command->command);
MQTTAsync_freeCommand(command); /* free up the command if necessary */
}
}
else
{
/* put the command into a waiting for response queue for each client, indexed by msgid */
......@@ -1073,6 +1122,24 @@ void MQTTAsync_checkTimeouts()
/* check connect timeout */
if (m->c->connect_state != 0 && MQTTAsync_elapsed(m->connect.start_time) > (m->connect.details.conn.timeout * 1000))
{
if (m->connect.details.conn.currentURI < m->connect.details.conn.serverURIcount)
{
MQTTAsync_queuedCommand* conn;
MQTTAsync_closeOnly(m->c);
/* put the connect command back to the head of the command queue, using the next serverURI */
conn = malloc(sizeof(MQTTAsync_queuedCommand));
memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
conn->client = m;
conn->command = m->connect;
Log(TRACE_MIN, -1, "Connect failed, now trying %s",
m->connect.details.conn.serverURIs[m->connect.details.conn.currentURI]);
MQTTAsync_addCommand(conn, sizeof(m->connect));
}
else
{
MQTTProtocol_closeSession(m->c, 0);
MQTTAsync_freeConnect(m->connect);
if (m->connect.onFailure)
{
Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID);
......@@ -1080,7 +1147,7 @@ void MQTTAsync_checkTimeouts()
(*(m->connect.onFailure))(m->connect.context, NULL);
Thread_lock_mutex(mqttasync_mutex);
}
MQTTProtocol_closeSession(m->c, 0);
}
continue;
}
......@@ -1299,8 +1366,6 @@ int MQTTAsync_completeConnection(MQTTAsyncs* m, MQTTPacket* pack)
rc = MQTTASYNC_DISCONNECTED;
}
}
else
MQTTProtocol_closeSession(m->c, 0);
free(connack);
m->pack = NULL;
}
......@@ -1381,14 +1446,41 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
{
int rc = MQTTAsync_completeConnection(m, pack);
if (rc == MQTTASYNC_SUCCESS && m->connect.onSuccess)
if (rc == MQTTASYNC_SUCCESS)
{
if (m->connect.details.conn.serverURIcount > 0)
Log(TRACE_MIN, -1, "Connect succeeded to %s",
m->connect.details.conn.serverURIs[m->connect.details.conn.currentURI - 1]);
MQTTAsync_freeConnect(m->connect);
if (m->connect.onSuccess)
{
Log(TRACE_MIN, -1, "Calling connect success for client %s", m->c->clientID);
Thread_unlock_mutex(mqttasync_mutex);
(*(m->connect.onSuccess))(m->connect.context, NULL);
Thread_lock_mutex(mqttasync_mutex);
}
else if (rc != MQTTASYNC_SUCCESS && m->connect.onFailure)
}
else
{
if (m->connect.details.conn.currentURI < m->connect.details.conn.serverURIcount)
{
MQTTAsync_queuedCommand* conn;
MQTTAsync_closeOnly(m->c);
/* put the connect command back to the head of the command queue, using the next serverURI */
conn = malloc(sizeof(MQTTAsync_queuedCommand));
memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
conn->client = m;
conn->command = m->connect;
Log(TRACE_MIN, -1, "Connect failed, now trying %s",
m->connect.details.conn.serverURIs[m->connect.details.conn.currentURI]);
MQTTAsync_addCommand(conn, sizeof(m->connect));
}
else
{
MQTTProtocol_closeSession(m->c, 0);
MQTTAsync_freeConnect(m->connect);
if (m->connect.onFailure)
{
MQTTAsync_failureData data;
......@@ -1401,6 +1493,8 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
Thread_lock_mutex(mqttasync_mutex);
}
}
}
}
else if (pack->header.bits.type == SUBACK)
{
ListElement* current = NULL;
......@@ -1561,10 +1655,11 @@ int MQTTAsync_setCallbacks(MQTTAsync handle, void* context,
}
void MQTTProtocol_closeSession(Clients* client, int sendwill)
void MQTTAsync_closeOnly(Clients* client)
{
FUNC_ENTRY;
client->good = 0;
client->ping_outstanding = 0;
if (client->net.socket > 0)
{
if (client->connected || client->connect_state)
......@@ -1580,6 +1675,14 @@ void MQTTProtocol_closeSession(Clients* client, int sendwill)
}
client->connected = 0;
client->connect_state = 0;
FUNC_EXIT;
}
void MQTTProtocol_closeSession(Clients* client, int sendwill)
{
FUNC_ENTRY;
MQTTAsync_closeOnly(client);
if (client->cleansession)
MQTTAsync_cleanSession(client);
......@@ -1873,7 +1976,9 @@ int MQTTAsync_connect(MQTTAsync handle, MQTTAsync_connectOptions* options)
rc = MQTTASYNC_NULL_PARAMETER;
goto exit;
}
if (strncmp(options->struct_id, "MQTC", 4) != 0 || (options->struct_version != 0 && options->struct_version != 1))
if (strncmp(options->struct_id, "MQTC", 4) != 0 ||
(options->struct_version != 0 && options->struct_version != 1 && options->struct_version != 2))
{
rc = MQTTASYNC_BAD_STRUCTURE;
goto exit;
......@@ -1974,6 +2079,20 @@ int MQTTAsync_connect(MQTTAsync handle, MQTTAsync_connectOptions* options)
conn->command.onFailure = options->onFailure;
conn->command.context = options->context;
conn->command.details.conn.timeout = options->connectTimeout;
if (options->struct_version >= 2 && options->serverURIcount > 0)
{
int i;
conn->command.details.conn.serverURIcount = options->serverURIcount;
conn->command.details.conn.serverURIs = malloc(options->serverURIcount * sizeof(char*));
for (i = 0; i < options->serverURIcount; ++i)
{
conn->command.details.conn.serverURIs[i] = malloc(strlen(options->serverURIs[i]) + 1);
strcpy(conn->command.details.conn.serverURIs[i], options->serverURIs[i]);
}
conn->command.details.conn.currentURI = 0;
}
}
conn->command.type = CONNECT;
rc = MQTTAsync_addCommand(conn, sizeof(conn));
......@@ -2347,6 +2466,24 @@ int MQTTAsync_connecting(MQTTAsyncs* m)
exit:
if ((rc != 0 && m->c->connect_state != 2) || (rc == SSL_FATAL))
{
if (m->connect.details.conn.currentURI < m->connect.details.conn.serverURIcount)
{
MQTTAsync_queuedCommand* conn;
MQTTAsync_closeOnly(m->c);
/* put the connect command back to the head of the command queue, using the next serverURI */
conn = malloc(sizeof(MQTTAsync_queuedCommand));
memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
conn->client = m;
conn->command = m->connect;
Log(TRACE_MIN, -1, "Connect failed, now trying %s",
m->connect.details.conn.serverURIs[m->connect.details.conn.currentURI]);
MQTTAsync_addCommand(conn, sizeof(m->connect));
}
else
{
MQTTProtocol_closeSession(m->c, 0);
MQTTAsync_freeConnect(m->connect);
if (m->connect.onFailure)
{
Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID);
......@@ -2354,7 +2491,7 @@ exit:
(*(m->connect.onFailure))(m->connect.context, NULL);
Thread_lock_mutex(mqttasync_mutex);
}
MQTTProtocol_closeSession(m->c, 0);
}
}
FUNC_EXIT_RC(rc);
return rc;
......@@ -2409,10 +2546,33 @@ MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
pack = MQTTPacket_Factory(&m->c->net, rc);
if ((m->c->connect_state == 3) && (*rc == SOCKET_ERROR))
{
Log( TRACE_MINIMUM, -1, "CONNECT sent but MQTTPacket_Factory has returned SOCKET_ERROR, calling connect.onFailure");
Log(TRACE_MINIMUM, -1, "CONNECT sent but MQTTPacket_Factory has returned SOCKET_ERROR");
if (m->connect.details.conn.currentURI < m->connect.details.conn.serverURIcount)
{
MQTTAsync_queuedCommand* conn;
MQTTAsync_closeOnly(m->c);
/* put the connect command back to the head of the command queue, using the next serverURI */
conn = malloc(sizeof(MQTTAsync_queuedCommand));
memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
conn->client = m;
conn->command = m->connect;
Log(TRACE_MIN, -1, "Connect failed, now trying %s",
m->connect.details.conn.serverURIs[m->connect.details.conn.currentURI]);
MQTTAsync_addCommand(conn, sizeof(m->connect));
}
else
{
MQTTProtocol_closeSession(m->c, 0);
MQTTAsync_freeConnect(m->connect);
if (m->connect.onFailure)
{
Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID);
Thread_unlock_mutex(mqttasync_mutex);
(*(m->connect.onFailure))(m->connect.context, NULL);
Thread_lock_mutex(mqttasync_mutex);
}
}
}
}
if (pack)
......
......@@ -13,6 +13,7 @@
* Contributors:
* Ian Craggs - initial API and implementation
* Ian Craggs, Allan Stockdill-Mander - SSL connections
* Ian Craggs - multiple server connection support
*******************************************************************************/
/********************************************************************/
......@@ -541,7 +542,10 @@ typedef struct
{
/** The eyecatcher for this structure. must be MQTC. */
char struct_id[4];
/** The version number of this structure. Must be 0 or 1. 0 signifies no SSL options */
/** The version number of this structure. Must be 0, 1 or 2.
* 0 signifies no SSL options and no serverURIs
* 1 signifies no serverURIs
*/
int struct_version;
/** The "keep alive" interval, measured in seconds, defines the maximum time
* that should pass without communication between the client and the server
......@@ -629,10 +633,23 @@ typedef struct
* provide access to the context information in the callback.
*/
void* context;
/**
* The number of entries in the serverURIs array.
*/
int serverURIcount;
/**
* An array of null-terminated strings specifying the servers to
* which the client will connect. Each string takes the form <i>protocol://host:port</i>.
* <i>protocol</i> must be <i>tcp</i> or <i>ssl</i>. For <i>host</i>, you can
* specify either an IP address or a domain name. For instance, to connect to
* a server running on the local machines with the default MQTT port, specify
* <i>tcp://localhost:1883</i>.
*/
char** serverURIs;
} MQTTAsync_connectOptions;
#define MQTTAsync_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 1, 60, 1, 10, NULL, NULL, NULL, 30, 20, NULL, NULL }
#define MQTTAsync_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 2, 60, 1, 10, NULL, NULL, NULL, 30, 20, NULL, NULL, 0, NULL}
/**
* This function attempts to connect a previously-created client (see
......
......@@ -15,6 +15,7 @@
* Ian Craggs - bug 384016 - segv setting will message
* Ian Craggs - bug 384053 - v1.0.0.7 - stop MQTTClient_receive on socket error
* Ian Craggs, Allan Stockdill-Mander - add ability to connect with SSL
* Ian Craggs - multiple server connection support
*******************************************************************************/
#include <stdlib.h>
......@@ -704,55 +705,14 @@ void Protocol_processPublication(Publish* publish, Clients* client)
}
int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions* options)
int MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions* options, char* serverURI)
{
MQTTClients* m = handle;
int rc = SOCKET_ERROR;
START_TIME_TYPE start;
long millisecsTimeout = 30000L;
int rc = SOCKET_ERROR;
FUNC_ENTRY;
Thread_lock_mutex(mqttclient_mutex);
if (options == NULL)
{
rc = MQTTCLIENT_NULL_PARAMETER;
goto exit;
}
if (strncmp(options->struct_id, "MQTC", 4) != 0 || (options->struct_version != 0 && options->struct_version != 1))
{
rc = MQTTCLIENT_BAD_STRUCTURE;
goto exit;
}
if (options->will) /* check validity of will options structure */
{
if (strncmp(options->will->struct_id, "MQTW", 4) != 0 || options->will->struct_version != 0)
{
rc = MQTTCLIENT_BAD_STRUCTURE;
goto exit;
}
}
#if defined(OPENSSL)
if (options->struct_version != 0 && options->ssl) /* check validity of SSL options structure */
{
if (strncmp(options->ssl->struct_id, "MQTS", 4) != 0 || options->ssl->struct_version != 0)
{
rc = MQTTCLIENT_BAD_STRUCTURE;
goto exit;
}
}
#endif
if ((options->username && !UTF8_validateString(options->username)) ||
(options->password && !UTF8_validateString(options->password)))
{
rc = MQTTCLIENT_BAD_UTF8_STRING;
goto exit;
}
millisecsTimeout = options->connectTimeout * 1000;
start = MQTTClient_start_clock();
if (m->ma && !running)
......@@ -788,11 +748,11 @@ int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions* options)
m->c->password = options->password;
m->c->retryInterval = options->retryInterval;
Log(TRACE_MIN, -1, "Connecting to serverURI %s", m->serverURI);
Log(TRACE_MIN, -1, "Connecting to serverURI %s", serverURI);
#if defined(OPENSSL)
rc = MQTTProtocol_connect(m->serverURI, m->c, m->ssl);
rc = MQTTProtocol_connect(serverURI, m->c, m->ssl);
#else
rc = MQTTProtocol_connect(m->serverURI, m->c);
rc = MQTTProtocol_connect(serverURI, m->c);
#endif
if (rc == SOCKET_ERROR)
goto exit;
......@@ -919,7 +879,84 @@ exit:
MQTTClient_disconnect(handle, 0); /* not "internal" because we don't want to call connection lost */
Thread_lock_mutex(mqttclient_mutex);
}
FUNC_EXIT_RC(rc);
return rc;
}
int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions* options)
{
MQTTClients* m = handle;
int rc = SOCKET_ERROR;
FUNC_ENTRY;
Thread_lock_mutex(mqttclient_mutex);
if (options == NULL)
{
rc = MQTTCLIENT_NULL_PARAMETER;
goto exit;
}
if (strncmp(options->struct_id, "MQTC", 4) != 0 ||
(options->struct_version != 0 && options->struct_version != 1 && options->struct_version != 2))
{
rc = MQTTCLIENT_BAD_STRUCTURE;
goto exit;
}
if (options->will) /* check validity of will options structure */
{
if (strncmp(options->will->struct_id, "MQTW", 4) != 0 || options->will->struct_version != 0)
{
rc = MQTTCLIENT_BAD_STRUCTURE;
goto exit;
}
}
#if defined(OPENSSL)
if (options->struct_version != 0 && options->ssl) /* check validity of SSL options structure */
{
if (strncmp(options->ssl->struct_id, "MQTS", 4) != 0 || options->ssl->struct_version != 0)
{
rc = MQTTCLIENT_BAD_STRUCTURE;
goto exit;
}
}
#endif
if ((options->username && !UTF8_validateString(options->username)) ||
(options->password && !UTF8_validateString(options->password)))
{
rc = MQTTCLIENT_BAD_UTF8_STRING;
goto exit;
}
if (options->struct_version < 2 || options->serverURIcount == 0)
rc = MQTTClient_connectURI(handle, options, m->serverURI);
else
{
int i;
for (i = 0; i < options->serverURIcount; ++i)
{
char* serverURI = options->serverURIs[i];
if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
serverURI += strlen(URI_TCP);
#if defined(OPENSSL)
else if (strncmp(URI_SSL, serverURI, strlen(URI_SSL)) == 0)
{
serverURI += strlen(URI_SSL);
m->ssl = 1;
}
#endif
if ((rc = MQTTClient_connectURI(handle, options, serverURI)) == MQTTCLIENT_SUCCESS)
break;
}
}
exit:
if (m->c->will)
{
free(m->c->will);
......
......@@ -13,6 +13,7 @@
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
* Ian Craggs, Allan Stockdill-Mander - SSL updates
* Ian Craggs - multiple server connection support
*******************************************************************************/
/**
......@@ -544,9 +545,22 @@ typedef struct
* application does not make use of SSL, set this pointer to NULL.
*/
MQTTClient_SSLOptions* ssl;
/**
* The number of entries in the serverURIs array.
*/
int serverURIcount;
/**
* An array of null-terminated strings specifying the servers to
* which the client will connect. Each string takes the form <i>protocol://host:port</i>.
* <i>protocol</i> must be <i>tcp</i> or <i>ssl</i>. For <i>host</i>, you can
* specify either an IP address or a domain name. For instance, to connect to
* a server running on the local machines with the default MQTT port, specify
* <i>tcp://localhost:1883</i>.
*/
char** serverURIs;
} MQTTClient_connectOptions;
#define MQTTClient_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 1, 60, 1, 1, NULL, NULL, NULL, 30, 20, NULL }
#define MQTTClient_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 2, 60, 1, 1, NULL, NULL, NULL, 30, 20, NULL, 0, NULL }
/**
* MQTTClient_libraryInfo is used to store details relating to the currently used
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment