Commit 82275178 authored by Ian Craggs's avatar Ian Craggs

Some corrections for failing socket writes #385

parent 6fa19a82
...@@ -1082,7 +1082,7 @@ static void MQTTAsync_writeComplete(int socket) ...@@ -1082,7 +1082,7 @@ static void MQTTAsync_writeComplete(int socket)
if (cur_response) /* we found a response */ if (cur_response) /* we found a response */
{ {
if (command->onSuccess) if (command->type == PUBLISH && command->onSuccess)
{ {
MQTTAsync_successData data; MQTTAsync_successData data;
...@@ -1095,8 +1095,11 @@ static void MQTTAsync_writeComplete(int socket) ...@@ -1095,8 +1095,11 @@ static void MQTTAsync_writeComplete(int socket)
Log(TRACE_MIN, -1, "Calling publish success for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Calling publish success for client %s", m->c->clientID);
(*(command->onSuccess))(command->context, &data); (*(command->onSuccess))(command->context, &data);
} }
ListDetach(m->responses, com); if (com)
MQTTAsync_freeCommand(com); {
ListDetach(m->responses, com);
MQTTAsync_freeCommand(com);
}
} }
m->pending_write = NULL; m->pending_write = NULL;
} }
......
...@@ -431,9 +431,9 @@ int Socket_writev(int socket, iobuf* iovecs, int count, unsigned long* bytes) ...@@ -431,9 +431,9 @@ int Socket_writev(int socket, iobuf* iovecs, int count, unsigned long* bytes)
//#define TESTING //#define TESTING
#if defined(TESTING) #if defined(TESTING)
static int i = 0; static int i = 0;
if (++i == 100) if (++i % 100 == 1)
{ {
if (0) if (1)
{ {
printf("Deliberately simulating TCPSOCKET_INTERRUPTED\n"); printf("Deliberately simulating TCPSOCKET_INTERRUPTED\n");
rc = TCPSOCKET_INTERRUPTED; /* simulate a network wait */ rc = TCPSOCKET_INTERRUPTED; /* simulate a network wait */
...@@ -807,10 +807,14 @@ int Socket_continueWrite(int socket) ...@@ -807,10 +807,14 @@ int Socket_continueWrite(int socket)
if (pw->frees[i]) if (pw->frees[i])
free(pw->iovecs[i].iov_base); free(pw->iovecs[i].iov_base);
} }
rc = 1; /* signal complete */
Log(TRACE_MIN, -1, "ContinueWrite: partial write now complete for socket %d", socket); Log(TRACE_MIN, -1, "ContinueWrite: partial write now complete for socket %d", socket);
} }
else else
{
rc = 0; /* signal not complete */
Log(TRACE_MIN, -1, "ContinueWrite wrote +%lu bytes on socket %d", bytes, socket); Log(TRACE_MIN, -1, "ContinueWrite wrote +%lu bytes on socket %d", bytes, socket);
}
} }
else /* if we got SOCKET_ERROR we need to clean up anyway - a partial write is no good anymore */ else /* if we got SOCKET_ERROR we need to clean up anyway - a partial write is no good anymore */
{ {
...@@ -842,7 +846,9 @@ int Socket_continueWrites(fd_set* pwset) ...@@ -842,7 +846,9 @@ int Socket_continueWrites(fd_set* pwset)
while (curpending) while (curpending)
{ {
int socket = *(int*)(curpending->content); int socket = *(int*)(curpending->content);
if (FD_ISSET(socket, pwset) && Socket_continueWrite(socket)) int rc = 0;
if (FD_ISSET(socket, pwset) && (rc = Socket_continueWrite(socket)))
{ {
if (!SocketBuffer_writeComplete(socket)) if (!SocketBuffer_writeComplete(socket))
Log(LOG_SEVERE, -1, "Failed to remove pending write from socket buffer list"); Log(LOG_SEVERE, -1, "Failed to remove pending write from socket buffer list");
...@@ -854,7 +860,7 @@ int Socket_continueWrites(fd_set* pwset) ...@@ -854,7 +860,7 @@ int Socket_continueWrites(fd_set* pwset)
} }
curpending = s.write_pending->current; curpending = s.write_pending->current;
if (writecomplete) if (rc == 1 && writecomplete)
(*writecomplete)(socket); (*writecomplete)(socket);
} }
else else
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment