Commit dd2a4063 authored by Ian Craggs's avatar Ian Craggs

Threading updates for bug #419233

parent df53add1
...@@ -352,17 +352,9 @@ int MQTTProtocol_handlePubrecs(void* pack, int sock) ...@@ -352,17 +352,9 @@ int MQTTProtocol_handlePubrecs(void* pack, int sock)
Pubrec* pubrec = (Pubrec*)pack; Pubrec* pubrec = (Pubrec*)pack;
Clients* client = NULL; Clients* client = NULL;
int rc = TCPSOCKET_COMPLETE; int rc = TCPSOCKET_COMPLETE;
ListElement* elem = NULL;
FUNC_ENTRY; FUNC_ENTRY;
elem = ListFindItem(bstate->clients, &sock, clientSocketCompare); client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
if (!elem)
{
printf("pubrec: couldn't find client for socket %d\n", sock);
rc = SOCKET_ERROR;
goto exit;
}
client = (Clients*)(elem->content);
Log(LOG_PROTOCOL, 15, NULL, sock, client->clientID, pubrec->msgId); Log(LOG_PROTOCOL, 15, NULL, sock, client->clientID, pubrec->msgId);
/* look for the message by message id in the records of outbound messages for this client */ /* look for the message by message id in the records of outbound messages for this client */
...@@ -392,7 +384,6 @@ int MQTTProtocol_handlePubrecs(void* pack, int sock) ...@@ -392,7 +384,6 @@ int MQTTProtocol_handlePubrecs(void* pack, int sock)
time(&(m->lastTouch)); time(&(m->lastTouch));
} }
} }
exit:
free(pack); free(pack);
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
return rc; return rc;
...@@ -470,18 +461,9 @@ int MQTTProtocol_handlePubcomps(void* pack, int sock) ...@@ -470,18 +461,9 @@ int MQTTProtocol_handlePubcomps(void* pack, int sock)
Pubcomp* pubcomp = (Pubcomp*)pack; Pubcomp* pubcomp = (Pubcomp*)pack;
Clients* client = NULL; Clients* client = NULL;
int rc = TCPSOCKET_COMPLETE; int rc = TCPSOCKET_COMPLETE;
ListElement* elem = NULL;
FUNC_ENTRY; FUNC_ENTRY;
elem = ListFindItem(bstate->clients, &sock, clientSocketCompare); client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
if (!elem)
{
printf("pubrec: couldn't find client for socket %d\n", sock);
rc = SOCKET_ERROR;
goto exit;
}
client = (Clients*)(elem->content);
//client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
Log(LOG_PROTOCOL, 19, NULL, sock, client->clientID, pubcomp->msgId); Log(LOG_PROTOCOL, 19, NULL, sock, client->clientID, pubcomp->msgId);
/* look for the message by message id in the records of outbound messages for this client */ /* look for the message by message id in the records of outbound messages for this client */
...@@ -511,7 +493,6 @@ int MQTTProtocol_handlePubcomps(void* pack, int sock) ...@@ -511,7 +493,6 @@ int MQTTProtocol_handlePubcomps(void* pack, int sock)
} }
} }
} }
exit:
free(pack); free(pack);
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
return rc; return rc;
......
...@@ -97,12 +97,10 @@ mutex_type Thread_create_mutex() ...@@ -97,12 +97,10 @@ mutex_type Thread_create_mutex()
} }
extern mutex_type mqttasync_mutex;
extern mutex_type mqttcommand_mutex;
/** /**
* Lock a mutex which has already been created, block until ready * Lock a mutex which has already been created, block until ready
* @param mutex the mutex * @param mutex the mutex
* @return completion code * @return completion code, 0 is success
*/ */
int Thread_lock_mutex(mutex_type mutex) int Thread_lock_mutex(mutex_type mutex)
{ {
...@@ -110,14 +108,11 @@ int Thread_lock_mutex(mutex_type mutex) ...@@ -110,14 +108,11 @@ int Thread_lock_mutex(mutex_type mutex)
/* don't add entry/exit trace points as the stack log uses mutexes - recursion beckons */ /* don't add entry/exit trace points as the stack log uses mutexes - recursion beckons */
#if defined(WIN32) #if defined(WIN32)
if (WaitForSingleObject(mutex, INFINITE) != WAIT_FAILED) /* WaitForSingleObject returns WAIT_OBJECT_0 (0), on success */
rc = WaitForSingleObject(mutex, INFINITE);
#else #else
rc = pthread_mutex_lock(mutex); rc = pthread_mutex_lock(mutex);
if (rc != 0)
printf("rc from mutex_lock was %d\n", rc);
//if ((rc = pthread_mutex_lock(mutex)) == 0)
#endif #endif
rc = 0;
return rc; return rc;
} }
...@@ -126,7 +121,7 @@ int Thread_lock_mutex(mutex_type mutex) ...@@ -126,7 +121,7 @@ int Thread_lock_mutex(mutex_type mutex)
/** /**
* Unlock a mutex which has already been locked * Unlock a mutex which has already been locked
* @param mutex the mutex * @param mutex the mutex
* @return completion code * @return completion code, 0 is success
*/ */
int Thread_unlock_mutex(mutex_type mutex) int Thread_unlock_mutex(mutex_type mutex)
{ {
...@@ -134,14 +129,14 @@ int Thread_unlock_mutex(mutex_type mutex) ...@@ -134,14 +129,14 @@ int Thread_unlock_mutex(mutex_type mutex)
/* don't add entry/exit trace points as the stack log uses mutexes - recursion beckons */ /* don't add entry/exit trace points as the stack log uses mutexes - recursion beckons */
#if defined(WIN32) #if defined(WIN32)
if (ReleaseMutex(mutex) != 0) /* if ReleaseMutex fails, the return value is 0 */
if (ReleaseMutex(mutex) == 0)
rc = GetLastError();
else
rc = 0;
#else #else
rc = pthread_mutex_unlock(mutex); rc = pthread_mutex_unlock(mutex);
if (rc != 0)
printf("rc from mutex_unlock was %d\n", rc);
//if ((rc = pthread_mutex_unlock(mutex)) == 0)
#endif #endif
rc = 0;
return rc; return rc;
} }
......
...@@ -61,7 +61,7 @@ struct Options ...@@ -61,7 +61,7 @@ struct Options
} options = } options =
{ {
"m2m.eclipse.org:1883", "m2m.eclipse.org:1883",
0, 1,
-1, -1,
10000, 10000,
}; };
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment