Commit 5829d3e8 authored by Ian Craggs's avatar Ian Craggs

Add test 2 for offline buffering

parent 08db9de5
......@@ -275,9 +275,6 @@ typedef struct
} dis;
struct
{
int timeout;
int serverURIcount;
char** serverURIs;
int currentURI;
int MQTTVersion; /**< current MQTT version being used to connect */
} conn;
......@@ -300,6 +297,8 @@ typedef struct MQTTAsync_struct
MQTTAsync_connected* connected;
void* connected_context; /* the context to be associated with the connected callback*/
/* Each time connect is called, we store the options that were used. These are reused in
any call to reconnect, or an automatic reconnect attempt */
MQTTAsync_command connect; /* Connect operation properties */
MQTTAsync_command disconnect; /* Disconnect operation properties */
MQTTAsync_command* pending_write; /* Is there a socket write pending? */
......@@ -317,6 +316,9 @@ typedef struct MQTTAsync_struct
int automaticReconnect;
int minRetryInterval;
int maxRetryInterval;
int serverURIcount;
char** serverURIs;
int connectTimeout;
int currentInterval;
START_TIME_TYPE lastConnectionFailedTime;
......@@ -381,12 +383,16 @@ void MQTTAsync_unlock_mutex(mutex_type amutex)
}
/*
Check whether there are any more connect options. If not then we are finished
with connect attempts.
*/
int MQTTAsync_checkConn(MQTTAsync_command* command, MQTTAsyncs* client)
{
int rc;
FUNC_ENTRY;
rc = command->details.conn.currentURI < command->details.conn.serverURIcount ||
rc = command->details.conn.currentURI < client->serverURIcount ||
(command->details.conn.MQTTVersion == 4 && client->c->MQTTVersion == MQTTVERSION_DEFAULT);
FUNC_EXIT_RC(rc);
return rc;
......@@ -948,17 +954,14 @@ void MQTTProtocol_checkPendingWrites()
}
void MQTTAsync_freeConnect(MQTTAsync_command command)
void MQTTAsync_freeServerURIs(MQTTAsyncs* m)
{
if (command.type == CONNECT)
{
int i;
for (i = 0; i < command.details.conn.serverURIcount; ++i)
free(command.details.conn.serverURIs[i]);
if (command.details.conn.serverURIs)
free(command.details.conn.serverURIs);
}
for (i = 0; i < m->serverURIcount; ++i)
free(m->serverURIs[i]);
if (m->serverURIs)
free(m->serverURIs);
}
......@@ -1113,19 +1116,20 @@ int MQTTAsync_processCommand()
{
char* serverURI = command->client->serverURI;
if (command->command.details.conn.serverURIcount > 0)
if (command->client->serverURIcount > 0)
{
if (command->client->c->MQTTVersion == MQTTVERSION_DEFAULT)
{
if (command->command.details.conn.MQTTVersion == 3)
if (command->command.details.conn.MQTTVersion == MQTTVERSION_3_1)
{
command->command.details.conn.currentURI++;
command->command.details.conn.MQTTVersion = 4;
command->command.details.conn.MQTTVersion = MQTTVERSION_DEFAULT;
}
}
else
command->command.details.conn.currentURI++;
serverURI = command->command.details.conn.serverURIs[command->command.details.conn.currentURI];
serverURI = command->client->serverURIs[command->command.details.conn.currentURI];
if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
serverURI += strlen(URI_TCP);
......@@ -1265,6 +1269,7 @@ int MQTTAsync_processCommand()
{
MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer;
MQTTAsync_disconnect(command->client, &opts); /* not "internal" because we don't want to call connection lost */
command->client->shouldBeConnected = 1; /* as above call is not "internal" we need to reset this */
}
else
MQTTAsync_disconnect_internal(command->client, 0);
......@@ -1282,7 +1287,6 @@ int MQTTAsync_processCommand()
Log(TRACE_MIN, -1, "Calling command failure for client %s", command->client->c->clientID);
(*(command->command.onFailure))(command->command.context, NULL);
}
MQTTAsync_freeConnect(command->command);
MQTTAsync_freeCommand(command); /* free up the command if necessary */
}
}
......@@ -1319,7 +1323,7 @@ void MQTTAsync_checkTimeouts()
MQTTAsyncs* m = (MQTTAsyncs*)(current->content);
/* check connect timeout */
if (m->c->connect_state != 0 && MQTTAsync_elapsed(m->connect.start_time) > (m->connect.details.conn.timeout * 1000))
if (m->c->connect_state != 0 && MQTTAsync_elapsed(m->connect.start_time) > (m->connectTimeout * 1000))
{
if (MQTTAsync_checkConn(&m->connect, m))
{
......@@ -1337,7 +1341,6 @@ void MQTTAsync_checkTimeouts()
else
{
MQTTAsync_closeSession(m->c);
MQTTAsync_freeConnect(m->connect);
if (m->connect.onFailure)
{
MQTTAsync_failureData data;
......@@ -1563,6 +1566,7 @@ void MQTTAsync_destroy(MQTTAsync* handle)
free(m->serverURI);
if (m->createOptions)
free(m->createOptions);
MQTTAsync_freeServerURIs(m);
if (!ListRemove(handles, m))
Log(LOG_ERROR, -1, "free error");
*handle = NULL;
......@@ -1724,18 +1728,17 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
if (rc == MQTTASYNC_SUCCESS)
{
if (m->connect.details.conn.serverURIcount > 0)
if (m->serverURIcount > 0)
Log(TRACE_MIN, -1, "Connect succeeded to %s",
m->connect.details.conn.serverURIs[m->connect.details.conn.currentURI]);
MQTTAsync_freeConnect(m->connect);
m->serverURIs[m->connect.details.conn.currentURI]);
int onSuccess = (m->connect.onSuccess != NULL); /* save setting of onSuccess callback */
if (m->connect.onSuccess)
{
MQTTAsync_successData data;
memset(&data, '\0', sizeof(data));
Log(TRACE_MIN, -1, "Calling connect success for client %s", m->c->clientID);
if (m->connect.details.conn.serverURIcount > 0)
data.alt.connect.serverURI = m->connect.details.conn.serverURIs[m->connect.details.conn.currentURI];
if (m->serverURIcount > 0)
data.alt.connect.serverURI = m->serverURIs[m->connect.details.conn.currentURI];
else
data.alt.connect.serverURI = m->serverURI;
data.alt.connect.MQTTVersion = m->connect.details.conn.MQTTVersion;
......@@ -1768,7 +1771,6 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
else
{
MQTTAsync_closeSession(m->c);
MQTTAsync_freeConnect(m->connect);
if (m->connect.onFailure)
{
MQTTAsync_failureData data;
......@@ -2176,6 +2178,7 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
m->connect.onSuccess = options->onSuccess;
m->connect.onFailure = options->onFailure;
m->connect.context = options->context;
m->connectTimeout = options->connectTimeout;
tostop = 0;
if (sendThread_state != STARTING && sendThread_state != RUNNING)
......@@ -2264,6 +2267,20 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
m->c->retryInterval = options->retryInterval;
m->shouldBeConnected = 1;
m->connectTimeout = options->connectTimeout;
MQTTAsync_freeServerURIs(m);
if (options->struct_version >= 2 && options->serverURIcount > 0)
{
int i;
m->serverURIcount = options->serverURIcount;
m->serverURIs = malloc(options->serverURIcount * sizeof(char*));
for (i = 0; i < options->serverURIcount; ++i)
m->serverURIs[i] = MQTTStrdup(options->serverURIs[i]);
conn->command.details.conn.currentURI = 0;
}
/* Add connect request to operation queue */
conn = malloc(sizeof(MQTTAsync_queuedCommand));
memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
......@@ -2273,18 +2290,6 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
conn->command.onSuccess = options->onSuccess;
conn->command.onFailure = options->onFailure;
conn->command.context = options->context;
conn->command.details.conn.timeout = options->connectTimeout;
if (options->struct_version >= 2 && options->serverURIcount > 0)
{
int i;
conn->command.details.conn.serverURIcount = options->serverURIcount;
conn->command.details.conn.serverURIs = malloc(options->serverURIcount * sizeof(char*));
for (i = 0; i < options->serverURIcount; ++i)
conn->command.details.conn.serverURIs[i] = MQTTStrdup(options->serverURIs[i]);
conn->command.details.conn.currentURI = 0;
}
}
conn->command.type = CONNECT;
rc = MQTTAsync_addCommand(conn, sizeof(conn));
......@@ -2764,7 +2769,6 @@ exit:
else
{
MQTTAsync_closeSession(m->c);
MQTTAsync_freeConnect(m->connect);
if (m->connect.onFailure)
{
MQTTAsync_failureData data;
......@@ -2840,7 +2844,6 @@ MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
else
{
MQTTAsync_closeSession(m->c);
MQTTAsync_freeConnect(m->connect);
if (m->connect.onFailure)
{
MQTTAsync_failureData data;
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment