Commit 8a93fed2 authored by Ian Craggs's avatar Ian Craggs

Fix multi-thread access for MQTTClient

Bug: 474905
parent 06e8f582
......@@ -78,13 +78,13 @@ SYNC_SAMPLES = ${addprefix ${blddir}/samples/,${SAMPLE_FILES_C}}
SAMPLE_FILES_A = stdoutsuba MQTTAsync_subscribe MQTTAsync_publish
ASYNC_SAMPLES = ${addprefix ${blddir}/samples/,${SAMPLE_FILES_A}}
TEST_FILES_C = test1 sync_client_test test_mqtt4sync
TEST_FILES_C = test1 test2 sync_client_test test_mqtt4sync
SYNC_TESTS = ${addprefix ${blddir}/test/,${TEST_FILES_C}}
TEST_FILES_CS = test3
SYNC_SSL_TESTS = ${addprefix ${blddir}/test/,${TEST_FILES_CS}}
TEST_FILES_A = test4 test_mqtt4async
TEST_FILES_A = test4 test_mqtt4async MQTTAsync_multi_pub
ASYNC_TESTS = ${addprefix ${blddir}/test/,${TEST_FILES_A}}
TEST_FILES_AS = test5
......@@ -172,7 +172,7 @@ mkdir:
echo OSTYPE is $(OSTYPE)
${SYNC_TESTS}: ${blddir}/test/%: ${srcdir}/../test/%.c $(MQTTLIB_C_TARGET)
${CC} -g -o $@ $< -l${MQTTLIB_C} ${FLAGS_EXE}
${CC} -DNOSTACKTRACE $(srcdir)/Thread.c -g -o $@ $< -l${MQTTLIB_C} ${FLAGS_EXE}
${SYNC_SSL_TESTS}: ${blddir}/test/%: ${srcdir}/../test/%.c $(MQTTLIB_CS_TARGET)
${CC} -g -o $@ $< -l${MQTTLIB_CS} ${FLAGS_EXES}
......
......@@ -82,7 +82,7 @@
</target>
<target name="test" >
<foreach target="runAtest" param="aTest" list="test1,test4"/>
<foreach target="runAtest" param="aTest" list="test1,test2,test4"/>
<foreach target="runSSLtest" param="aTest" list="test3,test5"/>
</target>
......
/*******************************************************************************
* Copyright (c) 2009, 2014 IBM Corp.
* Copyright (c) 2009, 2015 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
......@@ -27,6 +27,7 @@
* Ian Craggs - fix for bug 443724 - stack corruption
* Ian Craggs - fix for bug 447672 - simultaneous access to socket structure
* Ian Craggs - fix for bug 459791 - deadlock in WaitForCompletion for bad client
* Ian Craggs - fix for bug 474905 - insufficient synchronization for subscribe, unsubscribe, connect
*******************************************************************************/
/**
......@@ -79,6 +80,9 @@ MQTTProtocol state;
#if defined(WIN32) || defined(WIN64)
static mutex_type mqttclient_mutex = NULL;
static mutex_type socket_mutex = NULL;
static mutex_type subscribe_mutex = NULL;
static mutex_type unsubscribe_mutex = NULL;
static mutex_type connect_mutex = NULL;
extern mutex_type stack_mutex;
extern mutex_type heap_mutex;
extern mutex_type log_mutex;
......@@ -93,6 +97,9 @@ BOOL APIENTRY DllMain(HANDLE hModule,
if (mqttclient_mutex == NULL)
{
mqttclient_mutex = CreateMutex(NULL, 0, NULL);
subscribe_mutex = CreateMutex(NULL, 0, NULL);
unsubscribe_mutex = CreateMutex(NULL, 0, NULL);
connect_mutex = CreateMutex(NULL, 0, NULL);
stack_mutex = CreateMutex(NULL, 0, NULL);
heap_mutex = CreateMutex(NULL, 0, NULL);
log_mutex = CreateMutex(NULL, 0, NULL);
......@@ -110,9 +117,19 @@ BOOL APIENTRY DllMain(HANDLE hModule,
#else
static pthread_mutex_t mqttclient_mutex_store = PTHREAD_MUTEX_INITIALIZER;
static mutex_type mqttclient_mutex = &mqttclient_mutex_store;
static pthread_mutex_t socket_mutex_store = PTHREAD_MUTEX_INITIALIZER;
static mutex_type socket_mutex = &socket_mutex_store;
static pthread_mutex_t subscribe_mutex_store = PTHREAD_MUTEX_INITIALIZER;
static mutex_type subscribe_mutex = &subscribe_mutex_store;
static pthread_mutex_t unsubscribe_mutex_store = PTHREAD_MUTEX_INITIALIZER;
static mutex_type unsubscribe_mutex = &unsubscribe_mutex_store;
static pthread_mutex_t connect_mutex_store = PTHREAD_MUTEX_INITIALIZER;
static mutex_type connect_mutex = &connect_mutex_store;
void MQTTClient_init()
{
pthread_mutexattr_t attr;
......@@ -124,6 +141,12 @@ void MQTTClient_init()
printf("MQTTClient: error %d initializing client_mutex\n", rc);
if ((rc = pthread_mutex_init(socket_mutex, &attr)) != 0)
printf("MQTTClient: error %d initializing socket_mutex\n", rc);
if ((rc = pthread_mutex_init(subscribe_mutex, &attr)) != 0)
printf("MQTTClient: error %d initializing subscribe_mutex\n", rc);
if ((rc = pthread_mutex_init(unsubscribe_mutex, &attr)) != 0)
printf("MQTTClient: error %d initializing unsubscribe_mutex\n", rc);
if ((rc = pthread_mutex_init(connect_mutex, &attr)) != 0)
printf("MQTTClient: error %d initializing connect_mutex\n", rc);
}
#define WINAPI
......@@ -1032,6 +1055,7 @@ int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions* options)
int rc = SOCKET_ERROR;
FUNC_ENTRY;
Thread_lock_mutex(connect_mutex);
Thread_lock_mutex(mqttclient_mutex);
if (options == NULL)
......@@ -1106,6 +1130,7 @@ exit:
m->c->will = NULL;
}
Thread_unlock_mutex(mqttclient_mutex);
Thread_unlock_mutex(connect_mutex);
FUNC_EXIT_RC(rc);
return rc;
}
......@@ -1217,13 +1242,14 @@ int MQTTClient_isConnected(MQTTClient handle)
int MQTTClient_subscribeMany(MQTTClient handle, int count, char* const* topic, int* qos)
{
MQTTClients* m = handle;
List* topics = ListInitialize();
List* qoss = ListInitialize();
List* topics = NULL;
List* qoss = NULL;
int i = 0;
int rc = MQTTCLIENT_FAILURE;
int msgid = 0;
FUNC_ENTRY;
Thread_lock_mutex(subscribe_mutex);
Thread_lock_mutex(mqttclient_mutex);
if (m == NULL || m->c == NULL)
......@@ -1256,6 +1282,8 @@ int MQTTClient_subscribeMany(MQTTClient handle, int count, char* const* topic, i
goto exit;
}
topics = ListInitialize();
qoss = ListInitialize();
for (i = 0; i < count; i++)
{
ListAppend(topics, topic[i], strlen(topic[i]));
......@@ -1297,6 +1325,7 @@ int MQTTClient_subscribeMany(MQTTClient handle, int count, char* const* topic, i
exit:
Thread_unlock_mutex(mqttclient_mutex);
Thread_unlock_mutex(subscribe_mutex);
FUNC_EXIT_RC(rc);
return rc;
}
......@@ -1319,12 +1348,13 @@ int MQTTClient_subscribe(MQTTClient handle, const char* topic, int qos)
int MQTTClient_unsubscribeMany(MQTTClient handle, int count, char* const* topic)
{
MQTTClients* m = handle;
List* topics = ListInitialize();
List* topics = NULL;
int i = 0;
int rc = SOCKET_ERROR;
int msgid = 0;
FUNC_ENTRY;
Thread_lock_mutex(unsubscribe_mutex);
Thread_lock_mutex(mqttclient_mutex);
if (m == NULL || m->c == NULL)
......@@ -1351,6 +1381,7 @@ int MQTTClient_unsubscribeMany(MQTTClient handle, int count, char* const* topic)
goto exit;
}
topics = ListInitialize();
for (i = 0; i < count; i++)
ListAppend(topics, topic[i], strlen(topic[i]));
rc = MQTTProtocol_unsubscribe(m->c, topics, msgid);
......@@ -1377,6 +1408,7 @@ int MQTTClient_unsubscribeMany(MQTTClient handle, int count, char* const* topic)
exit:
Thread_unlock_mutex(mqttclient_mutex);
Thread_unlock_mutex(unsubscribe_mutex);
FUNC_EXIT_RC(rc);
return rc;
}
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment