Commit e3912c09 authored by Ian Craggs's avatar Ian Craggs

Fix error reporting for mutex calls, bug #419233

parent baeaf2dc
......@@ -80,7 +80,7 @@
<!-- non-SSL, asynchronous library -->
<property name="output.async.filename" value="${output.folder}/lib${libname.async}.so" />
<exec executable="gcc" failonerror="true">
<arg line="${ccflags.so} ${ldflags.so} -Wl,-soname,lib${libname.async}.so -o ${output.async.filename} ${async.source.files}"/>
<arg line="${ccflags.so} ${ldflags.so} -Wl,-init,MQTTAsync_init -Wl,-soname,lib${libname.async}.so -o ${output.async.filename} ${async.source.files}"/>
</exec>
<exec executable="strip" failonerror="true">
<arg value="${output.async.filename}" />
......@@ -92,7 +92,7 @@
<!-- SSL, asynchronous library -->
<property name="output.async.ssl.filename" value="${output.folder}/lib${libname.async.ssl}.so" />
<exec executable="gcc" failonerror="true">
<arg line="-DOPENSSL ${ccflags.so} ${ldflags.so} -Wl,-soname,lib${libname.async.ssl}.so -o ${output.async.ssl.filename} ${async.source.files}"/>
<arg line="-DOPENSSL ${ccflags.so} ${ldflags.so} -Wl,-init,MQTTAsync_init -Wl,-soname,lib${libname.async.ssl}.so -o ${output.async.ssl.filename} ${async.source.files}"/>
</exec>
<exec executable="strip" failonerror="true">
<arg value="${output.async.ssl.filename}" />
......
......@@ -16,6 +16,7 @@
* Ian Craggs - multiple server connection support
* Ian Craggs - fix for bug 413429 - connectionLost not called
* Ian Craggs - fix for bug# 415042 - using already freed structure
* Ian Craggs - fix for bug 419233 - mutexes not reporting errors
*******************************************************************************/
/**
......@@ -115,6 +116,20 @@ static pthread_mutex_t mqttcommand_mutex_store = PTHREAD_MUTEX_INITIALIZER;
static mutex_type mqttcommand_mutex = &mqttcommand_mutex_store;
static cond_type_struct send_cond_store = { PTHREAD_COND_INITIALIZER, PTHREAD_MUTEX_INITIALIZER };
static cond_type send_cond = &send_cond_store;
void MQTTAsync_init()
{
pthread_mutexattr_t attr;
int rc;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
if ((rc = pthread_mutex_init(mqttasync_mutex, &attr)) != 0)
printf("MQTTAsync: error %d initializing async_mutex\n", rc);
if ((rc = pthread_mutex_init(mqttcommand_mutex, &attr)) != 0)
printf("MQTTAsync: error %d initializing command_mutex\n", rc);
}
#define WINAPI
#endif
......@@ -302,6 +317,22 @@ int clientSockCompare(void* a, void* b)
}
void MQTTAsync_lock_mutex(mutex_type amutex)
{
int rc = Thread_lock_mutex(amutex);
if (rc != 0)
Log(LOG_ERROR, 0, "Error %d locking mutex", rc);
}
void MQTTAsync_unlock_mutex(mutex_type amutex)
{
int rc = Thread_unlock_mutex(amutex);
if (rc != 0)
Log(LOG_ERROR, 0, "Error %d unlocking mutex", rc);
}
int MQTTAsync_create(MQTTAsync* handle, char* serverURI, char* clientId,
int persistence_type, void* persistence_context)
{
......@@ -309,7 +340,7 @@ int MQTTAsync_create(MQTTAsync* handle, char* serverURI, char* clientId,
MQTTAsyncs *m = NULL;
FUNC_ENTRY;
rc = Thread_lock_mutex(mqttasync_mutex);
MQTTAsync_lock_mutex(mqttasync_mutex);
if (serverURI == NULL || clientId == NULL)
{
......@@ -380,7 +411,7 @@ int MQTTAsync_create(MQTTAsync* handle, char* serverURI, char* clientId,
ListAppend(bstate->clients, m->c, sizeof(Clients) + 3*sizeof(List));
exit:
Thread_unlock_mutex(mqttasync_mutex);
MQTTAsync_unlock_mutex(mqttasync_mutex);
FUNC_EXIT_RC(rc);
return rc;
}
......@@ -671,10 +702,10 @@ int MQTTAsync_restoreCommands(MQTTAsyncs* client)
int MQTTAsync_addCommand(MQTTAsync_queuedCommand* command, int command_size)
{
int rc;
int rc = 0;
FUNC_ENTRY;
rc = Thread_lock_mutex(mqttcommand_mutex);
MQTTAsync_lock_mutex(mqttcommand_mutex);
command->command.start_time = MQTTAsync_start_clock();
if (command->command.type == CONNECT ||
(command->command.type == DISCONNECT && command->command.details.dis.internal))
......@@ -697,7 +728,7 @@ int MQTTAsync_addCommand(MQTTAsync_queuedCommand* command, int command_size)
MQTTAsync_persistCommand(command);
#endif
}
rc = Thread_unlock_mutex(mqttcommand_mutex);
MQTTAsync_unlock_mutex(mqttcommand_mutex);
#if !defined(WIN32)
Thread_signal_cond(send_cond);
#else
......@@ -722,16 +753,16 @@ void MQTTAsync_checkDisconnect(MQTTAsync handle, MQTTAsync_command* command)
if (command->details.dis.internal && m->cl && was_connected)
{
Log(TRACE_MIN, -1, "Calling connectionLost for client %s", m->c->clientID);
//Thread_unlock_mutex(mqttasync_mutex);
//MQTTAsync_unlock_mutex(mqttasync_mutex);
(*(m->cl))(m->context, NULL);
//Thread_lock_mutex(mqttasync_mutex);
//MQTTAsync_lock_mutex(mqttasync_mutex);
}
else if (!command->details.dis.internal && command->onSuccess)
{
Log(TRACE_MIN, -1, "Calling disconnect complete for client %s", m->c->clientID);
//Thread_unlock_mutex(mqttasync_mutex);
//MQTTAsync_unlock_mutex(mqttasync_mutex);
(*(command->onSuccess))(command->context, NULL);
//Thread_lock_mutex(mqttasync_mutex);
//MQTTAsync_lock_mutex(mqttasync_mutex);
}
}
FUNC_EXIT;
......@@ -880,8 +911,8 @@ void MQTTAsync_processCommand()
List* ignored_clients = NULL;
FUNC_ENTRY;
Thread_lock_mutex(mqttasync_mutex);
Thread_lock_mutex(mqttcommand_mutex);
MQTTAsync_lock_mutex(mqttasync_mutex);
MQTTAsync_lock_mutex(mqttcommand_mutex);
/* only the first command in the list must be processed for any particular client, so if we skip
a command for a client, we must skip all following commands for that client. Use a list of
......@@ -920,7 +951,7 @@ void MQTTAsync_processCommand()
MQTTAsync_unpersistCommand(command);
#endif
}
Thread_unlock_mutex(mqttcommand_mutex);
MQTTAsync_unlock_mutex(mqttcommand_mutex);
if (!command)
goto exit; /* nothing to do */
......@@ -1020,9 +1051,9 @@ void MQTTAsync_processCommand()
data.alt.pub.message.qos = command->command.details.pub.qos;
data.alt.pub.message.retained = command->command.details.pub.retained;
Log(TRACE_MIN, -1, "Calling publish success for client %s", command->client->c->clientID);
//Thread_unlock_mutex(mqttasync_mutex);
//MQTTAsync_unlock_mutex(mqttasync_mutex);
(*(command->command.onSuccess))(command->command.context, &data);
//Thread_lock_mutex(mqttasync_mutex);
//MQTTAsync_lock_mutex(mqttasync_mutex);
}
}
else
......@@ -1085,9 +1116,9 @@ void MQTTAsync_processCommand()
{
Log(TRACE_MIN, -1, "Calling command failure for client %s", command->client->c->clientID);
//Thread_unlock_mutex(mqttasync_mutex);
//MQTTAsync_unlock_mutex(mqttasync_mutex);
(*(command->command.onFailure))(command->command.context, NULL);
//Thread_lock_mutex(mqttasync_mutex);
//MQTTAsync_lock_mutex(mqttasync_mutex);
}
MQTTAsync_freeConnect(command->command);
MQTTAsync_freeCommand(command); /* free up the command if necessary */
......@@ -1101,7 +1132,7 @@ void MQTTAsync_processCommand()
}
exit:
Thread_unlock_mutex(mqttasync_mutex);
MQTTAsync_unlock_mutex(mqttasync_mutex);
FUNC_EXIT;
}
......@@ -1117,7 +1148,7 @@ void MQTTAsync_checkTimeouts()
if (difftime(now, last) < 3)
goto exit;
Thread_lock_mutex(mqttasync_mutex);
MQTTAsync_lock_mutex(mqttasync_mutex);
last = now;
while (ListNextElement(handles, &current)) /* for each client */
{
......@@ -1151,9 +1182,9 @@ void MQTTAsync_checkTimeouts()
if (m->connect.onFailure)
{
Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID);
//Thread_unlock_mutex(mqttasync_mutex);
//MQTTAsync_unlock_mutex(mqttasync_mutex);
(*(m->connect.onFailure))(m->connect.context, NULL);
//Thread_lock_mutex(mqttasync_mutex);
//MQTTAsync_lock_mutex(mqttasync_mutex);
}
}
continue;
......@@ -1177,9 +1208,9 @@ void MQTTAsync_checkTimeouts()
{
Log(TRACE_MIN, -1, "Calling %s failure for client %s",
MQTTPacket_name(com->command.type), m->c->clientID);
//Thread_unlock_mutex(mqttasync_mutex);
//MQTTAsync_unlock_mutex(mqttasync_mutex);
(*(com->command.onFailure))(com->command.context, NULL);
//Thread_lock_mutex(mqttasync_mutex);
//MQTTAsync_lock_mutex(mqttasync_mutex);
}
timed_out_count++;
}
......@@ -1187,7 +1218,7 @@ void MQTTAsync_checkTimeouts()
for (i = 0; i < timed_out_count; ++i)
ListRemoveHead(m->responses); /* remove the first response in the list */
}
Thread_unlock_mutex(mqttasync_mutex);
MQTTAsync_unlock_mutex(mqttasync_mutex);
exit:
FUNC_EXIT;
}
......@@ -1196,9 +1227,9 @@ exit:
thread_return_type WINAPI MQTTAsync_sendThread(void* n)
{
FUNC_ENTRY;
Thread_lock_mutex(mqttasync_mutex);
MQTTAsync_lock_mutex(mqttasync_mutex);
sendThread_state = RUNNING;
Thread_unlock_mutex(mqttasync_mutex);
MQTTAsync_unlock_mutex(mqttasync_mutex);
while (!tostop)
{
/*int rc;*/
......@@ -1214,9 +1245,9 @@ thread_return_type WINAPI MQTTAsync_sendThread(void* n)
MQTTAsync_checkTimeouts();
}
sendThread_state = STOPPING;
Thread_lock_mutex(mqttasync_mutex);
MQTTAsync_lock_mutex(mqttasync_mutex);
sendThread_state = STOPPED;
Thread_unlock_mutex(mqttasync_mutex);
MQTTAsync_unlock_mutex(mqttasync_mutex);
FUNC_EXIT;
return 0;
}
......@@ -1288,7 +1319,7 @@ void MQTTAsync_destroy(MQTTAsync* handle)
MQTTAsyncs* m = *handle;
FUNC_ENTRY;
Thread_lock_mutex(mqttasync_mutex);
MQTTAsync_lock_mutex(mqttasync_mutex);
if (m == NULL)
goto exit;
......@@ -1322,7 +1353,7 @@ void MQTTAsync_destroy(MQTTAsync* handle)
MQTTAsync_terminate();
exit:
Thread_unlock_mutex(mqttasync_mutex);
MQTTAsync_unlock_mutex(mqttasync_mutex);
FUNC_EXIT;
}
......@@ -1387,7 +1418,7 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
long timeout = 10L; /* first time in we have a small timeout. Gets things started more quickly */
FUNC_ENTRY;
Thread_lock_mutex(mqttasync_mutex);
MQTTAsync_lock_mutex(mqttasync_mutex);
receiveThread_state = RUNNING;
while (!tostop)
{
......@@ -1396,9 +1427,9 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
MQTTAsyncs* m = NULL;
MQTTPacket* pack = NULL;
Thread_unlock_mutex(mqttasync_mutex);
MQTTAsync_unlock_mutex(mqttasync_mutex);
pack = MQTTAsync_cycle(&sock, timeout, &rc);
Thread_lock_mutex(mqttasync_mutex);
MQTTAsync_lock_mutex(mqttasync_mutex);
if (tostop)
break;
timeout = 1000L;
......@@ -1417,9 +1448,9 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
}
if (rc == SOCKET_ERROR)
{
Thread_unlock_mutex(mqttasync_mutex);
MQTTAsync_unlock_mutex(mqttasync_mutex);
MQTTAsync_disconnect_internal(m, 0);
Thread_lock_mutex(mqttasync_mutex);
MQTTAsync_lock_mutex(mqttasync_mutex);
}
else
{
......@@ -1463,9 +1494,9 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
if (m->connect.onSuccess)
{
Log(TRACE_MIN, -1, "Calling connect success for client %s", m->c->clientID);
//Thread_unlock_mutex(mqttasync_mutex);
//MQTTAsync_unlock_mutex(mqttasync_mutex);
(*(m->connect.onSuccess))(m->connect.context, NULL);
//Thread_lock_mutex(mqttasync_mutex);
//MQTTAsync_lock_mutex(mqttasync_mutex);
}
}
else
......@@ -1496,9 +1527,9 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
data.code = rc;
data.message = "CONNACK return code";
Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID);
//Thread_unlock_mutex(mqttasync_mutex);
//MQTTAsync_unlock_mutex(mqttasync_mutex);
(*(m->connect.onFailure))(m->connect.context, &data);
//Thread_lock_mutex(mqttasync_mutex);
//MQTTAsync_lock_mutex(mqttasync_mutex);
}
}
}
......@@ -1536,9 +1567,9 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
rc = MQTTProtocol_handleSubacks(pack, m->c->net.socket);
handleCalled = 1;
Log(TRACE_MIN, -1, "Calling subscribe success for client %s", m->c->clientID);
//Thread_unlock_mutex(mqttasync_mutex);
//MQTTAsync_unlock_mutex(mqttasync_mutex);
(*(command->command.onSuccess))(command->command.context, &data);
//Thread_lock_mutex(mqttasync_mutex);
//MQTTAsync_lock_mutex(mqttasync_mutex);
if (array)
free(array);
}
......@@ -1567,9 +1598,9 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
rc = MQTTProtocol_handleUnsubacks(pack, m->c->net.socket);
handleCalled = 1;
Log(TRACE_MIN, -1, "Calling unsubscribe success for client %s", m->c->clientID);
//Thread_unlock_mutex(mqttasync_mutex);
//MQTTAsync_unlock_mutex(mqttasync_mutex);
(*(command->command.onSuccess))(command->command.context, NULL);
//Thread_lock_mutex(mqttasync_mutex);
//MQTTAsync_lock_mutex(mqttasync_mutex);
}
MQTTAsync_freeCommand(command);
break;
......@@ -1582,7 +1613,7 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
}
}
receiveThread_state = STOPPED;
Thread_unlock_mutex(mqttasync_mutex);
MQTTAsync_unlock_mutex(mqttasync_mutex);
#if !defined(WIN32)
if (sendThread_state != STOPPED)
Thread_signal_cond(send_cond);
......@@ -1623,10 +1654,10 @@ void MQTTAsync_stop()
tostop = 1;
while ((sendThread_state != STOPPED || receiveThread_state != STOPPED) && ++count < 100)
{
Thread_unlock_mutex(mqttasync_mutex);
MQTTAsync_unlock_mutex(mqttasync_mutex);
Log(TRACE_MIN, -1, "sleeping");
MQTTAsync_sleep(100L);
Thread_lock_mutex(mqttasync_mutex);
MQTTAsync_lock_mutex(mqttasync_mutex);
}
rc = 1;
tostop = 0;
......@@ -1645,7 +1676,7 @@ int MQTTAsync_setCallbacks(MQTTAsync handle, void* context,
MQTTAsyncs* m = handle;
FUNC_ENTRY;
Thread_lock_mutex(mqttasync_mutex);
MQTTAsync_lock_mutex(mqttasync_mutex);
if (m == NULL || ma == NULL || m->c->connect_state != 0)
rc = MQTTASYNC_FAILURE;
......@@ -1657,7 +1688,7 @@ int MQTTAsync_setCallbacks(MQTTAsync handle, void* context,
m->dc = dc;
}
Thread_unlock_mutex(mqttasync_mutex);
MQTTAsync_unlock_mutex(mqttasync_mutex);
FUNC_EXIT_RC(rc);
return rc;
}
......@@ -1901,9 +1932,9 @@ int MQTTAsync_deliverMessage(MQTTAsyncs* m, char* topicName, int topicLen, MQTTA
Log(TRACE_MIN, -1, "Calling messageArrived for client %s, queue depth %d",
m->c->clientID, m->c->messageQueue->count);
//Thread_unlock_mutex(mqttasync_mutex);
//MQTTAsync_unlock_mutex(mqttasync_mutex);
rc = (*(m->ma))(m->context, topicName, topicLen, mm);
//Thread_lock_mutex(mqttasync_mutex);
//MQTTAsync_lock_mutex(mqttasync_mutex);
/* if 0 (false) is returned by the callback then it failed, so we don't remove the message from
* the queue, and it will be retried later. If 1 is returned then the message data may have been freed,
* so we must be careful how we use it.
......@@ -2026,17 +2057,17 @@ int MQTTAsync_connect(MQTTAsync handle, MQTTAsync_connectOptions* options)
tostop = 0;
if (sendThread_state != STARTING && sendThread_state != RUNNING)
{
Thread_lock_mutex(mqttasync_mutex);
MQTTAsync_lock_mutex(mqttasync_mutex);
sendThread_state = STARTING;
Thread_start(MQTTAsync_sendThread, NULL);
Thread_unlock_mutex(mqttasync_mutex);
MQTTAsync_unlock_mutex(mqttasync_mutex);
}
if (receiveThread_state != STARTING && receiveThread_state != RUNNING)
{
Thread_lock_mutex(mqttasync_mutex);
MQTTAsync_lock_mutex(mqttasync_mutex);
receiveThread_state = STARTING;
Thread_start(MQTTAsync_receiveThread, handle);
Thread_unlock_mutex(mqttasync_mutex);
MQTTAsync_unlock_mutex(mqttasync_mutex);
}
m->c->keepAliveInterval = options->keepAliveInterval;
......@@ -2177,10 +2208,10 @@ int MQTTAsync_isConnected(MQTTAsync handle)
int rc = 0;
FUNC_ENTRY;
Thread_lock_mutex(mqttasync_mutex);
MQTTAsync_lock_mutex(mqttasync_mutex);
if (m && m->c)
rc = m->c->connected;
Thread_unlock_mutex(mqttasync_mutex);
MQTTAsync_unlock_mutex(mqttasync_mutex);
FUNC_EXIT_RC(rc);
return rc;
}
......@@ -2524,9 +2555,9 @@ exit:
if (m->connect.onFailure)
{
Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID);
//Thread_unlock_mutex(mqttasync_mutex);
//MQTTAsync_unlock_mutex(mqttasync_mutex);
(*(m->connect.onFailure))(m->connect.context, NULL);
//Thread_lock_mutex(mqttasync_mutex);
//MQTTAsync_lock_mutex(mqttasync_mutex);
}
}
}
......@@ -2569,7 +2600,7 @@ MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
#if defined(OPENSSL)
}
#endif
Thread_lock_mutex(mqttasync_mutex);
MQTTAsync_lock_mutex(mqttasync_mutex);
if (*sock > 0)
{
MQTTAsyncs* m = NULL;
......@@ -2605,9 +2636,9 @@ MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
if (m->connect.onFailure)
{
Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID);
//Thread_unlock_mutex(mqttasync_mutex);
//MQTTAsync_unlock_mutex(mqttasync_mutex);
(*(m->connect.onFailure))(m->connect.context, NULL);
//Thread_lock_mutex(mqttasync_mutex);
//MQTTAsync_lock_mutex(mqttasync_mutex);
}
}
}
......@@ -2636,9 +2667,9 @@ MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
if (m->dc)
{
Log(TRACE_MIN, -1, "Calling deliveryComplete for client %s, msgid %d", m->c->clientID, msgid);
//Thread_unlock_mutex(mqttasync_mutex);
//MQTTAsync_unlock_mutex(mqttasync_mutex);
(*(m->dc))(m->context, msgid);
//Thread_lock_mutex(mqttasync_mutex);
//MQTTAsync_lock_mutex(mqttasync_mutex);
}
/* use the msgid to find the callback to be called */
while (ListNextElement(m->responses, &current))
......@@ -2659,9 +2690,9 @@ MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
data.alt.pub.message.qos = command->command.details.pub.qos;
data.alt.pub.message.retained = command->command.details.pub.retained;
Log(TRACE_MIN, -1, "Calling publish success for client %s", m->c->clientID);
//Thread_unlock_mutex(mqttasync_mutex);
//MQTTAsync_unlock_mutex(mqttasync_mutex);
(*(command->command.onSuccess))(command->command.context, &data);
//Thread_lock_mutex(mqttasync_mutex);
//MQTTAsync_lock_mutex(mqttasync_mutex);
}
MQTTAsync_freeCommand(command);
break;
......@@ -2682,7 +2713,7 @@ MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
}
}
MQTTAsync_retry();
Thread_unlock_mutex(mqttasync_mutex);
MQTTAsync_unlock_mutex(mqttasync_mutex);
FUNC_EXIT_RC(*rc);
return pack;
}
......@@ -2702,7 +2733,7 @@ int MQTTAsync_getPendingTokens(MQTTAsync handle, MQTTAsync_token **tokens)
*tokens = NULL;
FUNC_ENTRY;
Thread_lock_mutex(mqttasync_mutex);
MQTTAsync_lock_mutex(mqttasync_mutex);
if (m == NULL)
{
......@@ -2725,7 +2756,7 @@ int MQTTAsync_getPendingTokens(MQTTAsync handle, MQTTAsync_token **tokens)
}
exit:
Thread_unlock_mutex(mqttasync_mutex);
MQTTAsync_unlock_mutex(mqttasync_mutex);
FUNC_EXIT_RC(rc);
return rc;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment