Commit f0ff597d authored by Ian Craggs's avatar Ian Craggs

bug #406025 - fix timing and cleanup in connection breaks under load

parent 6496a110
......@@ -855,6 +855,7 @@ void MQTTAsync_processCommand()
List* ignored_clients = NULL;
FUNC_ENTRY;
Thread_lock_mutex(mqttasync_mutex);
Thread_lock_mutex(mqttcommand_mutex);
/* only the first command in the list must be processed for any particular client, so if we skip
......@@ -894,7 +895,6 @@ void MQTTAsync_processCommand()
if (!command)
goto exit; /* nothing to do */
Thread_lock_mutex(mqttasync_mutex);
if (command->command.type == CONNECT)
{
if (command->client->c->connect_state != 0 || command->client->c->connected)
......@@ -1022,7 +1022,7 @@ void MQTTAsync_processCommand()
MQTTAsync_disconnect(command->client, &opts); /* not "internal" because we don't want to call connection lost */
}
else
MQTTAsync_disconnect_internal(command->client->c, 0);
MQTTAsync_disconnect_internal(command->client, 0);
if (command->command.onFailure)
{
Log(TRACE_MIN, -1, "Calling command failure for client %s", command->client->c->clientID);
......@@ -1040,8 +1040,8 @@ void MQTTAsync_processCommand()
ListAppend(command->client->responses, command, sizeof(command));
}
Thread_unlock_mutex(mqttasync_mutex);
exit:
Thread_unlock_mutex(mqttasync_mutex);
FUNC_EXIT;
}
......@@ -1908,19 +1908,21 @@ int MQTTAsync_connect(MQTTAsync handle, MQTTAsync_connectOptions* options)
m->connect.onFailure = options->onFailure;
m->connect.context = options->context;
Thread_lock_mutex(mqttasync_mutex);
tostop = 0;
if (sendThread_state != STARTING && sendThread_state != RUNNING)
{
Thread_lock_mutex(mqttasync_mutex);
sendThread_state = STARTING;
Thread_start(MQTTAsync_sendThread, NULL);
Thread_unlock_mutex(mqttasync_mutex);
}
if (receiveThread_state != STARTING && receiveThread_state != RUNNING)
{
Thread_lock_mutex(mqttasync_mutex);
receiveThread_state = STARTING;
Thread_start(MQTTAsync_receiveThread, handle);
}
Thread_unlock_mutex(mqttasync_mutex);
}
m->c->keepAliveInterval = options->keepAliveInterval;
m->c->cleansession = options->cleansession;
......
......@@ -1443,16 +1443,32 @@ MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* rc, long
else if (m->c->connect_state == 2)
{
*rc = SSLSocket_connect(m->c->net.ssl, sock);
if (*rc == 1 || *rc == SSL_FATAL)
if (*rc == SSL_FATAL)
break;
else if (*rc == 1) /* rc == 1 means SSL connect has finished and succeeded */
{
if (*rc == 1 && !m->c->cleansession && m->c->session == NULL)
if (!m->c->cleansession && m->c->session == NULL)
m->c->session = SSL_get1_session(m->c->net.ssl);
break;
}
}
#endif
else if (m->c->connect_state == 3)
{
int error;
socklen_t len = sizeof(error);
if (getsockopt(m->c->net.socket, SOL_SOCKET, SO_ERROR, &error, &len) == 0)
{
if (error)
{
*rc = error;
break;
}
}
}
}
else if (MQTTClient_elapsed(start) > timeout)
if (MQTTClient_elapsed(start) > timeout)
{
pack = NULL;
break;
......
......@@ -426,7 +426,7 @@ int MQTTProtocol_handlePubrels(void* pack, int sock)
publish.payloadlen = m->publish->payloadlen;
Protocol_processPublication(&publish, client);
#if !defined(NO_PERSISTENCE)
rc = MQTTPersistence_remove(client, PERSISTENCE_PUBLISH_RECEIVED, m->qos, pubrel->msgId);
rc += MQTTPersistence_remove(client, PERSISTENCE_PUBLISH_RECEIVED, m->qos, pubrel->msgId);
#endif
ListRemove(&(state.publications), m->publish);
ListRemove(client->inboundMsgs, m);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment