Commit ad500799 authored by Ian Craggs's avatar Ian Craggs

Some memory cleanup changes #373

parent e81b6755
......@@ -1268,7 +1268,7 @@ static int MQTTAsync_processCommand(void)
}
else
{
command->command.details.pub.destinationName = NULL; /* this will be freed by the protocol code */
//command->command.details.pub.destinationName = NULL; /* this will be freed by the protocol code */
command->client->pending_write = &command->command;
}
}
......@@ -1295,7 +1295,8 @@ static int MQTTAsync_processCommand(void)
command->client->disconnect = command->command;
MQTTAsync_freeCommand(command);
}
else if (command->command.type == PUBLISH && command->command.details.pub.qos == 0)
else if (command->command.type == PUBLISH && command->command.details.pub.qos == 0 &&
rc != SOCKET_ERROR && rc != MQTTASYNC_PERSISTENCE_ERROR)
{
if (rc == TCPSOCKET_INTERRUPTED)
ListAppend(command->client->responses, command, sizeof(command));
......@@ -1313,25 +1314,25 @@ static int MQTTAsync_processCommand(void)
else
MQTTAsync_disconnect_internal(command->client, 0);
if (command->command.type == CONNECT && MQTTAsync_checkConn(&command->command, command->client))
if (command->command.type == CONNECT
&& MQTTAsync_checkConn(&command->command, command->client))
{
Log(TRACE_MIN, -1, "Connect failed, more to try");
if (command->client->c->MQTTVersion == MQTTVERSION_DEFAULT)
{
if (command->command.details.conn.MQTTVersion == MQTTVERSION_3_1)
{
command->command.details.conn.currentURI++;
command->command.details.conn.MQTTVersion = MQTTVERSION_DEFAULT;
}
}
else
command->command.details.conn.currentURI++;
if (command->client->c->MQTTVersion == MQTTVERSION_DEFAULT)
{
if (command->command.details.conn.MQTTVersion == MQTTVERSION_3_1)
{
command->command.details.conn.currentURI++;
command->command.details.conn.MQTTVersion = MQTTVERSION_DEFAULT;
}
} else
command->command.details.conn.currentURI++;
/* put the connect command back to the head of the command queue, using the next serverURI */
rc = MQTTAsync_addCommand(command, sizeof(command->command.details.conn));
}
else
rc = MQTTAsync_addCommand(command,
sizeof(command->command.details.conn));
} else
{
if (command->command.onFailure)
{
......@@ -2029,7 +2030,8 @@ static void MQTTAsync_closeOnly(Clients* client)
client->ping_outstanding = 0;
if (client->net.socket > 0)
{
if (client->connected)
MQTTProtocol_checkPendingWrites();
if (client->connected && Socket_noPendingWrites(client->net.socket))
MQTTPacket_send_disconnect(&client->net, client->clientID);
Thread_lock_mutex(socket_mutex);
#if defined(OPENSSL)
......
......@@ -418,6 +418,7 @@ int Socket_writev(int socket, iobuf* iovecs, int count, unsigned long* bytes)
int rc;
FUNC_ENTRY;
*bytes = 0L;
#if defined(WIN32) || defined(WIN64)
rc = WSASend(socket, iovecs, count, (LPDWORD)bytes, 0, NULL, NULL);
if (rc == SOCKET_ERROR)
......@@ -427,7 +428,6 @@ int Socket_writev(int socket, iobuf* iovecs, int count, unsigned long* bytes)
rc = TCPSOCKET_INTERRUPTED;
}
#else
*bytes = 0L;
rc = writev(socket, iovecs, count);
if (rc == SOCKET_ERROR)
{
......@@ -475,7 +475,7 @@ int Socket_putdatas(int socket, char* buf0, size_t buf0len, int count, char** bu
iovecs[0].iov_base = buf0;
iovecs[0].iov_len = (ULONG)buf0len;
frees1[0] = 1;
frees1[0] = 1; /* this buffer should be freed by SocketBuffer if the write is interrupted */
for (i = 0; i < count; i++)
{
iovecs[i+1].iov_base = buffers[i];
......@@ -735,7 +735,7 @@ int Socket_continueWrite(int socket)
int rc = 0;
pending_writes* pw;
unsigned long curbuflen = 0L, /* cumulative total of buffer lengths */
bytes;
bytes = 0L;
int curbuf = -1, i;
iobuf iovecs1[5];
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment