Commit 5edc4db3 authored by Juergen Kosel's avatar Juergen Kosel

Merge branch '373' of https://github.com/eclipse/paho.mqtt.c.git into 373

* '373' of https://github.com/eclipse/paho.mqtt.c.git:
  More cleanup when a socket is closed #373

# Conflicts:
#	src/Socket.c
Signed-off-by: 's avatarJuergen Kosel <juergen.kosel@softing.com>
parents 673955c8 993ecc7c
...@@ -1663,6 +1663,8 @@ void MQTTAsync_destroy(MQTTAsync* handle) ...@@ -1663,6 +1663,8 @@ void MQTTAsync_destroy(MQTTAsync* handle)
if (m == NULL) if (m == NULL)
goto exit; goto exit;
MQTTAsync_closeSession(m->c);
MQTTAsync_removeResponsesAndCommands(m); MQTTAsync_removeResponsesAndCommands(m);
ListFree(m->responses); ListFree(m->responses);
......
...@@ -52,6 +52,7 @@ int Socket_close_only(int socket); ...@@ -52,6 +52,7 @@ int Socket_close_only(int socket);
int Socket_continueWrite(int socket); int Socket_continueWrite(int socket);
int Socket_continueWrites(fd_set* pwset); int Socket_continueWrites(fd_set* pwset);
char* Socket_getaddrname(struct sockaddr* sa, int sock); char* Socket_getaddrname(struct sockaddr* sa, int sock);
int Socket_abortWrite(int socket);
#if defined(WIN32) || defined(WIN64) #if defined(WIN32) || defined(WIN64)
#define iov_len len #define iov_len len
...@@ -494,6 +495,7 @@ int Socket_putdatas(int socket, char* buf0, size_t buf0len, int count, char** bu ...@@ -494,6 +495,7 @@ int Socket_putdatas(int socket, char* buf0, size_t buf0len, int count, char** bu
if (!Socket_noPendingWrites(socket)) if (!Socket_noPendingWrites(socket))
{ {
Log(LOG_SEVERE, -1, "Trying to write to socket %d for which there is already pending output", socket); Log(LOG_SEVERE, -1, "Trying to write to socket %d for which there is already pending output", socket);
StackTrace_printStack(stdout);
rc = SOCKET_ERROR; rc = SOCKET_ERROR;
goto exit; goto exit;
} }
...@@ -523,6 +525,8 @@ int Socket_putdatas(int socket, char* buf0, size_t buf0len, int count, char** bu ...@@ -523,6 +525,8 @@ int Socket_putdatas(int socket, char* buf0, size_t buf0len, int count, char** bu
#if defined(OPENSSL) #if defined(OPENSSL)
SocketBuffer_pendingWrite(socket, NULL, count+1, iovecs, frees1, total, bytes); SocketBuffer_pendingWrite(socket, NULL, count+1, iovecs, frees1, total, bytes);
#else #else
//printf("Partial write for socket %d pending write created\n", socket);
StackTrace_printStack(stdout);
SocketBuffer_pendingWrite(socket, count+1, iovecs, frees1, total, bytes); SocketBuffer_pendingWrite(socket, count+1, iovecs, frees1, total, bytes);
#endif #endif
*sockmem = socket; *sockmem = socket;
...@@ -602,15 +606,17 @@ int Socket_close_only(int socket) ...@@ -602,15 +606,17 @@ int Socket_close_only(int socket)
void Socket_close(int socket) void Socket_close(int socket)
{ {
FUNC_ENTRY; FUNC_ENTRY;
//printf("Closing socket %d\n", socket);
Socket_close_only(socket); Socket_close_only(socket);
FD_CLR(socket, &(s.rset_saved)); FD_CLR(socket, &(s.rset_saved));
if (FD_ISSET(socket, &(s.pending_wset))) if (FD_ISSET(socket, &(s.pending_wset)))
FD_CLR(socket, &(s.pending_wset)); FD_CLR(socket, &(s.pending_wset));
if (s.cur_clientsds != NULL && *(int*)(s.cur_clientsds->content) == socket) if (s.cur_clientsds != NULL && *(int*)(s.cur_clientsds->content) == socket)
s.cur_clientsds = s.cur_clientsds->next; s.cur_clientsds = s.cur_clientsds->next;
Socket_abortWrite(socket);
SocketBuffer_cleanup(socket);
ListRemoveItem(s.connect_pending, &socket, intcompare); ListRemoveItem(s.connect_pending, &socket, intcompare);
ListRemoveItem(s.write_pending, &socket, intcompare); ListRemoveItem(s.write_pending, &socket, intcompare);
SocketBuffer_cleanup(socket);
if (ListRemoveItem(s.clientsds, &socket, intcompare)) if (ListRemoveItem(s.clientsds, &socket, intcompare))
Log(TRACE_MIN, -1, "Removed socket %d", socket); Log(TRACE_MIN, -1, "Removed socket %d", socket);
...@@ -713,14 +719,16 @@ int Socket_new(char* addr, int port, int* sock) ...@@ -713,14 +719,16 @@ int Socket_new(char* addr, int port, int* sock)
if (setsockopt(*sock, SOL_SOCKET, SO_NOSIGPIPE, (void*)&opt, sizeof(opt)) != 0) if (setsockopt(*sock, SOL_SOCKET, SO_NOSIGPIPE, (void*)&opt, sizeof(opt)) != 0)
Log(LOG_ERROR, -1, "Could not set SO_NOSIGPIPE for socket %d", *sock); Log(LOG_ERROR, -1, "Could not set SO_NOSIGPIPE for socket %d", *sock);
#endif #endif
#if defined(TESTING) //#define TESTING1
{ #if defined(TESTING1)
int optsend = 2 * 1440; if (1)
if (setsockopt(*sock, SOL_SOCKET, SO_SNDBUF, (void*)&optsend, sizeof(optsend)) != 0) {
Log(LOG_ERROR, -1, "Could not set SO_SNDBUF for socket %d", *sock); int optsend = 100; //2 * 1440;
} printf("Setting optsend to %d\n", optsend);
if (setsockopt(*sock, SOL_SOCKET, SO_SNDBUF, (void*)&optsend, sizeof(optsend)) != 0)
Log(LOG_ERROR, -1, "Could not set SO_SNDBUF for socket %d", *sock);
}
#endif #endif
Log(TRACE_MIN, -1, "New socket %d for %s, port %d", *sock, addr, port); Log(TRACE_MIN, -1, "New socket %d for %s, port %d", *sock, addr, port);
if (Socket_addSocket(*sock) == SOCKET_ERROR) if (Socket_addSocket(*sock) == SOCKET_ERROR)
rc = Socket_error("addSocket", *sock); rc = Socket_error("addSocket", *sock);
...@@ -766,6 +774,8 @@ void Socket_setWriteCompleteCallback(Socket_writeComplete* mywritecomplete) ...@@ -766,6 +774,8 @@ void Socket_setWriteCompleteCallback(Socket_writeComplete* mywritecomplete)
writecomplete = mywritecomplete; writecomplete = mywritecomplete;
} }
/** /**
* Continue an outstanding write for a particular socket * Continue an outstanding write for a particular socket
* @param socket that socket * @param socket that socket
...@@ -824,6 +834,7 @@ int Socket_continueWrite(int socket) ...@@ -824,6 +834,7 @@ int Socket_continueWrite(int socket)
} }
} }
rc = 1; /* signal complete */ rc = 1; /* signal complete */
//printf("Partial write complete for socket %d\n", socket);
Log(TRACE_MIN, -1, "ContinueWrite: partial write now complete for socket %d", socket); Log(TRACE_MIN, -1, "ContinueWrite: partial write now complete for socket %d", socket);
} }
else else
...@@ -842,6 +853,7 @@ int Socket_continueWrite(int socket) ...@@ -842,6 +853,7 @@ int Socket_continueWrite(int socket)
pw->iovecs[i].iov_base = NULL; pw->iovecs[i].iov_base = NULL;
} }
} }
//printf("Partial write aborted for socket %d\n", socket);
} }
#if defined(OPENSSL) #if defined(OPENSSL)
exit: exit:
...@@ -851,6 +863,45 @@ exit: ...@@ -851,6 +863,45 @@ exit:
} }
/**
* Continue an outstanding write for a particular socket
* @param socket that socket
* @return completion code: 0=incomplete, 1=complete, -1=socket error
*/
int Socket_abortWrite(int socket)
{
int i = -1, rc = 0;
pending_writes* pw;
FUNC_ENTRY;
//printf("In abortWrite for socket %d\n", socket);
if ((pw = SocketBuffer_getWrite(socket)) == NULL)
goto exit;
#if defined(OPENSSL)
if (pw->ssl)
{
//rc = SSLSocket_continueWrite(pw);
goto exit;
}
#endif
for (i = 0; i < pw->count; i++)
{
if (pw->frees[i])
{
printf("cleaning in abortwrite for socket %d\n", socket);
free(pw->iovecs[i].iov_base);
}
}
exit:
//printf("Exit abortWrite for socket %d\n", socket);
FUNC_EXIT_RC(rc);
return rc;
}
/** /**
* Continue any outstanding writes for a socket set * Continue any outstanding writes for a socket set
* @param pwset the set of sockets * @param pwset the set of sockets
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment