Commit 7f0e3a26 authored by Ian Craggs's avatar Ian Craggs

Fix for issue #218 auto reconnect timing

parent 19887ece
...@@ -3,11 +3,11 @@ ...@@ -3,11 +3,11 @@
* *
* All rights reserved. This program and the accompanying materials * All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0 * are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution. * and Eclipse Distribution License v1.0 which accompany this distribution.
* *
* The Eclipse Public License is available at * The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html * http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at * and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php. * http://www.eclipse.org/org/documents/edl-v10.php.
* *
* Contributors: * Contributors:
...@@ -31,6 +31,7 @@ ...@@ -31,6 +31,7 @@
* Ian Craggs - fix for bug 472250 * Ian Craggs - fix for bug 472250
* Ian Craggs - fix for bug 486548 * Ian Craggs - fix for bug 486548
* Ian Craggs - SNI support * Ian Craggs - SNI support
* Ian Craggs - auto reconnect timing fix #218
*******************************************************************************/ *******************************************************************************/
/** /**
...@@ -71,7 +72,7 @@ const char *client_version_eye = "MQTTAsyncV3_Version " CLIENT_VERSION; ...@@ -71,7 +72,7 @@ const char *client_version_eye = "MQTTAsyncV3_Version " CLIENT_VERSION;
static ClientStates ClientState = static ClientStates ClientState =
{ {
CLIENT_VERSION, /* version */ CLIENT_VERSION, /* version */
NULL /* client list */ NULL /* client list */
}; };
ClientStates* bstate = &ClientState; ClientStates* bstate = &ClientState;
...@@ -278,7 +279,7 @@ typedef struct MQTTAsync_struct ...@@ -278,7 +279,7 @@ typedef struct MQTTAsync_struct
char* serverURI; char* serverURI;
int ssl; int ssl;
Clients* c; Clients* c;
/* "Global", to the client, callback definitions */ /* "Global", to the client, callback definitions */
MQTTAsync_connectionLost* cl; MQTTAsync_connectionLost* cl;
MQTTAsync_messageArrived* ma; MQTTAsync_messageArrived* ma;
...@@ -287,15 +288,15 @@ typedef struct MQTTAsync_struct ...@@ -287,15 +288,15 @@ typedef struct MQTTAsync_struct
MQTTAsync_connected* connected; MQTTAsync_connected* connected;
void* connected_context; /* the context to be associated with the connected callback*/ void* connected_context; /* the context to be associated with the connected callback*/
/* Each time connect is called, we store the options that were used. These are reused in /* Each time connect is called, we store the options that were used. These are reused in
any call to reconnect, or an automatic reconnect attempt */ any call to reconnect, or an automatic reconnect attempt */
MQTTAsync_command connect; /* Connect operation properties */ MQTTAsync_command connect; /* Connect operation properties */
MQTTAsync_command disconnect; /* Disconnect operation properties */ MQTTAsync_command disconnect; /* Disconnect operation properties */
MQTTAsync_command* pending_write; /* Is there a socket write pending? */ MQTTAsync_command* pending_write; /* Is there a socket write pending? */
List* responses; List* responses;
unsigned int command_seqno; unsigned int command_seqno;
MQTTPacket* pack; MQTTPacket* pack;
...@@ -563,7 +564,7 @@ static int MQTTAsync_unpersistCommand(MQTTAsync_queuedCommand* qcmd) ...@@ -563,7 +564,7 @@ static int MQTTAsync_unpersistCommand(MQTTAsync_queuedCommand* qcmd)
{ {
int rc = 0; int rc = 0;
char key[PERSISTENCE_MAX_KEY_LENGTH + 1]; char key[PERSISTENCE_MAX_KEY_LENGTH + 1];
FUNC_ENTRY; FUNC_ENTRY;
sprintf(key, "%s%d", PERSISTENCE_COMMAND_KEY, qcmd->seqno); sprintf(key, "%s%d", PERSISTENCE_COMMAND_KEY, qcmd->seqno);
if ((rc = qcmd->client->c->persistence->premove(qcmd->client->c->phandle, key)) != 0) if ((rc = qcmd->client->c->persistence->premove(qcmd->client->c->phandle, key)) != 0)
...@@ -582,87 +583,87 @@ static int MQTTAsync_persistCommand(MQTTAsync_queuedCommand* qcmd) ...@@ -582,87 +583,87 @@ static int MQTTAsync_persistCommand(MQTTAsync_queuedCommand* qcmd)
void** bufs = NULL; void** bufs = NULL;
int bufindex = 0, i, nbufs = 0; int bufindex = 0, i, nbufs = 0;
char key[PERSISTENCE_MAX_KEY_LENGTH + 1]; char key[PERSISTENCE_MAX_KEY_LENGTH + 1];
FUNC_ENTRY; FUNC_ENTRY;
switch (command->type) switch (command->type)
{ {
case SUBSCRIBE: case SUBSCRIBE:
nbufs = 3 + (command->details.sub.count * 2); nbufs = 3 + (command->details.sub.count * 2);
lens = (int*)malloc(nbufs * sizeof(int)); lens = (int*)malloc(nbufs * sizeof(int));
bufs = malloc(nbufs * sizeof(char *)); bufs = malloc(nbufs * sizeof(char *));
bufs[bufindex] = &command->type; bufs[bufindex] = &command->type;
lens[bufindex++] = sizeof(command->type); lens[bufindex++] = sizeof(command->type);
bufs[bufindex] = &command->token; bufs[bufindex] = &command->token;
lens[bufindex++] = sizeof(command->token); lens[bufindex++] = sizeof(command->token);
bufs[bufindex] = &command->details.sub.count; bufs[bufindex] = &command->details.sub.count;
lens[bufindex++] = sizeof(command->details.sub.count); lens[bufindex++] = sizeof(command->details.sub.count);
for (i = 0; i < command->details.sub.count; ++i) for (i = 0; i < command->details.sub.count; ++i)
{ {
bufs[bufindex] = command->details.sub.topics[i]; bufs[bufindex] = command->details.sub.topics[i];
lens[bufindex++] = (int)strlen(command->details.sub.topics[i]) + 1; lens[bufindex++] = (int)strlen(command->details.sub.topics[i]) + 1;
bufs[bufindex] = &command->details.sub.qoss[i]; bufs[bufindex] = &command->details.sub.qoss[i];
lens[bufindex++] = sizeof(command->details.sub.qoss[i]); lens[bufindex++] = sizeof(command->details.sub.qoss[i]);
} }
sprintf(key, "%s%d", PERSISTENCE_COMMAND_KEY, ++aclient->command_seqno); sprintf(key, "%s%d", PERSISTENCE_COMMAND_KEY, ++aclient->command_seqno);
break; break;
case UNSUBSCRIBE: case UNSUBSCRIBE:
nbufs = 3 + command->details.unsub.count; nbufs = 3 + command->details.unsub.count;
lens = (int*)malloc(nbufs * sizeof(int)); lens = (int*)malloc(nbufs * sizeof(int));
bufs = malloc(nbufs * sizeof(char *)); bufs = malloc(nbufs * sizeof(char *));
bufs[bufindex] = &command->type; bufs[bufindex] = &command->type;
lens[bufindex++] = sizeof(command->type); lens[bufindex++] = sizeof(command->type);
bufs[bufindex] = &command->token; bufs[bufindex] = &command->token;
lens[bufindex++] = sizeof(command->token); lens[bufindex++] = sizeof(command->token);
bufs[bufindex] = &command->details.unsub.count; bufs[bufindex] = &command->details.unsub.count;
lens[bufindex++] = sizeof(command->details.unsub.count); lens[bufindex++] = sizeof(command->details.unsub.count);
for (i = 0; i < command->details.unsub.count; ++i) for (i = 0; i < command->details.unsub.count; ++i)
{ {
bufs[bufindex] = command->details.unsub.topics[i]; bufs[bufindex] = command->details.unsub.topics[i];
lens[bufindex++] = (int)strlen(command->details.unsub.topics[i]) + 1; lens[bufindex++] = (int)strlen(command->details.unsub.topics[i]) + 1;
} }
sprintf(key, "%s%d", PERSISTENCE_COMMAND_KEY, ++aclient->command_seqno); sprintf(key, "%s%d", PERSISTENCE_COMMAND_KEY, ++aclient->command_seqno);
break; break;
case PUBLISH: case PUBLISH:
nbufs = 7; nbufs = 7;
lens = (int*)malloc(nbufs * sizeof(int)); lens = (int*)malloc(nbufs * sizeof(int));
bufs = malloc(nbufs * sizeof(char *)); bufs = malloc(nbufs * sizeof(char *));
bufs[bufindex] = &command->type; bufs[bufindex] = &command->type;
lens[bufindex++] = sizeof(command->type); lens[bufindex++] = sizeof(command->type);
bufs[bufindex] = &command->token; bufs[bufindex] = &command->token;
lens[bufindex++] = sizeof(command->token); lens[bufindex++] = sizeof(command->token);
bufs[bufindex] = command->details.pub.destinationName; bufs[bufindex] = command->details.pub.destinationName;
lens[bufindex++] = (int)strlen(command->details.pub.destinationName) + 1; lens[bufindex++] = (int)strlen(command->details.pub.destinationName) + 1;
bufs[bufindex] = &command->details.pub.payloadlen; bufs[bufindex] = &command->details.pub.payloadlen;
lens[bufindex++] = sizeof(command->details.pub.payloadlen); lens[bufindex++] = sizeof(command->details.pub.payloadlen);
bufs[bufindex] = command->details.pub.payload; bufs[bufindex] = command->details.pub.payload;
lens[bufindex++] = command->details.pub.payloadlen; lens[bufindex++] = command->details.pub.payloadlen;
bufs[bufindex] = &command->details.pub.qos; bufs[bufindex] = &command->details.pub.qos;
lens[bufindex++] = sizeof(command->details.pub.qos); lens[bufindex++] = sizeof(command->details.pub.qos);
bufs[bufindex] = &command->details.pub.retained; bufs[bufindex] = &command->details.pub.retained;
lens[bufindex++] = sizeof(command->details.pub.retained); lens[bufindex++] = sizeof(command->details.pub.retained);
sprintf(key, "%s%d", PERSISTENCE_COMMAND_KEY, ++aclient->command_seqno); sprintf(key, "%s%d", PERSISTENCE_COMMAND_KEY, ++aclient->command_seqno);
break; break;
} }
if (nbufs > 0) if (nbufs > 0)
{ {
...@@ -686,15 +687,15 @@ static MQTTAsync_queuedCommand* MQTTAsync_restoreCommand(char* buffer, int bufle ...@@ -686,15 +687,15 @@ static MQTTAsync_queuedCommand* MQTTAsync_restoreCommand(char* buffer, int bufle
char* ptr = buffer; char* ptr = buffer;
int i; int i;
size_t data_size; size_t data_size;
FUNC_ENTRY; FUNC_ENTRY;
qcommand = malloc(sizeof(MQTTAsync_queuedCommand)); qcommand = malloc(sizeof(MQTTAsync_queuedCommand));
memset(qcommand, '\0', sizeof(MQTTAsync_queuedCommand)); memset(qcommand, '\0', sizeof(MQTTAsync_queuedCommand));
command = &qcommand->command; command = &qcommand->command;
command->type = *(int*)ptr; command->type = *(int*)ptr;
ptr += sizeof(int); ptr += sizeof(int);
command->token = *(MQTTAsync_token*)ptr; command->token = *(MQTTAsync_token*)ptr;
ptr += sizeof(MQTTAsync_token); ptr += sizeof(MQTTAsync_token);
...@@ -703,61 +704,61 @@ static MQTTAsync_queuedCommand* MQTTAsync_restoreCommand(char* buffer, int bufle ...@@ -703,61 +704,61 @@ static MQTTAsync_queuedCommand* MQTTAsync_restoreCommand(char* buffer, int bufle
case SUBSCRIBE: case SUBSCRIBE:
command->details.sub.count = *(int*)ptr; command->details.sub.count = *(int*)ptr;
ptr += sizeof(int); ptr += sizeof(int);
for (i = 0; i < command->details.sub.count; ++i) for (i = 0; i < command->details.sub.count; ++i)
{ {
data_size = strlen(ptr) + 1; data_size = strlen(ptr) + 1;
command->details.sub.topics[i] = malloc(data_size); command->details.sub.topics[i] = malloc(data_size);
strcpy(command->details.sub.topics[i], ptr); strcpy(command->details.sub.topics[i], ptr);
ptr += data_size; ptr += data_size;
command->details.sub.qoss[i] = *(int*)ptr; command->details.sub.qoss[i] = *(int*)ptr;
ptr += sizeof(int); ptr += sizeof(int);
} }
break; break;
case UNSUBSCRIBE: case UNSUBSCRIBE:
command->details.sub.count = *(int*)ptr; command->details.sub.count = *(int*)ptr;
ptr += sizeof(int); ptr += sizeof(int);
for (i = 0; i < command->details.unsub.count; ++i) for (i = 0; i < command->details.unsub.count; ++i)
{ {
size_t data_size = strlen(ptr) + 1; size_t data_size = strlen(ptr) + 1;
command->details.unsub.topics[i] = malloc(data_size); command->details.unsub.topics[i] = malloc(data_size);
strcpy(command->details.unsub.topics[i], ptr); strcpy(command->details.unsub.topics[i], ptr);
ptr += data_size; ptr += data_size;
} }
break; break;
case PUBLISH: case PUBLISH:
data_size = strlen(ptr) + 1; data_size = strlen(ptr) + 1;
command->details.pub.destinationName = malloc(data_size); command->details.pub.destinationName = malloc(data_size);
strcpy(command->details.pub.destinationName, ptr); strcpy(command->details.pub.destinationName, ptr);
ptr += data_size; ptr += data_size;
command->details.pub.payloadlen = *(int*)ptr; command->details.pub.payloadlen = *(int*)ptr;
ptr += sizeof(int); ptr += sizeof(int);
data_size = command->details.pub.payloadlen; data_size = command->details.pub.payloadlen;
command->details.pub.payload = malloc(data_size); command->details.pub.payload = malloc(data_size);
memcpy(command->details.pub.payload, ptr, data_size); memcpy(command->details.pub.payload, ptr, data_size);
ptr += data_size; ptr += data_size;
command->details.pub.qos = *(int*)ptr; command->details.pub.qos = *(int*)ptr;
ptr += sizeof(int); ptr += sizeof(int);
command->details.pub.retained = *(int*)ptr; command->details.pub.retained = *(int*)ptr;
ptr += sizeof(int); ptr += sizeof(int);
break; break;
default: default:
free(qcommand); free(qcommand);
qcommand = NULL; qcommand = NULL;
} }
FUNC_EXIT; FUNC_EXIT;
return qcommand; return qcommand;
} }
...@@ -796,7 +797,7 @@ static int MQTTAsync_restoreCommands(MQTTAsyncs* client) ...@@ -796,7 +797,7 @@ static int MQTTAsync_restoreCommands(MQTTAsyncs* client)
{ {
char *buffer = NULL; char *buffer = NULL;
int buflen; int buflen;
if (strncmp(msgkeys[i], PERSISTENCE_COMMAND_KEY, strlen(PERSISTENCE_COMMAND_KEY)) != 0) if (strncmp(msgkeys[i], PERSISTENCE_COMMAND_KEY, strlen(PERSISTENCE_COMMAND_KEY)) != 0)
{ {
; ;
...@@ -804,10 +805,10 @@ static int MQTTAsync_restoreCommands(MQTTAsyncs* client) ...@@ -804,10 +805,10 @@ static int MQTTAsync_restoreCommands(MQTTAsyncs* client)
else if ((rc = c->persistence->pget(c->phandle, msgkeys[i], &buffer, &buflen)) == 0) else if ((rc = c->persistence->pget(c->phandle, msgkeys[i], &buffer, &buflen)) == 0)
{ {
MQTTAsync_queuedCommand* cmd = MQTTAsync_restoreCommand(buffer, buflen); MQTTAsync_queuedCommand* cmd = MQTTAsync_restoreCommand(buffer, buflen);
if (cmd) if (cmd)
{ {
cmd->client = client; cmd->client = client;
cmd->seqno = atoi(msgkeys[i]+2); cmd->seqno = atoi(msgkeys[i]+2);
MQTTPersistence_insertInOrder(commands, cmd, sizeof(MQTTAsync_queuedCommand)); MQTTPersistence_insertInOrder(commands, cmd, sizeof(MQTTAsync_queuedCommand));
free(buffer); free(buffer);
...@@ -832,18 +833,20 @@ static int MQTTAsync_restoreCommands(MQTTAsyncs* client) ...@@ -832,18 +833,20 @@ static int MQTTAsync_restoreCommands(MQTTAsyncs* client)
static int MQTTAsync_addCommand(MQTTAsync_queuedCommand* command, int command_size) static int MQTTAsync_addCommand(MQTTAsync_queuedCommand* command, int command_size)
{ {
int rc = 0; int rc = 0;
FUNC_ENTRY; FUNC_ENTRY;
MQTTAsync_lock_mutex(mqttcommand_mutex); MQTTAsync_lock_mutex(mqttcommand_mutex);
command->command.start_time = MQTTAsync_start_clock(); /* Don't set start time if the connect command is already in process #218 */
if (command->command.type == CONNECT || if ((command->command.type != CONNECT) || (command->client->c->connect_state == 0))
command->command.start_time = MQTTAsync_start_clock();
if (command->command.type == CONNECT ||
(command->command.type == DISCONNECT && command->command.details.dis.internal)) (command->command.type == DISCONNECT && command->command.details.dis.internal))
{ {
MQTTAsync_queuedCommand* head = NULL; MQTTAsync_queuedCommand* head = NULL;
if (commands->first) if (commands->first)
head = (MQTTAsync_queuedCommand*)(commands->first->content); head = (MQTTAsync_queuedCommand*)(commands->first->content);
if (head != NULL && head->client == command->client && head->command.type == command->command.type) if (head != NULL && head->client == command->client && head->command.type == command->command.type)
MQTTAsync_freeCommand(command); /* ignore duplicate connect or disconnect command */ MQTTAsync_freeCommand(command); /* ignore duplicate connect or disconnect command */
else else
...@@ -895,7 +898,7 @@ int MQTTAsync_reconnect(MQTTAsync handle) ...@@ -895,7 +898,7 @@ int MQTTAsync_reconnect(MQTTAsync handle)
FUNC_ENTRY; FUNC_ENTRY;
MQTTAsync_lock_mutex(mqttasync_mutex); MQTTAsync_lock_mutex(mqttasync_mutex);
if (m->automaticReconnect) if (m->automaticReconnect)
{ {
if (m->shouldBeConnected) if (m->shouldBeConnected)
{ {
...@@ -916,7 +919,7 @@ int MQTTAsync_reconnect(MQTTAsync handle) ...@@ -916,7 +919,7 @@ int MQTTAsync_reconnect(MQTTAsync handle)
conn->client = m; conn->client = m;
conn->command = m->connect; conn->command = m->connect;
/* make sure that the version attempts are restarted */ /* make sure that the version attempts are restarted */
if (m->c->MQTTVersion == MQTTVERSION_DEFAULT) if (m->c->MQTTVersion == MQTTVERSION_DEFAULT)
conn->command.details.conn.MQTTVersion = 0; conn->command.details.conn.MQTTVersion = 0;
MQTTAsync_addCommand(conn, sizeof(m->connect)); MQTTAsync_addCommand(conn, sizeof(m->connect));
rc = MQTTASYNC_SUCCESS; rc = MQTTASYNC_SUCCESS;
...@@ -935,7 +938,7 @@ static void MQTTAsync_checkDisconnect(MQTTAsync handle, MQTTAsync_command* comma ...@@ -935,7 +938,7 @@ static void MQTTAsync_checkDisconnect(MQTTAsync handle, MQTTAsync_command* comma
FUNC_ENTRY; FUNC_ENTRY;
/* wait for all inflight message flows to finish, up to timeout */; /* wait for all inflight message flows to finish, up to timeout */;
if (m->c->outboundMsgs->count == 0 || MQTTAsync_elapsed(command->start_time) >= command->details.dis.timeout) if (m->c->outboundMsgs->count == 0 || MQTTAsync_elapsed(command->start_time) >= command->details.dis.timeout)
{ {
int was_connected = m->c->connected; int was_connected = m->c->connected;
MQTTAsync_closeSession(m->c); MQTTAsync_closeSession(m->c);
if (command->details.dis.internal) if (command->details.dis.internal)
...@@ -988,11 +991,11 @@ static void MQTTProtocol_checkPendingWrites(void) ...@@ -988,11 +991,11 @@ static void MQTTProtocol_checkPendingWrites(void)
static void MQTTAsync_freeServerURIs(MQTTAsyncs* m) static void MQTTAsync_freeServerURIs(MQTTAsyncs* m)
{ {
int i; int i;
for (i = 0; i < m->serverURIcount; ++i) for (i = 0; i < m->serverURIcount; ++i)
free(m->serverURIs[i]); free(m->serverURIs[i]);
if (m->serverURIs) if (m->serverURIs)
free(m->serverURIs); free(m->serverURIs);
} }
...@@ -1001,7 +1004,7 @@ static void MQTTAsync_freeCommand1(MQTTAsync_queuedCommand *command) ...@@ -1001,7 +1004,7 @@ static void MQTTAsync_freeCommand1(MQTTAsync_queuedCommand *command)
if (command->command.type == SUBSCRIBE) if (command->command.type == SUBSCRIBE)
{ {
int i; int i;
for (i = 0; i < command->command.details.sub.count; i++) for (i = 0; i < command->command.details.sub.count; i++)
free(command->command.details.sub.topics[i]); free(command->command.details.sub.topics[i]);
...@@ -1011,7 +1014,7 @@ static void MQTTAsync_freeCommand1(MQTTAsync_queuedCommand *command) ...@@ -1011,7 +1014,7 @@ static void MQTTAsync_freeCommand1(MQTTAsync_queuedCommand *command)
else if (command->command.type == UNSUBSCRIBE) else if (command->command.type == UNSUBSCRIBE)
{ {
int i; int i;
for (i = 0; i < command->command.details.unsub.count; i++) for (i = 0; i < command->command.details.unsub.count; i++)
free(command->command.details.unsub.topics[i]); free(command->command.details.unsub.topics[i]);
...@@ -1021,7 +1024,7 @@ static void MQTTAsync_freeCommand1(MQTTAsync_queuedCommand *command) ...@@ -1021,7 +1024,7 @@ static void MQTTAsync_freeCommand1(MQTTAsync_queuedCommand *command)
{ {
/* qos 1 and 2 topics are freed in the protocol code when the flows are completed */ /* qos 1 and 2 topics are freed in the protocol code when the flows are completed */
if (command->command.details.pub.destinationName) if (command->command.details.pub.destinationName)
free(command->command.details.pub.destinationName); free(command->command.details.pub.destinationName);
free(command->command.details.pub.payload); free(command->command.details.pub.payload);
} }
} }
...@@ -1036,37 +1039,37 @@ static void MQTTAsync_freeCommand(MQTTAsync_queuedCommand *command) ...@@ -1036,37 +1039,37 @@ static void MQTTAsync_freeCommand(MQTTAsync_queuedCommand *command)
static void MQTTAsync_writeComplete(int socket) static void MQTTAsync_writeComplete(int socket)
{ {
ListElement* found = NULL; ListElement* found = NULL;
FUNC_ENTRY; FUNC_ENTRY;
/* a partial write is now complete for a socket - this will be on a publish*/ /* a partial write is now complete for a socket - this will be on a publish*/
MQTTProtocol_checkPendingWrites(); MQTTProtocol_checkPendingWrites();
/* find the client using this socket */ /* find the client using this socket */
if ((found = ListFindItem(handles, &socket, clientSockCompare)) != NULL) if ((found = ListFindItem(handles, &socket, clientSockCompare)) != NULL)
{ {
MQTTAsyncs* m = (MQTTAsyncs*)(found->content); MQTTAsyncs* m = (MQTTAsyncs*)(found->content);
time(&(m->c->net.lastSent)); time(&(m->c->net.lastSent));
/* see if there is a pending write flagged */ /* see if there is a pending write flagged */
if (m->pending_write) if (m->pending_write)
{ {
ListElement* cur_response = NULL; ListElement* cur_response = NULL;
MQTTAsync_command* command = m->pending_write; MQTTAsync_command* command = m->pending_write;
MQTTAsync_queuedCommand* com = NULL; MQTTAsync_queuedCommand* com = NULL;
while (ListNextElement(m->responses, &cur_response)) while (ListNextElement(m->responses, &cur_response))
{ {
com = (MQTTAsync_queuedCommand*)(cur_response->content); com = (MQTTAsync_queuedCommand*)(cur_response->content);
if (com->client->pending_write == m->pending_write) if (com->client->pending_write == m->pending_write)
break; break;
} }
if (cur_response && command->onSuccess) if (cur_response && command->onSuccess)
{ {
MQTTAsync_successData data; MQTTAsync_successData data;
data.token = command->token; data.token = command->token;
data.alt.pub.destinationName = command->details.pub.destinationName; data.alt.pub.destinationName = command->details.pub.destinationName;
data.alt.pub.message.payload = command->details.pub.payload; data.alt.pub.message.payload = command->details.pub.payload;
...@@ -1075,16 +1078,16 @@ static void MQTTAsync_writeComplete(int socket) ...@@ -1075,16 +1078,16 @@ static void MQTTAsync_writeComplete(int socket)
data.alt.pub.message.retained = command->details.pub.retained; data.alt.pub.message.retained = command->details.pub.retained;
Log(TRACE_MIN, -1, "Calling publish success for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Calling publish success for client %s", m->c->clientID);
(*(command->onSuccess))(command->context, &data); (*(command->onSuccess))(command->context, &data);
} }
m->pending_write = NULL; m->pending_write = NULL;
ListDetach(m->responses, com); ListDetach(m->responses, com);
MQTTAsync_freeCommand(com); MQTTAsync_freeCommand(com);
} }
} }
FUNC_EXIT; FUNC_EXIT;
} }
static int MQTTAsync_processCommand(void) static int MQTTAsync_processCommand(void)
{ {
...@@ -1092,26 +1095,26 @@ static int MQTTAsync_processCommand(void) ...@@ -1092,26 +1095,26 @@ static int MQTTAsync_processCommand(void)
MQTTAsync_queuedCommand* command = NULL; MQTTAsync_queuedCommand* command = NULL;
ListElement* cur_command = NULL; ListElement* cur_command = NULL;
List* ignored_clients = NULL; List* ignored_clients = NULL;
FUNC_ENTRY; FUNC_ENTRY;
MQTTAsync_lock_mutex(mqttasync_mutex); MQTTAsync_lock_mutex(mqttasync_mutex);
MQTTAsync_lock_mutex(mqttcommand_mutex); MQTTAsync_lock_mutex(mqttcommand_mutex);
/* only the first command in the list must be processed for any particular client, so if we skip /* only the first command in the list must be processed for any particular client, so if we skip
a command for a client, we must skip all following commands for that client. Use a list of a command for a client, we must skip all following commands for that client. Use a list of
ignored clients to keep track ignored clients to keep track
*/ */
ignored_clients = ListInitialize(); ignored_clients = ListInitialize();
/* don't try a command until there isn't a pending write for that client, and we are not connecting */ /* don't try a command until there isn't a pending write for that client, and we are not connecting */
while (ListNextElement(commands, &cur_command)) while (ListNextElement(commands, &cur_command))
{ {
MQTTAsync_queuedCommand* cmd = (MQTTAsync_queuedCommand*)(cur_command->content); MQTTAsync_queuedCommand* cmd = (MQTTAsync_queuedCommand*)(cur_command->content);
if (ListFind(ignored_clients, cmd->client)) if (ListFind(ignored_clients, cmd->client))
continue; continue;
if (cmd->command.type == CONNECT || cmd->command.type == DISCONNECT || (cmd->client->c->connected && if (cmd->command.type == CONNECT || cmd->command.type == DISCONNECT || (cmd->client->c->connected &&
cmd->client->c->connect_state == 0 && Socket_noPendingWrites(cmd->client->c->net.socket))) cmd->client->c->connect_state == 0 && Socket_noPendingWrites(cmd->client->c->net.socket)))
{ {
if ((cmd->command.type == PUBLISH || cmd->command.type == SUBSCRIBE || cmd->command.type == UNSUBSCRIBE) && if ((cmd->command.type == PUBLISH || cmd->command.type == SUBSCRIBE || cmd->command.type == UNSUBSCRIBE) &&
...@@ -1137,10 +1140,10 @@ static int MQTTAsync_processCommand(void) ...@@ -1137,10 +1140,10 @@ static int MQTTAsync_processCommand(void)
#endif #endif
} }
MQTTAsync_unlock_mutex(mqttcommand_mutex); MQTTAsync_unlock_mutex(mqttcommand_mutex);
if (!command) if (!command)
goto exit; /* nothing to do */ goto exit; /* nothing to do */
if (command->command.type == CONNECT) if (command->command.type == CONNECT)
{ {
if (command->client->c->connect_state != 0 || command->client->c->connected) if (command->client->c->connect_state != 0 || command->client->c->connected)
...@@ -1193,11 +1196,11 @@ static int MQTTAsync_processCommand(void) ...@@ -1193,11 +1196,11 @@ static int MQTTAsync_processCommand(void)
#endif #endif
if (command->client->c->connect_state == 0) if (command->client->c->connect_state == 0)
rc = SOCKET_ERROR; rc = SOCKET_ERROR;
/* if the TCP connect is pending, then we must call select to determine when the connect has completed, /* if the TCP connect is pending, then we must call select to determine when the connect has completed,
which is indicated by the socket being ready *either* for reading *or* writing. The next couple of lines which is indicated by the socket being ready *either* for reading *or* writing. The next couple of lines
make sure we check for writeability as well as readability, otherwise we wait around longer than we need to make sure we check for writeability as well as readability, otherwise we wait around longer than we need to
in Socket_getReadySocket() */ in Socket_getReadySocket() */
if (rc == EINPROGRESS) if (rc == EINPROGRESS)
Socket_addPendingWrite(command->client->c->net.socket); Socket_addPendingWrite(command->client->c->net.socket);
} }
...@@ -1207,7 +1210,7 @@ static int MQTTAsync_processCommand(void) ...@@ -1207,7 +1210,7 @@ static int MQTTAsync_processCommand(void)
List* topics = ListInitialize(); List* topics = ListInitialize();
List* qoss = ListInitialize(); List* qoss = ListInitialize();
int i; int i;
for (i = 0; i < command->command.details.sub.count; i++) for (i = 0; i < command->command.details.sub.count; i++)
{ {
ListAppend(topics, command->command.details.sub.topics[i], strlen(command->command.details.sub.topics[i])); ListAppend(topics, command->command.details.sub.topics[i], strlen(command->command.details.sub.topics[i]));
...@@ -1221,10 +1224,10 @@ static int MQTTAsync_processCommand(void) ...@@ -1221,10 +1224,10 @@ static int MQTTAsync_processCommand(void)
{ {
List* topics = ListInitialize(); List* topics = ListInitialize();
int i; int i;
for (i = 0; i < command->command.details.unsub.count; i++) for (i = 0; i < command->command.details.unsub.count; i++)
ListAppend(topics, command->command.details.unsub.topics[i], strlen(command->command.details.unsub.topics[i])); ListAppend(topics, command->command.details.unsub.topics[i], strlen(command->command.details.unsub.topics[i]));
rc = MQTTProtocol_unsubscribe(command->client->c, topics, command->command.token); rc = MQTTProtocol_unsubscribe(command->client->c, topics, command->command.token);
ListFreeNoContent(topics); ListFreeNoContent(topics);
} }
...@@ -1232,7 +1235,7 @@ static int MQTTAsync_processCommand(void) ...@@ -1232,7 +1235,7 @@ static int MQTTAsync_processCommand(void)
{ {
Messages* msg = NULL; Messages* msg = NULL;
Publish* p = NULL; Publish* p = NULL;
p = malloc(sizeof(Publish)); p = malloc(sizeof(Publish));
p->payload = command->command.details.pub.payload; p->payload = command->command.details.pub.payload;
...@@ -1241,15 +1244,15 @@ static int MQTTAsync_processCommand(void) ...@@ -1241,15 +1244,15 @@ static int MQTTAsync_processCommand(void)
p->msgId = command->command.token; p->msgId = command->command.token;
rc = MQTTProtocol_startPublish(command->client->c, p, command->command.details.pub.qos, command->command.details.pub.retained, &msg); rc = MQTTProtocol_startPublish(command->client->c, p, command->command.details.pub.qos, command->command.details.pub.retained, &msg);
if (command->command.details.pub.qos == 0) if (command->command.details.pub.qos == 0)
{ {
if (rc == TCPSOCKET_COMPLETE) if (rc == TCPSOCKET_COMPLETE)
{ {
if (command->command.onSuccess) if (command->command.onSuccess)
{ {
MQTTAsync_successData data; MQTTAsync_successData data;
data.token = command->command.token; data.token = command->command.token;
data.alt.pub.destinationName = command->command.details.pub.destinationName; data.alt.pub.destinationName = command->command.details.pub.destinationName;
data.alt.pub.message.payload = command->command.details.pub.payload; data.alt.pub.message.payload = command->command.details.pub.payload;
...@@ -1306,7 +1309,7 @@ static int MQTTAsync_processCommand(void) ...@@ -1306,7 +1309,7 @@ static int MQTTAsync_processCommand(void)
} }
else else
MQTTAsync_disconnect_internal(command->client, 0); MQTTAsync_disconnect_internal(command->client, 0);
if (command->command.type == CONNECT && MQTTAsync_checkConn(&command->command, command->client)) if (command->command.type == CONNECT && MQTTAsync_checkConn(&command->command, command->client))
{ {
Log(TRACE_MIN, -1, "Connect failed, more to try"); Log(TRACE_MIN, -1, "Connect failed, more to try");
...@@ -1350,11 +1353,11 @@ static void MQTTAsync_checkTimeouts(void) ...@@ -1350,11 +1353,11 @@ static void MQTTAsync_checkTimeouts(void)
while (ListNextElement(handles, &current)) /* for each client */ while (ListNextElement(handles, &current)) /* for each client */
{ {
ListElement* cur_response = NULL; ListElement* cur_response = NULL;
int i = 0, int i = 0,
timed_out_count = 0; timed_out_count = 0;
MQTTAsyncs* m = (MQTTAsyncs*)(current->content); MQTTAsyncs* m = (MQTTAsyncs*)(current->content);
/* check disconnect timeout */ /* check disconnect timeout */
if (m->c->connect_state == -2) if (m->c->connect_state == -2)
MQTTAsync_checkDisconnect(m, &m->disconnect); MQTTAsync_checkDisconnect(m, &m->disconnect);
...@@ -1364,7 +1367,7 @@ static void MQTTAsync_checkTimeouts(void) ...@@ -1364,7 +1367,7 @@ static void MQTTAsync_checkTimeouts(void)
if (MQTTAsync_checkConn(&m->connect, m)) if (MQTTAsync_checkConn(&m->connect, m))
{ {
MQTTAsync_queuedCommand* conn; MQTTAsync_queuedCommand* conn;
MQTTAsync_closeOnly(m->c); MQTTAsync_closeOnly(m->c);
/* put the connect command back to the head of the command queue, using the next serverURI */ /* put the connect command back to the head of the command queue, using the next serverURI */
conn = malloc(sizeof(MQTTAsync_queuedCommand)); conn = malloc(sizeof(MQTTAsync_queuedCommand));
...@@ -1380,7 +1383,7 @@ static void MQTTAsync_checkTimeouts(void) ...@@ -1380,7 +1383,7 @@ static void MQTTAsync_checkTimeouts(void)
if (m->connect.onFailure) if (m->connect.onFailure)
{ {
MQTTAsync_failureData data; MQTTAsync_failureData data;
data.token = 0; data.token = 0;
data.code = MQTTASYNC_FAILURE; data.code = MQTTASYNC_FAILURE;
data.message = "TCP connect timeout"; data.message = "TCP connect timeout";
...@@ -1391,20 +1394,20 @@ static void MQTTAsync_checkTimeouts(void) ...@@ -1391,20 +1394,20 @@ static void MQTTAsync_checkTimeouts(void)
} }
continue; continue;
} }
timed_out_count = 0; timed_out_count = 0;
/* check response timeouts */ /* check response timeouts */
while (ListNextElement(m->responses, &cur_response)) while (ListNextElement(m->responses, &cur_response))
{ {
MQTTAsync_queuedCommand* com = (MQTTAsync_queuedCommand*)(cur_response->content); MQTTAsync_queuedCommand* com = (MQTTAsync_queuedCommand*)(cur_response->content);
if (1 /*MQTTAsync_elapsed(com->command.start_time) < 120000*/) if (1 /*MQTTAsync_elapsed(com->command.start_time) < 120000*/)
break; /* command has not timed out */ break; /* command has not timed out */
else else
{ {
if (com->command.onFailure) if (com->command.onFailure)
{ {
Log(TRACE_MIN, -1, "Calling %s failure for client %s", Log(TRACE_MIN, -1, "Calling %s failure for client %s",
MQTTPacket_name(com->command.type), m->c->clientID); MQTTPacket_name(com->command.type), m->c->clientID);
(*(com->command.onFailure))(com->command.context, NULL); (*(com->command.onFailure))(com->command.context, NULL);
} }
...@@ -1424,7 +1427,7 @@ static void MQTTAsync_checkTimeouts(void) ...@@ -1424,7 +1427,7 @@ static void MQTTAsync_checkTimeouts(void)
conn->client = m; conn->client = m;
conn->command = m->connect; conn->command = m->connect;
/* make sure that the version attempts are restarted */ /* make sure that the version attempts are restarted */
if (m->c->MQTTVersion == MQTTVERSION_DEFAULT) if (m->c->MQTTVersion == MQTTVERSION_DEFAULT)
conn->command.details.conn.MQTTVersion = 0; conn->command.details.conn.MQTTVersion = 0;
Log(TRACE_MIN, -1, "Automatically attempting to reconnect"); Log(TRACE_MIN, -1, "Automatically attempting to reconnect");
MQTTAsync_addCommand(conn, sizeof(m->connect)); MQTTAsync_addCommand(conn, sizeof(m->connect));
...@@ -1448,7 +1451,7 @@ static thread_return_type WINAPI MQTTAsync_sendThread(void* n) ...@@ -1448,7 +1451,7 @@ static thread_return_type WINAPI MQTTAsync_sendThread(void* n)
while (!tostop) while (!tostop)
{ {
int rc; int rc;
while (commands->count > 0) while (commands->count > 0)
{ {
if (MQTTAsync_processCommand() == 0) if (MQTTAsync_processCommand() == 0)
...@@ -1461,7 +1464,7 @@ static thread_return_type WINAPI MQTTAsync_sendThread(void* n) ...@@ -1461,7 +1464,7 @@ static thread_return_type WINAPI MQTTAsync_sendThread(void* n)
if ((rc = Thread_wait_sem(send_sem, 1000)) != 0 && rc != ETIMEDOUT) if ((rc = Thread_wait_sem(send_sem, 1000)) != 0 && rc != ETIMEDOUT)
Log(LOG_ERROR, -1, "Error %d waiting for semaphore", rc); Log(LOG_ERROR, -1, "Error %d waiting for semaphore", rc);
#endif #endif
MQTTAsync_checkTimeouts(); MQTTAsync_checkTimeouts();
} }
sendThread_state = STOPPING; sendThread_state = STOPPING;
...@@ -1496,7 +1499,7 @@ static void MQTTAsync_emptyMessageQueue(Clients* client) ...@@ -1496,7 +1499,7 @@ static void MQTTAsync_emptyMessageQueue(Clients* client)
static void MQTTAsync_removeResponsesAndCommands(MQTTAsyncs* m) static void MQTTAsync_removeResponsesAndCommands(MQTTAsyncs* m)
{ {
int count = 0; int count = 0;
ListElement* current = NULL; ListElement* current = NULL;
ListElement *next = NULL; ListElement *next = NULL;
...@@ -1528,7 +1531,7 @@ static void MQTTAsync_removeResponsesAndCommands(MQTTAsyncs* m) ...@@ -1528,7 +1531,7 @@ static void MQTTAsync_removeResponsesAndCommands(MQTTAsyncs* m)
} }
ListEmpty(m->responses); ListEmpty(m->responses);
Log(TRACE_MINIMUM, -1, "%d responses removed for client %s", count, m->c->clientID); Log(TRACE_MINIMUM, -1, "%d responses removed for client %s", count, m->c->clientID);
/* remove commands in the command queue relating to this client */ /* remove commands in the command queue relating to this client */
count = 0; count = 0;
current = ListNextElement(commands, &next); current = ListNextElement(commands, &next);
...@@ -1536,7 +1539,7 @@ static void MQTTAsync_removeResponsesAndCommands(MQTTAsyncs* m) ...@@ -1536,7 +1539,7 @@ static void MQTTAsync_removeResponsesAndCommands(MQTTAsyncs* m)
while (current) while (current)
{ {
MQTTAsync_queuedCommand* command = (MQTTAsync_queuedCommand*)(current->content); MQTTAsync_queuedCommand* command = (MQTTAsync_queuedCommand*)(current->content);
if (command->client == m) if (command->client == m)
{ {
ListDetach(commands, command); ListDetach(commands, command);
...@@ -1577,7 +1580,7 @@ void MQTTAsync_destroy(MQTTAsync* handle) ...@@ -1577,7 +1580,7 @@ void MQTTAsync_destroy(MQTTAsync* handle)
MQTTAsync_removeResponsesAndCommands(m); MQTTAsync_removeResponsesAndCommands(m);
ListFree(m->responses); ListFree(m->responses);
if (m->c) if (m->c)
{ {
int saved_socket = m->c->net.socket; int saved_socket = m->c->net.socket;
...@@ -1593,7 +1596,7 @@ void MQTTAsync_destroy(MQTTAsync* handle) ...@@ -1593,7 +1596,7 @@ void MQTTAsync_destroy(MQTTAsync* handle)
Log(TRACE_MIN, 1, NULL, saved_clientid, saved_socket); Log(TRACE_MIN, 1, NULL, saved_clientid, saved_socket);
free(saved_clientid); free(saved_clientid);
} }
if (m->serverURI) if (m->serverURI)
free(m->serverURI); free(m->serverURI);
if (m->createOptions) if (m->createOptions)
...@@ -1736,9 +1739,9 @@ static thread_return_type WINAPI MQTTAsync_receiveThread(void* n) ...@@ -1736,9 +1739,9 @@ static thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
if (m->ma) if (m->ma)
rc = MQTTAsync_deliverMessage(m, qe->topicName, topicLen, qe->msg); rc = MQTTAsync_deliverMessage(m, qe->topicName, topicLen, qe->msg);
else else
rc = 1; rc = 1;
if (rc) if (rc)
{ {
ListRemove(m->c->messageQueue, qe); ListRemove(m->c->messageQueue, qe);
...@@ -1757,12 +1760,12 @@ static thread_return_type WINAPI MQTTAsync_receiveThread(void* n) ...@@ -1757,12 +1760,12 @@ static thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
{ {
int sessionPresent = ((Connack*)pack)->flags.bits.sessionPresent; int sessionPresent = ((Connack*)pack)->flags.bits.sessionPresent;
int rc = MQTTAsync_completeConnection(m, pack); int rc = MQTTAsync_completeConnection(m, pack);
if (rc == MQTTASYNC_SUCCESS) if (rc == MQTTASYNC_SUCCESS)
{ {
int onSuccess = 0; int onSuccess = 0;
if (m->serverURIcount > 0) if (m->serverURIcount > 0)
Log(TRACE_MIN, -1, "Connect succeeded to %s", Log(TRACE_MIN, -1, "Connect succeeded to %s",
m->serverURIs[m->connect.details.conn.currentURI]); m->serverURIs[m->connect.details.conn.currentURI]);
onSuccess = (m->connect.onSuccess != NULL); /* save setting of onSuccess callback */ onSuccess = (m->connect.onSuccess != NULL); /* save setting of onSuccess callback */
if (m->connect.onSuccess) if (m->connect.onSuccess)
...@@ -1791,13 +1794,13 @@ static thread_return_type WINAPI MQTTAsync_receiveThread(void* n) ...@@ -1791,13 +1794,13 @@ static thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
if (MQTTAsync_checkConn(&m->connect, m)) if (MQTTAsync_checkConn(&m->connect, m))
{ {
MQTTAsync_queuedCommand* conn; MQTTAsync_queuedCommand* conn;
MQTTAsync_closeOnly(m->c); MQTTAsync_closeOnly(m->c);
/* put the connect command back to the head of the command queue, using the next serverURI */ /* put the connect command back to the head of the command queue, using the next serverURI */
conn = malloc(sizeof(MQTTAsync_queuedCommand)); conn = malloc(sizeof(MQTTAsync_queuedCommand));
memset(conn, '\0', sizeof(MQTTAsync_queuedCommand)); memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
conn->client = m; conn->client = m;
conn->command = m->connect; conn->command = m->connect;
Log(TRACE_MIN, -1, "Connect failed, more to try"); Log(TRACE_MIN, -1, "Connect failed, more to try");
MQTTAsync_addCommand(conn, sizeof(m->connect)); MQTTAsync_addCommand(conn, sizeof(m->connect));
} }
...@@ -1807,7 +1810,7 @@ static thread_return_type WINAPI MQTTAsync_receiveThread(void* n) ...@@ -1807,7 +1810,7 @@ static thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
if (m->connect.onFailure) if (m->connect.onFailure)
{ {
MQTTAsync_failureData data; MQTTAsync_failureData data;
data.token = 0; data.token = 0;
data.code = rc; data.code = rc;
data.message = "CONNACK return code"; data.message = "CONNACK return code";
...@@ -1821,13 +1824,13 @@ static thread_return_type WINAPI MQTTAsync_receiveThread(void* n) ...@@ -1821,13 +1824,13 @@ static thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
else if (pack->header.bits.type == SUBACK) else if (pack->header.bits.type == SUBACK)
{ {
ListElement* current = NULL; ListElement* current = NULL;
/* use the msgid to find the callback to be called */ /* use the msgid to find the callback to be called */
while (ListNextElement(m->responses, &current)) while (ListNextElement(m->responses, &current))
{ {
MQTTAsync_queuedCommand* command = (MQTTAsync_queuedCommand*)(current->content); MQTTAsync_queuedCommand* command = (MQTTAsync_queuedCommand*)(current->content);
if (command->command.token == ((Suback*)pack)->msgId) if (command->command.token == ((Suback*)pack)->msgId)
{ {
Suback* sub = (Suback*)pack; Suback* sub = (Suback*)pack;
if (!ListDetach(m->responses, command)) /* remove the response from the list */ if (!ListDetach(m->responses, command)) /* remove the response from the list */
Log(LOG_ERROR, -1, "Subscribe command not removed from command list"); Log(LOG_ERROR, -1, "Subscribe command not removed from command list");
...@@ -1853,7 +1856,7 @@ static thread_return_type WINAPI MQTTAsync_receiveThread(void* n) ...@@ -1853,7 +1856,7 @@ static thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
{ {
MQTTAsync_successData data; MQTTAsync_successData data;
int* array = NULL; int* array = NULL;
if (sub->qoss->count == 1) if (sub->qoss->count == 1)
data.alt.qos = *(int*)(sub->qoss->first->content); data.alt.qos = *(int*)(sub->qoss->first->content);
else if (sub->qoss->count > 1) else if (sub->qoss->count > 1)
...@@ -1862,7 +1865,7 @@ static thread_return_type WINAPI MQTTAsync_receiveThread(void* n) ...@@ -1862,7 +1865,7 @@ static thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
int* element = array = data.alt.qosList = malloc(sub->qoss->count * sizeof(int)); int* element = array = data.alt.qosList = malloc(sub->qoss->count * sizeof(int));
while (ListNextElement(sub->qoss, &cur_qos)) while (ListNextElement(sub->qoss, &cur_qos))
*element++ = *(int*)(cur_qos->content); *element++ = *(int*)(cur_qos->content);
} }
data.token = command->command.token; data.token = command->command.token;
Log(TRACE_MIN, -1, "Calling subscribe success for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Calling subscribe success for client %s", m->c->clientID);
(*(command->command.onSuccess))(command->command.context, &data); (*(command->command.onSuccess))(command->command.context, &data);
...@@ -1879,13 +1882,13 @@ static thread_return_type WINAPI MQTTAsync_receiveThread(void* n) ...@@ -1879,13 +1882,13 @@ static thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
{ {
ListElement* current = NULL; ListElement* current = NULL;
int handleCalled = 0; int handleCalled = 0;
/* use the msgid to find the callback to be called */ /* use the msgid to find the callback to be called */
while (ListNextElement(m->responses, &current)) while (ListNextElement(m->responses, &current))
{ {
MQTTAsync_queuedCommand* command = (MQTTAsync_queuedCommand*)(current->content); MQTTAsync_queuedCommand* command = (MQTTAsync_queuedCommand*)(current->content);
if (command->command.token == ((Unsuback*)pack)->msgId) if (command->command.token == ((Unsuback*)pack)->msgId)
{ {
if (!ListDetach(m->responses, command)) /* remove the response from the list */ if (!ListDetach(m->responses, command)) /* remove the response from the list */
Log(LOG_ERROR, -1, "Unsubscribe command not removed from command list"); Log(LOG_ERROR, -1, "Unsubscribe command not removed from command list");
if (command->command.onSuccess) if (command->command.onSuccess)
...@@ -2031,7 +2034,7 @@ static void MQTTAsync_closeOnly(Clients* client) ...@@ -2031,7 +2034,7 @@ static void MQTTAsync_closeOnly(Clients* client)
#endif #endif
} }
client->connected = 0; client->connected = 0;
client->connect_state = 0; client->connect_state = 0;
FUNC_EXIT; FUNC_EXIT;
} }
...@@ -2043,7 +2046,7 @@ static void MQTTAsync_closeSession(Clients* client) ...@@ -2043,7 +2046,7 @@ static void MQTTAsync_closeSession(Clients* client)
if (client->cleansession) if (client->cleansession)
MQTTAsync_cleanSession(client); MQTTAsync_cleanSession(client);
FUNC_EXIT; FUNC_EXIT;
} }
...@@ -2074,7 +2077,7 @@ static int MQTTAsync_cleanSession(Clients* client) ...@@ -2074,7 +2077,7 @@ static int MQTTAsync_cleanSession(Clients* client)
MQTTProtocol_emptyMessageList(client->outboundMsgs); MQTTProtocol_emptyMessageList(client->outboundMsgs);
MQTTAsync_emptyMessageQueue(client); MQTTAsync_emptyMessageQueue(client);
client->msgID = 0; client->msgID = 0;
if ((found = ListFindItem(handles, client, clientStructCompare)) != NULL) if ((found = ListFindItem(handles, client, clientStructCompare)) != NULL)
{ {
MQTTAsyncs* m = (MQTTAsyncs*)(found->content); MQTTAsyncs* m = (MQTTAsyncs*)(found->content);
...@@ -2090,7 +2093,7 @@ static int MQTTAsync_cleanSession(Clients* client) ...@@ -2090,7 +2093,7 @@ static int MQTTAsync_cleanSession(Clients* client)
static int MQTTAsync_deliverMessage(MQTTAsyncs* m, char* topicName, size_t topicLen, MQTTAsync_message* mm) static int MQTTAsync_deliverMessage(MQTTAsyncs* m, char* topicName, size_t topicLen, MQTTAsync_message* mm)
{ {
int rc; int rc;
Log(TRACE_MIN, -1, "Calling messageArrived for client %s, queue depth %d", Log(TRACE_MIN, -1, "Calling messageArrived for client %s, queue depth %d",
m->c->clientID, m->c->messageQueue->count); m->c->clientID, m->c->messageQueue->count);
rc = (*(m->ma))(m->context, topicName, (int)topicLen, mm); rc = (*(m->ma))(m->context, topicName, (int)topicLen, mm);
...@@ -2129,11 +2132,11 @@ void Protocol_processPublication(Publish* publish, Clients* client) ...@@ -2129,11 +2132,11 @@ void Protocol_processPublication(Publish* publish, Clients* client)
else else
mm->dup = publish->header.bits.dup; mm->dup = publish->header.bits.dup;
mm->msgid = publish->msgId; mm->msgid = publish->msgId;
if (client->messageQueue->count == 0 && client->connected) if (client->messageQueue->count == 0 && client->connected)
{ {
ListElement* found = NULL; ListElement* found = NULL;
if ((found = ListFindItem(handles, client, clientStructCompare)) == NULL) if ((found = ListFindItem(handles, client, clientStructCompare)) == NULL)
Log(LOG_ERROR, -1, "processPublication: did not find client structure in handles list"); Log(LOG_ERROR, -1, "processPublication: did not find client structure in handles list");
else else
...@@ -2142,12 +2145,12 @@ void Protocol_processPublication(Publish* publish, Clients* client) ...@@ -2142,12 +2145,12 @@ void Protocol_processPublication(Publish* publish, Clients* client)
if (m->ma) if (m->ma)
rc = MQTTAsync_deliverMessage(m, publish->topic, publish->topiclen, mm); rc = MQTTAsync_deliverMessage(m, publish->topic, publish->topiclen, mm);
} }
} }
if (rc == 0) /* if message was not delivered, queue it up */ if (rc == 0) /* if message was not delivered, queue it up */
{ {
qEntry* qe = malloc(sizeof(qEntry)); qEntry* qe = malloc(sizeof(qEntry));
qe->msg = mm; qe->msg = mm;
qe->topicName = publish->topic; qe->topicName = publish->topic;
qe->topicLen = publish->topiclen; qe->topicLen = publish->topiclen;
...@@ -2157,7 +2160,7 @@ void Protocol_processPublication(Publish* publish, Clients* client) ...@@ -2157,7 +2160,7 @@ void Protocol_processPublication(Publish* publish, Clients* client)
MQTTPersistence_persistQueueEntry(client, (MQTTPersistence_qEntry*)qe); MQTTPersistence_persistQueueEntry(client, (MQTTPersistence_qEntry*)qe);
#endif #endif
} }
publish->topic = NULL; publish->topic = NULL;
FUNC_EXIT; FUNC_EXIT;
} }
...@@ -2212,7 +2215,7 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options) ...@@ -2212,7 +2215,7 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
m->connect.onFailure = options->onFailure; m->connect.onFailure = options->onFailure;
m->connect.context = options->context; m->connect.context = options->context;
m->connectTimeout = options->connectTimeout; m->connectTimeout = options->connectTimeout;
tostop = 0; tostop = 0;
if (sendThread_state != STARTING && sendThread_state != RUNNING) if (sendThread_state != STARTING && sendThread_state != RUNNING)
{ {
...@@ -2250,11 +2253,11 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options) ...@@ -2250,11 +2253,11 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
free(m->c->will); free(m->c->will);
m->c->will = NULL; m->c->will = NULL;
} }
if (options->will && (options->will->struct_version == 0 || options->will->struct_version == 1)) if (options->will && (options->will->struct_version == 0 || options->will->struct_version == 1))
{ {
const void* source = NULL; const void* source = NULL;
m->c->will = malloc(sizeof(willMessages)); m->c->will = malloc(sizeof(willMessages));
if (options->will->message || (options->will->struct_version == 1 && options->will->payload.data)) if (options->will->message || (options->will->struct_version == 1 && options->will->payload.data))
{ {
...@@ -2271,7 +2274,7 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options) ...@@ -2271,7 +2274,7 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
m->c->will->payload = malloc(m->c->will->payloadlen); m->c->will->payload = malloc(m->c->will->payloadlen);
memcpy(m->c->will->payload, source, m->c->will->payloadlen); memcpy(m->c->will->payload, source, m->c->will->payloadlen);
} }
else else
{ {
m->c->will->payload = NULL; m->c->will->payload = NULL;
m->c->will->payloadlen = 0; m->c->will->payloadlen = 0;
...@@ -2280,7 +2283,7 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options) ...@@ -2280,7 +2283,7 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
m->c->will->retained = options->will->retained; m->c->will->retained = options->will->retained;
m->c->will->topic = MQTTStrdup(options->will->topicName); m->c->will->topic = MQTTStrdup(options->will->topicName);
} }
#if defined(OPENSSL) #if defined(OPENSSL)
if (m->c->sslopts) if (m->c->sslopts)
{ {
...@@ -2325,23 +2328,23 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options) ...@@ -2325,23 +2328,23 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
m->c->password = options->binarypwd.data; m->c->password = options->binarypwd.data;
m->c->passwordlen = options->binarypwd.len; m->c->passwordlen = options->binarypwd.len;
} }
m->c->retryInterval = options->retryInterval; m->c->retryInterval = options->retryInterval;
m->shouldBeConnected = 1; m->shouldBeConnected = 1;
m->connectTimeout = options->connectTimeout; m->connectTimeout = options->connectTimeout;
MQTTAsync_freeServerURIs(m); MQTTAsync_freeServerURIs(m);
if (options->struct_version >= 2 && options->serverURIcount > 0) if (options->struct_version >= 2 && options->serverURIcount > 0)
{ {
int i; int i;
m->serverURIcount = options->serverURIcount; m->serverURIcount = options->serverURIcount;
m->serverURIs = malloc(options->serverURIcount * sizeof(char*)); m->serverURIs = malloc(options->serverURIcount * sizeof(char*));
for (i = 0; i < options->serverURIcount; ++i) for (i = 0; i < options->serverURIcount; ++i)
m->serverURIs[i] = MQTTStrdup(options->serverURIs[i]); m->serverURIs[i] = MQTTStrdup(options->serverURIs[i]);
} }
/* Add connect request to operation queue */ /* Add connect request to operation queue */
conn = malloc(sizeof(MQTTAsync_queuedCommand)); conn = malloc(sizeof(MQTTAsync_queuedCommand));
memset(conn, '\0', sizeof(MQTTAsync_queuedCommand)); memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
...@@ -2381,7 +2384,7 @@ static int MQTTAsync_disconnect1(MQTTAsync handle, const MQTTAsync_disconnectOpt ...@@ -2381,7 +2384,7 @@ static int MQTTAsync_disconnect1(MQTTAsync handle, const MQTTAsync_disconnectOpt
rc = MQTTASYNC_DISCONNECTED; rc = MQTTASYNC_DISCONNECTED;
goto exit; goto exit;
} }
/* Add disconnect request to operation queue */ /* Add disconnect request to operation queue */
dis = malloc(sizeof(MQTTAsync_queuedCommand)); dis = malloc(sizeof(MQTTAsync_queuedCommand));
memset(dis, '\0', sizeof(MQTTAsync_queuedCommand)); memset(dis, '\0', sizeof(MQTTAsync_queuedCommand));
...@@ -2396,7 +2399,7 @@ static int MQTTAsync_disconnect1(MQTTAsync handle, const MQTTAsync_disconnectOpt ...@@ -2396,7 +2399,7 @@ static int MQTTAsync_disconnect1(MQTTAsync handle, const MQTTAsync_disconnectOpt
dis->command.type = DISCONNECT; dis->command.type = DISCONNECT;
dis->command.details.dis.internal = internal; dis->command.details.dis.internal = internal;
rc = MQTTAsync_addCommand(dis, sizeof(dis)); rc = MQTTAsync_addCommand(dis, sizeof(dis));
exit: exit:
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
return rc; return rc;
...@@ -2406,7 +2409,7 @@ exit: ...@@ -2406,7 +2409,7 @@ exit:
static int MQTTAsync_disconnect_internal(MQTTAsync handle, int timeout) static int MQTTAsync_disconnect_internal(MQTTAsync handle, int timeout)
{ {
MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer; MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer;
options.timeout = timeout; options.timeout = timeout;
return MQTTAsync_disconnect1(handle, &options, 1); return MQTTAsync_disconnect1(handle, &options, 1);
} }
...@@ -2419,7 +2422,7 @@ void MQTTProtocol_closeSession(Clients* c, int sendwill) ...@@ -2419,7 +2422,7 @@ void MQTTProtocol_closeSession(Clients* c, int sendwill)
int MQTTAsync_disconnect(MQTTAsync handle, const MQTTAsync_disconnectOptions* options) int MQTTAsync_disconnect(MQTTAsync handle, const MQTTAsync_disconnectOptions* options)
{ {
return MQTTAsync_disconnect1(handle, options, 0); return MQTTAsync_disconnect1(handle, options, 0);
} }
...@@ -2546,7 +2549,7 @@ int MQTTAsync_subscribeMany(MQTTAsync handle, int count, char* const* topic, int ...@@ -2546,7 +2549,7 @@ int MQTTAsync_subscribeMany(MQTTAsync handle, int count, char* const* topic, int
for (i = 0; i < count; ++i) for (i = 0; i < count; ++i)
{ {
sub->command.details.sub.topics[i] = MQTTStrdup(topic[i]); sub->command.details.sub.topics[i] = MQTTStrdup(topic[i]);
sub->command.details.sub.qoss[i] = qos[i]; sub->command.details.sub.qoss[i] = qos[i];
} }
rc = MQTTAsync_addCommand(sub, sizeof(sub)); rc = MQTTAsync_addCommand(sub, sizeof(sub));
...@@ -2599,7 +2602,7 @@ int MQTTAsync_unsubscribeMany(MQTTAsync handle, int count, char* const* topic, M ...@@ -2599,7 +2602,7 @@ int MQTTAsync_unsubscribeMany(MQTTAsync handle, int count, char* const* topic, M
rc = MQTTASYNC_NO_MORE_MSGIDS; rc = MQTTASYNC_NO_MORE_MSGIDS;
goto exit; goto exit;
} }
/* Add unsubscribe request to operation queue */ /* Add unsubscribe request to operation queue */
unsub = malloc(sizeof(MQTTAsync_queuedCommand)); unsub = malloc(sizeof(MQTTAsync_queuedCommand));
memset(unsub, '\0', sizeof(MQTTAsync_queuedCommand)); memset(unsub, '\0', sizeof(MQTTAsync_queuedCommand));
...@@ -2663,7 +2666,7 @@ int MQTTAsync_send(MQTTAsync handle, const char* destinationName, int payloadlen ...@@ -2663,7 +2666,7 @@ int MQTTAsync_send(MQTTAsync handle, const char* destinationName, int payloadlen
FUNC_ENTRY; FUNC_ENTRY;
if (m == NULL || m->c == NULL) if (m == NULL || m->c == NULL)
rc = MQTTASYNC_FAILURE; rc = MQTTASYNC_FAILURE;
else if (m->c->connected == 0 && (m->createOptions == NULL || else if (m->c->connected == 0 && (m->createOptions == NULL ||
m->createOptions->sendWhileDisconnected == 0 || m->shouldBeConnected == 0)) m->createOptions->sendWhileDisconnected == 0 || m->shouldBeConnected == 0))
rc = MQTTASYNC_DISCONNECTED; rc = MQTTASYNC_DISCONNECTED;
else if (!UTF8_validateString(destinationName)) else if (!UTF8_validateString(destinationName))
...@@ -2677,7 +2680,7 @@ int MQTTAsync_send(MQTTAsync handle, const char* destinationName, int payloadlen ...@@ -2677,7 +2680,7 @@ int MQTTAsync_send(MQTTAsync handle, const char* destinationName, int payloadlen
if (rc != MQTTASYNC_SUCCESS) if (rc != MQTTASYNC_SUCCESS)
goto exit; goto exit;
/* Add publish request to operation queue */ /* Add publish request to operation queue */
pub = malloc(sizeof(MQTTAsync_queuedCommand)); pub = malloc(sizeof(MQTTAsync_queuedCommand));
memset(pub, '\0', sizeof(MQTTAsync_queuedCommand)); memset(pub, '\0', sizeof(MQTTAsync_queuedCommand));
...@@ -2765,7 +2768,7 @@ static int MQTTAsync_connecting(MQTTAsyncs* m) ...@@ -2765,7 +2768,7 @@ static int MQTTAsync_connecting(MQTTAsyncs* m)
if (rc != 0) if (rc != 0)
goto exit; goto exit;
Socket_clearPendingWrite(m->c->net.socket); Socket_clearPendingWrite(m->c->net.socket);
#if defined(OPENSSL) #if defined(OPENSSL)
...@@ -2796,7 +2799,7 @@ static int MQTTAsync_connecting(MQTTAsyncs* m) ...@@ -2796,7 +2799,7 @@ static int MQTTAsync_connecting(MQTTAsyncs* m)
rc = SOCKET_ERROR; rc = SOCKET_ERROR;
goto exit; goto exit;
} }
else if (rc == 1) else if (rc == 1)
{ {
rc = MQTTCLIENT_SUCCESS; rc = MQTTCLIENT_SUCCESS;
m->c->connect_state = 3; m->c->connect_state = 3;
...@@ -2845,13 +2848,13 @@ exit: ...@@ -2845,13 +2848,13 @@ exit:
if (MQTTAsync_checkConn(&m->connect, m)) if (MQTTAsync_checkConn(&m->connect, m))
{ {
MQTTAsync_queuedCommand* conn; MQTTAsync_queuedCommand* conn;
MQTTAsync_closeOnly(m->c); MQTTAsync_closeOnly(m->c);
/* put the connect command back to the head of the command queue, using the next serverURI */ /* put the connect command back to the head of the command queue, using the next serverURI */
conn = malloc(sizeof(MQTTAsync_queuedCommand)); conn = malloc(sizeof(MQTTAsync_queuedCommand));
memset(conn, '\0', sizeof(MQTTAsync_queuedCommand)); memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
conn->client = m; conn->client = m;
conn->command = m->connect; conn->command = m->connect;
Log(TRACE_MIN, -1, "Connect failed, more to try"); Log(TRACE_MIN, -1, "Connect failed, more to try");
MQTTAsync_addCommand(conn, sizeof(m->connect)); MQTTAsync_addCommand(conn, sizeof(m->connect));
} }
...@@ -2861,7 +2864,7 @@ exit: ...@@ -2861,7 +2864,7 @@ exit:
if (m->connect.onFailure) if (m->connect.onFailure)
{ {
MQTTAsync_failureData data; MQTTAsync_failureData data;
data.token = 0; data.token = 0;
data.code = MQTTASYNC_FAILURE; data.code = MQTTASYNC_FAILURE;
data.message = "TCP/TLS connect failure"; data.message = "TCP/TLS connect failure";
...@@ -2936,7 +2939,7 @@ static MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc) ...@@ -2936,7 +2939,7 @@ static MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
if (m->connect.onFailure) if (m->connect.onFailure)
{ {
MQTTAsync_failureData data; MQTTAsync_failureData data;
data.token = 0; data.token = 0;
data.code = MQTTASYNC_FAILURE; data.code = MQTTASYNC_FAILURE;
data.message = "TCP connect completion failure"; data.message = "TCP connect completion failure";
...@@ -2967,7 +2970,7 @@ static MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc) ...@@ -2967,7 +2970,7 @@ static MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
if (m) if (m)
{ {
ListElement* current = NULL; ListElement* current = NULL;
if (m->dc) if (m->dc)
{ {
Log(TRACE_MIN, -1, "Calling deliveryComplete for client %s, msgid %d", m->c->clientID, msgid); Log(TRACE_MIN, -1, "Calling deliveryComplete for client %s, msgid %d", m->c->clientID, msgid);
...@@ -2978,13 +2981,13 @@ static MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc) ...@@ -2978,13 +2981,13 @@ static MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
{ {
MQTTAsync_queuedCommand* command = (MQTTAsync_queuedCommand*)(current->content); MQTTAsync_queuedCommand* command = (MQTTAsync_queuedCommand*)(current->content);
if (command->command.token == msgid) if (command->command.token == msgid)
{ {
if (!ListDetach(m->responses, command)) /* then remove the response from the list */ if (!ListDetach(m->responses, command)) /* then remove the response from the list */
Log(LOG_ERROR, -1, "Publish command not removed from command list"); Log(LOG_ERROR, -1, "Publish command not removed from command list");
if (command->command.onSuccess) if (command->command.onSuccess)
{ {
MQTTAsync_successData data; MQTTAsync_successData data;
data.token = command->command.token; data.token = command->command.token;
data.alt.pub.destinationName = command->command.details.pub.destinationName; data.alt.pub.destinationName = command->command.details.pub.destinationName;
data.alt.pub.message.payload = command->command.details.pub.payload; data.alt.pub.message.payload = command->command.details.pub.payload;
...@@ -3194,29 +3197,29 @@ MQTTAsync_nameValue* MQTTAsync_getVersionInfo(void) ...@@ -3194,29 +3197,29 @@ MQTTAsync_nameValue* MQTTAsync_getVersionInfo(void)
{ {
#define MAX_INFO_STRINGS 8 #define MAX_INFO_STRINGS 8
static MQTTAsync_nameValue libinfo[MAX_INFO_STRINGS + 1]; static MQTTAsync_nameValue libinfo[MAX_INFO_STRINGS + 1];
int i = 0; int i = 0;
libinfo[i].name = "Product name"; libinfo[i].name = "Product name";
libinfo[i++].value = "Paho Asynchronous MQTT C Client Library"; libinfo[i++].value = "Paho Asynchronous MQTT C Client Library";
libinfo[i].name = "Version"; libinfo[i].name = "Version";
libinfo[i++].value = CLIENT_VERSION; libinfo[i++].value = CLIENT_VERSION;
libinfo[i].name = "Build level"; libinfo[i].name = "Build level";
libinfo[i++].value = BUILD_TIMESTAMP; libinfo[i++].value = BUILD_TIMESTAMP;
#if defined(OPENSSL) #if defined(OPENSSL)
libinfo[i].name = "OpenSSL version"; libinfo[i].name = "OpenSSL version";
libinfo[i++].value = SSLeay_version(SSLEAY_VERSION); libinfo[i++].value = SSLeay_version(SSLEAY_VERSION);
libinfo[i].name = "OpenSSL flags"; libinfo[i].name = "OpenSSL flags";
libinfo[i++].value = SSLeay_version(SSLEAY_CFLAGS); libinfo[i++].value = SSLeay_version(SSLEAY_CFLAGS);
libinfo[i].name = "OpenSSL build timestamp"; libinfo[i].name = "OpenSSL build timestamp";
libinfo[i++].value = SSLeay_version(SSLEAY_BUILT_ON); libinfo[i++].value = SSLeay_version(SSLEAY_BUILT_ON);
libinfo[i].name = "OpenSSL platform"; libinfo[i].name = "OpenSSL platform";
libinfo[i++].value = SSLeay_version(SSLEAY_PLATFORM); libinfo[i++].value = SSLeay_version(SSLEAY_PLATFORM);
libinfo[i].name = "OpenSSL directory"; libinfo[i].name = "OpenSSL directory";
libinfo[i++].value = SSLeay_version(SSLEAY_DIR); libinfo[i++].value = SSLeay_version(SSLEAY_DIR);
#endif #endif
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment