Commit ae855dd5 authored by Ian Craggs's avatar Ian Craggs

Variable latency when sending/receiving messages

Bug: 465369
parent b9946d43
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
* Ian Craggs - fix for bug 442400: reconnecting after network cable unplugged * Ian Craggs - fix for bug 442400: reconnecting after network cable unplugged
* Ian Craggs - fix for bug 444934 - incorrect free in freeCommand1 * Ian Craggs - fix for bug 444934 - incorrect free in freeCommand1
* Ian Craggs - fix for bug 445891 - assigning msgid is not thread safe * Ian Craggs - fix for bug 445891 - assigning msgid is not thread safe
* Ian Craggs - fix for bug 465369 - longer latency than expected
*******************************************************************************/ *******************************************************************************/
/** /**
...@@ -137,6 +138,11 @@ void MQTTAsync_init() ...@@ -137,6 +138,11 @@ void MQTTAsync_init()
printf("MQTTAsync: error %d initializing async_mutex\n", rc); printf("MQTTAsync: error %d initializing async_mutex\n", rc);
if ((rc = pthread_mutex_init(mqttcommand_mutex, &attr)) != 0) if ((rc = pthread_mutex_init(mqttcommand_mutex, &attr)) != 0)
printf("MQTTAsync: error %d initializing command_mutex\n", rc); printf("MQTTAsync: error %d initializing command_mutex\n", rc);
if ((rc = pthread_cond_init(&send_cond->cond, NULL)) != 0)
printf("MQTTAsync: error %d initializing send_cond cond\n", rc);
if ((rc = pthread_mutex_init(&send_cond->mutex, &attr)) != 0)
printf("MQTTAsync: error %d initializing send_cond mutex\n", rc);
} }
#define WINAPI #define WINAPI
...@@ -760,7 +766,9 @@ int MQTTAsync_addCommand(MQTTAsync_queuedCommand* command, int command_size) ...@@ -760,7 +766,9 @@ int MQTTAsync_addCommand(MQTTAsync_queuedCommand* command, int command_size)
} }
MQTTAsync_unlock_mutex(mqttcommand_mutex); MQTTAsync_unlock_mutex(mqttcommand_mutex);
#if !defined(WIN32) && !defined(WIN64) #if !defined(WIN32) && !defined(WIN64)
Thread_signal_cond(send_cond); rc = Thread_signal_cond(send_cond);
if (rc != 0)
Log(LOG_ERROR, 0, "Error %d from signal cond", rc);
#else #else
if (!Thread_check_sem(send_sem)) if (!Thread_check_sem(send_sem))
Thread_post_sem(send_sem); Thread_post_sem(send_sem);
...@@ -927,7 +935,7 @@ void MQTTAsync_writeComplete(int socket) ...@@ -927,7 +935,7 @@ void MQTTAsync_writeComplete(int socket)
} }
void MQTTAsync_processCommand() int MQTTAsync_processCommand()
{ {
int rc = 0; int rc = 0;
MQTTAsync_queuedCommand* command = NULL; MQTTAsync_queuedCommand* command = NULL;
...@@ -1166,7 +1174,9 @@ void MQTTAsync_processCommand() ...@@ -1166,7 +1174,9 @@ void MQTTAsync_processCommand()
exit: exit:
MQTTAsync_unlock_mutex(mqttasync_mutex); MQTTAsync_unlock_mutex(mqttasync_mutex);
FUNC_EXIT; rc = (command != NULL);
FUNC_EXIT_RC(rc);
return rc;
} }
...@@ -1265,13 +1275,10 @@ thread_return_type WINAPI MQTTAsync_sendThread(void* n) ...@@ -1265,13 +1275,10 @@ thread_return_type WINAPI MQTTAsync_sendThread(void* n)
while (commands->count > 0) while (commands->count > 0)
{ {
int before = commands->count; if (MQTTAsync_processCommand() == 0)
MQTTAsync_processCommand();
if (before == commands->count)
break; /* no commands were processed, so go into a wait */ break; /* no commands were processed, so go into a wait */
} }
#if !defined(WIN32) && !defined(WIN64) #if !defined(WIN32) && !defined(WIN64)
rc = Thread_wait_cond(send_cond, 1);
if ((rc = Thread_wait_cond(send_cond, 1)) != 0 && rc != ETIMEDOUT) if ((rc = Thread_wait_cond(send_cond, 1)) != 0 && rc != ETIMEDOUT)
Log(LOG_ERROR, -1, "Error %d waiting for condition variable", rc); Log(LOG_ERROR, -1, "Error %d waiting for condition variable", rc);
#else #else
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment