Commit 69d9be4d authored by Ian Craggs's avatar Ian Craggs

Merge branch 'develop'

Conflicts:
	CONTRIBUTING.md
parents 4e619837 c4a474b2
...@@ -8,7 +8,7 @@ Project description: ...@@ -8,7 +8,7 @@ Project description:
The Paho project has been created to provide scalable open-source implementations of open and standard messaging protocols aimed at new, existing, and emerging applications for Machine-to-Machine (M2M) and Internet of Things (IoT). The Paho project has been created to provide scalable open-source implementations of open and standard messaging protocols aimed at new, existing, and emerging applications for Machine-to-Machine (M2M) and Internet of Things (IoT).
Paho reflects the inherent physical and cost constraints of device connectivity. Its objectives include effective levels of decoupling between devices and applications, designed to keep markets open and encourage the rapid growth of scalable Web and Enterprise middleware and applications. Paho is being kicked off with MQTT publish/subscribe client implementations for use on embedded platforms, along with corresponding server support as determined by the community. Paho reflects the inherent physical and cost constraints of device connectivity. Its objectives include effective levels of decoupling between devices and applications, designed to keep markets open and encourage the rapid growth of scalable Web and Enterprise middleware and applications. Paho is being kicked off with MQTT publish/subscribe client implementations for use on embedded platforms, along with corresponding server support as determined by the community.
- [Project web site](https://www.eclipse.org/paho) - [Project web site](https://www.eclipse.org/paho)
- [Project information](https://projects.eclipse.org/projects/iot.paho) - [Project information](https://projects.eclipse.org/projects/iot.paho)
...@@ -52,9 +52,10 @@ change. ...@@ -52,9 +52,10 @@ change.
What happens next depends on the content of the patch. If it is 100% authored What happens next depends on the content of the patch. If it is 100% authored
by the contributor and is less than 1000 lines (and meets the needs of the by the contributor and is less than 1000 lines (and meets the needs of the
project), then it can be committed to the main repository. If not, more steps project), then it can be committed to the main repository. If not, more steps
are required. These are detailed in the are required. These are detailed in the
[legal process poster](http://www.eclipse.org/legal/EclipseLegalProcessPoster.pdf). [legal process poster](http://www.eclipse.org/legal/EclipseLegalProcessPoster.pdf).
Developer resources: Developer resources:
-------------------- --------------------
...@@ -70,7 +71,7 @@ Before your contribution can be accepted by the project, you need to create and ...@@ -70,7 +71,7 @@ Before your contribution can be accepted by the project, you need to create and
Contact: Contact:
-------- --------
Contact the project developers via the project's development Contact the project developers via the project's development
[mailing list](https://dev.eclipse.org/mailman/listinfo/paho-dev). [mailing list](https://dev.eclipse.org/mailman/listinfo/paho-dev).
Search for bugs: Search for bugs:
......
<!--******************************************************************************* <!--*******************************************************************************
Copyright (c) 2012, 2013 IBM Corp. Copyright (c) 2012, 2014 IBM Corp.
All rights reserved. This program and the accompanying materials All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License v1.0 are made available under the terms of the Eclipse Public License v1.0
...@@ -101,7 +101,7 @@ ...@@ -101,7 +101,7 @@
<else> <else>
<exec executable="./${aTest}" failonerror="true" dir="${output.folder}/test" > <exec executable="./${aTest}" failonerror="true" dir="${output.folder}/test" >
<arg value="--connection" /> <arg value="--connection" />
<arg value="tcp://${test.hostname}:${test.port}" /> <arg value="tcp://${test.hostname}:1883" />
<env key="LD_LIBRARY_PATH" path="${output.folder}" /> <env key="LD_LIBRARY_PATH" path="${output.folder}" />
<env key="DYLD_LIBRARY_PATH" path="${output.folder}" /> <env key="DYLD_LIBRARY_PATH" path="${output.folder}" />
</exec> </exec>
......
...@@ -149,7 +149,8 @@ BE*/ ...@@ -149,7 +149,8 @@ BE*/
typedef struct typedef struct
{ {
int socket; int socket;
time_t lastContact; time_t lastSent;
time_t lastReceived;
#if defined(OPENSSL) #if defined(OPENSSL)
SSL* ssl; SSL* ssl;
SSL_CTX* ctx; SSL_CTX* ctx;
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
* Ian Craggs - fix for bug 432903 - queue persistence * Ian Craggs - fix for bug 432903 - queue persistence
* Ian Craggs - MQTT 3.1.1 support * Ian Craggs - MQTT 3.1.1 support
* Rong Xiang, Ian Craggs - C++ compatibility * Rong Xiang, Ian Craggs - C++ compatibility
* Ian Craggs - fix for bug 442400: reconnecting after network cable unplugged
*******************************************************************************/ *******************************************************************************/
/** /**
...@@ -885,7 +886,7 @@ void MQTTAsync_writeComplete(int socket) ...@@ -885,7 +886,7 @@ void MQTTAsync_writeComplete(int socket)
{ {
MQTTAsyncs* m = (MQTTAsyncs*)(found->content); MQTTAsyncs* m = (MQTTAsyncs*)(found->content);
time(&(m->c->net.lastContact)); time(&(m->c->net.lastSent));
/* see if there is a pending write flagged */ /* see if there is a pending write flagged */
if (m->pending_write) if (m->pending_write)
...@@ -1257,14 +1258,22 @@ thread_return_type WINAPI MQTTAsync_sendThread(void* n) ...@@ -1257,14 +1258,22 @@ thread_return_type WINAPI MQTTAsync_sendThread(void* n)
MQTTAsync_unlock_mutex(mqttasync_mutex); MQTTAsync_unlock_mutex(mqttasync_mutex);
while (!tostop) while (!tostop)
{ {
/*int rc;*/ int rc;
while (commands->count > 0) while (commands->count > 0)
{
int before = commands->count;
MQTTAsync_processCommand(); MQTTAsync_processCommand();
if (before == commands->count)
break; /* no commands were processed, so go into a wait */
}
#if !defined(WIN32) && !defined(WIN64) #if !defined(WIN32) && !defined(WIN64)
/*rc =*/ Thread_wait_cond(send_cond, 1); rc = Thread_wait_cond(send_cond, 1);
if ((rc = Thread_wait_cond(send_cond, 1)) != 0 && rc != ETIMEDOUT)
Log(LOG_ERROR, -1, "Error %d waiting for condition variable", rc);
#else #else
/*rc =*/ Thread_wait_sem(send_sem, 1000); if ((rc = Thread_wait_sem(send_sem, 1000)) != 0 && rc != ETIMEDOUT)
Log(LOG_ERROR, -1, "Error %d waiting for semaphore", rc);
#endif #endif
MQTTAsync_checkTimeouts(); MQTTAsync_checkTimeouts();
...@@ -1420,12 +1429,13 @@ int MQTTAsync_completeConnection(MQTTAsyncs* m, MQTTPacket* pack) ...@@ -1420,12 +1429,13 @@ int MQTTAsync_completeConnection(MQTTAsyncs* m, MQTTPacket* pack)
if (m->c->outboundMsgs->count > 0) if (m->c->outboundMsgs->count > 0)
{ {
ListElement* outcurrent = NULL; ListElement* outcurrent = NULL;
while (ListNextElement(m->c->outboundMsgs, &outcurrent)) while (ListNextElement(m->c->outboundMsgs, &outcurrent))
{ {
Messages* m = (Messages*)(outcurrent->content); Messages* m = (Messages*)(outcurrent->content);
m->lastTouch = 0; m->lastTouch = 0;
} }
MQTTProtocol_retry(m->c->net.lastContact, 1); MQTTProtocol_retry((time_t)0, 1, 1);
if (m->c->connected != 1) if (m->c->connected != 1)
rc = MQTTASYNC_DISCONNECTED; rc = MQTTASYNC_DISCONNECTED;
} }
...@@ -1459,23 +1469,33 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n) ...@@ -1459,23 +1469,33 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
break; break;
timeout = 1000L; timeout = 1000L;
if (sock == 0)
continue;
/* find client corresponding to socket */ /* find client corresponding to socket */
if (ListFindItem(handles, &sock, clientSockCompare) == NULL) if (ListFindItem(handles, &sock, clientSockCompare) == NULL)
{ {
/* assert: should not happen */ Log(TRACE_MINIMUM, -1, "Could not find client corresponding to socket %d", sock);
/* Socket_close(sock); - removing socket in this case is not necessary (Bug 442400) */
continue; continue;
} }
m = (MQTTAsyncs*)(handles->current->content); m = (MQTTAsyncs*)(handles->current->content);
if (m == NULL) if (m == NULL)
{ {
/* assert: should not happen */ Log(LOG_ERROR, -1, "Client structure was NULL for socket %d - removing socket", sock);
Socket_close(sock);
continue; continue;
} }
if (rc == SOCKET_ERROR) if (rc == SOCKET_ERROR)
{ {
MQTTAsync_unlock_mutex(mqttasync_mutex); Log(TRACE_MINIMUM, -1, "Error from MQTTAsync_cycle() - removing socket %d", sock);
MQTTAsync_disconnect_internal(m, 0); if (m->c->connected == 1)
MQTTAsync_lock_mutex(mqttasync_mutex); {
MQTTAsync_unlock_mutex(mqttasync_mutex);
MQTTAsync_disconnect_internal(m, 0);
MQTTAsync_lock_mutex(mqttasync_mutex);
}
else /* calling disconnect_internal won't have any effect if we're already disconnected */
MQTTAsync_closeOnly(m->c);
} }
else else
{ {
...@@ -2357,10 +2377,10 @@ void MQTTAsync_retry(void) ...@@ -2357,10 +2377,10 @@ void MQTTAsync_retry(void)
{ {
time(&(last)); time(&(last));
MQTTProtocol_keepalive(now); MQTTProtocol_keepalive(now);
MQTTProtocol_retry(now, 1); MQTTProtocol_retry(now, 1, 0);
} }
else else
MQTTProtocol_retry(now, 0); MQTTProtocol_retry(now, 0, 0);
FUNC_EXIT; FUNC_EXIT;
} }
...@@ -2392,8 +2412,11 @@ int MQTTAsync_connecting(MQTTAsyncs* m) ...@@ -2392,8 +2412,11 @@ int MQTTAsync_connecting(MQTTAsyncs* m)
if ((rc = SSL_set_session(m->c->net.ssl, m->c->session)) != 1) if ((rc = SSL_set_session(m->c->net.ssl, m->c->session)) != 1)
Log(TRACE_MIN, -1, "Failed to set SSL session with stored data, non critical"); Log(TRACE_MIN, -1, "Failed to set SSL session with stored data, non critical");
rc = SSLSocket_connect(m->c->net.ssl, m->c->net.socket); rc = SSLSocket_connect(m->c->net.ssl, m->c->net.socket);
if (rc == -1) if (rc == TCPSOCKET_INTERRUPTED)
{
rc = MQTTCLIENT_SUCCESS; /* the connect is still in progress */
m->c->connect_state = 2; m->c->connect_state = 2;
}
else if (rc == SSL_FATAL) else if (rc == SSL_FATAL)
{ {
rc = SOCKET_ERROR; rc = SOCKET_ERROR;
...@@ -2497,11 +2520,13 @@ MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc) ...@@ -2497,11 +2520,13 @@ MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
if (!tostop && *sock == 0 && (tp.tv_sec > 0L || tp.tv_usec > 0L)) if (!tostop && *sock == 0 && (tp.tv_sec > 0L || tp.tv_usec > 0L))
{ {
MQTTAsync_sleep(100L); MQTTAsync_sleep(100L);
#if 0
if (s.clientsds->count == 0) if (s.clientsds->count == 0)
{ {
if (++nosockets_count == 50) /* 5 seconds with no sockets */ if (++nosockets_count == 50) /* 5 seconds with no sockets */
tostop = 1; tostop = 1;
} }
#endif
} }
else else
nosockets_count = 0; nosockets_count = 0;
...@@ -2520,19 +2545,19 @@ MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc) ...@@ -2520,19 +2545,19 @@ MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
*rc = MQTTAsync_connecting(m); *rc = MQTTAsync_connecting(m);
else else
pack = MQTTPacket_Factory(&m->c->net, rc); pack = MQTTPacket_Factory(&m->c->net, rc);
if ((m->c->connect_state == 3) && (*rc == SOCKET_ERROR)) if (m->c->connect_state == 3 && *rc == SOCKET_ERROR)
{ {
Log(TRACE_MINIMUM, -1, "CONNECT sent but MQTTPacket_Factory has returned SOCKET_ERROR"); Log(TRACE_MINIMUM, -1, "CONNECT sent but MQTTPacket_Factory has returned SOCKET_ERROR");
if (MQTTAsync_checkConn(&m->connect, m)) if (MQTTAsync_checkConn(&m->connect, m))
{ {
MQTTAsync_queuedCommand* conn; MQTTAsync_queuedCommand* conn;
MQTTAsync_closeOnly(m->c); MQTTAsync_closeOnly(m->c);
/* put the connect command back to the head of the command queue, using the next serverURI */ /* put the connect command back to the head of the command queue, using the next serverURI */
conn = malloc(sizeof(MQTTAsync_queuedCommand)); conn = malloc(sizeof(MQTTAsync_queuedCommand));
memset(conn, '\0', sizeof(MQTTAsync_queuedCommand)); memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
conn->client = m; conn->client = m;
conn->command = m->connect; conn->command = m->connect;
Log(TRACE_MIN, -1, "Connect failed, more to try"); Log(TRACE_MIN, -1, "Connect failed, more to try");
MQTTAsync_addCommand(conn, sizeof(m->connect)); MQTTAsync_addCommand(conn, sizeof(m->connect));
} }
...@@ -2632,6 +2657,8 @@ int MQTTAsync_getPendingTokens(MQTTAsync handle, MQTTAsync_token **tokens) ...@@ -2632,6 +2657,8 @@ int MQTTAsync_getPendingTokens(MQTTAsync handle, MQTTAsync_token **tokens)
int rc = MQTTASYNC_SUCCESS; int rc = MQTTASYNC_SUCCESS;
MQTTAsyncs* m = handle; MQTTAsyncs* m = handle;
*tokens = NULL; *tokens = NULL;
ListElement* current = NULL;
int count = 0;
FUNC_ENTRY; FUNC_ENTRY;
MQTTAsync_lock_mutex(mqttasync_mutex); MQTTAsync_lock_mutex(mqttasync_mutex);
...@@ -2642,19 +2669,87 @@ int MQTTAsync_getPendingTokens(MQTTAsync handle, MQTTAsync_token **tokens) ...@@ -2642,19 +2669,87 @@ int MQTTAsync_getPendingTokens(MQTTAsync handle, MQTTAsync_token **tokens)
goto exit; goto exit;
} }
if (m->c && m->c->outboundMsgs->count > 0) /* calculate the number of pending tokens - commands plus inflight */
while (ListNextElement(commands, &current))
{ {
ListElement* current = NULL; MQTTAsync_queuedCommand* cmd = (MQTTAsync_queuedCommand*)(current->content);
int count = 0;
*tokens = malloc(sizeof(MQTTAsync_token) * (m->c->outboundMsgs->count + 1)); if (cmd->client == m)
count++;
}
if (m->c)
count += m->c->outboundMsgs->count;
if (count == 0)
goto exit; /* no tokens to return */
*tokens = malloc(sizeof(MQTTAsync_token) * (count + 1)); /* add space for sentinel at end of list */
/* First add the unprocessed commands to the pending tokens */
current = NULL;
count = 0;
while (ListNextElement(commands, &current))
{
MQTTAsync_queuedCommand* cmd = (MQTTAsync_queuedCommand*)(current->content);
if (cmd->client == m)
(*tokens)[count++] = cmd->command.token;
}
/* Now add the inflight messages */
if (m->c && m->c->outboundMsgs->count > 0)
{
current = NULL;
while (ListNextElement(m->c->outboundMsgs, &current)) while (ListNextElement(m->c->outboundMsgs, &current))
{ {
Messages* m = (Messages*)(current->content); Messages* m = (Messages*)(current->content);
(*tokens)[count++] = m->msgid; (*tokens)[count++] = m->msgid;
} }
(*tokens)[count] = -1;
} }
(*tokens)[count] = -1; /* indicate end of list */
exit:
MQTTAsync_unlock_mutex(mqttasync_mutex);
FUNC_EXIT_RC(rc);
return rc;
}
int MQTTAsync_isComplete(MQTTAsync handle, MQTTAsync_token dt)
{
int rc = MQTTASYNC_SUCCESS;
MQTTAsyncs* m = handle;
ListElement* current = NULL;
FUNC_ENTRY;
MQTTAsync_lock_mutex(mqttasync_mutex);
if (m == NULL)
{
rc = MQTTASYNC_FAILURE;
goto exit;
}
/* First check unprocessed commands */
current = NULL;
while (ListNextElement(commands, &current))
{
MQTTAsync_queuedCommand* cmd = (MQTTAsync_queuedCommand*)(current->content);
if (cmd->client == m && cmd->command.token == dt)
goto exit;
}
/* Now check the inflight messages */
if (m->c && m->c->outboundMsgs->count > 0)
{
current = NULL;
while (ListNextElement(m->c->outboundMsgs, &current))
{
Messages* m = (Messages*)(current->content);
if (m->msgid == dt)
goto exit;
}
}
rc = MQTTASYNC_TRUE; /* Can't find it, so it must be complete */
exit: exit:
MQTTAsync_unlock_mutex(mqttasync_mutex); MQTTAsync_unlock_mutex(mqttasync_mutex);
...@@ -2663,6 +2758,51 @@ exit: ...@@ -2663,6 +2758,51 @@ exit:
} }
int MQTTAsync_waitForCompletion(MQTTAsync handle, MQTTAsync_token dt, unsigned long timeout)
{
int rc = MQTTASYNC_FAILURE;
START_TIME_TYPE start = MQTTAsync_start_clock();
unsigned long elapsed = 0L;
MQTTAsyncs* m = handle;
FUNC_ENTRY;
MQTTAsync_lock_mutex(mqttasync_mutex);
if (m == NULL || m->c == NULL)
{
rc = MQTTASYNC_FAILURE;
goto exit;
}
if (m->c->connected == 0)
{
rc = MQTTASYNC_DISCONNECTED;
goto exit;
}
MQTTAsync_unlock_mutex(mqttasync_mutex);
if (MQTTAsync_isComplete(handle, dt) == 1)
{
rc = MQTTASYNC_SUCCESS; /* well we couldn't find it */
goto exit;
}
elapsed = MQTTAsync_elapsed(start);
while (elapsed < timeout)
{
MQTTAsync_sleep(100);
if (MQTTAsync_isComplete(handle, dt) == 1)
{
rc = MQTTASYNC_SUCCESS; /* well we couldn't find it */
goto exit;
}
elapsed = MQTTAsync_elapsed(start);
}
exit:
FUNC_EXIT_RC(rc);
return rc;
}
void MQTTAsync_setTraceLevel(enum MQTTASYNC_TRACE_LEVELS level) void MQTTAsync_setTraceLevel(enum MQTTASYNC_TRACE_LEVELS level)
{ {
......
...@@ -693,7 +693,7 @@ typedef struct ...@@ -693,7 +693,7 @@ typedef struct
} MQTTAsync_connectOptions; } MQTTAsync_connectOptions;
#define MQTTAsync_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 3, 60, 1, 10, NULL, NULL, NULL, 30, 20, NULL, NULL, 0, NULL, 0} #define MQTTAsync_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 3, 60, 1, 10, NULL, NULL, NULL, 30, 0, NULL, NULL, NULL, NULL, 0, NULL, 0}
/** /**
* This function attempts to connect a previously-created client (see * This function attempts to connect a previously-created client (see
...@@ -908,6 +908,11 @@ DLLExport int MQTTAsync_sendMessage(MQTTAsync handle, const char* destinationNam ...@@ -908,6 +908,11 @@ DLLExport int MQTTAsync_sendMessage(MQTTAsync handle, const char* destinationNam
*/ */
DLLExport int MQTTAsync_getPendingTokens(MQTTAsync handle, MQTTAsync_token **tokens); DLLExport int MQTTAsync_getPendingTokens(MQTTAsync handle, MQTTAsync_token **tokens);
#define MQTTASYNC_TRUE 1
DLLExport int MQTTAsync_isComplete(MQTTAsync handle, MQTTAsync_token dt);
DLLExport int MQTTAsync_waitForCompletion(MQTTAsync handle, MQTTAsync_token dt, unsigned long timeout);
/** /**
* This function frees memory allocated to an MQTT message, including the * This function frees memory allocated to an MQTT message, including the
......
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
* Ian Craggs - MQTT 3.1.1 support * Ian Craggs - MQTT 3.1.1 support
* Ian Craggs - fix for bug 438176 - MQTT version selection * Ian Craggs - fix for bug 438176 - MQTT version selection
* Rong Xiang, Ian Craggs - C++ compatibility * Rong Xiang, Ian Craggs - C++ compatibility
* Ian Craggs - fix for bug 443724 - stack corruption
*******************************************************************************/ *******************************************************************************/
/** /**
...@@ -424,11 +425,11 @@ int MQTTClient_deliverMessage(int rc, MQTTClients* m, char** topicName, int* top ...@@ -424,11 +425,11 @@ int MQTTClient_deliverMessage(int rc, MQTTClients* m, char** topicName, int* top
*topicLen = qe->topicLen; *topicLen = qe->topicLen;
if (strlen(*topicName) != *topicLen) if (strlen(*topicName) != *topicLen)
rc = MQTTCLIENT_TOPICNAME_TRUNCATED; rc = MQTTCLIENT_TOPICNAME_TRUNCATED;
ListRemove(m->c->messageQueue, m->c->messageQueue->first->content);
#if !defined(NO_PERSISTENCE) #if !defined(NO_PERSISTENCE)
if (m->c->persistence) if (m->c->persistence)
MQTTPersistence_unpersistQueueEntry(m->c, (MQTTPersistence_qEntry*)qe); MQTTPersistence_unpersistQueueEntry(m->c, (MQTTPersistence_qEntry*)qe);
#endif #endif
ListRemove(m->c->messageQueue, m->c->messageQueue->first->content);
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
return rc; return rc;
} }
...@@ -810,8 +811,8 @@ int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOptions* o ...@@ -810,8 +811,8 @@ int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOptions* o
if ((rc = SSL_set_session(m->c->net.ssl, m->c->session)) != 1) if ((rc = SSL_set_session(m->c->net.ssl, m->c->session)) != 1)
Log(TRACE_MIN, -1, "Failed to set SSL session with stored data, non critical"); Log(TRACE_MIN, -1, "Failed to set SSL session with stored data, non critical");
rc = SSLSocket_connect(m->c->net.ssl, m->c->net.socket); rc = SSLSocket_connect(m->c->net.ssl, m->c->net.socket);
if (rc == -1) if (rc == TCPSOCKET_INTERRUPTED)
m->c->connect_state = 2; m->c->connect_state = 2; /* the connect is still in progress */
else if (rc == SSL_FATAL) else if (rc == SSL_FATAL)
{ {
rc = SOCKET_ERROR; rc = SOCKET_ERROR;
...@@ -884,7 +885,7 @@ int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOptions* o ...@@ -884,7 +885,7 @@ int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOptions* o
else else
{ {
Connack* connack = (Connack*)pack; Connack* connack = (Connack*)pack;
Log(LOG_PROTOCOL, 1, NULL, m->c->net.socket, m->c->clientID, connack->rc); Log(TRACE_PROTOCOL, 1, NULL, m->c->net.socket, m->c->clientID, connack->rc);
if ((rc = connack->rc) == MQTTCLIENT_SUCCESS) if ((rc = connack->rc) == MQTTCLIENT_SUCCESS)
{ {
m->c->connected = 1; m->c->connected = 1;
...@@ -903,7 +904,7 @@ int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOptions* o ...@@ -903,7 +904,7 @@ int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOptions* o
Messages* m = (Messages*)(outcurrent->content); Messages* m = (Messages*)(outcurrent->content);
m->lastTouch = 0; m->lastTouch = 0;
} }
MQTTProtocol_retry(m->c->net.lastContact, 1); MQTTProtocol_retry((time_t)0, 1, 1);
if (m->c->connected != 1) if (m->c->connected != 1)
rc = MQTTCLIENT_DISCONNECTED; rc = MQTTCLIENT_DISCONNECTED;
} }
...@@ -1518,10 +1519,10 @@ void MQTTClient_retry(void) ...@@ -1518,10 +1519,10 @@ void MQTTClient_retry(void)
{ {
time(&(last)); time(&(last));
MQTTProtocol_keepalive(now); MQTTProtocol_keepalive(now);
MQTTProtocol_retry(now, 1); MQTTProtocol_retry(now, 1, 0);
} }
else else
MQTTProtocol_retry(now, 0); MQTTProtocol_retry(now, 0, 0);
FUNC_EXIT; FUNC_EXIT;
} }
...@@ -1632,7 +1633,7 @@ MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* rc, long ...@@ -1632,7 +1633,7 @@ MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* rc, long
else if (packet_type == UNSUBACK) else if (packet_type == UNSUBACK)
*rc = Thread_wait_sem(m->unsuback_sem, timeout); *rc = Thread_wait_sem(m->unsuback_sem, timeout);
if (*rc == 0 && packet_type != CONNECT && m->pack == NULL) if (*rc == 0 && packet_type != CONNECT && m->pack == NULL)
Log(TRACE_MIN, -1, "waitfor unexpectedly is NULL for client %s, packet_type %d, timeout %ld", m->c->clientID, packet_type, timeout); Log(LOG_ERROR, -1, "waitfor unexpectedly is NULL for client %s, packet_type %d, timeout %ld", m->c->clientID, packet_type, timeout);
pack = m->pack; pack = m->pack;
} }
else else
...@@ -1957,7 +1958,7 @@ void MQTTClient_writeComplete(int socket) ...@@ -1957,7 +1958,7 @@ void MQTTClient_writeComplete(int socket)
{ {
MQTTClients* m = (MQTTClients*)(found->content); MQTTClients* m = (MQTTClients*)(found->content);
time(&(m->c->net.lastContact)); time(&(m->c->net.lastSent));
} }
FUNC_EXIT; FUNC_EXIT;
} }
File mode changed from 100755 to 100644
File mode changed from 100755 to 100644
...@@ -156,6 +156,8 @@ void* MQTTPacket_Factory(networkHandles* net, int* error) ...@@ -156,6 +156,8 @@ void* MQTTPacket_Factory(networkHandles* net, int* error)
#endif #endif
} }
} }
if (pack)
time(&(net->lastReceived));
exit: exit:
FUNC_EXIT_RC(*error); FUNC_EXIT_RC(*error);
return pack; return pack;
...@@ -197,7 +199,7 @@ int MQTTPacket_send(networkHandles* net, Header header, char* buffer, size_t buf ...@@ -197,7 +199,7 @@ int MQTTPacket_send(networkHandles* net, Header header, char* buffer, size_t buf
rc = Socket_putdatas(net->socket, buf, buf0len, 1, &buffer, &buflen, &free); rc = Socket_putdatas(net->socket, buf, buf0len, 1, &buffer, &buflen, &free);
if (rc == TCPSOCKET_COMPLETE) if (rc == TCPSOCKET_COMPLETE)
time(&(net->lastContact)); time(&(net->lastSent));
if (rc != TCPSOCKET_INTERRUPTED) if (rc != TCPSOCKET_INTERRUPTED)
free(buf); free(buf);
...@@ -244,7 +246,7 @@ int MQTTPacket_sends(networkHandles* net, Header header, int count, char** buffe ...@@ -244,7 +246,7 @@ int MQTTPacket_sends(networkHandles* net, Header header, int count, char** buffe
rc = Socket_putdatas(net->socket, buf, buf0len, count, buffers, buflens, frees); rc = Socket_putdatas(net->socket, buf, buf0len, count, buffers, buflens, frees);
if (rc == TCPSOCKET_COMPLETE) if (rc == TCPSOCKET_COMPLETE)
time(&(net->lastContact)); time(&(net->lastSent));
if (rc != TCPSOCKET_INTERRUPTED) if (rc != TCPSOCKET_INTERRUPTED)
free(buf); free(buf);
......
File mode changed from 100755 to 100644
File mode changed from 100755 to 100644
File mode changed from 100755 to 100644
File mode changed from 100755 to 100644
File mode changed from 100755 to 100644
File mode changed from 100755 to 100644
File mode changed from 100755 to 100644
...@@ -514,7 +514,9 @@ void MQTTProtocol_keepalive(time_t now) ...@@ -514,7 +514,9 @@ void MQTTProtocol_keepalive(time_t now)
{ {
Clients* client = (Clients*)(current->content); Clients* client = (Clients*)(current->content);
ListNextElement(bstate->clients, &current); ListNextElement(bstate->clients, &current);
if (client->connected && client->keepAliveInterval > 0 && (difftime(now, client->net.lastContact) >= client->keepAliveInterval)) if (client->connected && client->keepAliveInterval > 0 &&
(difftime(now, client->net.lastSent) >= client->keepAliveInterval ||
difftime(now, client->net.lastReceived) >= client->keepAliveInterval))
{ {
if (client->ping_outstanding == 0) if (client->ping_outstanding == 0)
{ {
...@@ -522,19 +524,19 @@ void MQTTProtocol_keepalive(time_t now) ...@@ -522,19 +524,19 @@ void MQTTProtocol_keepalive(time_t now)
{ {
if (MQTTPacket_send_pingreq(&client->net, client->clientID) != TCPSOCKET_COMPLETE) if (MQTTPacket_send_pingreq(&client->net, client->clientID) != TCPSOCKET_COMPLETE)
{ {
Log(TRACE_MIN, -1, "Error sending PINGREQ for client %s on socket %d, disconnecting", client->clientID, client->net.socket); Log(TRACE_PROTOCOL, -1, "Error sending PINGREQ for client %s on socket %d, disconnecting", client->clientID, client->net.socket);
MQTTProtocol_closeSession(client, 1); MQTTProtocol_closeSession(client, 1);
} }
else else
{ {
client->net.lastContact = now; client->net.lastSent = now;
client->ping_outstanding = 1; client->ping_outstanding = 1;
} }
} }
} }
else else
{ {
Log(TRACE_MIN, -1, "PINGRESP not received in keepalive interval for client %s on socket %d, disconnecting", client->clientID, client->net.socket); Log(TRACE_PROTOCOL, -1, "PINGRESP not received in keepalive interval for client %s on socket %d, disconnecting", client->clientID, client->net.socket);
MQTTProtocol_closeSession(client, 1); MQTTProtocol_closeSession(client, 1);
} }
} }
...@@ -547,14 +549,15 @@ void MQTTProtocol_keepalive(time_t now) ...@@ -547,14 +549,15 @@ void MQTTProtocol_keepalive(time_t now)
* MQTT retry processing per client * MQTT retry processing per client
* @param now current time * @param now current time
* @param client - the client to which to apply the retry processing * @param client - the client to which to apply the retry processing
* @param regardless boolean - retry packets regardless of retry interval (used on reconnect)
*/ */
void MQTTProtocol_retries(time_t now, Clients* client) void MQTTProtocol_retries(time_t now, Clients* client, int regardless)
{ {
ListElement* outcurrent = NULL; ListElement* outcurrent = NULL;
FUNC_ENTRY; FUNC_ENTRY;
if (client->retryInterval <= 0) /* 0 or -ive retryInterval turns off retry */ if (!regardless && client->retryInterval <= 0) /* 0 or -ive retryInterval turns off retry except on reconnect */
goto exit; goto exit;
while (client && ListNextElement(client->outboundMsgs, &outcurrent) && while (client && ListNextElement(client->outboundMsgs, &outcurrent) &&
...@@ -562,7 +565,7 @@ void MQTTProtocol_retries(time_t now, Clients* client) ...@@ -562,7 +565,7 @@ void MQTTProtocol_retries(time_t now, Clients* client)
Socket_noPendingWrites(client->net.socket)) /* there aren't any previous packets still stacked up on the socket */ Socket_noPendingWrites(client->net.socket)) /* there aren't any previous packets still stacked up on the socket */
{ {
Messages* m = (Messages*)(outcurrent->content); Messages* m = (Messages*)(outcurrent->content);
if (difftime(now, m->lastTouch) > max(client->retryInterval, 10)) if (regardless || difftime(now, m->lastTouch) > max(client->retryInterval, 10))
{ {
if (m->qos == 1 || (m->qos == 2 && m->nextMessageType == PUBREC)) if (m->qos == 1 || (m->qos == 2 && m->nextMessageType == PUBREC))
{ {
...@@ -578,7 +581,7 @@ void MQTTProtocol_retries(time_t now, Clients* client) ...@@ -578,7 +581,7 @@ void MQTTProtocol_retries(time_t now, Clients* client)
if (rc == SOCKET_ERROR) if (rc == SOCKET_ERROR)
{ {
client->good = 0; client->good = 0;
Log(TRACE_MIN, 8, NULL, client->clientID, client->net.socket, Log(TRACE_PROTOCOL, 29, NULL, client->clientID, client->net.socket,
Socket_getpeer(client->net.socket)); Socket_getpeer(client->net.socket));
MQTTProtocol_closeSession(client, 1); MQTTProtocol_closeSession(client, 1);
client = NULL; client = NULL;
...@@ -596,7 +599,7 @@ void MQTTProtocol_retries(time_t now, Clients* client) ...@@ -596,7 +599,7 @@ void MQTTProtocol_retries(time_t now, Clients* client)
if (MQTTPacket_send_pubrel(m->msgid, 1, &client->net, client->clientID) != TCPSOCKET_COMPLETE) if (MQTTPacket_send_pubrel(m->msgid, 1, &client->net, client->clientID) != TCPSOCKET_COMPLETE)
{ {
client->good = 0; client->good = 0;
Log(TRACE_MIN, 8, NULL, client->clientID, client->net.socket, Log(TRACE_PROTOCOL, 29, NULL, client->clientID, client->net.socket,
Socket_getpeer(client->net.socket)); Socket_getpeer(client->net.socket));
MQTTProtocol_closeSession(client, 1); MQTTProtocol_closeSession(client, 1);
client = NULL; client = NULL;
...@@ -616,8 +619,9 @@ exit: ...@@ -616,8 +619,9 @@ exit:
* MQTT retry protocol and socket pending writes processing. * MQTT retry protocol and socket pending writes processing.
* @param now current time * @param now current time
* @param doRetry boolean - retries as well as pending writes? * @param doRetry boolean - retries as well as pending writes?
* @param regardless boolean - retry packets regardless of retry interval (used on reconnect)
*/ */
void MQTTProtocol_retry(time_t now, int doRetry) void MQTTProtocol_retry(time_t now, int doRetry, int regardless)
{ {
ListElement* current = NULL; ListElement* current = NULL;
...@@ -638,7 +642,7 @@ void MQTTProtocol_retry(time_t now, int doRetry) ...@@ -638,7 +642,7 @@ void MQTTProtocol_retry(time_t now, int doRetry)
if (Socket_noPendingWrites(client->net.socket) == 0) if (Socket_noPendingWrites(client->net.socket) == 0)
continue; continue;
if (doRetry) if (doRetry)
MQTTProtocol_retries(now, client); MQTTProtocol_retries(now, client, regardless);
} }
FUNC_EXIT; FUNC_EXIT;
} }
......
...@@ -43,7 +43,7 @@ int MQTTProtocol_handlePubrels(void* pack, int sock); ...@@ -43,7 +43,7 @@ int MQTTProtocol_handlePubrels(void* pack, int sock);
int MQTTProtocol_handlePubcomps(void* pack, int sock); int MQTTProtocol_handlePubcomps(void* pack, int sock);
void MQTTProtocol_keepalive(time_t); void MQTTProtocol_keepalive(time_t);
void MQTTProtocol_retry(time_t, int); void MQTTProtocol_retry(time_t, int, int);
void MQTTProtocol_freeClient(Clients* client); void MQTTProtocol_freeClient(Clients* client);
void MQTTProtocol_emptyMessageList(List* msgList); void MQTTProtocol_emptyMessageList(List* msgList);
void MQTTProtocol_freeMessageList(List* msgList); void MQTTProtocol_freeMessageList(List* msgList);
......
File mode changed from 100755 to 100644
File mode changed from 100755 to 100644
...@@ -66,6 +66,7 @@ static char* protocol_message_list[] = ...@@ -66,6 +66,7 @@ static char* protocol_message_list[] =
"%d %s <- CONNECT", /* 26 */ "%d %s <- CONNECT", /* 26 */
"%d %s -> PUBLISH qos: 0 retained: %d (%d)", /* 27 */ "%d %s -> PUBLISH qos: 0 retained: %d (%d)", /* 27 */
"%d %s -> DISCONNECT (%d)", /* 28 */ "%d %s -> DISCONNECT (%d)", /* 28 */
"Socket error for client identifier %s, socket %d, peer address %s; ending connection", /* 29 */
}; };
static char* trace_message_list[] = static char* trace_message_list[] =
...@@ -78,7 +79,7 @@ static char* trace_message_list[] = ...@@ -78,7 +79,7 @@ static char* trace_message_list[] =
"Packet %s received from client %s for message identifier %d, but message is in wrong state", /* 5 */ "Packet %s received from client %s for message identifier %d, but message is in wrong state", /* 5 */
"%s received from client %s for message id %d - removing publication", /* 6 */ "%s received from client %s for message id %d - removing publication", /* 6 */
"Trying %s again for client %s, socket %d, message identifier %d", /* 7 */ "Trying %s again for client %s, socket %d, message identifier %d", /* 7 */
"Socket error for client identifier %s, socket %d, peer address %s; ending connection", /* 8 */ "", /* 8 */
"(%lu) %*s(%d)> %s:%d", /* 9 */ "(%lu) %*s(%d)> %s:%d", /* 9 */
"(%lu) %*s(%d)< %s:%d", /* 10 */ "(%lu) %*s(%d)< %s:%d", /* 10 */
"(%lu) %*s(%d)< %s:%d (%d)", /* 11 */ "(%lu) %*s(%d)< %s:%d (%d)", /* 11 */
......
...@@ -573,6 +573,8 @@ int SSLSocket_connect(SSL* ssl, int sock) ...@@ -573,6 +573,8 @@ int SSLSocket_connect(SSL* ssl, int sock)
error = SSLSocket_error("SSL_connect", ssl, sock, rc); error = SSLSocket_error("SSL_connect", ssl, sock, rc);
if (error == SSL_FATAL) if (error == SSL_FATAL)
rc = error; rc = error;
if (error == SSL_ERROR_WANT_READ || error == SSL_ERROR_WANT_WRITE)
rc = TCPSOCKET_INTERRUPTED;
} }
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
......
File mode changed from 100755 to 100644
...@@ -98,7 +98,7 @@ int Socket_error(char* aString, int sock) ...@@ -98,7 +98,7 @@ int Socket_error(char* aString, int sock)
if (errno != EINTR && errno != EAGAIN && errno != EINPROGRESS && errno != EWOULDBLOCK) if (errno != EINTR && errno != EAGAIN && errno != EINPROGRESS && errno != EWOULDBLOCK)
{ {
if (strcmp(aString, "shutdown") != 0 || (errno != ENOTCONN && errno != ECONNRESET)) if (strcmp(aString, "shutdown") != 0 || (errno != ENOTCONN && errno != ECONNRESET))
Log(TRACE_MIN, -1, "Socket error %s in %s for socket %d", strerror(errno), aString, sock); Log(LOG_ERROR, -1, "Socket error %s in %s for socket %d", strerror(errno), aString, sock);
} }
FUNC_EXIT_RC(errno); FUNC_EXIT_RC(errno);
return errno; return errno;
...@@ -170,7 +170,7 @@ int Socket_addSocket(int newSd) ...@@ -170,7 +170,7 @@ int Socket_addSocket(int newSd)
rc = Socket_setnonblocking(newSd); rc = Socket_setnonblocking(newSd);
} }
else else
Log(TRACE_MIN, -1, "addSocket: socket %d already in the list", newSd); Log(LOG_ERROR, -1, "addSocket: socket %d already in the list", newSd);
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
return rc; return rc;
...@@ -559,7 +559,7 @@ void Socket_close(int socket) ...@@ -559,7 +559,7 @@ void Socket_close(int socket)
if (ListRemoveItem(s.clientsds, &socket, intcompare)) if (ListRemoveItem(s.clientsds, &socket, intcompare))
Log(TRACE_MIN, -1, "Removed socket %d", socket); Log(TRACE_MIN, -1, "Removed socket %d", socket);
else else
Log(TRACE_MIN, -1, "Failed to remove socket %d", socket); Log(LOG_ERROR, -1, "Failed to remove socket %d", socket);
if (socket + 1 >= s.maxfdp1) if (socket + 1 >= s.maxfdp1)
{ {
/* now we have to reset s.maxfdp1 */ /* now we have to reset s.maxfdp1 */
...@@ -640,10 +640,10 @@ int Socket_new(char* addr, int port, int* sock) ...@@ -640,10 +640,10 @@ int Socket_new(char* addr, int port, int* sock)
else else
rc = -1; rc = -1;
freeaddrinfo(result); freeaddrinfo(result);
} }
else else
Log(TRACE_MIN, -1, "getaddrinfo failed for addr %s with rc %d", addr, rc); Log(LOG_ERROR, -1, "getaddrinfo failed for addr %s with rc %d", addr, rc);
if (rc != 0) if (rc != 0)
Log(LOG_ERROR, -1, "%s is not a valid IP address", addr); Log(LOG_ERROR, -1, "%s is not a valid IP address", addr);
...@@ -658,7 +658,7 @@ int Socket_new(char* addr, int port, int* sock) ...@@ -658,7 +658,7 @@ int Socket_new(char* addr, int port, int* sock)
int opt = 1; int opt = 1;
if (setsockopt(*sock, SOL_SOCKET, SO_NOSIGPIPE, (void*)&opt, sizeof(opt)) != 0) if (setsockopt(*sock, SOL_SOCKET, SO_NOSIGPIPE, (void*)&opt, sizeof(opt)) != 0)
Log(TRACE_MIN, -1, "Could not set SO_NOSIGPIPE for socket %d", *sock); Log(LOG_ERROR, -1, "Could not set SO_NOSIGPIPE for socket %d", *sock);
#endif #endif
Log(TRACE_MIN, -1, "New socket %d for %s, port %d", *sock, addr, port); Log(TRACE_MIN, -1, "New socket %d for %s, port %d", *sock, addr, port);
......
...@@ -31,6 +31,7 @@ ...@@ -31,6 +31,7 @@
#define EWOULDBLOCK WSAEWOULDBLOCK #define EWOULDBLOCK WSAEWOULDBLOCK
#define ENOTCONN WSAENOTCONN #define ENOTCONN WSAENOTCONN
#define ECONNRESET WSAECONNRESET #define ECONNRESET WSAECONNRESET
#define ETIMEDOUT WAIT_TIMEOUT
#endif #endif
#define ioctl ioctlsocket #define ioctl ioctlsocket
#define socklen_t int #define socklen_t int
......
/*******************************************************************************
* Copyright (c) 2012, 2013 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial contribution
*******************************************************************************/
/*
stdin publisher
compulsory parameters:
--topic topic to publish on
defaulted parameters:
--host localhost
--port 1883
--qos 0
--delimiters \n
--clientid stdin_publisher
--maxdatalen 100
--userid none
--password none
*/
#include "MQTTClient.h"
#include "MQTTClientPersistence.h"
#include <stdio.h>
#include <signal.h>
#include <memory.h>
#if defined(WIN32)
#include <Windows.h>
#define sleep Sleep
#else
#include <sys/time.h>
#include <stdlib.h>
#endif
volatile int toStop = 0;
void usage()
{
printf("MQTT stdin publisher\n");
printf("Usage: stdinpub topicname <options>, where options are:\n");
printf(" --host <hostname> (default is localhost)\n");
printf(" --port <port> (default is 1883)\n");
printf(" --qos <qos> (default is 0)\n");
printf(" --retained (default is off)\n");
printf(" --delimiter <delim> (default is \\n)");
printf(" --clientid <clientid> (default is hostname+timestamp)");
printf(" --maxdatalen 100\n");
printf(" --username none\n");
printf(" --password none\n");
exit(-1);
}
void myconnect(MQTTClient* client, MQTTClient_connectOptions* opts)
{
printf("Connecting\n");
if (MQTTClient_connect(*client, opts) != 0)
{
printf("Failed to connect\n");
exit(-1);
}
printf("Connected\n");
}
void cfinish(int sig)
{
signal(SIGINT, NULL);
toStop = 1;
}
struct
{
char* clientid;
char* delimiter;
int maxdatalen;
int qos;
int retained;
char* username;
char* password;
char* host;
char* port;
int verbose;
} opts =
{
"publisher", "\n", 100, 0, 0, NULL, NULL, "localhost", "1883", 0
};
void getopts(int argc, char** argv);
int messageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* m)
{
/* not expecting any messages */
return 1;
}
int main(int argc, char** argv)
{
MQTTClient client;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
MQTTClient_SSLOptions ssl_opts = MQTTClient_SSLOptions_initializer;
char* topic = NULL;
char* buffer = NULL;
int rc = 0;
char url[100];
if (argc < 2)
usage();
getopts(argc, argv);
sprintf(url, "%s:%s", opts.host, opts.port);
if (opts.verbose)
printf("URL is %s\n", url);
topic = argv[1];
printf("Using topic %s\n", topic);
rc = MQTTClient_create(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL);
signal(SIGINT, cfinish);
signal(SIGTERM, cfinish);
rc = MQTTClient_setCallbacks(client, NULL, NULL, messageArrived, NULL);
conn_opts.keepAliveInterval = 10;
conn_opts.reliable = 0;
conn_opts.cleansession = 1;
conn_opts.username = opts.username;
conn_opts.password = opts.password;
ssl_opts.enableServerCertAuth = 0;
conn_opts.ssl = &ssl_opts;
myconnect(&client, &conn_opts);
buffer = malloc(opts.maxdatalen);
while (!toStop)
{
int data_len = 0;
int delim_len = 0;
delim_len = strlen(opts.delimiter);
do
{
buffer[data_len++] = getchar();
if (data_len > delim_len)
{
//printf("comparing %s %s\n", opts.delimiter, &buffer[data_len - delim_len]);
if (strncmp(opts.delimiter, &buffer[data_len - delim_len], delim_len) == 0)
break;
}
} while (data_len < opts.maxdatalen);
if (opts.verbose)
printf("Publishing data of length %d\n", data_len);
rc = MQTTClient_publish(client, topic, data_len, buffer, opts.qos, opts.retained, NULL);
if (rc != 0)
{
myconnect(&client, &conn_opts);
rc = MQTTClient_publish(client, topic, data_len, buffer, opts.qos, opts.retained, NULL);
}
if (opts.qos > 0)
MQTTClient_yield();
}
printf("Stopping\n");
free(buffer);
MQTTClient_disconnect(client, 0);
MQTTClient_destroy(&client);
return 0;
}
void getopts(int argc, char** argv)
{
int count = 2;
while (count < argc)
{
if (strcmp(argv[count], "--retained") == 0)
opts.retained = 1;
if (strcmp(argv[count], "--verbose") == 0)
opts.verbose = 1;
else if (strcmp(argv[count], "--qos") == 0)
{
if (++count < argc)
{
if (strcmp(argv[count], "0") == 0)
opts.qos = 0;
else if (strcmp(argv[count], "1") == 0)
opts.qos = 1;
else if (strcmp(argv[count], "2") == 0)
opts.qos = 2;
else
usage();
}
else
usage();
}
else if (strcmp(argv[count], "--host") == 0)
{
if (++count < argc)
opts.host = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--port") == 0)
{
if (++count < argc)
opts.port = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--clientid") == 0)
{
if (++count < argc)
opts.clientid = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--username") == 0)
{
if (++count < argc)
opts.username = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--password") == 0)
{
if (++count < argc)
opts.password = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--maxdatalen") == 0)
{
if (++count < argc)
opts.maxdatalen = atoi(argv[count]);
else
usage();
}
else if (strcmp(argv[count], "--delimiter") == 0)
{
if (++count < argc)
opts.delimiter = argv[count];
else
usage();
}
count++;
}
}
This diff is collapsed.
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
/** /**
* @file * @file
* Tests for the MQ Telemetry MQTT C client * Tests for the Paho Asynchronous MQTT C client
*/ */
...@@ -67,7 +67,7 @@ struct Options ...@@ -67,7 +67,7 @@ struct Options
int iterations; int iterations;
} options = } options =
{ {
"m2m.eclipse.org:1883", "iot.eclipse.org:1883",
0, 0,
-1, -1,
10000, 10000,
...@@ -1052,7 +1052,7 @@ int test6(struct Options options) ...@@ -1052,7 +1052,7 @@ int test6(struct Options options)
char* uris[2] = {options.connection, options.connection}; char* uris[2] = {options.connection, options.connection};
failures = 0; failures = 0;
MyLog(LOGA_INFO, "Starting test 7 - HA connections"); MyLog(LOGA_INFO, "Starting test 6 - HA connections");
fprintf(xml, "<testcase classname=\"test4\" name=\"HA connections\""); fprintf(xml, "<testcase classname=\"test4\" name=\"HA connections\"");
global_start_time = start_clock(); global_start_time = start_clock();
...@@ -1134,6 +1134,264 @@ exit: ...@@ -1134,6 +1134,264 @@ exit:
/********************************************************************
Test7: Persistence
*********************************************************************/
char* test7_topic = "C client test7";
int test7_messageCount = 0;
void test7_onDisconnect(void* context, MQTTAsync_successData* response)
{
MQTTAsync c = (MQTTAsync)context;
MyLog(LOGA_DEBUG, "In onDisconnect callback %p", c);
test_finished = 1;
}
void test7_onUnsubscribe(void* context, MQTTAsync_successData* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer;
int rc;
MyLog(LOGA_DEBUG, "In onUnsubscribe onSuccess callback %p", c);
opts.onSuccess = test7_onDisconnect;
opts.context = c;
rc = MQTTAsync_disconnect(c, &opts);
assert("Disconnect successful", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
}
int test7_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
{
MQTTAsync c = (MQTTAsync)context;
static int message_count = 0;
int rc;
MyLog(LOGA_DEBUG, "Test7: received message id %d", message->msgid);
test7_messageCount++;
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
static int test7_subscribed = 0;
void test7_onSubscribe(void* context, MQTTAsync_successData* response)
{
MQTTAsync c = (MQTTAsync)context;
MyLog(LOGA_DEBUG, "In subscribe onSuccess callback %p granted qos %d", c, response->alt.qos);
test7_subscribed = 1;
}
void test7_onConnect(void* context, MQTTAsync_successData* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p", context);
opts.onSuccess = test7_onSubscribe;
opts.context = c;
rc = MQTTAsync_subscribe(c, test7_topic, 2, &opts);
assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
test_finished = 1;
}
/*********************************************************************
Test7: Pending tokens
*********************************************************************/
int test7(struct Options options)
{
int subsqos = 2;
MQTTAsync c;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
int rc = 0;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MQTTAsync_responseOptions ropts = MQTTAsync_responseOptions_initializer;
MQTTAsync_disconnectOptions dopts = MQTTAsync_disconnectOptions_initializer;
MQTTAsync_token* tokens = NULL;
int msg_count = 6;
MyLog(LOGA_INFO, "Starting test 7 - pending tokens");
fprintf(xml, "<testcase classname=\"test4\" name=\"pending tokens\"");
global_start_time = start_clock();
test_finished = 0;
rc = MQTTAsync_create(&c, options.connection, "async_test7",
MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit;
}
rc = MQTTAsync_setCallbacks(c, c, NULL, test7_messageArrived, NULL);
assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
opts.keepAliveInterval = 20;
opts.cleansession = 0;
opts.username = "testuser";
opts.password = "testpassword";
opts.MQTTVersion = options.MQTTVersion;
opts.will = &wopts;
opts.will->message = "will message";
opts.will->qos = 1;
opts.will->retained = 0;
opts.will->topicName = "will topic";
opts.will = NULL;
opts.onSuccess = test7_onConnect;
opts.onFailure = NULL;
opts.context = c;
MyLog(LOGA_DEBUG, "Connecting");
rc = MQTTAsync_connect(c, &opts);
rc = 0;
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit;
while (!test7_subscribed)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
pubmsg.payloadlen = 11;
pubmsg.qos = 2;
pubmsg.retained = 0;
rc = MQTTAsync_send(c, test_topic, pubmsg.payloadlen, pubmsg.payload, pubmsg.qos, pubmsg.retained, &ropts);
MyLog(LOGA_DEBUG, "Token was %d", ropts.token);
rc = MQTTAsync_isComplete(c, ropts.token);
assert("0 rc from isComplete", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
rc = MQTTAsync_waitForCompletion(c, ropts.token, 5000L);
assert("Good rc from waitForCompletion", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
rc = MQTTAsync_isComplete(c, ropts.token);
assert("1 rc from isComplete", rc == 1, "rc was %d", rc);
test7_messageCount = 0;
int i = 0;
pubmsg.qos = 2;
for (i = 0; i < msg_count; ++i)
{
pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
pubmsg.payloadlen = 11;
//pubmsg.qos = (pubmsg.qos == 2) ? 1 : 2;
pubmsg.retained = 0;
rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &ropts);
}
/* disconnect immediately without receiving the incoming messages */
dopts.timeout = 0;
dopts.onSuccess = test7_onDisconnect;
MQTTAsync_disconnect(c, &dopts); /* now there should be "orphaned" publications */
while (!test_finished)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
test_finished = 0;
rc = MQTTAsync_getPendingTokens(c, &tokens);
assert("getPendingTokens rc == 0", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
assert("should get some tokens back", tokens != NULL, "tokens was %p", tokens);
MQTTAsync_free(tokens);
MQTTAsync_destroy(&c); /* force re-reading persistence on create */
MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
rc = MQTTAsync_create(&c, options.connection, "async_test7", MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit;
}
rc = MQTTAsync_getPendingTokens(c, &tokens);
assert("getPendingTokens rc == 0", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
assert("should get some tokens back", tokens != NULL, "tokens was %p", tokens);
if (tokens)
{
int i = 0;
while (tokens[i] != -1)
MyLog(LOGA_DEBUG, "Delivery token %d", tokens[i++]);
MQTTAsync_free(tokens);
//The following assertion should work, does with RSMB, but not Mosquitto
//assert1("no of tokens should be count", i == msg_count, "no of tokens %d count %d", i, msg_count);
}
rc = MQTTAsync_setCallbacks(c, c, NULL, test7_messageArrived, NULL);
assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
MyLog(LOGA_DEBUG, "Reconnecting");
opts.context = c;
if (MQTTAsync_connect(c, &opts) != 0)
{
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
goto exit;
}
#if defined(WIN32)
Sleep(5000);
#else
usleep(5000000L);
#endif
rc = MQTTAsync_getPendingTokens(c, &tokens);
assert("getPendingTokens rc == 0", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
/* assert("should get no tokens back", tokens == NULL, "tokens was %p", tokens);
assert1("no of messages should be count", test7_messageCount == msg_count, "no of tokens %d count %d",
test7_messageCount, msg_count);
assertions fail against Mosquitto - needs testing */
dopts.timeout = 1000;
MQTTAsync_disconnect(c, &dopts);
while (!test_finished)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
MQTTAsync_destroy(&c);
exit:
MyLog(LOGA_INFO, "TEST7: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", tests, failures);
write_test_result();
return failures;
}
void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message) void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message)
{ {
printf("Trace : %d, %s\n", level, message); printf("Trace : %d, %s\n", level, message);
...@@ -1145,7 +1403,7 @@ void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message) ...@@ -1145,7 +1403,7 @@ void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message)
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
int rc = 0; int rc = 0;
int (*tests[])() = {NULL, test1, test2, test3, test4, test5, test6}; /* indexed starting from 1 */ int (*tests[])() = {NULL, test1, test2, test3, test4, test5, test6, test7}; /* indexed starting from 1 */
MQTTAsync_nameValue* info; MQTTAsync_nameValue* info;
int i; int i;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment