Commit 76684fcb authored by Ian Craggs's avatar Ian Craggs

Some threading/mutex changes

parent 4be97e2d
......@@ -45,7 +45,7 @@
#define URI_TCP "tcp://"
#define BUILD_TIMESTAMP "##MQTTCLIENT_BUILD_TAG##"
#define CLIENT_VERSION "##MQTTCLIENT_VERSION_TAG##"
#define CLIENT_VERSION "##MQTTCLIENT_VERSION_TAG##"
char* client_timestamp_eye = "MQTTAsyncV3_Timestamp " BUILD_TIMESTAMP;
char* client_version_eye = "MQTTAsyncV3_Version " CLIENT_VERSION;
......@@ -70,15 +70,10 @@ enum MQTTAsync_threadStates
enum MQTTAsync_threadStates sendThread_state = STOPPED;
enum MQTTAsync_threadStates receiveThread_state = STOPPED;
#if !defined(WIN32)
static cond_type send_cond;
#else
static sem_type send_sem;
#endif
#if defined(WIN32)
static mutex_type mqttasync_mutex = NULL;
static mutex_type mqttcommand_mutex = NULL;
static sem_type send_sem = NULL;
extern mutex_type stack_mutex;
extern mutex_type heap_mutex;
extern mutex_type log_mutex;
......@@ -94,6 +89,12 @@ BOOL APIENTRY DllMain(HANDLE hModule,
{
mqttasync_mutex = CreateMutex(NULL, 0, NULL);
mqttcommand_mutex = CreateMutex(NULL, 0, NULL);
send_sem = CreateEvent(
NULL, // default security attributes
FALSE, // manual-reset event?
FALSE, // initial state is nonsignaled
NULL // object name
);
stack_mutex = CreateMutex(NULL, 0, NULL);
heap_mutex = CreateMutex(NULL, 0, NULL);
log_mutex = CreateMutex(NULL, 0, NULL);
......@@ -112,6 +113,8 @@ static pthread_mutex_t mqttasync_mutex_store = PTHREAD_MUTEX_INITIALIZER;
static mutex_type mqttasync_mutex = &mqttasync_mutex_store;
static pthread_mutex_t mqttcommand_mutex_store = PTHREAD_MUTEX_INITIALIZER;
static mutex_type mqttcommand_mutex = &mqttcommand_mutex_store;
static cond_type_struct send_cond_store = { PTHREAD_COND_INITIALIZER, PTHREAD_MUTEX_INITIALIZER };
static cond_type send_cond = &send_cond_store;
#define WINAPI
#endif
......@@ -333,11 +336,6 @@ int MQTTAsync_create(MQTTAsync* handle, char* serverURI, char* clientId,
commands = ListInitialize();
#if defined(OPENSSL)
SSLSocket_initialize();
#endif
#if !defined(WIN32)
send_cond = Thread_create_cond();
#else
send_sem = Thread_create_sem();
#endif
initialized = 1;
}
......@@ -396,9 +394,9 @@ void MQTTAsync_terminate(void)
{
ListElement* elem = NULL;
#if !defined(WIN32)
Thread_destroy_cond(send_cond);
//Thread_destroy_cond(send_cond);
#else
Thread_destroy_sem(send_sem);
//Thread_destroy_sem(send_sem);
#endif
ListFree(bstate->clients);
ListFree(handles);
......@@ -904,14 +902,19 @@ void MQTTAsync_processCommand()
if (ListFind(ignored_clients, cmd->client))
continue;
if (cmd->command.type == CONNECT || (cmd->client->c->connected &&
if (cmd->command.type == CONNECT || cmd->command.type == DISCONNECT || (cmd->client->c->connected &&
cmd->client->c->connect_state == 0 && Socket_noPendingWrites(cmd->client->c->net.socket)))
{
command = cmd;
break;
if ((cmd->command.type == PUBLISH || cmd->command.type == SUBSCRIBE || cmd->command.type == UNSUBSCRIBE) &&
cmd->client->c->outboundMsgs->count >= MAX_MSG_ID - 1)
; /* no more message ids available */
else
{
command = cmd;
break;
}
}
else
ListAppend(ignored_clients, cmd->client, sizeof(cmd->client));
ListAppend(ignored_clients, cmd->client, sizeof(cmd->client));
}
ListFreeNoContent(ignored_clients);
if (command)
......@@ -1171,7 +1174,7 @@ void MQTTAsync_checkTimeouts()
{
MQTTAsync_queuedCommand* com = (MQTTAsync_queuedCommand*)(cur_response->content);
if (MQTTAsync_elapsed(com->command.start_time) < 30000)
if (1 /*MQTTAsync_elapsed(com->command.start_time) < 120000*/)
break; /* command has not timed out */
else
{
......@@ -2206,6 +2209,11 @@ int MQTTAsync_subscribeMany(MQTTAsync handle, int count, char** topic, int* qos,
rc = MQTTASYNC_DISCONNECTED;
goto exit;
}
if (m->c->outboundMsgs->count >= MAX_MSG_ID - 1)
{
rc = MQTTASYNC_NO_MORE_MSGIDS;
goto exit;
}
for (i = 0; i < count; i++)
{
if (!UTF8_validateString(topic[i]))
......@@ -2277,7 +2285,11 @@ int MQTTAsync_unsubscribeMany(MQTTAsync handle, int count, char** topic, MQTTAsy
rc = MQTTASYNC_DISCONNECTED;
goto exit;
}
if (m->c->outboundMsgs->count >= MAX_MSG_ID - 1)
{
rc = MQTTASYNC_NO_MORE_MSGIDS;
goto exit;
}
for (i = 0; i < count; i++)
{
if (!UTF8_validateString(topic[i]))
......@@ -2340,6 +2352,9 @@ int MQTTAsync_send(MQTTAsync handle, char* destinationName, int payloadlen, void
rc = MQTTASYNC_BAD_UTF8_STRING;
else if (qos < 0 || qos > 2)
rc = MQTTASYNC_BAD_QOS;
else if (m->c->outboundMsgs->count >= MAX_MSG_ID - 1)
rc = MQTTASYNC_NO_MORE_MSGIDS;
if (rc != MQTTASYNC_SUCCESS)
goto exit;
......@@ -2617,6 +2632,8 @@ MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
msgid = ack.msgId;
*rc = (pack->header.bits.type == PUBCOMP) ?
MQTTProtocol_handlePubcomps(pack, *sock) : MQTTProtocol_handlePubacks(pack, *sock);
if (!m)
Log(LOG_ERROR, -1, "PUBCOMP or PUBACK received for no client, msgid %d", msgid);
if (m)
{
ListElement* current = NULL;
......@@ -2624,7 +2641,9 @@ MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
if (m->dc)
{
Log(TRACE_MIN, -1, "Calling deliveryComplete for client %s, msgid %d", m->c->clientID, msgid);
//Thread_unlock_mutex(mqttasync_mutex);
(*(m->dc))(m->context, msgid);
//Thread_lock_mutex(mqttasync_mutex);
}
/* use the msgid to find the callback to be called */
while (ListNextElement(m->responses, &current))
......@@ -2645,9 +2664,9 @@ MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
data.alt.pub.message.qos = command->command.details.pub.qos;
data.alt.pub.message.retained = command->command.details.pub.retained;
Log(TRACE_MIN, -1, "Calling publish success for client %s", m->c->clientID);
Thread_unlock_mutex(mqttasync_mutex);
//Thread_unlock_mutex(mqttasync_mutex);
(*(command->command.onSuccess))(command->command.context, &data);
Thread_lock_mutex(mqttasync_mutex);
//Thread_lock_mutex(mqttasync_mutex);
}
MQTTAsync_freeCommand(command);
break;
......
......@@ -137,11 +137,13 @@ exit:
}
void StackTrace_printStack(char* dest)
void StackTrace_printStack(FILE* dest)
{
FILE* file = stdout;
int t = 0;
if (dest)
file = dest;
for (t = 0; t < thread_count; ++t)
{
threadEntry *cur_thread = &threads[t];
......
......@@ -17,6 +17,7 @@
#ifndef STACKTRACE_H_
#define STACKTRACE_H_
#include <stdio.h>
#include "Log.h"
#if defined(NOSTACKTRACE)
......@@ -63,7 +64,7 @@
void StackTrace_entry(const char* name, int line, int trace);
void StackTrace_exit(const char* name, int line, void* return_value, int trace);
void StackTrace_dumpStack(char* dest);
void StackTrace_printStack(FILE* dest);
char* StackTrace_get(unsigned long);
#endif /* STACKTRACE_H_ */
......@@ -110,6 +110,11 @@ int Thread_lock_mutex(mutex_type mutex)
#if defined(WIN32)
if (WaitForSingleObject(mutex, INFINITE) != WAIT_FAILED)
#else
if (mutex->__data.__owner != 0)
{
printf("mutex owner != 0\n");
StackTrace_printStack(stdout);
}
if ((rc = pthread_mutex_lock(mutex)) == 0)
#endif
rc = 0;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment