Commit 3b23859e authored by Ian Craggs's avatar Ian Craggs

Some threading tests

parent d0f87d58
...@@ -942,8 +942,7 @@ static int MQTTAsync_addCommand(MQTTAsync_queuedCommand* command, int command_si ...@@ -942,8 +942,7 @@ static int MQTTAsync_addCommand(MQTTAsync_queuedCommand* command, int command_si
if (rc != 0) if (rc != 0)
Log(LOG_ERROR, 0, "Error %d from signal cond", rc); Log(LOG_ERROR, 0, "Error %d from signal cond", rc);
#else #else
if (!Thread_check_sem(send_sem)) rc = Thread_post_sem(send_sem);
Thread_post_sem(send_sem);
#endif #endif
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
return rc; return rc;
...@@ -1986,8 +1985,7 @@ static int MQTTAsync_completeConnection(MQTTAsyncs* m, Connack* connack) ...@@ -1986,8 +1985,7 @@ static int MQTTAsync_completeConnection(MQTTAsyncs* m, Connack* connack)
#if !defined(WIN32) && !defined(WIN64) #if !defined(WIN32) && !defined(WIN64)
Thread_signal_cond(send_cond); Thread_signal_cond(send_cond);
#else #else
if (!Thread_check_sem(send_sem)) Thread_post_sem(send_sem);
Thread_post_sem(send_sem);
#endif #endif
} }
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
...@@ -2303,7 +2301,7 @@ static thread_return_type WINAPI MQTTAsync_receiveThread(void* n) ...@@ -2303,7 +2301,7 @@ static thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
if (sendThread_state != STOPPED) if (sendThread_state != STOPPED)
Thread_signal_cond(send_cond); Thread_signal_cond(send_cond);
#else #else
if (sendThread_state != STOPPED && !Thread_check_sem(send_sem)) if (sendThread_state != STOPPED)
Thread_post_sem(send_sem); Thread_post_sem(send_sem);
#endif #endif
FUNC_EXIT; FUNC_EXIT;
......
...@@ -773,12 +773,12 @@ static thread_return_type WINAPI MQTTClient_run(void* n) ...@@ -773,12 +773,12 @@ static thread_return_type WINAPI MQTTClient_run(void* n)
MQTTClient_disconnect_internal(m, 0); MQTTClient_disconnect_internal(m, 0);
else else
{ {
if (m->c->connect_state == SSL_IN_PROGRESS && !Thread_check_sem(m->connect_sem)) if (m->c->connect_state == SSL_IN_PROGRESS)
{ {
Log(TRACE_MIN, -1, "Posting connect semaphore for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Posting connect semaphore for client %s", m->c->clientID);
Thread_post_sem(m->connect_sem); Thread_post_sem(m->connect_sem);
} }
if (m->c->connect_state == WAIT_FOR_CONNACK && !Thread_check_sem(m->connack_sem)) if (m->c->connect_state == WAIT_FOR_CONNACK)
{ {
Log(TRACE_MIN, -1, "Posting connack semaphore for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Posting connack semaphore for client %s", m->c->clientID);
Thread_post_sem(m->connack_sem); Thread_post_sem(m->connack_sem);
...@@ -818,7 +818,7 @@ static thread_return_type WINAPI MQTTClient_run(void* n) ...@@ -818,7 +818,7 @@ static thread_return_type WINAPI MQTTClient_run(void* n)
} }
if (pack) if (pack)
{ {
if (pack->header.bits.type == CONNACK && !Thread_check_sem(m->connack_sem)) if (pack->header.bits.type == CONNACK)
{ {
Log(TRACE_MIN, -1, "Posting connack semaphore for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Posting connack semaphore for client %s", m->c->clientID);
m->pack = pack; m->pack = pack;
...@@ -869,7 +869,7 @@ static thread_return_type WINAPI MQTTClient_run(void* n) ...@@ -869,7 +869,7 @@ static thread_return_type WINAPI MQTTClient_run(void* n)
#endif #endif
} }
} }
else if (m->c->connect_state == TCP_IN_PROGRESS && !Thread_check_sem(m->connect_sem)) else if (m->c->connect_state == TCP_IN_PROGRESS)
{ {
int error; int error;
socklen_t len = sizeof(error); socklen_t len = sizeof(error);
...@@ -880,7 +880,7 @@ static thread_return_type WINAPI MQTTClient_run(void* n) ...@@ -880,7 +880,7 @@ static thread_return_type WINAPI MQTTClient_run(void* n)
Thread_post_sem(m->connect_sem); Thread_post_sem(m->connect_sem);
} }
#if defined(OPENSSL) #if defined(OPENSSL)
else if (m->c->connect_state == SSL_IN_PROGRESS && !Thread_check_sem(m->connect_sem)) else if (m->c->connect_state == SSL_IN_PROGRESS)
{ {
rc = SSLSocket_connect(m->c->net.ssl, m->c->net.socket, rc = SSLSocket_connect(m->c->net.ssl, m->c->net.socket,
m->serverURI, m->c->sslopts->verify); m->serverURI, m->c->sslopts->verify);
...@@ -894,7 +894,7 @@ static thread_return_type WINAPI MQTTClient_run(void* n) ...@@ -894,7 +894,7 @@ static thread_return_type WINAPI MQTTClient_run(void* n)
} }
} }
#endif #endif
else if (m->c->connect_state == WEBSOCKET_IN_PROGRESS && !Thread_check_sem(m->connect_sem)) else if (m->c->connect_state == WEBSOCKET_IN_PROGRESS)
{ {
Log(TRACE_MIN, -1, "Posting websocket handshake for client %s rc %d", m->c->clientID, m->rc); Log(TRACE_MIN, -1, "Posting websocket handshake for client %s rc %d", m->c->clientID, m->rc);
m->c->connect_state = WAIT_FOR_CONNACK; m->c->connect_state = WAIT_FOR_CONNACK;
...@@ -1678,14 +1678,14 @@ static int MQTTClient_disconnect1(MQTTClient handle, int timeout, int call_conne ...@@ -1678,14 +1678,14 @@ static int MQTTClient_disconnect1(MQTTClient handle, int timeout, int call_conne
MQTTClient_closeSession(m->c, reason, props); MQTTClient_closeSession(m->c, reason, props);
while (Thread_check_sem(m->connect_sem)) /*while (Thread_wait_sem(m->connect_sem, 100))
Thread_wait_sem(m->connect_sem, 100); ;
while (Thread_check_sem(m->connack_sem)) while (Thread_wait_sem(m->connack_sem, 100))
Thread_wait_sem(m->connack_sem, 100); ;
while (Thread_check_sem(m->suback_sem)) while (Thread_wait_sem(m->suback_sem, 100))
Thread_wait_sem(m->suback_sem, 100); ;
while (Thread_check_sem(m->unsuback_sem)) while (Thread_wait_sem(m->unsuback_sem, 100))
Thread_wait_sem(m->unsuback_sem, 100); ;*/
exit: exit:
if (stop) if (stop)
MQTTClient_stop(); MQTTClient_stop();
......
...@@ -197,6 +197,13 @@ sem_type Thread_create_sem(void) ...@@ -197,6 +197,13 @@ sem_type Thread_create_sem(void)
FALSE, /* initial state is nonsignaled */ FALSE, /* initial state is nonsignaled */
NULL /* object name */ NULL /* object name */
); );
sem = CreateSemaphore(
NULL, /* default security attributes */
0, /* initial count - non signaled */
1, /* maximum count */
NULL /* unnamed semaphore */
);
#elif defined(OSX) #elif defined(OSX)
sem = dispatch_semaphore_create(0L); sem = dispatch_semaphore_create(0L);
rc = (sem == NULL) ? -1 : 0; rc = (sem == NULL) ? -1 : 0;
...@@ -234,9 +241,15 @@ int Thread_wait_sem(sem_type sem, int timeout) ...@@ -234,9 +241,15 @@ int Thread_wait_sem(sem_type sem, int timeout)
FUNC_ENTRY; FUNC_ENTRY;
#if defined(WIN32) || defined(WIN64) #if defined(WIN32) || defined(WIN64)
/* returns 0 (WAIT_OBJECT_0) on success, non-zero (WAIT_TIMEOUT) if timeout occurred */
rc = WaitForSingleObject(sem, timeout < 0 ? 0 : timeout); rc = WaitForSingleObject(sem, timeout < 0 ? 0 : timeout);
#elif defined(OSX) if (rc == WAIT_TIMEOUT)
rc = ETIMEDOUT;
#elif defined(OSX)
/* returns 0 on success, non-zero if timeout occurred */
rc = (int)dispatch_semaphore_wait(sem, dispatch_time(DISPATCH_TIME_NOW, (int64_t)timeout*1000000L)); rc = (int)dispatch_semaphore_wait(sem, dispatch_time(DISPATCH_TIME_NOW, (int64_t)timeout*1000000L));
if (rc != 0)
rc = ETIMEDOUT;
#elif defined(USE_TRYWAIT) #elif defined(USE_TRYWAIT)
while (++i < count && (rc = sem_trywait(sem)) != 0) while (++i < count && (rc = sem_trywait(sem)) != 0)
{ {
...@@ -264,20 +277,25 @@ int Thread_wait_sem(sem_type sem, int timeout) ...@@ -264,20 +277,25 @@ int Thread_wait_sem(sem_type sem, int timeout)
/** /**
* Check to see if a semaphore has been posted, without waiting. * Check to see if a semaphore has been posted, without waiting
* The semaphore will be unchanged, if the return value is false.
* The semaphore will have been decremented, if the return value is true.
* @param sem the semaphore * @param sem the semaphore
* @return 0 (false) or 1 (true) * @return 0 (false) or 1 (true)
*/ */
int Thread_check_sem(sem_type sem) int Thread_check_sem(sem_type sem)
{ {
#if defined(WIN32) || defined(WIN64) #if defined(WIN32) || defined(WIN64)
/* if the return value is not 0, the semaphore will not have been decremented */
return WaitForSingleObject(sem, 0) == WAIT_OBJECT_0; return WaitForSingleObject(sem, 0) == WAIT_OBJECT_0;
#elif defined(OSX) #elif defined(OSX)
return dispatch_semaphore_wait(sem, DISPATCH_TIME_NOW) == 0; /* if the return value is not 0, the semaphore will not have been decremented */
return dispatch_semaphore_wait(sem, DISPATCH_TIME_NOW) == 0;
#else #else
int semval = -1; /*int semval = -1;
sem_getvalue(sem, &semval); sem_getvalue(sem, &semval);
return semval > 0; return semval > 0;*/
return sem_trywait(sem) == 0;
#endif #endif
} }
...@@ -285,7 +303,7 @@ int Thread_check_sem(sem_type sem) ...@@ -285,7 +303,7 @@ int Thread_check_sem(sem_type sem)
/** /**
* Post a semaphore * Post a semaphore
* @param sem the semaphore * @param sem the semaphore
* @return completion code * @return 0 on success
*/ */
int Thread_post_sem(sem_type sem) int Thread_post_sem(sem_type sem)
{ {
...@@ -371,7 +389,7 @@ int Thread_signal_cond(cond_type condvar) ...@@ -371,7 +389,7 @@ int Thread_signal_cond(cond_type condvar)
/** /**
* Wait with a timeout (seconds) for condition variable * Wait with a timeout (seconds) for condition variable
* @return completion code * @return 0 for success, ETIMEDOUT otherwise
*/ */
int Thread_wait_cond(cond_type condvar, int timeout) int Thread_wait_cond(cond_type condvar, int timeout)
{ {
...@@ -413,61 +431,201 @@ int Thread_destroy_cond(cond_type condvar) ...@@ -413,61 +431,201 @@ int Thread_destroy_cond(cond_type condvar)
#if defined(THREAD_UNIT_TESTS) #if defined(THREAD_UNIT_TESTS)
#if defined(WIN32) || defined(_WINDOWS)
#define mqsleep(A) Sleep(1000*A)
#define START_TIME_TYPE DWORD
static DWORD start_time = 0;
START_TIME_TYPE start_clock(void)
{
return GetTickCount();
}
#elif defined(AIX)
#define mqsleep sleep
#define START_TIME_TYPE struct timespec
START_TIME_TYPE start_clock(void)
{
static struct timespec start;
clock_gettime(CLOCK_REALTIME, &start);
return start;
}
#else
#define mqsleep sleep
#define START_TIME_TYPE struct timeval
/* TODO - unused - remove? static struct timeval start_time; */
START_TIME_TYPE start_clock(void)
{
struct timeval start_time;
gettimeofday(&start_time, NULL);
return start_time;
}
#endif
#if defined(WIN32)
long elapsed(START_TIME_TYPE start_time)
{
return GetTickCount() - start_time;
}
#elif defined(AIX)
#define assert(a)
long elapsed(struct timespec start)
{
struct timespec now, res;
clock_gettime(CLOCK_REALTIME, &now);
ntimersub(now, start, res);
return (res.tv_sec)*1000L + (res.tv_nsec)/1000000L;
}
#else
long elapsed(START_TIME_TYPE start_time)
{
struct timeval now, res;
gettimeofday(&now, NULL);
timersub(&now, &start_time, &res);
return (res.tv_sec)*1000 + (res.tv_usec)/1000;
}
#endif
int tests = 0, failures = 0;
void myassert(char* filename, int lineno, char* description, int value, char* format, ...)
{
++tests;
if (!value)
{
va_list args;
++failures;
printf("Assertion failed, file %s, line %d, description: %s\n", filename, lineno, description);
va_start(args, format);
vprintf(format, args);
va_end(args);
//cur_output += sprintf(cur_output, "<failure type=\"%s\">file %s, line %d </failure>\n",
// description, filename, lineno);
}
else
printf("Assertion succeeded, file %s, line %d, description: %s\n", filename, lineno, description);
}
#define assert(a, b, c, d) myassert(__FILE__, __LINE__, a, b, c, d)
#define assert1(a, b, c, d, e) myassert(__FILE__, __LINE__, a, b, c, d, e)
#include <stdio.h> #include <stdio.h>
thread_return_type secondary(void* n) thread_return_type cond_secondary(void* n)
{ {
int rc = 0; int rc = 0;
/*
cond_type cond = n; cond_type cond = n;
printf("Secondary thread about to wait\n"); printf("This should return immediately as it was posted already\n");
rc = Thread_wait_cond(cond); rc = Thread_wait_cond(cond, 99999);
printf("Secondary thread returned from wait %d\n", rc);*/ assert("rc 1 from wait_cond", rc == 1, "rc was %d", rc);
printf("This should hang around a few seconds\n");
rc = Thread_wait_cond(cond, 99999);
assert("rc 1 from wait_cond", rc == 1, "rc was %d", rc);
printf("Secondary cond thread ending\n");
return 0;
}
int cond_test()
{
int rc = 0;
cond_type cond = Thread_create_cond();
thread_type thread;
printf("Post secondary so it should return immediately\n");
rc = Thread_signal_cond(cond);
assert("rc 0 from signal cond", rc == 0, "rc was %d", rc);
printf("Starting secondary thread\n");
thread = Thread_start(cond_secondary, (void*)cond);
sleep(3);
printf("post secondary\n");
rc = Thread_signal_cond(cond);
assert("rc 1 from signal cond", rc == 1, "rc was %d", rc);
sleep(3);
printf("Main thread ending\n");
return failures;
}
thread_return_type sem_secondary(void* n)
{
int rc = 0;
sem_type sem = n; sem_type sem = n;
printf("Secondary semaphore pointer %p\n", sem);
rc = Thread_check_sem(sem);
assert("rc 1 from check_sem", rc == 1, "rc was %d", rc);
printf("Secondary thread about to wait\n"); printf("Secondary thread about to wait\n");
rc = Thread_wait_sem(sem); rc = Thread_wait_sem(sem, 99999);
printf("Secondary thread returned from wait %d\n", rc); printf("Secondary thread returned from wait %d\n", rc);
printf("Secondary thread about to wait\n"); printf("Secondary thread about to wait\n");
rc = Thread_wait_sem(sem); rc = Thread_wait_sem(sem, 99999);
printf("Secondary thread returned from wait %d\n", rc); printf("Secondary thread returned from wait %d\n", rc);
printf("Secondary check sem %d\n", Thread_check_sem(sem)); printf("Secondary check sem %d\n", Thread_check_sem(sem));
printf("Secondary thread ending\n");
return 0; return 0;
} }
int main(int argc, char *argv[]) int sem_test()
{ {
int rc = 0; int rc = 0;
sem_type sem = Thread_create_sem(); sem_type sem = Thread_create_sem();
thread_type thread;
printf("check sem %d\n", Thread_check_sem(sem)); printf("Primary semaphore pointer %p\n", sem);
printf("post secondary\n"); rc = Thread_check_sem(sem);
assert("rc 0 from check_sem", rc == 0, "rc was %d\n", rc);
printf("post secondary so then check should be 1\n");
rc = Thread_post_sem(sem); rc = Thread_post_sem(sem);
printf("posted secondary %d\n", rc); assert("rc 0 from post_sem", rc == 0, "rc was %d\n", rc);
printf("check sem %d\n", Thread_check_sem(sem)); rc = Thread_check_sem(sem);
assert("rc 1 from check_sem", rc == 1, "rc was %d", rc);
printf("Starting secondary thread\n"); printf("Starting secondary thread\n");
Thread_start(secondary, (void*)sem); thread = Thread_start(sem_secondary, (void*)sem);
sleep(3); sleep(3);
printf("check sem %d\n", Thread_check_sem(sem)); rc = Thread_check_sem(sem);
assert("rc 1 from check_sem", rc == 1, "rc was %d", rc);
printf("post secondary\n"); printf("post secondary\n");
rc = Thread_post_sem(sem); rc = Thread_post_sem(sem);
printf("posted secondary %d\n", rc); assert("rc 1 from post_sem", rc == 1, "rc was %d", rc);
sleep(3); sleep(3);
printf("Main thread ending\n"); printf("Main thread ending\n");
return failures;
}
int main(int argc, char *argv[])
{
sem_test();
//cond_test();
} }
#endif #endif
...@@ -31,6 +31,16 @@ IF (PAHO_WITH_SSL) ...@@ -31,6 +31,16 @@ IF (PAHO_WITH_SSL)
) )
ENDIF () ENDIF ()
ADD_EXECUTABLE(
thread
thread.c ../src/Thread.c
)
SET_TARGET_PROPERTIES(
thread PROPERTIES
COMPILE_DEFINITIONS "NOSTACKTRACE"
)
ADD_EXECUTABLE( ADD_EXECUTABLE(
test1 test1
test1.c test1.c
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment