Commit 46aec931 authored by Ian Craggs's avatar Ian Craggs

Changes to the synchronous client API to check mutex calls on Linux, for bug #419233

parent 848411d0
...@@ -58,6 +58,18 @@ SYNC_SAMPLES = ${addprefix ${blddir}/samples/,${SAMPLE_FILES_C}} ...@@ -58,6 +58,18 @@ SYNC_SAMPLES = ${addprefix ${blddir}/samples/,${SAMPLE_FILES_C}}
SAMPLE_FILES_A = stdoutsuba MQTTAsync_subscribe MQTTAsync_publish SAMPLE_FILES_A = stdoutsuba MQTTAsync_subscribe MQTTAsync_publish
ASYNC_SAMPLES = ${addprefix ${blddir}/samples/,${SAMPLE_FILES_A}} ASYNC_SAMPLES = ${addprefix ${blddir}/samples/,${SAMPLE_FILES_A}}
TEST_FILES_C = test1
SYNC_TESTS = ${addprefix ${blddir}/test/,${TEST_FILES_C}}
TEST_FILES_CS = test3
SYNC_SSL_TESTS = ${addprefix ${blddir}/test/,${TEST_FILES_CS}}
TEST_FILES_A = test4
ASYNC_TESTS = ${addprefix ${blddir}/test/,${TEST_FILES_A}}
TEST_FILES_AS = test5
ASYNC_SSL_TESTS = ${addprefix ${blddir}/test/,${TEST_FILES_AS}}
# The names of the four different libraries to be built # The names of the four different libraries to be built
MQTTLIB_C = paho-mqtt3c MQTTLIB_C = paho-mqtt3c
MQTTLIB_CS = paho-mqtt3cs MQTTLIB_CS = paho-mqtt3cs
...@@ -96,20 +108,33 @@ MQTTVERSION_TARGET = ${blddir}/MQTTVersion ...@@ -96,20 +108,33 @@ MQTTVERSION_TARGET = ${blddir}/MQTTVersion
CCFLAGS_SO = -g -fPIC -Os -Wall -fvisibility=hidden CCFLAGS_SO = -g -fPIC -Os -Wall -fvisibility=hidden
FLAGS_EXE = -I ${srcdir} -lpthread -L ${blddir} FLAGS_EXE = -I ${srcdir} -lpthread -L ${blddir}
LDFLAGS_C = -shared -Wl,-soname,lib$(MQTTLIB_C).so.${MAJOR_VERSION} LDFLAGS_C = -shared -Wl,-soname,lib$(MQTTLIB_C).so.${MAJOR_VERSION} -Wl,-init,MQTTClient_init
LDFLAGS_CS = -shared -Wl,-soname,lib$(MQTTLIB_CS).so.${MAJOR_VERSION} -ldl -Wl,-whole-archive -lcrypto -lssl -Wl,-no-whole-archive LDFLAGS_CS = -shared -Wl,-soname,lib$(MQTTLIB_CS).so.${MAJOR_VERSION} -ldl -lcrypto -lssl -Wl,-no-whole-archive -Wl,-init,MQTTClient_init
LDFLAGS_A = -shared -Wl,-soname,lib${MQTTLIB_A}.so.${MAJOR_VERSION} LDFLAGS_A = -shared -Wl,-soname,lib${MQTTLIB_A}.so.${MAJOR_VERSION} -Wl,-init,MQTTAsync_init
LDFLAGS_AS = -shared -Wl,-soname,lib${MQTTLIB_AS}.so.${MAJOR_VERSION} -ldl -Wl,-whole-archive -lcrypto -lssl -Wl,-no-whole-archive LDFLAGS_AS = -shared -Wl,-soname,lib${MQTTLIB_AS}.so.${MAJOR_VERSION} -ldl -lcrypto -lssl -Wl,-no-whole-archive -Wl,-init,MQTTAsync_init
all: build all: build
build: | mkdir ${MQTTLIB_C_TARGET} ${MQTTLIB_CS_TARGET} ${MQTTLIB_A_TARGET} ${MQTTLIB_AS_TARGET} ${MQTTVERSION_TARGET} ${SYNC_SAMPLES} ${ASYNC_SAMPLES} build: | mkdir ${MQTTLIB_C_TARGET} ${MQTTLIB_CS_TARGET} ${MQTTLIB_A_TARGET} ${MQTTLIB_AS_TARGET} ${MQTTVERSION_TARGET} ${SYNC_SAMPLES} ${ASYNC_SAMPLES} ${SYNC_TESTS} ${SYNC_SSL_TESTS} ${ASYNC_TESTS} ${ASYNC_SSL_TESTS}
clean: clean:
rm -rf ${blddir}/* rm -rf ${blddir}/*
mkdir: mkdir:
-mkdir -p ${blddir}/samples -mkdir -p ${blddir}/samples
-mkdir -p ${blddir}/test
${SYNC_TESTS}: ${blddir}/test/%: ${srcdir}/../test/%.c
${CC} ${FLAGS_EXE} -g -o ${blddir}/test/${basename ${+F}} $< -l${MQTTLIB_C}
${SYNC_SSL_TESTS}: ${blddir}/test/%: ${srcdir}/../test/%.c
${CC} ${FLAGS_EXE} -g -o ${blddir}/test/${basename ${+F}} $< -l${MQTTLIB_CS}
${ASYNC_TESTS}: ${blddir}/test/%: ${srcdir}/../test/%.c
${CC} ${FLAGS_EXE} -g -o ${blddir}/test/${basename ${+F}} $< -l${MQTTLIB_A}
${ASYNC_SSL_TESTS}: ${blddir}/test/%: ${srcdir}/../test/%.c
${CC} ${FLAGS_EXE} -g -o ${blddir}/test/${basename ${+F}} $< -l${MQTTLIB_AS}
${SYNC_SAMPLES}: ${blddir}/samples/%: ${srcdir}/samples/%.c ${SYNC_SAMPLES}: ${blddir}/samples/%: ${srcdir}/samples/%.c
${CC} ${FLAGS_EXE} -o ${blddir}/samples/${basename ${+F}} $< -l${MQTTLIB_C} ${CC} ${FLAGS_EXE} -o ${blddir}/samples/${basename ${+F}} $< -l${MQTTLIB_C}
...@@ -119,23 +144,23 @@ ${ASYNC_SAMPLES}: ${blddir}/samples/%: ${srcdir}/samples/%.c ...@@ -119,23 +144,23 @@ ${ASYNC_SAMPLES}: ${blddir}/samples/%: ${srcdir}/samples/%.c
${MQTTLIB_C_TARGET}: ${SOURCE_FILES_C} ${HEADERS_C} ${MQTTLIB_C_TARGET}: ${SOURCE_FILES_C} ${HEADERS_C}
${CC} ${CCFLAGS_SO} ${LDFLAGS_C} -o $@ ${SOURCE_FILES_C} ${CC} ${CCFLAGS_SO} ${LDFLAGS_C} -o $@ ${SOURCE_FILES_C}
ln -s lib$(MQTTLIB_C).so.${VERSION} ${blddir}/lib$(MQTTLIB_C).so.${MAJOR_VERSION} -ln -s lib$(MQTTLIB_C).so.${VERSION} ${blddir}/lib$(MQTTLIB_C).so.${MAJOR_VERSION}
ln -s lib$(MQTTLIB_C).so.${MAJOR_VERSION} ${blddir}/lib$(MQTTLIB_C).so -ln -s lib$(MQTTLIB_C).so.${MAJOR_VERSION} ${blddir}/lib$(MQTTLIB_C).so
${MQTTLIB_CS_TARGET}: ${SOURCE_FILES_CS} ${HEADERS_C} ${MQTTLIB_CS_TARGET}: ${SOURCE_FILES_CS} ${HEADERS_C}
${CC} ${CCFLAGS_SO} ${LDFLAGS_CS} -o $@ ${SOURCE_FILES_CS} -DOPENSSL ${CC} ${CCFLAGS_SO} ${LDFLAGS_CS} -o $@ ${SOURCE_FILES_CS} -DOPENSSL
ln -s lib$(MQTTLIB_CS).so.${VERSION} ${blddir}/lib$(MQTTLIB_CS).so.${MAJOR_VERSION} -ln -s lib$(MQTTLIB_CS).so.${VERSION} ${blddir}/lib$(MQTTLIB_CS).so.${MAJOR_VERSION}
ln -s lib$(MQTTLIB_CS).so.${MAJOR_VERSION} ${blddir}/lib$(MQTTLIB_CS).so -ln -s lib$(MQTTLIB_CS).so.${MAJOR_VERSION} ${blddir}/lib$(MQTTLIB_CS).so
${MQTTLIB_A_TARGET}: ${SOURCE_FILES_A} ${HEADERS_A} ${MQTTLIB_A_TARGET}: ${SOURCE_FILES_A} ${HEADERS_A}
${CC} ${CCFLAGS_SO} ${LDFLAGS_A} -o $@ ${SOURCE_FILES_A} ${CC} ${CCFLAGS_SO} ${LDFLAGS_A} -o $@ ${SOURCE_FILES_A}
ln -s lib$(MQTTLIB_A).so.${VERSION} ${blddir}/lib$(MQTTLIB_A).so.${MAJOR_VERSION} -ln -s lib$(MQTTLIB_A).so.${VERSION} ${blddir}/lib$(MQTTLIB_A).so.${MAJOR_VERSION}
ln -s lib$(MQTTLIB_A).so.${MAJOR_VERSION} ${blddir}/lib$(MQTTLIB_A).so -ln -s lib$(MQTTLIB_A).so.${MAJOR_VERSION} ${blddir}/lib$(MQTTLIB_A).so
${MQTTLIB_AS_TARGET}: ${SOURCE_FILES_AS} ${HEADERS_A} ${MQTTLIB_AS_TARGET}: ${SOURCE_FILES_AS} ${HEADERS_A}
${CC} ${CCFLAGS_SO} ${LDFLAGS_AS} -o $@ ${SOURCE_FILES_AS} -DOPENSSL ${CC} ${CCFLAGS_SO} ${LDFLAGS_AS} -o $@ ${SOURCE_FILES_AS} -DOPENSSL
ln -s lib$(MQTTLIB_AS).so.${VERSION} ${blddir}/lib$(MQTTLIB_AS).so.${MAJOR_VERSION} -ln -s lib$(MQTTLIB_AS).so.${VERSION} ${blddir}/lib$(MQTTLIB_AS).so.${MAJOR_VERSION}
ln -s lib$(MQTTLIB_AS).so.${MAJOR_VERSION} ${blddir}/lib$(MQTTLIB_AS).so -ln -s lib$(MQTTLIB_AS).so.${MAJOR_VERSION} ${blddir}/lib$(MQTTLIB_AS).so
${MQTTVERSION_TARGET}: $(srcdir)/MQTTVersion.c $(srcdir)/MQTTAsync.h ${MQTTVERSION_TARGET}: $(srcdir)/MQTTVersion.c $(srcdir)/MQTTAsync.h
${CC} ${FLAGS_EXE} -o $@ -l${MQTTLIB_A} $(srcdir)/MQTTVersion.c -ldl ${CC} ${FLAGS_EXE} -o $@ -l${MQTTLIB_A} $(srcdir)/MQTTVersion.c -ldl
......
...@@ -57,7 +57,7 @@ ...@@ -57,7 +57,7 @@
<!-- non-SSL, synchronous library --> <!-- non-SSL, synchronous library -->
<property name="output.filename" value="${output.folder}/lib${libname}.so" /> <property name="output.filename" value="${output.folder}/lib${libname}.so" />
<exec executable="gcc" failonerror="true"> <exec executable="gcc" failonerror="true">
<arg line="${ccflags.so} ${ldflags.so} -Wl,-soname,lib${libname}.so -o ${output.filename} ${sync.source.files}"/> <arg line="${ccflags.so} ${ldflags.so} -Wl,-init,MQTTClient_init -Wl,-soname,lib${libname}.so -o ${output.filename} ${sync.source.files}"/>
</exec> </exec>
<exec executable="strip" failonerror="true"> <exec executable="strip" failonerror="true">
<arg value="${output.filename}" /> <arg value="${output.filename}" />
...@@ -69,7 +69,7 @@ ...@@ -69,7 +69,7 @@
<!-- SSL, synchronous library --> <!-- SSL, synchronous library -->
<property name="output.ssl.filename" value="${output.folder}/lib${libname.ssl}.so" /> <property name="output.ssl.filename" value="${output.folder}/lib${libname.ssl}.so" />
<exec executable="gcc" failonerror="true"> <exec executable="gcc" failonerror="true">
<arg line="-DOPENSSL ${ccflags.so} ${ldflags.so} -Wl,-soname,lib${libname.ssl}.so -o ${output.ssl.filename} ${sync.source.files}"/> <arg line="-DOPENSSL ${ccflags.so} ${ldflags.so} -Wl,-init,MQTTClient_init -Wl,-soname,lib${libname.ssl}.so -o ${output.ssl.filename} ${sync.source.files}"/>
</exec> </exec>
<exec executable="strip" failonerror="true"> <exec executable="strip" failonerror="true">
<arg value="${output.ssl.filename}" /> <arg value="${output.ssl.filename}" />
...@@ -210,22 +210,22 @@ ...@@ -210,22 +210,22 @@
<available file="/usr/bin/doxygen"/> <available file="/usr/bin/doxygen"/>
<then> <then>
<mkdir dir="${output.folder}/doc"/> <mkdir dir="${output.folder}/doc"/>
<exec executable="doxygen" dir="src"> <exec executable="doxygen" dir="src">
<arg value="../doc/DoxyfileV3ClientAPI"/> <arg value="../doc/DoxyfileV3ClientAPI"/>
</exec> </exec>
<exec executable="doxygen" dir="src"> <exec executable="doxygen" dir="src">
<arg value="../doc/DoxyfileV3AsyncAPI"/> <arg value="../doc/DoxyfileV3AsyncAPI"/>
</exec> </exec>
<zip destfile="${output.folder}/MQTTClient_doc.zip"> <zip destfile="${output.folder}/MQTTClient_doc.zip">
<zipfileset dir="${output.folder}/doc/MQTTClient" /> <zipfileset dir="${output.folder}/doc/MQTTClient" />
</zip> </zip>
<zip destfile="${output.folder}/MQTTAsync_doc.zip"> <zip destfile="${output.folder}/MQTTAsync_doc.zip">
<zipfileset dir="${output.folder}/doc/MQTTAsync" prefix="MQTTAsync/"/> <zipfileset dir="${output.folder}/doc/MQTTAsync" prefix="MQTTAsync/"/>
</zip> </zip>
<delete dir="${output.folder}/doc" /> <delete dir="${output.folder}/doc" />
</then> </then>
<else> <else>
<echo message="doxygen is not available" /> <echo message="doxygen is not available" />
</else> </else>
</if> </if>
</target> </target>
...@@ -234,13 +234,13 @@ ...@@ -234,13 +234,13 @@
<if> <if>
<available file="/shared/technology"/> <available file="/shared/technology"/>
<then> <then>
<mkdir dir="/shared/technology/paho/C"/> <mkdir dir="/shared/technology/paho/C"/>
<echo message="Copying the build output to /shared" /> <echo message="Copying the build output to /shared" />
<copy overwrite="true" todir="/shared/technology/paho/C"> <copy overwrite="true" todir="/shared/technology/paho/C">
<fileset dir="${output.folder}"> <fileset dir="${output.folder}">
<include name="*.zip"/> <include name="*.zip"/>
</fileset> </fileset>
</copy> </copy>
</then> </then>
</if> </if>
</target> </target>
......
...@@ -91,10 +91,10 @@ BOOL APIENTRY DllMain(HANDLE hModule, ...@@ -91,10 +91,10 @@ BOOL APIENTRY DllMain(HANDLE hModule,
mqttasync_mutex = CreateMutex(NULL, 0, NULL); mqttasync_mutex = CreateMutex(NULL, 0, NULL);
mqttcommand_mutex = CreateMutex(NULL, 0, NULL); mqttcommand_mutex = CreateMutex(NULL, 0, NULL);
send_sem = CreateEvent( send_sem = CreateEvent(
NULL, // default security attributes NULL, /* default security attributes */
FALSE, // manual-reset event? FALSE, /* manual-reset event? */
FALSE, // initial state is nonsignaled FALSE, /* initial state is nonsignaled */
NULL // object name NULL /* object name */
); );
stack_mutex = CreateMutex(NULL, 0, NULL); stack_mutex = CreateMutex(NULL, 0, NULL);
heap_mutex = CreateMutex(NULL, 0, NULL); heap_mutex = CreateMutex(NULL, 0, NULL);
...@@ -753,16 +753,12 @@ void MQTTAsync_checkDisconnect(MQTTAsync handle, MQTTAsync_command* command) ...@@ -753,16 +753,12 @@ void MQTTAsync_checkDisconnect(MQTTAsync handle, MQTTAsync_command* command)
if (command->details.dis.internal && m->cl && was_connected) if (command->details.dis.internal && m->cl && was_connected)
{ {
Log(TRACE_MIN, -1, "Calling connectionLost for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Calling connectionLost for client %s", m->c->clientID);
//MQTTAsync_unlock_mutex(mqttasync_mutex);
(*(m->cl))(m->context, NULL); (*(m->cl))(m->context, NULL);
//MQTTAsync_lock_mutex(mqttasync_mutex);
} }
else if (!command->details.dis.internal && command->onSuccess) else if (!command->details.dis.internal && command->onSuccess)
{ {
Log(TRACE_MIN, -1, "Calling disconnect complete for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Calling disconnect complete for client %s", m->c->clientID);
//MQTTAsync_unlock_mutex(mqttasync_mutex);
(*(command->onSuccess))(command->context, NULL); (*(command->onSuccess))(command->context, NULL);
//MQTTAsync_lock_mutex(mqttasync_mutex);
} }
} }
FUNC_EXIT; FUNC_EXIT;
...@@ -1051,9 +1047,7 @@ void MQTTAsync_processCommand() ...@@ -1051,9 +1047,7 @@ void MQTTAsync_processCommand()
data.alt.pub.message.qos = command->command.details.pub.qos; data.alt.pub.message.qos = command->command.details.pub.qos;
data.alt.pub.message.retained = command->command.details.pub.retained; data.alt.pub.message.retained = command->command.details.pub.retained;
Log(TRACE_MIN, -1, "Calling publish success for client %s", command->client->c->clientID); Log(TRACE_MIN, -1, "Calling publish success for client %s", command->client->c->clientID);
//MQTTAsync_unlock_mutex(mqttasync_mutex);
(*(command->command.onSuccess))(command->command.context, &data); (*(command->command.onSuccess))(command->command.context, &data);
//MQTTAsync_lock_mutex(mqttasync_mutex);
} }
} }
else else
...@@ -1115,10 +1109,7 @@ void MQTTAsync_processCommand() ...@@ -1115,10 +1109,7 @@ void MQTTAsync_processCommand()
if (command->command.onFailure) if (command->command.onFailure)
{ {
Log(TRACE_MIN, -1, "Calling command failure for client %s", command->client->c->clientID); Log(TRACE_MIN, -1, "Calling command failure for client %s", command->client->c->clientID);
//MQTTAsync_unlock_mutex(mqttasync_mutex);
(*(command->command.onFailure))(command->command.context, NULL); (*(command->command.onFailure))(command->command.context, NULL);
//MQTTAsync_lock_mutex(mqttasync_mutex);
} }
MQTTAsync_freeConnect(command->command); MQTTAsync_freeConnect(command->command);
MQTTAsync_freeCommand(command); /* free up the command if necessary */ MQTTAsync_freeCommand(command); /* free up the command if necessary */
...@@ -1182,9 +1173,7 @@ void MQTTAsync_checkTimeouts() ...@@ -1182,9 +1173,7 @@ void MQTTAsync_checkTimeouts()
if (m->connect.onFailure) if (m->connect.onFailure)
{ {
Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID);
//MQTTAsync_unlock_mutex(mqttasync_mutex);
(*(m->connect.onFailure))(m->connect.context, NULL); (*(m->connect.onFailure))(m->connect.context, NULL);
//MQTTAsync_lock_mutex(mqttasync_mutex);
} }
} }
continue; continue;
...@@ -1208,9 +1197,7 @@ void MQTTAsync_checkTimeouts() ...@@ -1208,9 +1197,7 @@ void MQTTAsync_checkTimeouts()
{ {
Log(TRACE_MIN, -1, "Calling %s failure for client %s", Log(TRACE_MIN, -1, "Calling %s failure for client %s",
MQTTPacket_name(com->command.type), m->c->clientID); MQTTPacket_name(com->command.type), m->c->clientID);
//MQTTAsync_unlock_mutex(mqttasync_mutex);
(*(com->command.onFailure))(com->command.context, NULL); (*(com->command.onFailure))(com->command.context, NULL);
//MQTTAsync_lock_mutex(mqttasync_mutex);
} }
timed_out_count++; timed_out_count++;
} }
...@@ -1494,9 +1481,7 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n) ...@@ -1494,9 +1481,7 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
if (m->connect.onSuccess) if (m->connect.onSuccess)
{ {
Log(TRACE_MIN, -1, "Calling connect success for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Calling connect success for client %s", m->c->clientID);
//MQTTAsync_unlock_mutex(mqttasync_mutex);
(*(m->connect.onSuccess))(m->connect.context, NULL); (*(m->connect.onSuccess))(m->connect.context, NULL);
//MQTTAsync_lock_mutex(mqttasync_mutex);
} }
} }
else else
...@@ -1527,9 +1512,7 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n) ...@@ -1527,9 +1512,7 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
data.code = rc; data.code = rc;
data.message = "CONNACK return code"; data.message = "CONNACK return code";
Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID);
//MQTTAsync_unlock_mutex(mqttasync_mutex);
(*(m->connect.onFailure))(m->connect.context, &data); (*(m->connect.onFailure))(m->connect.context, &data);
//MQTTAsync_lock_mutex(mqttasync_mutex);
} }
} }
} }
...@@ -1567,9 +1550,7 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n) ...@@ -1567,9 +1550,7 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
rc = MQTTProtocol_handleSubacks(pack, m->c->net.socket); rc = MQTTProtocol_handleSubacks(pack, m->c->net.socket);
handleCalled = 1; handleCalled = 1;
Log(TRACE_MIN, -1, "Calling subscribe success for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Calling subscribe success for client %s", m->c->clientID);
//MQTTAsync_unlock_mutex(mqttasync_mutex);
(*(command->command.onSuccess))(command->command.context, &data); (*(command->command.onSuccess))(command->command.context, &data);
//MQTTAsync_lock_mutex(mqttasync_mutex);
if (array) if (array)
free(array); free(array);
} }
...@@ -1598,9 +1579,7 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n) ...@@ -1598,9 +1579,7 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
rc = MQTTProtocol_handleUnsubacks(pack, m->c->net.socket); rc = MQTTProtocol_handleUnsubacks(pack, m->c->net.socket);
handleCalled = 1; handleCalled = 1;
Log(TRACE_MIN, -1, "Calling unsubscribe success for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Calling unsubscribe success for client %s", m->c->clientID);
//MQTTAsync_unlock_mutex(mqttasync_mutex);
(*(command->command.onSuccess))(command->command.context, NULL); (*(command->command.onSuccess))(command->command.context, NULL);
//MQTTAsync_lock_mutex(mqttasync_mutex);
} }
MQTTAsync_freeCommand(command); MQTTAsync_freeCommand(command);
break; break;
...@@ -1932,9 +1911,7 @@ int MQTTAsync_deliverMessage(MQTTAsyncs* m, char* topicName, int topicLen, MQTTA ...@@ -1932,9 +1911,7 @@ int MQTTAsync_deliverMessage(MQTTAsyncs* m, char* topicName, int topicLen, MQTTA
Log(TRACE_MIN, -1, "Calling messageArrived for client %s, queue depth %d", Log(TRACE_MIN, -1, "Calling messageArrived for client %s, queue depth %d",
m->c->clientID, m->c->messageQueue->count); m->c->clientID, m->c->messageQueue->count);
//MQTTAsync_unlock_mutex(mqttasync_mutex);
rc = (*(m->ma))(m->context, topicName, topicLen, mm); rc = (*(m->ma))(m->context, topicName, topicLen, mm);
//MQTTAsync_lock_mutex(mqttasync_mutex);
/* if 0 (false) is returned by the callback then it failed, so we don't remove the message from /* if 0 (false) is returned by the callback then it failed, so we don't remove the message from
* the queue, and it will be retried later. If 1 is returned then the message data may have been freed, * the queue, and it will be retried later. If 1 is returned then the message data may have been freed,
* so we must be careful how we use it. * so we must be careful how we use it.
...@@ -2594,9 +2571,7 @@ exit: ...@@ -2594,9 +2571,7 @@ exit:
if (m->connect.onFailure) if (m->connect.onFailure)
{ {
Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID);
//MQTTAsync_unlock_mutex(mqttasync_mutex);
(*(m->connect.onFailure))(m->connect.context, NULL); (*(m->connect.onFailure))(m->connect.context, NULL);
//MQTTAsync_lock_mutex(mqttasync_mutex);
} }
} }
} }
...@@ -2675,9 +2650,7 @@ MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc) ...@@ -2675,9 +2650,7 @@ MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
if (m->connect.onFailure) if (m->connect.onFailure)
{ {
Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID);
//MQTTAsync_unlock_mutex(mqttasync_mutex);
(*(m->connect.onFailure))(m->connect.context, NULL); (*(m->connect.onFailure))(m->connect.context, NULL);
//MQTTAsync_lock_mutex(mqttasync_mutex);
} }
} }
} }
...@@ -2706,9 +2679,7 @@ MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc) ...@@ -2706,9 +2679,7 @@ MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
if (m->dc) if (m->dc)
{ {
Log(TRACE_MIN, -1, "Calling deliveryComplete for client %s, msgid %d", m->c->clientID, msgid); Log(TRACE_MIN, -1, "Calling deliveryComplete for client %s, msgid %d", m->c->clientID, msgid);
//MQTTAsync_unlock_mutex(mqttasync_mutex);
(*(m->dc))(m->context, msgid); (*(m->dc))(m->context, msgid);
//MQTTAsync_lock_mutex(mqttasync_mutex);
} }
/* use the msgid to find the callback to be called */ /* use the msgid to find the callback to be called */
while (ListNextElement(m->responses, &current)) while (ListNextElement(m->responses, &current))
...@@ -2729,9 +2700,7 @@ MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc) ...@@ -2729,9 +2700,7 @@ MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
data.alt.pub.message.qos = command->command.details.pub.qos; data.alt.pub.message.qos = command->command.details.pub.qos;
data.alt.pub.message.retained = command->command.details.pub.retained; data.alt.pub.message.retained = command->command.details.pub.retained;
Log(TRACE_MIN, -1, "Calling publish success for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Calling publish success for client %s", m->c->clientID);
//MQTTAsync_unlock_mutex(mqttasync_mutex);
(*(command->command.onSuccess))(command->command.context, &data); (*(command->command.onSuccess))(command->command.context, &data);
//MQTTAsync_lock_mutex(mqttasync_mutex);
} }
MQTTAsync_freeCommand(command); MQTTAsync_freeCommand(command);
break; break;
......
...@@ -97,6 +97,18 @@ BOOL APIENTRY DllMain(HANDLE hModule, ...@@ -97,6 +97,18 @@ BOOL APIENTRY DllMain(HANDLE hModule,
#else #else
static pthread_mutex_t mqttclient_mutex_store = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t mqttclient_mutex_store = PTHREAD_MUTEX_INITIALIZER;
static mutex_type mqttclient_mutex = &mqttclient_mutex_store; static mutex_type mqttclient_mutex = &mqttclient_mutex_store;
void MQTTClient_init()
{
pthread_mutexattr_t attr;
int rc;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
if ((rc = pthread_mutex_init(mqttclient_mutex, &attr)) != 0)
printf("MQTTAsync: error %d initializing client_mutex\n", rc);
}
#define WINAPI #define WINAPI
#endif #endif
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment