Commit e3e40d17 authored by Ian Craggs's avatar Ian Craggs

Test fixes for bug #442400

Bug:442400
parent fa781a1a
...@@ -49,8 +49,8 @@ ...@@ -49,8 +49,8 @@
#define URI_TCP "tcp://" #define URI_TCP "tcp://"
#define BUILD_TIMESTAMP "##MQTTCLIENT_BUILD_TAG##" #define BUILD_TIMESTAMP "201408221458"
#define CLIENT_VERSION "##MQTTCLIENT_VERSION_TAG##" #define CLIENT_VERSION "1.0.0"
char* client_timestamp_eye = "MQTTAsyncV3_Timestamp " BUILD_TIMESTAMP; char* client_timestamp_eye = "MQTTAsyncV3_Timestamp " BUILD_TIMESTAMP;
char* client_version_eye = "MQTTAsyncV3_Version " CLIENT_VERSION; char* client_version_eye = "MQTTAsyncV3_Version " CLIENT_VERSION;
...@@ -1257,14 +1257,17 @@ thread_return_type WINAPI MQTTAsync_sendThread(void* n) ...@@ -1257,14 +1257,17 @@ thread_return_type WINAPI MQTTAsync_sendThread(void* n)
MQTTAsync_unlock_mutex(mqttasync_mutex); MQTTAsync_unlock_mutex(mqttasync_mutex);
while (!tostop) while (!tostop)
{ {
/*int rc;*/ int rc;
while (commands->count > 0) while (commands->count > 0)
MQTTAsync_processCommand(); MQTTAsync_processCommand();
#if !defined(WIN32) && !defined(WIN64) #if !defined(WIN32) && !defined(WIN64)
/*rc =*/ Thread_wait_cond(send_cond, 1); rc = Thread_wait_cond(send_cond, 1);
if ((rc = Thread_wait_cond(send_cond, 1)) != 0 && rc != ETIMEDOUT)
Log(LOG_ERROR, -1, "Error %d waiting for condition variable", rc);
#else #else
/*rc =*/ Thread_wait_sem(send_sem, 1000); if ((rc = Thread_wait_sem(send_sem, 1000)) != 0 && rc != ETIMEDOUT)
Log(LOG_ERROR, -1, "Error %d waiting for semaphore", rc);
#endif #endif
MQTTAsync_checkTimeouts(); MQTTAsync_checkTimeouts();
...@@ -1459,23 +1462,32 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n) ...@@ -1459,23 +1462,32 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
break; break;
timeout = 1000L; timeout = 1000L;
if (sock == 0)
continue;
/* find client corresponding to socket */ /* find client corresponding to socket */
if (ListFindItem(handles, &sock, clientSockCompare) == NULL) if (ListFindItem(handles, &sock, clientSockCompare) == NULL)
{ {
/* assert: should not happen */ Log(LOG_ERROR, -1, "Could not find client corresponding to socket %d - removing socket", sock);
Socket_close(sock);
continue; continue;
} }
m = (MQTTAsyncs*)(handles->current->content); m = (MQTTAsyncs*)(handles->current->content);
if (m == NULL) if (m == NULL)
{ {
/* assert: should not happen */ Log(LOG_ERROR, -1, "Client structure was NULL for socket %d - removing socket", sock);
Socket_close(sock);
continue; continue;
} }
if (rc == SOCKET_ERROR) if (rc == SOCKET_ERROR)
{ {
MQTTAsync_unlock_mutex(mqttasync_mutex); if (m->c->connected == 1)
MQTTAsync_disconnect_internal(m, 0); {
MQTTAsync_lock_mutex(mqttasync_mutex); MQTTAsync_unlock_mutex(mqttasync_mutex);
MQTTAsync_disconnect_internal(m, 0);
MQTTAsync_lock_mutex(mqttasync_mutex);
}
else /* calling disconnect_internal won't have any effect if we're already disconnected */
MQTTAsync_closeOnly(m->c);
} }
else else
{ {
...@@ -2535,19 +2547,19 @@ MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc) ...@@ -2535,19 +2547,19 @@ MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
*rc = MQTTAsync_connecting(m); *rc = MQTTAsync_connecting(m);
else else
pack = MQTTPacket_Factory(&m->c->net, rc); pack = MQTTPacket_Factory(&m->c->net, rc);
if ((m->c->connect_state == 3) && (*rc == SOCKET_ERROR)) if (m->c->connect_state == 3 && *rc == SOCKET_ERROR)
{ {
Log(TRACE_MINIMUM, -1, "CONNECT sent but MQTTPacket_Factory has returned SOCKET_ERROR"); Log(TRACE_MINIMUM, -1, "CONNECT sent but MQTTPacket_Factory has returned SOCKET_ERROR");
if (MQTTAsync_checkConn(&m->connect, m)) if (MQTTAsync_checkConn(&m->connect, m))
{ {
MQTTAsync_queuedCommand* conn; MQTTAsync_queuedCommand* conn;
MQTTAsync_closeOnly(m->c); MQTTAsync_closeOnly(m->c);
/* put the connect command back to the head of the command queue, using the next serverURI */ /* put the connect command back to the head of the command queue, using the next serverURI */
conn = malloc(sizeof(MQTTAsync_queuedCommand)); conn = malloc(sizeof(MQTTAsync_queuedCommand));
memset(conn, '\0', sizeof(MQTTAsync_queuedCommand)); memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
conn->client = m; conn->client = m;
conn->command = m->connect; conn->command = m->connect;
Log(TRACE_MIN, -1, "Connect failed, more to try"); Log(TRACE_MIN, -1, "Connect failed, more to try");
MQTTAsync_addCommand(conn, sizeof(m->connect)); MQTTAsync_addCommand(conn, sizeof(m->connect));
} }
......
...@@ -559,7 +559,7 @@ void Socket_close(int socket) ...@@ -559,7 +559,7 @@ void Socket_close(int socket)
if (ListRemoveItem(s.clientsds, &socket, intcompare)) if (ListRemoveItem(s.clientsds, &socket, intcompare))
Log(TRACE_MIN, -1, "Removed socket %d", socket); Log(TRACE_MIN, -1, "Removed socket %d", socket);
else else
Log(TRACE_MIN, -1, "Failed to remove socket %d", socket); Log(LOG_ERROR, -1, "Failed to remove socket %d", socket);
if (socket + 1 >= s.maxfdp1) if (socket + 1 >= s.maxfdp1)
{ {
/* now we have to reset s.maxfdp1 */ /* now we have to reset s.maxfdp1 */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment