Commit 2cff9d01 authored by Ian Craggs's avatar Ian Craggs

Don't try ack writes for sockets with pending output, missing connect failure call #373

parent d889b818
......@@ -1299,7 +1299,20 @@ static int MQTTAsync_processCommand(void)
{
if (command->client->c->connect_state != 0 || command->client->c->connected != 0)
{
command->client->c->connect_state = -2;
if (command->client->c->connect_state != 0)
{
command->client->c->connect_state = -2;
if (command->client->connect.onFailure)
{
MQTTAsync_failureData data;
data.token = 0;
data.code = -2;
data.message = NULL;
Log(TRACE_MIN, -1, "Calling connect failure for client %s", command->client->c->clientID);
(*(command->client->connect.onFailure))(command->client->connect.context, &data);
}
}
MQTTAsync_checkDisconnect(command->client, &command->command);
}
}
......@@ -1386,16 +1399,16 @@ static void nextOrClose(MQTTAsyncs* m, int rc, char* message)
conn->command = m->connect;
Log(TRACE_MIN, -1, "Connect failed, more to try");
if (conn->client->c->MQTTVersion == MQTTVERSION_DEFAULT)
{
if (conn->command.details.conn.MQTTVersion == MQTTVERSION_3_1)
{
conn->command.details.conn.currentURI++;
conn->command.details.conn.MQTTVersion = MQTTVERSION_DEFAULT;
}
}
else
conn->command.details.conn.currentURI++;
if (conn->client->c->MQTTVersion == MQTTVERSION_DEFAULT)
{
if (conn->command.details.conn.MQTTVersion == MQTTVERSION_3_1)
{
conn->command.details.conn.currentURI++;
conn->command.details.conn.MQTTVersion = MQTTVERSION_DEFAULT;
}
}
else
conn->command.details.conn.currentURI++;
MQTTAsync_addCommand(conn, sizeof(m->connect));
}
......
/*******************************************************************************
* Copyright (c) 2009, 2013 IBM Corp.
* Copyright (c) 2009, 2017 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
......@@ -17,6 +17,7 @@
* Ian Craggs - fix for bug 421103 - trying to write to same socket, in retry
* Rong Xiang, Ian Craggs - C++ compatibility
* Ian Craggs - turn off DUP flag for PUBREL - MQTT 3.1.1
* Ian Craggs - ensure that acks are not sent if write is outstanding on socket
*******************************************************************************/
/**
......@@ -83,7 +84,7 @@ int MQTTProtocol_assignMsgId(Clients* client)
while (ListFindItem(client->outboundMsgs, &msgid, messageIDCompare) != NULL)
{
msgid = (msgid == MAX_MSG_ID) ? 1 : msgid + 1;
if (msgid == start_msgid)
if (msgid == start_msgid)
{ /* we've tried them all - none free */
msgid = 0;
break;
......@@ -275,12 +276,14 @@ int MQTTProtocol_handlePublishes(void* pack, int sock)
if (publish->header.bits.qos == 0)
Protocol_processPublication(publish, client);
else if (!Socket_noPendingWrites(sock))
rc = SOCKET_ERROR; /* queue acks? */
else if (publish->header.bits.qos == 1)
{
/* send puback before processing the publications because a lot of return publications could fill up the socket buffer */
rc = MQTTPacket_send_puback(publish->msgId, &client->net, client->clientID);
/* if we get a socket error from sending the puback, should we ignore the publication? */
Protocol_processPublication(publish, client);
/* send puback before processing the publications because a lot of return publications could fill up the socket buffer */
rc = MQTTPacket_send_puback(publish->msgId, &client->net, client->clientID);
/* if we get a socket error from sending the puback, should we ignore the publication? */
Protocol_processPublication(publish, client);
}
else if (publish->header.bits.qos == 2)
{
......@@ -420,6 +423,8 @@ int MQTTProtocol_handlePubrels(void* pack, int sock)
{
if (pubrel->header.bits.dup == 0)
Log(TRACE_MIN, 3, NULL, "PUBREL", client->clientID, pubrel->msgId);
else if (!Socket_noPendingWrites(sock))
rc = SOCKET_ERROR; /* queue acks? */
else
/* Apparently this is "normal" behaviour, so we don't need to issue a warning */
rc = MQTTPacket_send_pubcomp(pubrel->msgId, &client->net, client->clientID);
......@@ -431,6 +436,8 @@ int MQTTProtocol_handlePubrels(void* pack, int sock)
Log(TRACE_MIN, 4, NULL, "PUBREL", client->clientID, pubrel->msgId, m->qos);
else if (m->nextMessageType != PUBREL)
Log(TRACE_MIN, 5, NULL, "PUBREL", client->clientID, pubrel->msgId);
else if (!Socket_noPendingWrites(sock))
rc = SOCKET_ERROR; /* queue acks? */
else
{
Publish publish;
......@@ -521,7 +528,7 @@ void MQTTProtocol_keepalive(time_t now)
while (current)
{
Clients* client = (Clients*)(current->content);
ListNextElement(bstate->clients, &current);
ListNextElement(bstate->clients, &current);
if (client->connected && client->keepAliveInterval > 0 &&
(difftime(now, client->net.lastSent) >= client->keepAliveInterval ||
difftime(now, client->net.lastReceived) >= client->keepAliveInterval))
......@@ -740,7 +747,7 @@ char* MQTTStrncpy(char *dest, const char *src, size_t dest_size)
size_t count = dest_size;
char *temp = dest;
FUNC_ENTRY;
FUNC_ENTRY;
if (dest_size < strlen(src))
Log(TRACE_MIN, -1, "the src string is truncated");
......
/*******************************************************************************
* Copyright (c) 2009, 2013 IBM Corp.
* Copyright (c) 2009, 2017 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
......@@ -15,6 +15,7 @@
* Ian Craggs, Allan Stockdill-Mander - SSL updates
* Ian Craggs - MQTT 3.1.1 updates
* Rong Xiang, Ian Craggs - C++ compatibility
* Ian Craggs - add debug definition of MQTTStrdup for when needed
*******************************************************************************/
#if !defined(MQTTPROTOCOLCLIENT_H)
......@@ -52,4 +53,7 @@ void MQTTProtocol_freeMessageList(List* msgList);
char* MQTTStrncpy(char *dest, const char* src, size_t num);
char* MQTTStrdup(const char* src);
//#define MQTTStrdup(src) MQTTStrncpy(malloc(strlen(src)+1), src, strlen(src)+1)
#endif
/*******************************************************************************
* Copyright (c) 2009, 2014 IBM Corp.
* Copyright (c) 2009, 2017 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
......@@ -170,13 +170,11 @@ void StackTrace_printStack(FILE* dest)
}
char* StackTrace_get(thread_id_type threadid)
char* StackTrace_get(thread_id_type threadid, char* buf, int bufsize)
{
int bufsize = 256;
char* buf = NULL;
int t = 0;
if ((buf = malloc(bufsize)) == NULL)
if (bufsize < 100)
goto exit;
buf[0] = '\0';
for (t = 0; t < thread_count; ++t)
......@@ -204,4 +202,3 @@ char* StackTrace_get(thread_id_type threadid)
exit:
return buf;
}
/*******************************************************************************
* Copyright (c) 2009, 2014 IBM Corp.
* Copyright (c) 2009, 2017 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
......@@ -66,6 +66,6 @@ void StackTrace_entry(const char* name, int line, enum LOG_LEVELS trace);
void StackTrace_exit(const char* name, int line, void* return_value, enum LOG_LEVELS trace);
void StackTrace_printStack(FILE* dest);
char* StackTrace_get(thread_id_type);
char* StackTrace_get(thread_id_type, char* buf, int bufsize);
#endif /* STACKTRACE_H_ */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment