Commit dcf457e0 authored by Ian Craggs's avatar Ian Craggs

Fix for issue #190

parent 9c6583fe
......@@ -32,6 +32,7 @@
* Ian Craggs - fix for bug 486548
* Ian Craggs - SNI support
* Ian Craggs - auto reconnect timing fix #218
* Ian Craggs - fix for issue #190
*******************************************************************************/
/**
......@@ -1337,6 +1338,39 @@ exit:
}
static void nextOrClose(MQTTAsyncs* m, int rc, char* message)
{
if (MQTTAsync_checkConn(&m->connect, m))
{
MQTTAsync_queuedCommand* conn;
MQTTAsync_closeOnly(m->c);
/* put the connect command back to the head of the command queue, using the next serverURI */
conn = malloc(sizeof(MQTTAsync_queuedCommand));
memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
conn->client = m;
conn->command = m->connect;
Log(TRACE_MIN, -1, "Connect failed, more to try");
MQTTAsync_addCommand(conn, sizeof(m->connect));
}
else
{
MQTTAsync_closeSession(m->c);
if (m->connect.onFailure)
{
MQTTAsync_failureData data;
data.token = 0;
data.code = rc;
data.message = message;
Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID);
(*(m->connect.onFailure))(m->connect.context, &data);
}
MQTTAsync_startConnectRetry(m);
}
}
static void MQTTAsync_checkTimeouts(void)
{
ListElement* current = NULL;
......@@ -1364,35 +1398,7 @@ static void MQTTAsync_checkTimeouts(void)
/* check connect timeout */
else if (m->c->connect_state != 0 && MQTTAsync_elapsed(m->connect.start_time) > (m->connectTimeout * 1000))
{
if (MQTTAsync_checkConn(&m->connect, m))
{
MQTTAsync_queuedCommand* conn;
MQTTAsync_closeOnly(m->c);
/* put the connect command back to the head of the command queue, using the next serverURI */
conn = malloc(sizeof(MQTTAsync_queuedCommand));
memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
conn->client = m;
conn->command = m->connect;
Log(TRACE_MIN, -1, "Connect failed with timeout, more to try");
MQTTAsync_addCommand(conn, sizeof(m->connect));
}
else
{
MQTTAsync_closeSession(m->c);
Log(TRACE_MIN, -1, "Connect failed with timeout, no to try");
if (m->connect.onFailure)
{
MQTTAsync_failureData data;
data.token = 0;
data.code = MQTTASYNC_FAILURE;
data.message = "TCP connect timeout";
Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID);
(*(m->connect.onFailure))(m->connect.context, &data);
}
MQTTAsync_startConnectRetry(m);
}
nextOrClose(m, MQTTASYNC_FAILURE, "TCP connect timeout");
continue;
}
......@@ -1677,6 +1683,7 @@ static int MQTTAsync_completeConnection(MQTTAsyncs* m, MQTTPacket* pack)
return rc;
}
/* This is the thread function that handles the calling of callback functions if set */
static thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
{
......@@ -1725,6 +1732,8 @@ static thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
MQTTAsync_disconnect_internal(m, 0);
MQTTAsync_lock_mutex(mqttasync_mutex);
}
else if (m->c->connect_state != 0)
nextOrClose(m, rc, "socket error");
else /* calling disconnect_internal won't have any effect if we're already disconnected */
MQTTAsync_closeOnly(m->c);
}
......@@ -1791,36 +1800,7 @@ static thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
}
}
else
{
if (MQTTAsync_checkConn(&m->connect, m))
{
MQTTAsync_queuedCommand* conn;
MQTTAsync_closeOnly(m->c);
/* put the connect command back to the head of the command queue, using the next serverURI */
conn = malloc(sizeof(MQTTAsync_queuedCommand));
memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
conn->client = m;
conn->command = m->connect;
Log(TRACE_MIN, -1, "Connect failed, more to try");
MQTTAsync_addCommand(conn, sizeof(m->connect));
}
else
{
MQTTAsync_closeSession(m->c);
if (m->connect.onFailure)
{
MQTTAsync_failureData data;
data.token = 0;
data.code = rc;
data.message = "CONNACK return code";
Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID);
(*(m->connect.onFailure))(m->connect.context, &data);
}
MQTTAsync_startConnectRetry(m);
}
}
nextOrClose(m, rc, "CONNACK return code");
}
else if (pack->header.bits.type == SUBACK)
{
......@@ -2845,36 +2825,8 @@ static int MQTTAsync_connecting(MQTTAsyncs* m)
exit:
if ((rc != 0 && rc != TCPSOCKET_INTERRUPTED && m->c->connect_state != 2) || (rc == SSL_FATAL))
{
if (MQTTAsync_checkConn(&m->connect, m))
{
MQTTAsync_queuedCommand* conn;
nextOrClose(m, MQTTASYNC_FAILURE, "TCP/TLS connect failure");
MQTTAsync_closeOnly(m->c);
/* put the connect command back to the head of the command queue, using the next serverURI */
conn = malloc(sizeof(MQTTAsync_queuedCommand));
memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
conn->client = m;
conn->command = m->connect;
Log(TRACE_MIN, -1, "Connect failed, more to try");
MQTTAsync_addCommand(conn, sizeof(m->connect));
}
else
{
MQTTAsync_closeSession(m->c);
if (m->connect.onFailure)
{
MQTTAsync_failureData data;
data.token = 0;
data.code = MQTTASYNC_FAILURE;
data.message = "TCP/TLS connect failure";
Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID);
(*(m->connect.onFailure))(m->connect.context, &data);
}
MQTTAsync_startConnectRetry(m);
}
}
FUNC_EXIT_RC(rc);
return rc;
}
......@@ -2922,34 +2874,7 @@ static MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
if (m->c->connect_state == 3 && *rc == SOCKET_ERROR)
{
Log(TRACE_MINIMUM, -1, "CONNECT sent but MQTTPacket_Factory has returned SOCKET_ERROR");
if (MQTTAsync_checkConn(&m->connect, m))
{
MQTTAsync_queuedCommand* conn;
MQTTAsync_closeOnly(m->c);
/* put the connect command back to the head of the command queue, using the next serverURI */
conn = malloc(sizeof(MQTTAsync_queuedCommand));
memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
conn->client = m;
conn->command = m->connect;
Log(TRACE_MIN, -1, "Connect failed, more to try");
MQTTAsync_addCommand(conn, sizeof(m->connect));
}
else
{
MQTTAsync_closeSession(m->c);
if (m->connect.onFailure)
{
MQTTAsync_failureData data;
data.token = 0;
data.code = MQTTASYNC_FAILURE;
data.message = "TCP connect completion failure";
Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID);
(*(m->connect.onFailure))(m->connect.context, &data);
}
MQTTAsync_startConnectRetry(m);
}
nextOrClose(m, MQTTASYNC_FAILURE, "TCP connect completion failure");
}
else
{
......
......@@ -1008,7 +1008,7 @@ int test2d(struct Options options)
int rc = 0;
char* test_topic = "C client test2d";
int count = 0;
unsigned int iteration = 0;
unsigned int iteration = 0;
failures = 0;
MyLog(
......@@ -1023,6 +1023,7 @@ int test2d(struct Options options)
// Therefore we need to test this several times!
for (iteration = 0; !failures && (iteration < 20) ; iteration++)
{
count = 0;
rc = MQTTAsync_create(&c, options.mutual_auth_connection,
"test2d", MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment