Commit 55fa0087 authored by Ian Craggs's avatar Ian Craggs

Thread safe message id assignment

Bug: 445891
parent 5c2e1e20
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
* Rong Xiang, Ian Craggs - C++ compatibility * Rong Xiang, Ian Craggs - C++ compatibility
* Ian Craggs - fix for bug 442400: reconnecting after network cable unplugged * Ian Craggs - fix for bug 442400: reconnecting after network cable unplugged
* Ian Craggs - fix for bug 444934 - incorrect free in freeCommand1 * Ian Craggs - fix for bug 444934 - incorrect free in freeCommand1
* Ian Craggs - fix for bug 445891 - assigning msgid is not thread safe
*******************************************************************************/ *******************************************************************************/
/** /**
...@@ -76,6 +77,8 @@ enum MQTTAsync_threadStates ...@@ -76,6 +77,8 @@ enum MQTTAsync_threadStates
enum MQTTAsync_threadStates sendThread_state = STOPPED; enum MQTTAsync_threadStates sendThread_state = STOPPED;
enum MQTTAsync_threadStates receiveThread_state = STOPPED; enum MQTTAsync_threadStates receiveThread_state = STOPPED;
static thread_id_type sendThread_id = 0,
receiveThread_id = 0;
#if defined(WIN32) || defined(WIN64) #if defined(WIN32) || defined(WIN64)
static mutex_type mqttasync_mutex = NULL; static mutex_type mqttasync_mutex = NULL;
...@@ -326,7 +329,7 @@ void MQTTAsync_lock_mutex(mutex_type amutex) ...@@ -326,7 +329,7 @@ void MQTTAsync_lock_mutex(mutex_type amutex)
{ {
int rc = Thread_lock_mutex(amutex); int rc = Thread_lock_mutex(amutex);
if (rc != 0) if (rc != 0)
Log(LOG_ERROR, 0, "Error %d locking mutex", rc); Log(LOG_ERROR, 0, "Error %s locking mutex", strerror(rc));
} }
...@@ -334,7 +337,7 @@ void MQTTAsync_unlock_mutex(mutex_type amutex) ...@@ -334,7 +337,7 @@ void MQTTAsync_unlock_mutex(mutex_type amutex)
{ {
int rc = Thread_unlock_mutex(amutex); int rc = Thread_unlock_mutex(amutex);
if (rc != 0) if (rc != 0)
Log(LOG_ERROR, 0, "Error %d unlocking mutex", rc); Log(LOG_ERROR, 0, "Error %s unlocking mutex", strerror(rc));
} }
...@@ -1254,6 +1257,7 @@ thread_return_type WINAPI MQTTAsync_sendThread(void* n) ...@@ -1254,6 +1257,7 @@ thread_return_type WINAPI MQTTAsync_sendThread(void* n)
FUNC_ENTRY; FUNC_ENTRY;
MQTTAsync_lock_mutex(mqttasync_mutex); MQTTAsync_lock_mutex(mqttasync_mutex);
sendThread_state = RUNNING; sendThread_state = RUNNING;
sendThread_id = Thread_getid();
MQTTAsync_unlock_mutex(mqttasync_mutex); MQTTAsync_unlock_mutex(mqttasync_mutex);
while (!tostop) while (!tostop)
{ {
...@@ -1280,6 +1284,7 @@ thread_return_type WINAPI MQTTAsync_sendThread(void* n) ...@@ -1280,6 +1284,7 @@ thread_return_type WINAPI MQTTAsync_sendThread(void* n)
sendThread_state = STOPPING; sendThread_state = STOPPING;
MQTTAsync_lock_mutex(mqttasync_mutex); MQTTAsync_lock_mutex(mqttasync_mutex);
sendThread_state = STOPPED; sendThread_state = STOPPED;
sendThread_id = 0;
MQTTAsync_unlock_mutex(mqttasync_mutex); MQTTAsync_unlock_mutex(mqttasync_mutex);
FUNC_EXIT; FUNC_EXIT;
return 0; return 0;
...@@ -1454,6 +1459,7 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n) ...@@ -1454,6 +1459,7 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
FUNC_ENTRY; FUNC_ENTRY;
MQTTAsync_lock_mutex(mqttasync_mutex); MQTTAsync_lock_mutex(mqttasync_mutex);
receiveThread_state = RUNNING; receiveThread_state = RUNNING;
receiveThread_id = Thread_getid();
while (!tostop) while (!tostop)
{ {
int rc = SOCKET_ERROR; int rc = SOCKET_ERROR;
...@@ -1670,6 +1676,7 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n) ...@@ -1670,6 +1676,7 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
} }
} }
receiveThread_state = STOPPED; receiveThread_state = STOPPED;
receiveThread_id = 0;
MQTTAsync_unlock_mutex(mqttasync_mutex); MQTTAsync_unlock_mutex(mqttasync_mutex);
#if !defined(WIN32) && !defined(WIN64) #if !defined(WIN32) && !defined(WIN64)
if (sendThread_state != STOPPED) if (sendThread_state != STOPPED)
...@@ -2157,6 +2164,56 @@ int MQTTAsync_isConnected(MQTTAsync handle) ...@@ -2157,6 +2164,56 @@ int MQTTAsync_isConnected(MQTTAsync handle)
} }
int cmdMessageIDCompare(void* a, void* b)
{
MQTTAsync_queuedCommand* cmd = (MQTTAsync_queuedCommand*)a;
return cmd->command.token == *(int*)b;
}
/**
* Assign a new message id for a client. Make sure it isn't already being used and does
* not exceed the maximum.
* @param m a client structure
* @return the next message id to use, or 0 if none available
*/
int MQTTAsync_assignMsgId(MQTTAsyncs* m)
{
int start_msgid = m->c->msgID;
int msgid = start_msgid;
thread_id_type thread_id = 0;
int locked = 0;
/* need to check: commands list and response list for a client */
FUNC_ENTRY;
/* We might be called in a callback. In which case, this mutex will be already locked. */
thread_id = Thread_getid();
if (thread_id != sendThread_id && thread_id != receiveThread_id)
{
MQTTAsync_lock_mutex(mqttasync_mutex);
locked = 1;
}
msgid = (msgid == MAX_MSG_ID) ? 1 : msgid + 1;
while (ListFindItem(commands, &msgid, cmdMessageIDCompare) ||
ListFindItem(m->responses, &msgid, cmdMessageIDCompare))
{
msgid = (msgid == MAX_MSG_ID) ? 1 : msgid + 1;
if (msgid == start_msgid)
{ /* we've tried them all - none free */
msgid = 0;
break;
}
}
if (msgid != 0)
m->c->msgID = msgid;
if (locked)
MQTTAsync_unlock_mutex(mqttasync_mutex);
FUNC_EXIT_RC(msgid);
return msgid;
}
int MQTTAsync_subscribeMany(MQTTAsync handle, size_t count, char* const* topic, int* qos, MQTTAsync_responseOptions* response) int MQTTAsync_subscribeMany(MQTTAsync handle, size_t count, char* const* topic, int* qos, MQTTAsync_responseOptions* response)
{ {
MQTTAsyncs* m = handle; MQTTAsyncs* m = handle;
...@@ -2189,7 +2246,7 @@ int MQTTAsync_subscribeMany(MQTTAsync handle, size_t count, char* const* topic, ...@@ -2189,7 +2246,7 @@ int MQTTAsync_subscribeMany(MQTTAsync handle, size_t count, char* const* topic,
goto exit; goto exit;
} }
} }
if ((msgid = MQTTProtocol_assignMsgId(m->c)) == 0) if ((msgid = MQTTAsync_assignMsgId(m)) == 0)
{ {
rc = MQTTASYNC_NO_MORE_MSGIDS; rc = MQTTASYNC_NO_MORE_MSGIDS;
goto exit; goto exit;
...@@ -2262,7 +2319,7 @@ int MQTTAsync_unsubscribeMany(MQTTAsync handle, size_t count, char* const* topic ...@@ -2262,7 +2319,7 @@ int MQTTAsync_unsubscribeMany(MQTTAsync handle, size_t count, char* const* topic
goto exit; goto exit;
} }
} }
if ((msgid = MQTTProtocol_assignMsgId(m->c)) == 0) if ((msgid = MQTTAsync_assignMsgId(m)) == 0)
{ {
rc = MQTTASYNC_NO_MORE_MSGIDS; rc = MQTTASYNC_NO_MORE_MSGIDS;
goto exit; goto exit;
...@@ -2321,7 +2378,7 @@ int MQTTAsync_send(MQTTAsync handle, const char* destinationName, size_t payload ...@@ -2321,7 +2378,7 @@ int MQTTAsync_send(MQTTAsync handle, const char* destinationName, size_t payload
rc = MQTTASYNC_BAD_UTF8_STRING; rc = MQTTASYNC_BAD_UTF8_STRING;
else if (qos < 0 || qos > 2) else if (qos < 0 || qos > 2)
rc = MQTTASYNC_BAD_QOS; rc = MQTTASYNC_BAD_QOS;
else if (qos > 0 && (msgid = MQTTProtocol_assignMsgId(m->c)) == 0) else if (qos > 0 && (msgid = MQTTAsync_assignMsgId(m)) == 0)
rc = MQTTASYNC_NO_MORE_MSGIDS; rc = MQTTASYNC_NO_MORE_MSGIDS;
if (rc != MQTTASYNC_SUCCESS) if (rc != MQTTASYNC_SUCCESS)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment