Commit dc6dc24a authored by Ian Craggs's avatar Ian Craggs

Merge branch '373' of https://github.com/JuergenKosel/paho.mqtt.c into JuergenKosel-373

parents b69d1ad2 231eda09
......@@ -247,10 +247,17 @@ static int Internal_heap_unlink(char* file, int line, void* p)
*/
void myfree(char* file, int line, void* p)
{
Thread_lock_mutex(heap_mutex);
if (Internal_heap_unlink(file, line, p))
free(((int*)p)-1);
Thread_unlock_mutex(heap_mutex);
if (p) /* it is legal und usual to call free(NULL) */
{
Thread_lock_mutex(heap_mutex);
if (Internal_heap_unlink(file, line, p))
free(((int*)p)-1);
Thread_unlock_mutex(heap_mutex);
}
else
{
Log(LOG_ERROR, -1, "Call of free(NULL) in %s,%d",file,line);
}
}
......@@ -479,3 +486,8 @@ int main(int argc, char *argv[])
}
#endif /* HEAP_UNIT_TESTS */
/* Local Variables: */
/* indent-tabs-mode: t */
/* c-basic-offset: 8 */
/* End: */
......@@ -59,6 +59,9 @@ typedef struct
size_t max_size; /**< max size the heap has reached in bytes */
} heap_info;
#if defined(__cplusplus)
extern "C" {
#endif
void* mymalloc(char*, int, size_t size);
void* myrealloc(char*, int, void* p, size_t size);
......@@ -72,5 +75,8 @@ int HeapDump(FILE* file);
int HeapDumpString(FILE* file, char* str);
void* Heap_findItem(void* p);
void Heap_unlink(char* file, int line, void* p);
#ifdef __cplusplus
}
#endif
#endif
......@@ -218,7 +218,10 @@ static int ListUnlink(List* aList, void* content, int(*callback)(void*, void*),
next = aList->current->next;
if (freeContent)
{
free(aList->current->content);
aList->current->content = NULL;
}
if (saved == aList->current)
saveddeleted = 1;
free(aList->current);
......@@ -357,7 +360,10 @@ void ListEmpty(List* aList)
{
ListElement* first = aList->first;
if (first->content != NULL)
{
free(first->content);
first->content = NULL;
}
aList->first = first->next;
free(first);
}
......
......@@ -1000,6 +1000,17 @@ static void MQTTAsync_checkDisconnect(MQTTAsync handle, MQTTAsync_command* comma
FUNC_EXIT;
}
/**
* Call Socket_noPendingWrites(int socket) with protection by socket_mutex, see https://github.com/eclipse/paho.mqtt.c/issues/385
*/
static int MQTTAsync_Socket_noPendingWrites(int socket)
{
int rc;
Thread_lock_mutex(socket_mutex);
rc = Socket_noPendingWrites(socket);
Thread_unlock_mutex(socket_mutex);
return rc;
}
/**
* See if any pending writes have been completed, and cleanup if so.
......@@ -1035,8 +1046,10 @@ static void MQTTAsync_freeServerURIs(MQTTAsyncs* m)
for (i = 0; i < m->serverURIcount; ++i)
free(m->serverURIs[i]);
m->serverURIcount = 0;
if (m->serverURIs)
free(m->serverURIs);
m->serverURIs = NULL;
}
......@@ -1050,7 +1063,9 @@ static void MQTTAsync_freeCommand1(MQTTAsync_queuedCommand *command)
free(command->command.details.sub.topics[i]);
free(command->command.details.sub.topics);
command->command.details.sub.topics = NULL;
free(command->command.details.sub.qoss);
command->command.details.sub.qoss = NULL;
}
else if (command->command.type == UNSUBSCRIBE)
{
......@@ -1060,13 +1075,16 @@ static void MQTTAsync_freeCommand1(MQTTAsync_queuedCommand *command)
free(command->command.details.unsub.topics[i]);
free(command->command.details.unsub.topics);
command->command.details.unsub.topics = NULL;
}
else if (command->command.type == PUBLISH)
{
/* qos 1 and 2 topics are freed in the protocol code when the flows are completed */
if (command->command.details.pub.destinationName)
free(command->command.details.pub.destinationName);
command->command.details.pub.destinationName = NULL;
free(command->command.details.pub.payload);
command->command.details.pub.payload = NULL;
}
}
......@@ -1174,7 +1192,7 @@ static int MQTTAsync_processCommand(void)
continue;
if (cmd->command.type == CONNECT || cmd->command.type == DISCONNECT || (cmd->client->c->connected &&
cmd->client->c->connect_state == 0 && Socket_noPendingWrites(cmd->client->c->net.socket)))
cmd->client->c->connect_state == 0 && MQTTAsync_Socket_noPendingWrites(cmd->client->c->net.socket)))
{
if ((cmd->command.type == PUBLISH || cmd->command.type == SUBSCRIBE || cmd->command.type == UNSUBSCRIBE) &&
cmd->client->c->outboundMsgs->count >= MAX_MSG_ID - 1)
......@@ -3001,10 +3019,8 @@ static MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
if ((*sock = SSLSocket_getPendingRead()) == -1)
{
#endif
Thread_lock_mutex(socket_mutex);
/* 0 from getReadySocket indicates no work to do, -1 == error, but can happen normally */
*sock = Socket_getReadySocket(0, &tp);
Thread_unlock_mutex(socket_mutex);
*sock = Socket_getReadySocket(0, &tp,socket_mutex);
if (!tostop && *sock == 0 && (tp.tv_sec > 0L || tp.tv_usec > 0L))
MQTTAsync_sleep(100L);
#if defined(OPENSSL)
......
......@@ -1729,9 +1729,7 @@ static MQTTPacket* MQTTClient_cycle(int* sock, unsigned long timeout, int* rc)
{
/* 0 from getReadySocket indicates no work to do, -1 == error, but can happen normally */
#endif
Thread_lock_mutex(socket_mutex);
*sock = Socket_getReadySocket(0, &tp);
Thread_unlock_mutex(socket_mutex);
*sock = Socket_getReadySocket(0, &tp, socket_mutex);
#if defined(OPENSSL)
}
#endif
......
......@@ -734,22 +734,3 @@ int MQTTPacket_send_publish(Publish* pack, int dup, int qos, int retained, netwo
FUNC_EXIT_RC(rc);
return rc;
}
/**
* Free allocated storage for a various packet tyoes
* @param pack pointer to the suback packet structure
*/
void MQTTPacket_free_packet(MQTTPacket* pack)
{
FUNC_ENTRY;
if (pack->header.bits.type == PUBLISH)
MQTTPacket_freePublish((Publish*)pack);
/*else if (pack->header.type == SUBSCRIBE)
MQTTPacket_freeSubscribe((Subscribe*)pack, 1);
else if (pack->header.type == UNSUBSCRIBE)
MQTTPacket_freeUnsubscribe((Unsubscribe*)pack);*/
else
free(pack);
FUNC_EXIT;
}
......@@ -253,8 +253,6 @@ int MQTTPacket_send_pubrec(int msgid, networkHandles* net, const char* clientID)
int MQTTPacket_send_pubrel(int msgid, int dup, networkHandles* net, const char* clientID);
int MQTTPacket_send_pubcomp(int msgid, networkHandles* net, const char* clientID);
void MQTTPacket_free_packet(MQTTPacket* pack);
#if !defined(NO_BRIDGE)
#include "MQTTPacketOut.h"
#endif
......
......@@ -675,11 +675,13 @@ void MQTTProtocol_freeClient(Clients* client)
MQTTProtocol_freeMessageList(client->inboundMsgs);
ListFree(client->messageQueue);
free(client->clientID);
client->clientID = NULL;
if (client->will)
{
free(client->will->payload);
free(client->will->topic);
free(client->will);
client->will = NULL;
}
if (client->username)
free((void*)client->username);
......@@ -704,6 +706,7 @@ void MQTTProtocol_freeClient(Clients* client)
free((void*)client->sslopts->CApath);
}
free(client->sslopts);
client->sslopts = NULL;
}
#endif
/* don't free the client structure itself... this is done elsewhere */
......
......@@ -881,9 +881,12 @@ int SSLSocket_putdatas(SSL* ssl, int socket, char* buf0, size_t buf0len, int cou
free(buf0);
for (i = 0; i < count; ++i)
{
if (frees[i])
free(buffers[i]);
}
if (frees[i])
{
free(buffers[i]);
buffers[i] = NULL;
}
}
}
FUNC_EXIT_RC(rc);
return rc;
......
......@@ -228,7 +228,7 @@ int isReady(int socket, fd_set* read_set, fd_set* write_set)
* @param tp the timeout to be used for the select, unless overridden
* @return the socket next ready, or 0 if none is ready
*/
int Socket_getReadySocket(int more_work, struct timeval *tp)
int Socket_getReadySocket(int more_work, struct timeval *tp, mutex_type mutex)
{
int rc = 0;
static struct timeval zero = {0L, 0L}; /* 0 seconds */
......@@ -236,6 +236,7 @@ int Socket_getReadySocket(int more_work, struct timeval *tp)
struct timeval timeout = one;
FUNC_ENTRY;
Thread_lock_mutex(mutex);
if (s.clientsds->count == 0)
goto exit;
......@@ -258,7 +259,11 @@ int Socket_getReadySocket(int more_work, struct timeval *tp)
memcpy((void*)&(s.rset), (void*)&(s.rset_saved), sizeof(s.rset));
memcpy((void*)&(pwset), (void*)&(s.pending_wset), sizeof(pwset));
if ((rc = select(s.maxfdp1, &(s.rset), &pwset, NULL, &timeout)) == SOCKET_ERROR)
/* Prevent performance issue by unlocking the socket_mutex while waiting for a ready socket. */
Thread_unlock_mutex(mutex);
rc = select(s.maxfdp1, &(s.rset), &pwset, NULL, &timeout);
Thread_lock_mutex(mutex);
if (rc == SOCKET_ERROR)
{
Socket_error("read select", 0);
goto exit;
......@@ -301,6 +306,7 @@ int Socket_getReadySocket(int more_work, struct timeval *tp)
ListNextElement(s.clientsds, &s.cur_clientsds);
}
exit:
Thread_unlock_mutex(mutex);
FUNC_EXIT_RC(rc);
return rc;
} /* end getReadySocket */
......@@ -538,6 +544,12 @@ int Socket_putdatas(int socket, char* buf0, size_t buf0len, int count, char** bu
}
}
exit:
#if 0
if (rc == TCPSOCKET_INTERRUPTED)
{
Log(LOG_ERROR, -1, "Socket_putdatas: TCPSOCKET_INTERRUPTED");
}
#endif
FUNC_EXIT_RC(rc);
return rc;
}
......@@ -826,7 +838,10 @@ int Socket_continueWrite(int socket)
for (i = 0; i < pw->count; i++)
{
if (pw->frees[i])
{
free(pw->iovecs[i].iov_base);
pw->iovecs[i].iov_base = NULL;
}
}
rc = 1; /* signal complete */
Log(TRACE_MIN, -1, "ContinueWrite: partial write now complete for socket %d", socket);
......@@ -842,7 +857,10 @@ int Socket_continueWrite(int socket)
for (i = 0; i < pw->count; i++)
{
if (pw->frees[i])
{
free(pw->iovecs[i].iov_base);
pw->iovecs[i].iov_base = NULL;
}
}
}
#if defined(OPENSSL)
......
......@@ -65,6 +65,8 @@
#define ULONG size_t
#endif
#include "mutex_type.h" /* Needed for mutex_type */
/** socket operation completed successfully */
#define TCPSOCKET_COMPLETE 0
#if !defined(SOCKET_ERROR)
......@@ -124,7 +126,7 @@ typedef struct
void Socket_outInitialize(void);
void Socket_outTerminate(void);
int Socket_getReadySocket(int more_work, struct timeval *tp);
int Socket_getReadySocket(int more_work, struct timeval *tp, mutex_type mutex);
int Socket_getch(int socket, char* c);
char *Socket_getdata(int socket, size_t bytes, size_t* actual_len);
int Socket_putdatas(int socket, char* buf0, size_t buf0len, int count, char** buffers, size_t* buflens, int* frees);
......
......@@ -106,6 +106,7 @@ void SocketBuffer_freeDefQ(void)
{
free(def_queue->buf);
free(def_queue);
def_queue = NULL;
}
......
......@@ -21,13 +21,14 @@
#if !defined(THREAD_H)
#define THREAD_H
#include "mutex_type.h" /* Needed for mutex_type */
#if defined(WIN32) || defined(WIN64)
#include <windows.h>
#define thread_type HANDLE
#define thread_id_type DWORD
#define thread_return_type DWORD
#define thread_fn LPTHREAD_START_ROUTINE
#define mutex_type HANDLE
#define cond_type HANDLE
#define sem_type HANDLE
#else
......@@ -37,7 +38,6 @@
#define thread_id_type pthread_t
#define thread_return_type void*
typedef thread_return_type (*thread_fn)(void*);
#define mutex_type pthread_mutex_t*
typedef struct { pthread_cond_t cond; pthread_mutex_t mutex; } cond_type_struct;
typedef cond_type_struct *cond_type;
#if defined(OSX)
......
/*******************************************************************************
* Copyright (c) 2009, 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
*******************************************************************************/
#ifndef _MUTEX_TYPE_H_
#define _MUTEX_TYPE_H_
#if defined(WIN32) || defined(WIN64)
#include <windows.h>
#define mutex_type HANDLE
#else
#include <pthread.h>
#define mutex_type pthread_mutex_t*
#endif
#endif /* _MUTEX_TYPE_H_ */
......@@ -446,3 +446,13 @@ SET_TESTS_PROPERTIES(
test9-6-offline-buffering-max-buffered-binary-will
PROPERTIES TIMEOUT 540
)
ADD_EXECUTABLE(
test_issue373
test_issue373.c
)
TARGET_LINK_LIBRARIES(
test_issue373
paho-mqtt3a
)
#!/usr/bin/python
import os
import sys
import time
import subprocess
import random
bindir='/usr/bin'
sys.path.append(bindir)
def input_sel(prompt,max_,selectionoption):
# let the user choose the VM and verify selection
while True:
try:
print ('Please select from the list of running VMs\n\n'+'\n'.join(selectionoption))
userin = int(raw_input(prompt))
except ValueError:
print('\nThat was not a number\n\n')
continue
if userin > max_:
print('\nInput must be less than or equal to {0}.\n\n'.format(max_))
elif userin < 1:
print('\nInput must be greater than or equal to 1\n\n')
else:
return userin
def statustext(result):
if result == 0:
status = 'OK'
else:
status = 'Failed'
return status
def controlvmnetworkstate():
try:
offtime = 600
ontime = 14
vmdict={}
vmlist=[]
executable = os.path.join(bindir, 'VBoxManage')
#retrieve a list of all running VMs
runningvms= subprocess.check_output('%s list runningvms' %executable,shell=True).splitlines()
if len(runningvms) != 0:
for n in range(0, len(runningvms)):
vmlist.append('%s: %s' %(n+1,runningvms[n].rsplit(' ',1)[0].strip('"')))
vmdict[n+1]=runningvms[n].rsplit(' ',1)[-1]
usersel=input_sel('\nEnter the number of the VM: ',len(runningvms),vmlist)
else:
print('Can not retrieve list of running VMs')
sys.exit()
vmuuid=vmdict[usersel]
while True:
offtime = random.randint(60, 90)
ontime = random.randint(10, 90)
timenow = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
on = subprocess.call('%s controlvm %s setlinkstate1 on' %(executable,vmuuid),
shell=True)
status=statustext(on)
print ('%s: Plug Network cable into VM %s for %ds: %s' % (timenow, runningvms[usersel-1].rsplit(' ',1)[0].strip('"'),ontime, str(status)))
time.sleep(ontime)
timenow = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
off = subprocess.call('%s controlvm %s setlinkstate1 off' %(executable,vmuuid),
shell=True)
status = statustext(off)
print ('%s: Unplug Network cable from VM %s for %ds: %s' % (timenow, runningvms[usersel-1].rsplit(' ',1)[0].strip('"'),offtime, str(status)))
time.sleep(offtime)
except KeyboardInterrupt:
sys.exit('\nUser Interrupt')
except Exception as e:
print("Error in %s in function %s: %s" % (__name__, sys._getframe().f_code.co_name, e.message))
if __name__ == "__main__":
sys.exit(controlvmnetworkstate())
......@@ -510,13 +510,13 @@ int recreateReconnect(void)
MyLog(LOGA_ALWAYS, "Recreating client");
MQTTAsync_destroy(&client); /* destroy the client object so that we force persistence to be read on recreate */
#if !defined(_WINDOWS)
heap_info* mqtt_mem = 0;
/*mqtt_mem = Heap_get_info();
MyLog(LOGA_INFO, "MQTT mem current %ld, max %ld",mqtt_mem->current_size,mqtt_mem->max_size);
if (mqtt_mem->current_size > 20)
HeapScan(5); */
#endif
rc = MQTTAsync_create(&client, opts.connection, opts.clientid, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
if (rc != MQTTASYNC_SUCCESS)
{
......@@ -1028,12 +1028,5 @@ exit:
destroy_exit:
MQTTAsync_destroy(&control_client);
/*#include "Heap.h"
heap_info* mqtt_mem = 0;
mqtt_mem = Heap_get_info();
MyLog(LOGA_INFO, "MQTT mem current %ld, max %ld",mqtt_mem->current_size,mqtt_mem->max_size);
if (mqtt_mem->current_size > 0)
failures++; consider any not freed memory as failure */
return 0;
}
......@@ -54,12 +54,16 @@ struct Options
char* proxy_connection; /**< connection to proxy */
int verbose;
int test_no;
unsigned int QoS;
unsigned int iterrations;
} options =
{
"localhost:1883",
"localhost:1884",
0,
0,
0,
5
};
void getopts(int argc, char** argv)
......@@ -227,7 +231,7 @@ int test373_messageArrived(void* context, char* topicName, int topicLen, MQTTAsy
static char test373Payload[] = "No one is interested in this payload";
int test373SendPublishMessage(MQTTAsync handle,int id)
int test373SendPublishMessage(MQTTAsync handle,int id, const unsigned int QoS)
{
int rc = 0;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
......@@ -240,13 +244,17 @@ int test373SendPublishMessage(MQTTAsync handle,int id)
pubmsg.payload = test373Payload;
pubmsg.payloadlen = sizeof(test373Payload);
pubmsg.qos = 1;
rc = MQTTAsync_sendMessage(handle, topic, &pubmsg, &opts);
pubmsg.qos = QoS;
rc = MQTTAsync_sendMessage( handle, topic,&pubmsg,&opts);
if (rc == MQTTASYNC_SUCCESS)
{
pendingMessageCnt++;
if (pendingMessageCnt > pendingMessageCntMax) pendingMessageCntMax = pendingMessageCnt;
}
else
{
MyLog(LOGA_INFO, "Failed to queue message for send with retvalue %d",rc);
}
return rc;
}
......@@ -260,7 +268,9 @@ int test_373(struct Options options)
char clientid[30 + sizeof(unique)];
heap_info* mqtt_mem = 0;
MyLog(LOGA_INFO, "Running test373 with QoS=%u, iterrations=%u\n",options.QoS,options.iterrations);
sprintf(clientid, "paho-test373-%s", unique);
connectCnt = 0;
rc = MQTTAsync_create(&mqttasyncContext, options.proxy_connection, clientid,
MQTTCLIENT_PERSISTENCE_NONE,
NULL);
......@@ -286,7 +296,7 @@ int test_373(struct Options options)
goto exit;
}
MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
while (connectCnt < 5)
while (connectCnt < options.iterrations)
{
if (!connected)
{
......@@ -315,11 +325,11 @@ int test_373(struct Options options)
}
else
{
/* while connected send 1000 message per second */
/* while connected send 100 message per second */
int topicId;
for(topicId=0; topicId < 1000; topicId++)
for(topicId=0; topicId < 100; topicId++)
{
rc = test373SendPublishMessage(mqttasyncContext,topicId);
rc = test373SendPublishMessage(mqttasyncContext,topicId,options.QoS);
if (rc != MQTTASYNC_SUCCESS) break;
}
MySleep(100);
......@@ -333,6 +343,7 @@ int test_373(struct Options options)
MyLog(LOGA_INFO, "MQTT mem current %ld, max %ld",mqtt_mem->current_size,mqtt_mem->max_size);
#endif
MQTTAsync_disconnect(mqttasyncContext, NULL);
connected = 0;
MyLog(LOGA_INFO, "PublishCnt %d, FailedCnt %d, Pending %d maxPending %d",
goodPublishCnt,failedPublishCnt,pendingMessageCnt,pendingMessageCntMax);
#if !defined(_WINDOWS)
......@@ -359,6 +370,7 @@ int main(int argc, char** argv)
int* numtests = &tests;
int rc = 0;
int (*tests[])() = { NULL, test_373};
unsigned int QoS;
sprintf(unique, "%u", rand());
MyLog(LOGA_INFO, "Random prefix/suffix is %s", unique);
......@@ -370,9 +382,24 @@ int main(int argc, char** argv)
{ /* run all the tests */
for (options.test_no = 1; options.test_no < ARRAY_SIZE(tests); ++options.test_no)
{
failures = 0;
MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
rc += tests[options.test_no](options); /* return number of failures. 0 = test succeeded */
/* test with QoS 0, 1 and 2 and just 5 iterrations */
for (QoS = 0; QoS < 3; QoS++)
{
failures = 0;
options.QoS = QoS;
options.iterrations = 5;
MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
rc += tests[options.test_no](options); /* return number of failures. 0 = test succeeded */
}
if (rc == 0)
{
/* Test with much more iterrations for QoS = 0 */
failures = 0;
options.QoS = 0;
options.iterrations = 100;
MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
rc += tests[options.test_no](options); /* return number of failures. 0 = test succeeded */
}
}
}
else
......
......@@ -6,7 +6,7 @@ rm -rf build.paho
mkdir build.paho
cd build.paho
echo "travis build dir $TRAVIS_BUILD_DIR pwd $PWD"
cmake -DPAHO_WITH_SSL=TRUE -DPAHO_BUILD_DOCUMENTATION=FALSE -DPAHO_BUILD_SAMPLES=TRUE ..
cmake -DCMAKE_BUILD_TYPE=Debug -DPAHO_WITH_SSL=TRUE -DPAHO_BUILD_DOCUMENTATION=FALSE -DPAHO_BUILD_SAMPLES=TRUE ..
make
python ../test/mqttsas2.py &
ctest -VV --timeout 600
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment