Commit cb8d4b70 authored by Ian Craggs's avatar Ian Craggs

Make sure writeComplete deals with socket error #373

parent 82275178
...@@ -361,7 +361,7 @@ static void MQTTProtocol_checkPendingWrites(void); ...@@ -361,7 +361,7 @@ static void MQTTProtocol_checkPendingWrites(void);
static void MQTTAsync_freeServerURIs(MQTTAsyncs* m); static void MQTTAsync_freeServerURIs(MQTTAsyncs* m);
static void MQTTAsync_freeCommand1(MQTTAsync_queuedCommand *command); static void MQTTAsync_freeCommand1(MQTTAsync_queuedCommand *command);
static void MQTTAsync_freeCommand(MQTTAsync_queuedCommand *command); static void MQTTAsync_freeCommand(MQTTAsync_queuedCommand *command);
static void MQTTAsync_writeComplete(int socket); static void MQTTAsync_writeComplete(int socket, int rc);
static int MQTTAsync_processCommand(void); static int MQTTAsync_processCommand(void);
static void MQTTAsync_checkTimeouts(void); static void MQTTAsync_checkTimeouts(void);
static thread_return_type WINAPI MQTTAsync_sendThread(void* n); static thread_return_type WINAPI MQTTAsync_sendThread(void* n);
...@@ -1050,7 +1050,7 @@ static void MQTTAsync_freeCommand(MQTTAsync_queuedCommand *command) ...@@ -1050,7 +1050,7 @@ static void MQTTAsync_freeCommand(MQTTAsync_queuedCommand *command)
} }
static void MQTTAsync_writeComplete(int socket) static void MQTTAsync_writeComplete(int socket, int rc)
{ {
ListElement* found = NULL; ListElement* found = NULL;
...@@ -1082,7 +1082,9 @@ static void MQTTAsync_writeComplete(int socket) ...@@ -1082,7 +1082,9 @@ static void MQTTAsync_writeComplete(int socket)
if (cur_response) /* we found a response */ if (cur_response) /* we found a response */
{ {
if (command->type == PUBLISH && command->onSuccess) if (command->type == PUBLISH)
{
if (rc == 1 && command->onSuccess)
{ {
MQTTAsync_successData data; MQTTAsync_successData data;
...@@ -1095,6 +1097,17 @@ static void MQTTAsync_writeComplete(int socket) ...@@ -1095,6 +1097,17 @@ static void MQTTAsync_writeComplete(int socket)
Log(TRACE_MIN, -1, "Calling publish success for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Calling publish success for client %s", m->c->clientID);
(*(command->onSuccess))(command->context, &data); (*(command->onSuccess))(command->context, &data);
} }
else if (rc == -1 && command->onFailure)
{
MQTTAsync_failureData data;
data.token = command->token;
data.code = rc;
data.message = NULL;
Log(TRACE_MIN, -1, "Calling publish failure for client %s", m->c->clientID);
(*(command->onFailure))(command->context, &data);
}
}
if (com) if (com)
{ {
ListDetach(m->responses, com); ListDetach(m->responses, com);
......
...@@ -295,7 +295,7 @@ static MQTTPacket* MQTTClient_cycle(int* sock, unsigned long timeout, int* rc); ...@@ -295,7 +295,7 @@ static MQTTPacket* MQTTClient_cycle(int* sock, unsigned long timeout, int* rc);
static MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* rc, long timeout); static MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* rc, long timeout);
/*static int pubCompare(void* a, void* b); */ /*static int pubCompare(void* a, void* b); */
static void MQTTProtocol_checkPendingWrites(void); static void MQTTProtocol_checkPendingWrites(void);
static void MQTTClient_writeComplete(int socket); static void MQTTClient_writeComplete(int socket, int rc);
int MQTTClient_create(MQTTClient* handle, const char* serverURI, const char* clientId, int MQTTClient_create(MQTTClient* handle, const char* serverURI, const char* clientId,
...@@ -2082,7 +2082,7 @@ static void MQTTProtocol_checkPendingWrites(void) ...@@ -2082,7 +2082,7 @@ static void MQTTProtocol_checkPendingWrites(void)
} }
static void MQTTClient_writeComplete(int socket) static void MQTTClient_writeComplete(int socket, int rc)
{ {
ListElement* found = NULL; ListElement* found = NULL;
......
...@@ -428,10 +428,10 @@ int Socket_writev(int socket, iobuf* iovecs, int count, unsigned long* bytes) ...@@ -428,10 +428,10 @@ int Socket_writev(int socket, iobuf* iovecs, int count, unsigned long* bytes)
rc = TCPSOCKET_INTERRUPTED; rc = TCPSOCKET_INTERRUPTED;
} }
#else #else
//#define TESTING #define TESTING
#if defined(TESTING) #if defined(TESTING)
static int i = 0; static int i = 0;
if (++i % 100 == 1) if (++i >= 10 && i < 21)
{ {
if (1) if (1)
{ {
...@@ -444,7 +444,7 @@ int Socket_writev(int socket, iobuf* iovecs, int count, unsigned long* bytes) ...@@ -444,7 +444,7 @@ int Socket_writev(int socket, iobuf* iovecs, int count, unsigned long* bytes)
rc = SOCKET_ERROR; rc = SOCKET_ERROR;
} }
/* should *bytes always be 0? */ /* should *bytes always be 0? */
if (1) if (i == 20)
{ {
printf("Shutdown socket\n"); printf("Shutdown socket\n");
shutdown(socket, SHUT_WR); shutdown(socket, SHUT_WR);
...@@ -756,7 +756,7 @@ void Socket_setWriteCompleteCallback(Socket_writeComplete* mywritecomplete) ...@@ -756,7 +756,7 @@ void Socket_setWriteCompleteCallback(Socket_writeComplete* mywritecomplete)
/** /**
* Continue an outstanding write for a particular socket * Continue an outstanding write for a particular socket
* @param socket that socket * @param socket that socket
* @return completion code * @return completion code: 0=incomplete, 1=complete, -1=socket error
*/ */
int Socket_continueWrite(int socket) int Socket_continueWrite(int socket)
{ {
...@@ -848,7 +848,7 @@ int Socket_continueWrites(fd_set* pwset) ...@@ -848,7 +848,7 @@ int Socket_continueWrites(fd_set* pwset)
int socket = *(int*)(curpending->content); int socket = *(int*)(curpending->content);
int rc = 0; int rc = 0;
if (FD_ISSET(socket, pwset) && (rc = Socket_continueWrite(socket))) if (FD_ISSET(socket, pwset) && ((rc = Socket_continueWrite(socket)) != 0))
{ {
if (!SocketBuffer_writeComplete(socket)) if (!SocketBuffer_writeComplete(socket))
Log(LOG_SEVERE, -1, "Failed to remove pending write from socket buffer list"); Log(LOG_SEVERE, -1, "Failed to remove pending write from socket buffer list");
...@@ -860,8 +860,8 @@ int Socket_continueWrites(fd_set* pwset) ...@@ -860,8 +860,8 @@ int Socket_continueWrites(fd_set* pwset)
} }
curpending = s.write_pending->current; curpending = s.write_pending->current;
if (rc == 1 && writecomplete) if (writecomplete)
(*writecomplete)(socket); (*writecomplete)(socket, rc);
} }
else else
ListNextElement(s.write_pending, &curpending); ListNextElement(s.write_pending, &curpending);
......
...@@ -137,7 +137,7 @@ char* Socket_getpeer(int sock); ...@@ -137,7 +137,7 @@ char* Socket_getpeer(int sock);
void Socket_addPendingWrite(int socket); void Socket_addPendingWrite(int socket);
void Socket_clearPendingWrite(int socket); void Socket_clearPendingWrite(int socket);
typedef void Socket_writeComplete(int socket); typedef void Socket_writeComplete(int socket, int rc);
void Socket_setWriteCompleteCallback(Socket_writeComplete*); void Socket_setWriteCompleteCallback(Socket_writeComplete*);
#endif /* SOCKET_H */ #endif /* SOCKET_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment