Commit 6643392c authored by Ian Craggs's avatar Ian Craggs

Fix timing issues with connect failure to non 3.1.1 server

parent 7c30485c
...@@ -490,9 +490,25 @@ thread_return_type WINAPI MQTTClient_run(void* n) ...@@ -490,9 +490,25 @@ thread_return_type WINAPI MQTTClient_run(void* n)
} }
if (rc == SOCKET_ERROR) if (rc == SOCKET_ERROR)
{ {
Thread_unlock_mutex(mqttclient_mutex); if (m->c->connected)
MQTTClient_disconnect_internal(m, 0); {
Thread_lock_mutex(mqttclient_mutex); Thread_unlock_mutex(mqttclient_mutex);
MQTTClient_disconnect_internal(m, 0);
Thread_lock_mutex(mqttclient_mutex);
}
else
{
if (m->c->connect_state == 2 && !Thread_check_sem(m->connect_sem))
{
Log(TRACE_MIN, -1, "Posting connect semaphore for client %s", m->c->clientID);
Thread_post_sem(m->connect_sem);
}
if (m->c->connect_state == 3 && !Thread_check_sem(m->connack_sem))
{
Log(TRACE_MIN, -1, "Posting connack semaphore for client %s", m->c->clientID);
Thread_post_sem(m->connack_sem);
}
}
} }
else else
{ {
...@@ -521,7 +537,7 @@ thread_return_type WINAPI MQTTClient_run(void* n) ...@@ -521,7 +537,7 @@ thread_return_type WINAPI MQTTClient_run(void* n)
} }
if (pack) if (pack)
{ {
if (pack->header.bits.type == CONNACK) if (pack->header.bits.type == CONNACK && !Thread_check_sem(m->connack_sem))
{ {
Log(TRACE_MIN, -1, "Posting connack semaphore for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Posting connack semaphore for client %s", m->c->clientID);
m->pack = pack; m->pack = pack;
...@@ -1111,15 +1127,14 @@ int MQTTClient_disconnect1(MQTTClient handle, int timeout, int internal, int sto ...@@ -1111,15 +1127,14 @@ int MQTTClient_disconnect1(MQTTClient handle, int timeout, int internal, int sto
MQTTClient_closeSession(m->c); MQTTClient_closeSession(m->c);
if (Thread_check_sem(m->connect_sem)) while (Thread_check_sem(m->connect_sem))
Thread_post_sem(m->connect_sem); Thread_wait_sem(m->connect_sem, 100);
if (Thread_check_sem(m->connack_sem)) while (Thread_check_sem(m->connack_sem))
Thread_post_sem(m->connect_sem); Thread_wait_sem(m->connack_sem, 100);
if (Thread_check_sem(m->suback_sem)) while (Thread_check_sem(m->suback_sem))
Thread_post_sem(m->suback_sem); Thread_wait_sem(m->suback_sem, 100);
if (Thread_check_sem(m->unsuback_sem)) while (Thread_check_sem(m->unsuback_sem))
Thread_post_sem(m->unsuback_sem); Thread_wait_sem(m->unsuback_sem, 100);
exit: exit:
if (stop) if (stop)
MQTTClient_stop(); MQTTClient_stop();
...@@ -1582,6 +1597,8 @@ MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* rc, long ...@@ -1582,6 +1597,8 @@ MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* rc, long
pack = MQTTClient_cycle(&sock, 100L, rc); pack = MQTTClient_cycle(&sock, 100L, rc);
if (sock == m->c->net.socket) if (sock == m->c->net.socket)
{ {
if (*rc == SOCKET_ERROR)
break;
if (pack && (pack->header.bits.type == packet_type)) if (pack && (pack->header.bits.type == packet_type))
break; break;
if (m->c->connect_state == 1) if (m->c->connect_state == 1)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment