Commit 993ecc7c authored by Ian Craggs's avatar Ian Craggs

More cleanup when a socket is closed #373

parent 2cff9d01
......@@ -1645,6 +1645,8 @@ void MQTTAsync_destroy(MQTTAsync* handle)
if (m == NULL)
goto exit;
MQTTAsync_closeSession(m->c);
MQTTAsync_removeResponsesAndCommands(m);
ListFree(m->responses);
......
......@@ -52,6 +52,7 @@ int Socket_close_only(int socket);
int Socket_continueWrite(int socket);
int Socket_continueWrites(fd_set* pwset);
char* Socket_getaddrname(struct sockaddr* sa, int sock);
int Socket_abortWrite(int socket);
#if defined(WIN32) || defined(WIN64)
#define iov_len len
......@@ -494,6 +495,7 @@ int Socket_putdatas(int socket, char* buf0, size_t buf0len, int count, char** bu
if (!Socket_noPendingWrites(socket))
{
Log(LOG_SEVERE, -1, "Trying to write to socket %d for which there is already pending output", socket);
StackTrace_printStack(stdout);
rc = SOCKET_ERROR;
goto exit;
}
......@@ -523,6 +525,8 @@ int Socket_putdatas(int socket, char* buf0, size_t buf0len, int count, char** bu
#if defined(OPENSSL)
SocketBuffer_pendingWrite(socket, NULL, count+1, iovecs, frees1, total, bytes);
#else
//printf("Partial write for socket %d pending write created\n", socket);
StackTrace_printStack(stdout);
SocketBuffer_pendingWrite(socket, count+1, iovecs, frees1, total, bytes);
#endif
*sockmem = socket;
......@@ -596,15 +600,17 @@ int Socket_close_only(int socket)
void Socket_close(int socket)
{
FUNC_ENTRY;
//printf("Closing socket %d\n", socket);
Socket_close_only(socket);
FD_CLR(socket, &(s.rset_saved));
if (FD_ISSET(socket, &(s.pending_wset)))
FD_CLR(socket, &(s.pending_wset));
if (s.cur_clientsds != NULL && *(int*)(s.cur_clientsds->content) == socket)
s.cur_clientsds = s.cur_clientsds->next;
Socket_abortWrite(socket);
SocketBuffer_cleanup(socket);
ListRemoveItem(s.connect_pending, &socket, intcompare);
ListRemoveItem(s.write_pending, &socket, intcompare);
SocketBuffer_cleanup(socket);
if (ListRemoveItem(s.clientsds, &socket, intcompare))
Log(TRACE_MIN, -1, "Removed socket %d", socket);
......@@ -707,7 +713,16 @@ int Socket_new(char* addr, int port, int* sock)
if (setsockopt(*sock, SOL_SOCKET, SO_NOSIGPIPE, (void*)&opt, sizeof(opt)) != 0)
Log(LOG_ERROR, -1, "Could not set SO_NOSIGPIPE for socket %d", *sock);
#endif
//#define TESTING1
#if defined(TESTING1)
if (1)
{
int optsend = 100; //2 * 1440;
printf("Setting optsend to %d\n", optsend);
if (setsockopt(*sock, SOL_SOCKET, SO_SNDBUF, (void*)&optsend, sizeof(optsend)) != 0)
Log(LOG_ERROR, -1, "Could not set SO_SNDBUF for socket %d", *sock);
}
#endif
Log(TRACE_MIN, -1, "New socket %d for %s, port %d", *sock, addr, port);
if (Socket_addSocket(*sock) == SOCKET_ERROR)
rc = Socket_error("addSocket", *sock);
......@@ -753,6 +768,8 @@ void Socket_setWriteCompleteCallback(Socket_writeComplete* mywritecomplete)
writecomplete = mywritecomplete;
}
/**
* Continue an outstanding write for a particular socket
* @param socket that socket
......@@ -808,6 +825,7 @@ int Socket_continueWrite(int socket)
free(pw->iovecs[i].iov_base);
}
rc = 1; /* signal complete */
//printf("Partial write complete for socket %d\n", socket);
Log(TRACE_MIN, -1, "ContinueWrite: partial write now complete for socket %d", socket);
}
else
......@@ -823,6 +841,7 @@ int Socket_continueWrite(int socket)
if (pw->frees[i])
free(pw->iovecs[i].iov_base);
}
//printf("Partial write aborted for socket %d\n", socket);
}
#if defined(OPENSSL)
exit:
......@@ -832,6 +851,45 @@ exit:
}
/**
* Continue an outstanding write for a particular socket
* @param socket that socket
* @return completion code: 0=incomplete, 1=complete, -1=socket error
*/
int Socket_abortWrite(int socket)
{
int i = -1, rc = 0;
pending_writes* pw;
FUNC_ENTRY;
//printf("In abortWrite for socket %d\n", socket);
if ((pw = SocketBuffer_getWrite(socket)) == NULL)
goto exit;
#if defined(OPENSSL)
if (pw->ssl)
{
//rc = SSLSocket_continueWrite(pw);
goto exit;
}
#endif
for (i = 0; i < pw->count; i++)
{
if (pw->frees[i])
{
printf("cleaning in abortwrite for socket %d\n", socket);
free(pw->iovecs[i].iov_base);
}
}
exit:
//printf("Exit abortWrite for socket %d\n", socket);
FUNC_EXIT_RC(rc);
return rc;
}
/**
* Continue any outstanding writes for a socket set
* @param pwset the set of sockets
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment