Commit 149a9ea2 authored by Ian Craggs's avatar Ian Craggs

Change global init parameter to structure, issue #136

parent dd6f1b7c
...@@ -66,10 +66,10 @@ ...@@ -66,10 +66,10 @@
const char *client_timestamp_eye = "MQTTAsyncV3_Timestamp " BUILD_TIMESTAMP; const char *client_timestamp_eye = "MQTTAsyncV3_Timestamp " BUILD_TIMESTAMP;
const char *client_version_eye = "MQTTAsyncV3_Version " CLIENT_VERSION; const char *client_version_eye = "MQTTAsyncV3_Version " CLIENT_VERSION;
void MQTTAsync_global_init(int handle_openssl_init) void MQTTAsync_global_init(MQTTAsync_init_options* inits)
{ {
#if defined(OPENSSL) #if defined(OPENSSL)
SSLSocket_handleOpensslInit(handle_openssl_init); SSLSocket_handleOpensslInit(inits->do_openssl_init);
#endif #endif
} }
...@@ -1374,7 +1374,7 @@ static void nextOrClose(MQTTAsyncs* m, int rc, char* message) ...@@ -1374,7 +1374,7 @@ static void nextOrClose(MQTTAsyncs* m, int rc, char* message)
(*(m->connect.onFailure))(m->connect.context, &data); (*(m->connect.onFailure))(m->connect.context, &data);
} }
MQTTAsync_startConnectRetry(m); MQTTAsync_startConnectRetry(m);
} }
} }
...@@ -2159,7 +2159,7 @@ static int retryLoopInterval = 5; ...@@ -2159,7 +2159,7 @@ static int retryLoopInterval = 5;
static void setRetryLoopInterval(int keepalive) static void setRetryLoopInterval(int keepalive)
{ {
int proposed = keepalive / 10; int proposed = keepalive / 10;
if (proposed < 1) if (proposed < 1)
proposed = 1; proposed = 1;
else if (proposed > 5) else if (proposed > 5)
......
This diff is collapsed.
...@@ -3,17 +3,17 @@ ...@@ -3,17 +3,17 @@
* *
* All rights reserved. This program and the accompanying materials * All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0 * are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution. * and Eclipse Distribution License v1.0 which accompany this distribution.
* *
* The Eclipse Public License is available at * The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html * http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at * and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php. * http://www.eclipse.org/org/documents/edl-v10.php.
* *
* Contributors: * Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation * Ian Craggs - initial API and implementation and/or initial documentation
* Ian Craggs - bug 384016 - segv setting will message * Ian Craggs - bug 384016 - segv setting will message
* Ian Craggs - bug 384053 - v1.0.0.7 - stop MQTTClient_receive on socket error * Ian Craggs - bug 384053 - v1.0.0.7 - stop MQTTClient_receive on socket error
* Ian Craggs, Allan Stockdill-Mander - add ability to connect with SSL * Ian Craggs, Allan Stockdill-Mander - add ability to connect with SSL
* Ian Craggs - multiple server connection support * Ian Craggs - multiple server connection support
* Ian Craggs - fix for bug 413429 - connectionLost not called * Ian Craggs - fix for bug 413429 - connectionLost not called
...@@ -73,10 +73,10 @@ ...@@ -73,10 +73,10 @@
const char *client_timestamp_eye = "MQTTClientV3_Timestamp " BUILD_TIMESTAMP; const char *client_timestamp_eye = "MQTTClientV3_Timestamp " BUILD_TIMESTAMP;
const char *client_version_eye = "MQTTClientV3_Version " CLIENT_VERSION; const char *client_version_eye = "MQTTClientV3_Version " CLIENT_VERSION;
void MQTTClient_global_init(int handle_openssl_init) void MQTTClient_global_init(MQTTClient_init_options* inits)
{ {
#if defined(OPENSSL) #if defined(OPENSSL)
SSLSocket_handleOpensslInit(handle_openssl_init); SSLSocket_handleOpensslInit(inits->do_openssl_init);
#endif #endif
} }
...@@ -567,7 +567,7 @@ static thread_return_type WINAPI MQTTClient_run(void* n) ...@@ -567,7 +567,7 @@ static thread_return_type WINAPI MQTTClient_run(void* n)
{ {
if (m->c->connected) if (m->c->connected)
MQTTClient_disconnect_internal(m, 0); MQTTClient_disconnect_internal(m, 0);
else else
{ {
if (m->c->connect_state == 2 && !Thread_check_sem(m->connect_sem)) if (m->c->connect_state == 2 && !Thread_check_sem(m->connect_sem))
{ {
...@@ -645,7 +645,7 @@ static thread_return_type WINAPI MQTTClient_run(void* n) ...@@ -645,7 +645,7 @@ static thread_return_type WINAPI MQTTClient_run(void* n)
} }
#if defined(OPENSSL) #if defined(OPENSSL)
else if (m->c->connect_state == 2 && !Thread_check_sem(m->connect_sem)) else if (m->c->connect_state == 2 && !Thread_check_sem(m->connect_sem))
{ {
rc = SSLSocket_connect(m->c->net.ssl, m->c->net.socket); rc = SSLSocket_connect(m->c->net.ssl, m->c->net.socket);
if (rc == 1 || rc == SSL_FATAL) if (rc == 1 || rc == SSL_FATAL)
{ {
...@@ -869,7 +869,7 @@ static int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOpt ...@@ -869,7 +869,7 @@ static int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOpt
rc = SOCKET_ERROR; rc = SOCKET_ERROR;
goto exit; goto exit;
} }
#if defined(OPENSSL) #if defined(OPENSSL)
if (m->ssl) if (m->ssl)
{ {
...@@ -895,7 +895,7 @@ static int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOpt ...@@ -895,7 +895,7 @@ static int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOpt
rc = SOCKET_ERROR; rc = SOCKET_ERROR;
goto exit; goto exit;
} }
else if (rc == 1) else if (rc == 1)
{ {
rc = MQTTCLIENT_SUCCESS; rc = MQTTCLIENT_SUCCESS;
m->c->connect_state = 3; m->c->connect_state = 3;
...@@ -927,7 +927,7 @@ static int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOpt ...@@ -927,7 +927,7 @@ static int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOpt
} }
#endif #endif
} }
#if defined(OPENSSL) #if defined(OPENSSL)
if (m->c->connect_state == 2) /* SSL connect sent - wait for completion */ if (m->c->connect_state == 2) /* SSL connect sent - wait for completion */
{ {
...@@ -994,9 +994,9 @@ exit: ...@@ -994,9 +994,9 @@ exit:
if (rc == MQTTCLIENT_SUCCESS) if (rc == MQTTCLIENT_SUCCESS)
{ {
if (options->struct_version == 4) /* means we have to fill out return values */ if (options->struct_version == 4) /* means we have to fill out return values */
{ {
options->returned.serverURI = serverURI; options->returned.serverURI = serverURI;
options->returned.MQTTVersion = MQTTVersion; options->returned.MQTTVersion = MQTTVersion;
options->returned.sessionPresent = sessionPresent; options->returned.sessionPresent = sessionPresent;
} }
} }
...@@ -1011,7 +1011,7 @@ static int retryLoopInterval = 5; ...@@ -1011,7 +1011,7 @@ static int retryLoopInterval = 5;
static void setRetryLoopInterval(int keepalive) static void setRetryLoopInterval(int keepalive)
{ {
int proposed = keepalive / 10; int proposed = keepalive / 10;
if (proposed < 1) if (proposed < 1)
proposed = 1; proposed = 1;
else if (proposed > 5) else if (proposed > 5)
...@@ -1049,7 +1049,7 @@ static int MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions* o ...@@ -1049,7 +1049,7 @@ static int MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions* o
if (options->will && (options->will->struct_version == 0 || options->will->struct_version == 1)) if (options->will && (options->will->struct_version == 0 || options->will->struct_version == 1))
{ {
const void* source = NULL; const void* source = NULL;
m->c->will = malloc(sizeof(willMessages)); m->c->will = malloc(sizeof(willMessages));
if (options->will->message || (options->will->struct_version == 1 && options->will->payload.data)) if (options->will->message || (options->will->struct_version == 1 && options->will->payload.data))
{ {
...@@ -1066,7 +1066,7 @@ static int MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions* o ...@@ -1066,7 +1066,7 @@ static int MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions* o
m->c->will->payload = malloc(m->c->will->payloadlen); m->c->will->payload = malloc(m->c->will->payloadlen);
memcpy(m->c->will->payload, source, m->c->will->payloadlen); memcpy(m->c->will->payload, source, m->c->will->payloadlen);
} }
else else
{ {
m->c->will->payload = NULL; m->c->will->payload = NULL;
m->c->will->payloadlen = 0; m->c->will->payloadlen = 0;
...@@ -1075,7 +1075,7 @@ static int MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions* o ...@@ -1075,7 +1075,7 @@ static int MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions* o
m->c->will->retained = options->will->retained; m->c->will->retained = options->will->retained;
m->c->will->topic = MQTTStrdup(options->will->topicName); m->c->will->topic = MQTTStrdup(options->will->topicName);
} }
#if defined(OPENSSL) #if defined(OPENSSL)
if (m->c->sslopts) if (m->c->sslopts)
{ {
...@@ -1169,7 +1169,7 @@ int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions* options) ...@@ -1169,7 +1169,7 @@ int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions* options)
goto exit; goto exit;
} }
} }
#if defined(OPENSSL) #if defined(OPENSSL)
if (options->struct_version != 0 && options->ssl) /* check validity of SSL options structure */ if (options->struct_version != 0 && options->ssl) /* check validity of SSL options structure */
{ {
...@@ -1193,11 +1193,11 @@ int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions* options) ...@@ -1193,11 +1193,11 @@ int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions* options)
else else
{ {
int i; int i;
for (i = 0; i < options->serverURIcount; ++i) for (i = 0; i < options->serverURIcount; ++i)
{ {
char* serverURI = options->serverURIs[i]; char* serverURI = options->serverURIs[i];
if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0) if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
serverURI += strlen(URI_TCP); serverURI += strlen(URI_TCP);
#if defined(OPENSSL) #if defined(OPENSSL)
...@@ -1209,7 +1209,7 @@ int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions* options) ...@@ -1209,7 +1209,7 @@ int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions* options)
#endif #endif
if ((rc = MQTTClient_connectURI(handle, options, serverURI)) == MQTTCLIENT_SUCCESS) if ((rc = MQTTClient_connectURI(handle, options, serverURI)) == MQTTCLIENT_SUCCESS)
break; break;
} }
} }
exit: exit:
...@@ -1362,7 +1362,7 @@ int MQTTClient_subscribeMany(MQTTClient handle, int count, char* const* topic, i ...@@ -1362,7 +1362,7 @@ int MQTTClient_subscribeMany(MQTTClient handle, int count, char* const* topic, i
rc = MQTTCLIENT_BAD_UTF8_STRING; rc = MQTTCLIENT_BAD_UTF8_STRING;
goto exit; goto exit;
} }
if(qos[i] < 0 || qos[i] > 2) if(qos[i] < 0 || qos[i] > 2)
{ {
rc = MQTTCLIENT_BAD_QOS; rc = MQTTCLIENT_BAD_QOS;
...@@ -1396,14 +1396,14 @@ int MQTTClient_subscribeMany(MQTTClient handle, int count, char* const* topic, i ...@@ -1396,14 +1396,14 @@ int MQTTClient_subscribeMany(MQTTClient handle, int count, char* const* topic, i
Thread_lock_mutex(mqttclient_mutex); Thread_lock_mutex(mqttclient_mutex);
if (pack != NULL) if (pack != NULL)
{ {
Suback* sub = (Suback*)pack; Suback* sub = (Suback*)pack;
ListElement* current = NULL; ListElement* current = NULL;
i = 0; i = 0;
while (ListNextElement(sub->qoss, &current)) while (ListNextElement(sub->qoss, &current))
{ {
int* reqqos = (int*)(current->content); int* reqqos = (int*)(current->content);
qos[i++] = *reqqos; qos[i++] = *reqqos;
} }
rc = MQTTProtocol_handleSubacks(pack, m->c->net.socket); rc = MQTTProtocol_handleSubacks(pack, m->c->net.socket);
m->pack = NULL; m->pack = NULL;
} }
...@@ -1541,7 +1541,7 @@ int MQTTClient_publish(MQTTClient handle, const char* topicName, int payloadlen, ...@@ -1541,7 +1541,7 @@ int MQTTClient_publish(MQTTClient handle, const char* topicName, int payloadlen,
goto exit; goto exit;
/* If outbound queue is full, block until it is not */ /* If outbound queue is full, block until it is not */
while (m->c->outboundMsgs->count >= m->c->maxInflightMessages || while (m->c->outboundMsgs->count >= m->c->maxInflightMessages ||
Socket_noPendingWrites(m->c->net.socket) == 0) /* wait until the socket is free of large packets being written */ Socket_noPendingWrites(m->c->net.socket) == 0) /* wait until the socket is free of large packets being written */
{ {
if (blocked == 0) if (blocked == 0)
...@@ -1865,7 +1865,7 @@ int MQTTClient_receive(MQTTClient handle, char** topicName, int* topicLen, MQTTC ...@@ -1865,7 +1865,7 @@ int MQTTClient_receive(MQTTClient handle, char** topicName, int* topicLen, MQTTC
{ {
int sock = 0; int sock = 0;
MQTTClient_cycle(&sock, (timeout > elapsed) ? timeout - elapsed : 0L, &rc); MQTTClient_cycle(&sock, (timeout > elapsed) ? timeout - elapsed : 0L, &rc);
if (rc == SOCKET_ERROR) if (rc == SOCKET_ERROR)
{ {
if (ListFindItem(handles, &sock, clientSockCompare) && /* find client corresponding to socket */ if (ListFindItem(handles, &sock, clientSockCompare) && /* find client corresponding to socket */
...@@ -2075,17 +2075,17 @@ static void MQTTProtocol_checkPendingWrites(void) ...@@ -2075,17 +2075,17 @@ static void MQTTProtocol_checkPendingWrites(void)
static void MQTTClient_writeComplete(int socket) static void MQTTClient_writeComplete(int socket)
{ {
ListElement* found = NULL; ListElement* found = NULL;
FUNC_ENTRY; FUNC_ENTRY;
/* a partial write is now complete for a socket - this will be on a publish*/ /* a partial write is now complete for a socket - this will be on a publish*/
MQTTProtocol_checkPendingWrites(); MQTTProtocol_checkPendingWrites();
/* find the client using this socket */ /* find the client using this socket */
if ((found = ListFindItem(handles, &socket, clientSockCompare)) != NULL) if ((found = ListFindItem(handles, &socket, clientSockCompare)) != NULL)
{ {
MQTTClients* m = (MQTTClients*)(found->content); MQTTClients* m = (MQTTClients*)(found->content);
time(&(m->c->net.lastSent)); time(&(m->c->net.lastSent));
} }
FUNC_EXIT; FUNC_EXIT;
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment