Commit af104622 authored by Keith Holman's avatar Keith Holman

clients: add defines for connect states

To help understand the various connecting states this patch introduces
defines with the various states.  This helps make the code a bit easier
to understand.
Signed-off-by: 's avatarKeith Holman <keith.holman@windriver.com>
parent 6f929f58
...@@ -160,6 +160,19 @@ typedef struct ...@@ -160,6 +160,19 @@ typedef struct
#endif #endif
} networkHandles; } networkHandles;
/* connection states */
/** no connection in progress, see connected value */
#define NOT_IN_PROGRESS 0x0
/** TCP connection in progress */
#define TCP_IN_PROGRESS 0x1
/** SSL connection in progress */
#define SSL_IN_PROGRESS 0x2
/** TCP completed, waiting for MQTT ACK */
#define WAIT_FOR_CONNACK 0x3
/** Disconnecting */
#define DISCONNECTING -2
/** /**
* Data related to one client * Data related to one client
*/ */
......
...@@ -878,7 +878,7 @@ static int MQTTAsync_addCommand(MQTTAsync_queuedCommand* command, int command_si ...@@ -878,7 +878,7 @@ static int MQTTAsync_addCommand(MQTTAsync_queuedCommand* command, int command_si
FUNC_ENTRY; FUNC_ENTRY;
MQTTAsync_lock_mutex(mqttcommand_mutex); MQTTAsync_lock_mutex(mqttcommand_mutex);
/* Don't set start time if the connect command is already in process #218 */ /* Don't set start time if the connect command is already in process #218 */
if ((command->command.type != CONNECT) || (command->client->c->connect_state == 0)) if ((command->command.type != CONNECT) || (command->client->c->connect_state == NOT_IN_PROGRESS))
command->command.start_time = MQTTAsync_start_clock(); command->command.start_time = MQTTAsync_start_clock();
if (command->command.type == CONNECT || if (command->command.type == CONNECT ||
(command->command.type == DISCONNECT && command->command.details.dis.internal)) (command->command.type == DISCONNECT && command->command.details.dis.internal))
...@@ -1192,7 +1192,7 @@ static int MQTTAsync_processCommand(void) ...@@ -1192,7 +1192,7 @@ static int MQTTAsync_processCommand(void)
continue; continue;
if (cmd->command.type == CONNECT || cmd->command.type == DISCONNECT || (cmd->client->c->connected && if (cmd->command.type == CONNECT || cmd->command.type == DISCONNECT || (cmd->client->c->connected &&
cmd->client->c->connect_state == 0 && MQTTAsync_Socket_noPendingWrites(cmd->client->c->net.socket))) cmd->client->c->connect_state == NOT_IN_PROGRESS && MQTTAsync_Socket_noPendingWrites(cmd->client->c->net.socket)))
{ {
if ((cmd->command.type == PUBLISH || cmd->command.type == SUBSCRIBE || cmd->command.type == UNSUBSCRIBE) && if ((cmd->command.type == PUBLISH || cmd->command.type == SUBSCRIBE || cmd->command.type == UNSUBSCRIBE) &&
cmd->client->c->outboundMsgs->count >= MAX_MSG_ID - 1) cmd->client->c->outboundMsgs->count >= MAX_MSG_ID - 1)
...@@ -1223,7 +1223,7 @@ static int MQTTAsync_processCommand(void) ...@@ -1223,7 +1223,7 @@ static int MQTTAsync_processCommand(void)
if (command->command.type == CONNECT) if (command->command.type == CONNECT)
{ {
if (command->client->c->connect_state != 0 || command->client->c->connected) if (command->client->c->connect_state != NOT_IN_PROGRESS || command->client->c->connected)
rc = 0; rc = 0;
else else
{ {
...@@ -1260,7 +1260,7 @@ static int MQTTAsync_processCommand(void) ...@@ -1260,7 +1260,7 @@ static int MQTTAsync_processCommand(void)
#else #else
rc = MQTTProtocol_connect(serverURI, command->client->c, command->command.details.conn.MQTTVersion); rc = MQTTProtocol_connect(serverURI, command->client->c, command->command.details.conn.MQTTVersion);
#endif #endif
if (command->client->c->connect_state == 0) if (command->client->c->connect_state == NOT_IN_PROGRESS)
rc = SOCKET_ERROR; rc = SOCKET_ERROR;
/* if the TCP connect is pending, then we must call select to determine when the connect has completed, /* if the TCP connect is pending, then we must call select to determine when the connect has completed,
...@@ -1342,11 +1342,11 @@ static int MQTTAsync_processCommand(void) ...@@ -1342,11 +1342,11 @@ static int MQTTAsync_processCommand(void)
} }
else if (command->command.type == DISCONNECT) else if (command->command.type == DISCONNECT)
{ {
if (command->client->c->connect_state != 0 || command->client->c->connected != 0) if (command->client->c->connect_state != NOT_IN_PROGRESS || command->client->c->connected != 0)
{ {
if (command->client->c->connect_state != 0) if (command->client->c->connect_state != NOT_IN_PROGRESS)
{ {
command->client->c->connect_state = -2; command->client->c->connect_state = DISCONNECTING;
if (command->client->connect.onFailure) if (command->client->connect.onFailure)
{ {
MQTTAsync_failureData data; MQTTAsync_failureData data;
...@@ -1502,11 +1502,11 @@ static void MQTTAsync_checkTimeouts(void) ...@@ -1502,11 +1502,11 @@ static void MQTTAsync_checkTimeouts(void)
MQTTAsyncs* m = (MQTTAsyncs*)(current->content); MQTTAsyncs* m = (MQTTAsyncs*)(current->content);
/* check disconnect timeout */ /* check disconnect timeout */
if (m->c->connect_state == -2) if (m->c->connect_state == DISCONNECTING)
MQTTAsync_checkDisconnect(m, &m->disconnect); MQTTAsync_checkDisconnect(m, &m->disconnect);
/* check connect timeout */ /* check connect timeout */
if (m->c->connect_state != 0 && MQTTAsync_elapsed(m->connect.start_time) > (m->connectTimeout * 1000)) if (m->c->connect_state != NOT_IN_PROGRESS && MQTTAsync_elapsed(m->connect.start_time) > (m->connectTimeout * 1000))
{ {
nextOrClose(m, MQTTASYNC_FAILURE, "TCP connect timeout"); nextOrClose(m, MQTTASYNC_FAILURE, "TCP connect timeout");
continue; continue;
...@@ -1756,7 +1756,7 @@ static int MQTTAsync_completeConnection(MQTTAsyncs* m, MQTTPacket* pack) ...@@ -1756,7 +1756,7 @@ static int MQTTAsync_completeConnection(MQTTAsyncs* m, MQTTPacket* pack)
int rc = MQTTASYNC_FAILURE; int rc = MQTTASYNC_FAILURE;
FUNC_ENTRY; FUNC_ENTRY;
if (m->c->connect_state == 3) /* MQTT connect sent - wait for CONNACK */ if (m->c->connect_state == WAIT_FOR_CONNACK) /* MQTT connect sent - wait for CONNACK */
{ {
Connack* connack = (Connack*)pack; Connack* connack = (Connack*)pack;
Log(LOG_PROTOCOL, 1, NULL, m->c->net.socket, m->c->clientID, connack->rc); Log(LOG_PROTOCOL, 1, NULL, m->c->net.socket, m->c->clientID, connack->rc);
...@@ -1765,7 +1765,7 @@ static int MQTTAsync_completeConnection(MQTTAsyncs* m, MQTTPacket* pack) ...@@ -1765,7 +1765,7 @@ static int MQTTAsync_completeConnection(MQTTAsyncs* m, MQTTPacket* pack)
m->retrying = 0; m->retrying = 0;
m->c->connected = 1; m->c->connected = 1;
m->c->good = 1; m->c->good = 1;
m->c->connect_state = 0; m->c->connect_state = NOT_IN_PROGRESS;
if (m->c->cleansession) if (m->c->cleansession)
rc = MQTTAsync_cleanSession(m->c); rc = MQTTAsync_cleanSession(m->c);
if (m->c->outboundMsgs->count > 0) if (m->c->outboundMsgs->count > 0)
...@@ -1844,7 +1844,7 @@ static thread_return_type WINAPI MQTTAsync_receiveThread(void* n) ...@@ -1844,7 +1844,7 @@ static thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
MQTTAsync_disconnect_internal(m, 0); MQTTAsync_disconnect_internal(m, 0);
MQTTAsync_lock_mutex(mqttasync_mutex); MQTTAsync_lock_mutex(mqttasync_mutex);
} }
else if (m->c->connect_state != 0) else if (m->c->connect_state != NOT_IN_PROGRESS)
nextOrClose(m, rc, "socket error"); nextOrClose(m, rc, "socket error");
else /* calling disconnect_internal won't have any effect if we're already disconnected */ else /* calling disconnect_internal won't have any effect if we're already disconnected */
MQTTAsync_closeOnly(m->c); MQTTAsync_closeOnly(m->c);
...@@ -2032,7 +2032,7 @@ static void MQTTAsync_stop(void) ...@@ -2032,7 +2032,7 @@ static void MQTTAsync_stop(void)
/* find out how many handles are still connected */ /* find out how many handles are still connected */
while (ListNextElement(handles, &current)) while (ListNextElement(handles, &current))
{ {
if (((MQTTAsyncs*)(current->content))->c->connect_state > 0 || if (((MQTTAsyncs*)(current->content))->c->connect_state > NOT_IN_PROGRESS ||
((MQTTAsyncs*)(current->content))->c->connected) ((MQTTAsyncs*)(current->content))->c->connected)
++conn_count; ++conn_count;
} }
...@@ -2069,7 +2069,7 @@ int MQTTAsync_setCallbacks(MQTTAsync handle, void* context, ...@@ -2069,7 +2069,7 @@ int MQTTAsync_setCallbacks(MQTTAsync handle, void* context,
FUNC_ENTRY; FUNC_ENTRY;
MQTTAsync_lock_mutex(mqttasync_mutex); MQTTAsync_lock_mutex(mqttasync_mutex);
if (m == NULL || ma == NULL || m->c->connect_state != 0) if (m == NULL || ma == NULL || m->c->connect_state != NOT_IN_PROGRESS)
rc = MQTTASYNC_FAILURE; rc = MQTTASYNC_FAILURE;
else else
{ {
...@@ -2093,7 +2093,7 @@ int MQTTAsync_setConnected(MQTTAsync handle, void* context, MQTTAsync_connected* ...@@ -2093,7 +2093,7 @@ int MQTTAsync_setConnected(MQTTAsync handle, void* context, MQTTAsync_connected*
FUNC_ENTRY; FUNC_ENTRY;
MQTTAsync_lock_mutex(mqttasync_mutex); MQTTAsync_lock_mutex(mqttasync_mutex);
if (m == NULL || m->c->connect_state != 0) if (m == NULL || m->c->connect_state != NOT_IN_PROGRESS)
rc = MQTTASYNC_FAILURE; rc = MQTTASYNC_FAILURE;
else else
{ {
...@@ -2129,7 +2129,7 @@ static void MQTTAsync_closeOnly(Clients* client) ...@@ -2129,7 +2129,7 @@ static void MQTTAsync_closeOnly(Clients* client)
Thread_unlock_mutex(socket_mutex); Thread_unlock_mutex(socket_mutex);
} }
client->connected = 0; client->connected = 0;
client->connect_state = 0; client->connect_state = NOT_IN_PROGRESS;
FUNC_EXIT; FUNC_EXIT;
} }
...@@ -2907,7 +2907,7 @@ static int MQTTAsync_connecting(MQTTAsyncs* m) ...@@ -2907,7 +2907,7 @@ static int MQTTAsync_connecting(MQTTAsyncs* m)
int rc = -1; int rc = -1;
FUNC_ENTRY; FUNC_ENTRY;
if (m->c->connect_state == 1) /* TCP connect started - check for completion */ if (m->c->connect_state == TCP_IN_PROGRESS) /* TCP connect started - check for completion */
{ {
int error; int error;
socklen_t len = sizeof(error); socklen_t len = sizeof(error);
...@@ -2942,7 +2942,7 @@ static int MQTTAsync_connecting(MQTTAsyncs* m) ...@@ -2942,7 +2942,7 @@ static int MQTTAsync_connecting(MQTTAsyncs* m)
if (rc == TCPSOCKET_INTERRUPTED) if (rc == TCPSOCKET_INTERRUPTED)
{ {
rc = MQTTCLIENT_SUCCESS; /* the connect is still in progress */ rc = MQTTCLIENT_SUCCESS; /* the connect is still in progress */
m->c->connect_state = 2; m->c->connect_state = SSL_IN_PROGRESS;
} }
else if (rc == SSL_FATAL) else if (rc == SSL_FATAL)
{ {
...@@ -2952,7 +2952,7 @@ static int MQTTAsync_connecting(MQTTAsyncs* m) ...@@ -2952,7 +2952,7 @@ static int MQTTAsync_connecting(MQTTAsyncs* m)
else if (rc == 1) else if (rc == 1)
{ {
rc = MQTTCLIENT_SUCCESS; rc = MQTTCLIENT_SUCCESS;
m->c->connect_state = 3; m->c->connect_state = WAIT_FOR_CONNACK;
if (MQTTPacket_send_connect(m->c, m->connect.details.conn.MQTTVersion) == SOCKET_ERROR) if (MQTTPacket_send_connect(m->c, m->connect.details.conn.MQTTVersion) == SOCKET_ERROR)
{ {
rc = SOCKET_ERROR; rc = SOCKET_ERROR;
...@@ -2971,7 +2971,7 @@ static int MQTTAsync_connecting(MQTTAsyncs* m) ...@@ -2971,7 +2971,7 @@ static int MQTTAsync_connecting(MQTTAsyncs* m)
else else
{ {
#endif #endif
m->c->connect_state = 3; /* TCP/SSL connect completed, in which case send the MQTT connect packet */ m->c->connect_state = WAIT_FOR_CONNACK; /* TCP/SSL connect completed, in which case send the MQTT connect packet */
if ((rc = MQTTPacket_send_connect(m->c, m->connect.details.conn.MQTTVersion)) == SOCKET_ERROR) if ((rc = MQTTPacket_send_connect(m->c, m->connect.details.conn.MQTTVersion)) == SOCKET_ERROR)
goto exit; goto exit;
#if defined(OPENSSL) #if defined(OPENSSL)
...@@ -2979,7 +2979,7 @@ static int MQTTAsync_connecting(MQTTAsyncs* m) ...@@ -2979,7 +2979,7 @@ static int MQTTAsync_connecting(MQTTAsyncs* m)
#endif #endif
} }
#if defined(OPENSSL) #if defined(OPENSSL)
else if (m->c->connect_state == 2) /* SSL connect sent - wait for completion */ else if (m->c->connect_state == SSL_IN_PROGRESS) /* SSL connect sent - wait for completion */
{ {
if ((rc = SSLSocket_connect(m->c->net.ssl, m->c->net.socket, if ((rc = SSLSocket_connect(m->c->net.ssl, m->c->net.socket,
m->serverURI, m->c->sslopts->verify)) != 1) m->serverURI, m->c->sslopts->verify)) != 1)
...@@ -2987,14 +2987,14 @@ static int MQTTAsync_connecting(MQTTAsyncs* m) ...@@ -2987,14 +2987,14 @@ static int MQTTAsync_connecting(MQTTAsyncs* m)
if(!m->c->cleansession && m->c->session == NULL) if(!m->c->cleansession && m->c->session == NULL)
m->c->session = SSL_get1_session(m->c->net.ssl); m->c->session = SSL_get1_session(m->c->net.ssl);
m->c->connect_state = 3; /* SSL connect completed, in which case send the MQTT connect packet */ m->c->connect_state = WAIT_FOR_CONNACK; /* SSL connect completed, in which case send the MQTT connect packet */
if ((rc = MQTTPacket_send_connect(m->c, m->connect.details.conn.MQTTVersion)) == SOCKET_ERROR) if ((rc = MQTTPacket_send_connect(m->c, m->connect.details.conn.MQTTVersion)) == SOCKET_ERROR)
goto exit; goto exit;
} }
#endif #endif
exit: exit:
if ((rc != 0 && rc != TCPSOCKET_INTERRUPTED && m->c->connect_state != 2) || (rc == SSL_FATAL)) if ((rc != 0 && rc != TCPSOCKET_INTERRUPTED && m->c->connect_state != SSL_IN_PROGRESS) || (rc == SSL_FATAL))
nextOrClose(m, MQTTASYNC_FAILURE, "TCP/TLS connect failure"); nextOrClose(m, MQTTASYNC_FAILURE, "TCP/TLS connect failure");
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
...@@ -3035,11 +3035,11 @@ static MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc) ...@@ -3035,11 +3035,11 @@ static MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
if (m != NULL) if (m != NULL)
{ {
Log(TRACE_MINIMUM, -1, "m->c->connect_state = %d",m->c->connect_state); Log(TRACE_MINIMUM, -1, "m->c->connect_state = %d",m->c->connect_state);
if (m->c->connect_state == 1 || m->c->connect_state == 2) if (m->c->connect_state == TCP_IN_PROGRESS || m->c->connect_state == SSL_IN_PROGRESS)
*rc = MQTTAsync_connecting(m); *rc = MQTTAsync_connecting(m);
else else
pack = MQTTPacket_Factory(&m->c->net, rc); pack = MQTTPacket_Factory(&m->c->net, rc);
if (m->c->connect_state == 3 && *rc == SOCKET_ERROR) if (m->c->connect_state == WAIT_FOR_CONNACK && *rc == SOCKET_ERROR)
{ {
Log(TRACE_MINIMUM, -1, "CONNECT sent but MQTTPacket_Factory has returned SOCKET_ERROR"); Log(TRACE_MINIMUM, -1, "CONNECT sent but MQTTPacket_Factory has returned SOCKET_ERROR");
nextOrClose(m, MQTTASYNC_FAILURE, "TCP connect completion failure"); nextOrClose(m, MQTTASYNC_FAILURE, "TCP connect completion failure");
......
...@@ -592,12 +592,12 @@ static thread_return_type WINAPI MQTTClient_run(void* n) ...@@ -592,12 +592,12 @@ static thread_return_type WINAPI MQTTClient_run(void* n)
MQTTClient_disconnect_internal(m, 0); MQTTClient_disconnect_internal(m, 0);
else else
{ {
if (m->c->connect_state == 2 && !Thread_check_sem(m->connect_sem)) if (m->c->connect_state == SSL_IN_PROGRESS && !Thread_check_sem(m->connect_sem))
{ {
Log(TRACE_MIN, -1, "Posting connect semaphore for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Posting connect semaphore for client %s", m->c->clientID);
Thread_post_sem(m->connect_sem); Thread_post_sem(m->connect_sem);
} }
if (m->c->connect_state == 3 && !Thread_check_sem(m->connack_sem)) if (m->c->connect_state == WAIT_FOR_CONNACK && !Thread_check_sem(m->connack_sem))
{ {
Log(TRACE_MIN, -1, "Posting connack semaphore for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Posting connack semaphore for client %s", m->c->clientID);
Thread_post_sem(m->connack_sem); Thread_post_sem(m->connack_sem);
...@@ -656,7 +656,7 @@ static thread_return_type WINAPI MQTTClient_run(void* n) ...@@ -656,7 +656,7 @@ static thread_return_type WINAPI MQTTClient_run(void* n)
Thread_post_sem(m->unsuback_sem); Thread_post_sem(m->unsuback_sem);
} }
} }
else if (m->c->connect_state == 1 && !Thread_check_sem(m->connect_sem)) else if (m->c->connect_state == TCP_IN_PROGRESS && !Thread_check_sem(m->connect_sem))
{ {
int error; int error;
socklen_t len = sizeof(error); socklen_t len = sizeof(error);
...@@ -667,7 +667,7 @@ static thread_return_type WINAPI MQTTClient_run(void* n) ...@@ -667,7 +667,7 @@ static thread_return_type WINAPI MQTTClient_run(void* n)
Thread_post_sem(m->connect_sem); Thread_post_sem(m->connect_sem);
} }
#if defined(OPENSSL) #if defined(OPENSSL)
else if (m->c->connect_state == 2 && !Thread_check_sem(m->connect_sem)) else if (m->c->connect_state == SSL_IN_PROGRESS && !Thread_check_sem(m->connect_sem))
{ {
rc = SSLSocket_connect(m->c->net.ssl, m->c->net.socket, rc = SSLSocket_connect(m->c->net.ssl, m->c->net.socket,
m->serverURI, m->c->sslopts->verify); m->serverURI, m->c->sslopts->verify);
...@@ -706,7 +706,7 @@ static void MQTTClient_stop(void) ...@@ -706,7 +706,7 @@ static void MQTTClient_stop(void)
/* find out how many handles are still connected */ /* find out how many handles are still connected */
while (ListNextElement(handles, &current)) while (ListNextElement(handles, &current))
{ {
if (((MQTTClients*)(current->content))->c->connect_state > 0 || if (((MQTTClients*)(current->content))->c->connect_state > NOT_IN_PROGRESS ||
((MQTTClients*)(current->content))->c->connected) ((MQTTClients*)(current->content))->c->connected)
++conn_count; ++conn_count;
} }
...@@ -743,7 +743,7 @@ int MQTTClient_setCallbacks(MQTTClient handle, void* context, MQTTClient_connect ...@@ -743,7 +743,7 @@ int MQTTClient_setCallbacks(MQTTClient handle, void* context, MQTTClient_connect
FUNC_ENTRY; FUNC_ENTRY;
Thread_lock_mutex(mqttclient_mutex); Thread_lock_mutex(mqttclient_mutex);
if (m == NULL || ma == NULL || m->c->connect_state != 0) if (m == NULL || ma == NULL || m->c->connect_state != NOT_IN_PROGRESS)
rc = MQTTCLIENT_FAILURE; rc = MQTTCLIENT_FAILURE;
else else
{ {
...@@ -780,7 +780,7 @@ static void MQTTClient_closeSession(Clients* client) ...@@ -780,7 +780,7 @@ static void MQTTClient_closeSession(Clients* client)
#endif #endif
} }
client->connected = 0; client->connected = 0;
client->connect_state = 0; client->connect_state = NOT_IN_PROGRESS;
if (client->cleansession) if (client->cleansession)
MQTTClient_cleanSession(client); MQTTClient_cleanSession(client);
...@@ -877,13 +877,13 @@ static int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOpt ...@@ -877,13 +877,13 @@ static int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOpt
if (rc == SOCKET_ERROR) if (rc == SOCKET_ERROR)
goto exit; goto exit;
if (m->c->connect_state == 0) if (m->c->connect_state == NOT_IN_PROGRESS)
{ {
rc = SOCKET_ERROR; rc = SOCKET_ERROR;
goto exit; goto exit;
} }
if (m->c->connect_state == 1) /* TCP connect started - wait for completion */ if (m->c->connect_state == TCP_IN_PROGRESS) /* TCP connect started - wait for completion */
{ {
Thread_unlock_mutex(mqttclient_mutex); Thread_unlock_mutex(mqttclient_mutex);
MQTTClient_waitfor(handle, CONNECT, &rc, millisecsTimeout - MQTTClient_elapsed(start)); MQTTClient_waitfor(handle, CONNECT, &rc, millisecsTimeout - MQTTClient_elapsed(start));
...@@ -914,7 +914,7 @@ static int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOpt ...@@ -914,7 +914,7 @@ static int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOpt
rc = SSLSocket_connect(m->c->net.ssl, m->c->net.socket, rc = SSLSocket_connect(m->c->net.ssl, m->c->net.socket,
m->serverURI, m->c->sslopts->verify); m->serverURI, m->c->sslopts->verify);
if (rc == TCPSOCKET_INTERRUPTED) if (rc == TCPSOCKET_INTERRUPTED)
m->c->connect_state = 2; /* the connect is still in progress */ m->c->connect_state = SSL_IN_PROGRESS; /* the connect is still in progress */
else if (rc == SSL_FATAL) else if (rc == SSL_FATAL)
{ {
rc = SOCKET_ERROR; rc = SOCKET_ERROR;
...@@ -923,7 +923,7 @@ static int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOpt ...@@ -923,7 +923,7 @@ static int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOpt
else if (rc == 1) else if (rc == 1)
{ {
rc = MQTTCLIENT_SUCCESS; rc = MQTTCLIENT_SUCCESS;
m->c->connect_state = 3; m->c->connect_state = WAIT_FOR_CONNACK;
if (MQTTPacket_send_connect(m->c, MQTTVersion) == SOCKET_ERROR) if (MQTTPacket_send_connect(m->c, MQTTVersion) == SOCKET_ERROR)
{ {
rc = SOCKET_ERROR; rc = SOCKET_ERROR;
...@@ -942,7 +942,7 @@ static int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOpt ...@@ -942,7 +942,7 @@ static int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOpt
else else
{ {
#endif #endif
m->c->connect_state = 3; /* TCP connect completed, in which case send the MQTT connect packet */ m->c->connect_state = WAIT_FOR_CONNACK; /* TCP connect completed, in which case send the MQTT connect packet */
if (MQTTPacket_send_connect(m->c, MQTTVersion) == SOCKET_ERROR) if (MQTTPacket_send_connect(m->c, MQTTVersion) == SOCKET_ERROR)
{ {
rc = SOCKET_ERROR; rc = SOCKET_ERROR;
...@@ -954,7 +954,7 @@ static int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOpt ...@@ -954,7 +954,7 @@ static int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOpt
} }
#if defined(OPENSSL) #if defined(OPENSSL)
if (m->c->connect_state == 2) /* SSL connect sent - wait for completion */ if (m->c->connect_state == SSL_IN_PROGRESS) /* SSL connect sent - wait for completion */
{ {
Thread_unlock_mutex(mqttclient_mutex); Thread_unlock_mutex(mqttclient_mutex);
MQTTClient_waitfor(handle, CONNECT, &rc, millisecsTimeout - MQTTClient_elapsed(start)); MQTTClient_waitfor(handle, CONNECT, &rc, millisecsTimeout - MQTTClient_elapsed(start));
...@@ -966,7 +966,7 @@ static int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOpt ...@@ -966,7 +966,7 @@ static int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOpt
} }
if(!m->c->cleansession && m->c->session == NULL) if(!m->c->cleansession && m->c->session == NULL)
m->c->session = SSL_get1_session(m->c->net.ssl); m->c->session = SSL_get1_session(m->c->net.ssl);
m->c->connect_state = 3; /* TCP connect completed, in which case send the MQTT connect packet */ m->c->connect_state = WAIT_FOR_CONNACK; /* TCP connect completed, in which case send the MQTT connect packet */
if (MQTTPacket_send_connect(m->c, MQTTVersion) == SOCKET_ERROR) if (MQTTPacket_send_connect(m->c, MQTTVersion) == SOCKET_ERROR)
{ {
rc = SOCKET_ERROR; rc = SOCKET_ERROR;
...@@ -975,7 +975,7 @@ static int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOpt ...@@ -975,7 +975,7 @@ static int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOpt
} }
#endif #endif
if (m->c->connect_state == 3) /* MQTT connect sent - wait for CONNACK */ if (m->c->connect_state == WAIT_FOR_CONNACK) /* MQTT connect sent - wait for CONNACK */
{ {
MQTTPacket* pack = NULL; MQTTPacket* pack = NULL;
...@@ -992,7 +992,7 @@ static int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOpt ...@@ -992,7 +992,7 @@ static int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOpt
{ {
m->c->connected = 1; m->c->connected = 1;
m->c->good = 1; m->c->good = 1;
m->c->connect_state = 0; m->c->connect_state = NOT_IN_PROGRESS;
if (MQTTVersion == 4) if (MQTTVersion == 4)
sessionPresent = connack->flags.bits.sessionPresent; sessionPresent = connack->flags.bits.sessionPresent;
if (m->c->cleansession) if (m->c->cleansession)
...@@ -1300,7 +1300,7 @@ static int MQTTClient_disconnect1(MQTTClient handle, int timeout, int call_conne ...@@ -1300,7 +1300,7 @@ static int MQTTClient_disconnect1(MQTTClient handle, int timeout, int call_conne
rc = MQTTCLIENT_FAILURE; rc = MQTTCLIENT_FAILURE;
goto exit; goto exit;
} }
if (m->c->connected == 0 && m->c->connect_state == 0) if (m->c->connected == 0 && m->c->connect_state == NOT_IN_PROGRESS)
{ {
rc = MQTTCLIENT_DISCONNECTED; rc = MQTTCLIENT_DISCONNECTED;
goto exit; goto exit;
...@@ -1309,7 +1309,7 @@ static int MQTTClient_disconnect1(MQTTClient handle, int timeout, int call_conne ...@@ -1309,7 +1309,7 @@ static int MQTTClient_disconnect1(MQTTClient handle, int timeout, int call_conne
if (m->c->connected != 0) if (m->c->connected != 0)
{ {
start = MQTTClient_start_clock(); start = MQTTClient_start_clock();
m->c->connect_state = -2; /* indicate disconnecting */ m->c->connect_state = DISCONNECTING; /* indicate disconnecting */
while (m->c->inboundMsgs->count > 0 || m->c->outboundMsgs->count > 0) while (m->c->inboundMsgs->count > 0 || m->c->outboundMsgs->count > 0)
{ /* wait for all inflight message flows to finish, up to timeout */ { /* wait for all inflight message flows to finish, up to timeout */
if (MQTTClient_elapsed(start) >= timeout) if (MQTTClient_elapsed(start) >= timeout)
...@@ -1741,7 +1741,7 @@ static MQTTPacket* MQTTClient_cycle(int* sock, unsigned long timeout, int* rc) ...@@ -1741,7 +1741,7 @@ static MQTTPacket* MQTTClient_cycle(int* sock, unsigned long timeout, int* rc)
m = (MQTTClient)(handles->current->content); m = (MQTTClient)(handles->current->content);
if (m != NULL) if (m != NULL)
{ {
if (m->c->connect_state == 1 || m->c->connect_state == 2) if (m->c->connect_state == TCP_IN_PROGRESS || m->c->connect_state == SSL_IN_PROGRESS)
*rc = 0; /* waiting for connect state to clear */ *rc = 0; /* waiting for connect state to clear */
else else
{ {
...@@ -1833,7 +1833,7 @@ static MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* r ...@@ -1833,7 +1833,7 @@ static MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* r
break; break;
if (pack && (pack->header.bits.type == packet_type)) if (pack && (pack->header.bits.type == packet_type))
break; break;
if (m->c->connect_state == 1) if (m->c->connect_state == TCP_IN_PROGRESS)
{ {
int error; int error;
socklen_t len = sizeof(error); socklen_t len = sizeof(error);
...@@ -1843,7 +1843,7 @@ static MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* r ...@@ -1843,7 +1843,7 @@ static MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* r
break; break;
} }
#if defined(OPENSSL) #if defined(OPENSSL)
else if (m->c->connect_state == 2) else if (m->c->connect_state == SSL_IN_PROGRESS)
{ {
*rc = SSLSocket_connect(m->c->net.ssl, sock, *rc = SSLSocket_connect(m->c->net.ssl, sock,
m->serverURI, m->c->sslopts->verify); m->serverURI, m->c->sslopts->verify);
...@@ -1857,7 +1857,7 @@ static MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* r ...@@ -1857,7 +1857,7 @@ static MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* r
} }
} }
#endif #endif
else if (m->c->connect_state == 3) else if (m->c->connect_state == WAIT_FOR_CONNACK)
{ {
int error; int error;
socklen_t len = sizeof(error); socklen_t len = sizeof(error);
...@@ -1965,7 +1965,7 @@ void MQTTClient_yield(void) ...@@ -1965,7 +1965,7 @@ void MQTTClient_yield(void)
if (rc == SOCKET_ERROR && ListFindItem(handles, &sock, clientSockCompare)) if (rc == SOCKET_ERROR && ListFindItem(handles, &sock, clientSockCompare))
{ {
MQTTClients* m = (MQTTClient)(handles->current->content); MQTTClients* m = (MQTTClient)(handles->current->content);
if (m->c->connect_state != -2) if (m->c->connect_state != DISCONNECTING)
MQTTClient_disconnect_internal(m, 0); MQTTClient_disconnect_internal(m, 0);
} }
Thread_unlock_mutex(mqttclient_mutex); Thread_unlock_mutex(mqttclient_mutex);
......
...@@ -108,7 +108,7 @@ int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int MQTTVersi ...@@ -108,7 +108,7 @@ int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int MQTTVersi
addr = MQTTProtocol_addressPort(ip_address, &port); addr = MQTTProtocol_addressPort(ip_address, &port);
rc = Socket_new(addr, port, &(aClient->net.socket)); rc = Socket_new(addr, port, &(aClient->net.socket));
if (rc == EINPROGRESS || rc == EWOULDBLOCK) if (rc == EINPROGRESS || rc == EWOULDBLOCK)
aClient->connect_state = 1; /* TCP connect called - wait for connect completion */ aClient->connect_state = TCP_IN_PROGRESS; /* TCP connect called - wait for connect completion */
else if (rc == 0) else if (rc == 0)
{ /* TCP connect completed. If SSL, send SSL connect */ { /* TCP connect completed. If SSL, send SSL connect */
#if defined(OPENSSL) #if defined(OPENSSL)
...@@ -119,20 +119,19 @@ int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int MQTTVersi ...@@ -119,20 +119,19 @@ int MQTTProtocol_connect(const char* ip_address, Clients* aClient, int MQTTVersi
rc = SSLSocket_connect(aClient->net.ssl, aClient->net.socket, rc = SSLSocket_connect(aClient->net.ssl, aClient->net.socket,
addr, aClient->sslopts->verify); addr, aClient->sslopts->verify);
if (rc == TCPSOCKET_INTERRUPTED) if (rc == TCPSOCKET_INTERRUPTED)
aClient->connect_state = 2; /* SSL connect called - wait for completion */ aClient->connect_state = SSL_IN_PROGRESS; /* SSL connect called - wait for completion */
} }
else else
rc = SOCKET_ERROR; rc = SOCKET_ERROR;
} }
#endif #endif
if (rc == 0) if (rc == 0)
{ {
/* Now send the MQTT connect packet */ /* Now send the MQTT connect packet */
if ((rc = MQTTPacket_send_connect(aClient, MQTTVersion)) == 0) if ((rc = MQTTPacket_send_connect(aClient, MQTTVersion)) == 0)
aClient->connect_state = 3; /* MQTT Connect sent - wait for CONNACK */ aClient->connect_state = WAIT_FOR_CONNACK; /* MQTT Connect sent - wait for CONNACK */
else else
aClient->connect_state = 0; aClient->connect_state = NOT_IN_PROGRESS;
} }
} }
if (addr != ip_address) if (addr != ip_address)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment