Commit 36ca298e authored by Ian Craggs's avatar Ian Craggs

Merge branch 'develop' into mqttv5

parents bc8a5cb7 6f929f58
sudo: true sudo: required
language: c language: c
compiler: compiler:
...@@ -9,9 +9,18 @@ os: ...@@ -9,9 +9,18 @@ os:
- linux - linux
- osx - osx
sudo: required matrix:
exclude:
- compiler: gcc
os: osx
- compiler: clang
os: linux
before_install: before_install:
- openssl aes-256-cbc -K $encrypted_dcd2b299f7c9_key -iv $encrypted_dcd2b299f7c9_iv -in deploy_rsa.enc -out /tmp/deploy_rsa -d
- eval "$(ssh-agent -s)"
- chmod 600 /tmp/deploy_rsa
- ssh-add /tmp/deploy_rsa
- ./travis-install.sh - ./travis-install.sh
env: env:
...@@ -20,7 +29,6 @@ env: ...@@ -20,7 +29,6 @@ env:
# via the "travis encrypt" command using the project repo's public key # via the "travis encrypt" command using the project repo's public key
- secure: "Ro53zVdGCjCQx9U4wvD9GBwB346tIQ7y1MWOAe1QrFWlmoQLC8KUeddQkc+27pdrOG9Fm9QQcI82EDlh0bfRBy1ITfWSVVZVfbNLv9sBWesND1F9YlnFpn/fag2OE+ULPSEJVJMxZoqiR9yiYWO3pTWue4YjCSuFAjpQNO6VnV3HiQJRG1jeaylx0QVLQWKAL/qkRbuqG9o4xpS1Kebaj86+q9UTHcL1a+Aj53u+Ajqnc9ZbUB/yBrfHyufTKpAD8Ef/FEIWXg2svtiWVEwEsPtdTn5P7AefJ2FNEyT4uMKIEBzWIPeWvUZLFF6U7QA07+uYDE0Ir4voPptBUlIYqQz1CUz9XCOPmM/N+GgqpyNyUjpMb4CM1b+iwBwcsHc0Z1JFcPz65ZMSt1D/WeUfQlaB/KxQBpz4lD2mxEmAuBIoGNrAG+FRULoY+xQSAf7V0W8am6QbHNnXif30mdkF3lgAhaAOwWO03JD7ctEJXqzRbMK8HrBkrgWfHsRRLFT50m8CrNLFz+3lCYuPHge2gHUMDfIHMxd4N9f2dlfV9GJkHQOQYwFfP5L2Y5Xq9KTnZX+bsglDC2WcOJu2F8h1LxTMV5Kku8zl1RZlEAt8Qa9EtCMczA3sL4NfGxazO22WpyOvHdwb26mhdJTgquI5oZsl71zcIf+WLGfgUAEq4/k=" - secure: "Ro53zVdGCjCQx9U4wvD9GBwB346tIQ7y1MWOAe1QrFWlmoQLC8KUeddQkc+27pdrOG9Fm9QQcI82EDlh0bfRBy1ITfWSVVZVfbNLv9sBWesND1F9YlnFpn/fag2OE+ULPSEJVJMxZoqiR9yiYWO3pTWue4YjCSuFAjpQNO6VnV3HiQJRG1jeaylx0QVLQWKAL/qkRbuqG9o4xpS1Kebaj86+q9UTHcL1a+Aj53u+Ajqnc9ZbUB/yBrfHyufTKpAD8Ef/FEIWXg2svtiWVEwEsPtdTn5P7AefJ2FNEyT4uMKIEBzWIPeWvUZLFF6U7QA07+uYDE0Ir4voPptBUlIYqQz1CUz9XCOPmM/N+GgqpyNyUjpMb4CM1b+iwBwcsHc0Z1JFcPz65ZMSt1D/WeUfQlaB/KxQBpz4lD2mxEmAuBIoGNrAG+FRULoY+xQSAf7V0W8am6QbHNnXif30mdkF3lgAhaAOwWO03JD7ctEJXqzRbMK8HrBkrgWfHsRRLFT50m8CrNLFz+3lCYuPHge2gHUMDfIHMxd4N9f2dlfV9GJkHQOQYwFfP5L2Y5Xq9KTnZX+bsglDC2WcOJu2F8h1LxTMV5Kku8zl1RZlEAt8Qa9EtCMczA3sL4NfGxazO22WpyOvHdwb26mhdJTgquI5oZsl71zcIf+WLGfgUAEq4/k="
script: script:
- if [[ "$COVERITY_SCAN_BRANCH" != 1 ]]; then ./travis-build.sh; fi - if [[ "$COVERITY_SCAN_BRANCH" != 1 ]]; then ./travis-build.sh; fi
...@@ -33,7 +41,6 @@ addons: ...@@ -33,7 +41,6 @@ addons:
build_command_prepend: "make clean" build_command_prepend: "make clean"
build_command: "make -j 4" build_command: "make -j 4"
branch_pattern: coverity-.* branch_pattern: coverity-.*
addons:
apt: apt:
sources: sources:
- george-edison55-precise-backports # cmake 3.2.3 / doxygen 1.8.3 - george-edison55-precise-backports # cmake 3.2.3 / doxygen 1.8.3
...@@ -44,4 +51,6 @@ addons: ...@@ -44,4 +51,6 @@ addons:
- mosquitto - mosquitto
- doxygen - doxygen
after_success:
- ls -l build.paho/*.tar.gz
- scp -o StrictHostKeyChecking=no build.paho/*.tar.gz icraggs@build.eclipse.org:../../../../shared/technology/paho/C/
#******************************************************************************* #*******************************************************************************
# Copyright (c) 2015, 2017 logi.cals GmbH and others # Copyright (c) 2015, 2018 logi.cals GmbH and others
# #
# All rights reserved. This program and the accompanying materials # All rights reserved. This program and the accompanying materials
# are made available under the terms of the Eclipse Public License v1.0 # are made available under the terms of the Eclipse Public License v1.0
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
# Note: on OS X you should install XCode and the associated command-line tools # Note: on OS X you should install XCode and the associated command-line tools
CMAKE_MINIMUM_REQUIRED(VERSION 2.8.4) CMAKE_MINIMUM_REQUIRED(VERSION 2.8.4)
PROJECT("paho" C) PROJECT("Eclipse Paho C" C)
MESSAGE(STATUS "CMake version: " ${CMAKE_VERSION}) MESSAGE(STATUS "CMake version: " ${CMAKE_VERSION})
MESSAGE(STATUS "CMake system name: " ${CMAKE_SYSTEM_NAME}) MESSAGE(STATUS "CMake system name: " ${CMAKE_SYSTEM_NAME})
...@@ -28,7 +28,7 @@ SET(CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/modules") ...@@ -28,7 +28,7 @@ SET(CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/modules")
## build settings ## build settings
SET(PAHO_VERSION_MAJOR 1) SET(PAHO_VERSION_MAJOR 1)
SET(PAHO_VERSION_MINOR 2) SET(PAHO_VERSION_MINOR 2)
SET(PAHO_VERSION_PATCH 0) SET(PAHO_VERSION_PATCH 1)
SET(CLIENT_VERSION ${PAHO_VERSION_MAJOR}.${PAHO_VERSION_MINOR}.${PAHO_VERSION_PATCH}) SET(CLIENT_VERSION ${PAHO_VERSION_MAJOR}.${PAHO_VERSION_MINOR}.${PAHO_VERSION_PATCH})
INCLUDE(GNUInstallDirs) INCLUDE(GNUInstallDirs)
...@@ -60,6 +60,11 @@ IF(PAHO_BUILD_DOCUMENTATION) ...@@ -60,6 +60,11 @@ IF(PAHO_BUILD_DOCUMENTATION)
ENDIF() ENDIF()
### packaging settings ### packaging settings
SET(CPACK_PACKAGE_VENDOR "Eclipse Paho")
SET(CPACK_PACKAGE_NAME "Eclipse-Paho-MQTT-C")
INSTALL(FILES CONTRIBUTING.md epl-v10 edl-v10 README.md notice.html DESTINATION .)
FILE(GLOB samples "src/samples/*.c")
INSTALL(FILES ${samples} DESTINATION samples)
IF (WIN32) IF (WIN32)
SET(CPACK_GENERATOR "ZIP") SET(CPACK_GENERATOR "ZIP")
ELSEIF(PAHO_BUILD_DEB_PACKAGE) ELSEIF(PAHO_BUILD_DEB_PACKAGE)
......
Please see the [Eclipse Community Code of Conduct](https://eclipse.org/org/documents/Community_Code_of_Conduct.php)
This program and the accompanying materials
are made available under the terms of the Eclipse Public License v1.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
http://www.eclipse.org/legal/epl-v10.html
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
For an explanation of what dual-licensing means to you, see:
https://www.eclipse.org/legal/eplfaq.php#DUALLIC
...@@ -24,7 +24,7 @@ SHELL = /bin/sh ...@@ -24,7 +24,7 @@ SHELL = /bin/sh
.PHONY: clean, mkdir, install, uninstall, html .PHONY: clean, mkdir, install, uninstall, html
ifndef release.version ifndef release.version
release.version = 1.2.0 release.version = 1.2.1
endif endif
# determine current platform # determine current platform
......
...@@ -9,7 +9,7 @@ install: ...@@ -9,7 +9,7 @@ install:
- cmd: netsh advfirewall firewall add rule name="Python 2.7" dir=in action=allow program="C:\Python27\python.exe" enable=yes - cmd: netsh advfirewall firewall add rule name="Python 2.7" dir=in action=allow program="C:\Python27\python.exe" enable=yes
- cmd: netsh advfirewall firewall add rule name="Open Port 1883" dir=in action=allow protocol=TCP localport=1883 - cmd: netsh advfirewall firewall add rule name="Open Port 1883" dir=in action=allow protocol=TCP localport=1883
- cmd: netsh advfirewall set allprofiles state off - cmd: netsh advfirewall set allprofiles state off
- ps: Start-Process python -ArgumentList 'test\mqttsas2.py' - ps: Start-Process python -ArgumentList 'test\mqttsas.py'
- cmd: C:\Python36\python --version - cmd: C:\Python36\python --version
- cmd: git clone https://github.com/eclipse/paho.mqtt.testing.git - cmd: git clone https://github.com/eclipse/paho.mqtt.testing.git
- cmd: cd paho.mqtt.testing\interoperability - cmd: cd paho.mqtt.testing\interoperability
......
<!--**************************************************************************** <!--****************************************************************************
Copyright (c) 2012, 2017 IBM Corp. Copyright (c) 2012, 2018 IBM Corp.
All rights reserved. This program and the accompanying materials All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License v1.0 are made available under the terms of the Eclipse Public License v1.0
...@@ -24,7 +24,7 @@ ...@@ -24,7 +24,7 @@
</taskdef> </taskdef>
<property name="output.folder" value="build/output" /> <property name="output.folder" value="build/output" />
<property name="release.version" value="1.2.0" /> <property name="release.version" value="1.2.1" />
<property name="libname" value="mqttv3c" /> <property name="libname" value="mqttv3c" />
<property name="libname.ssl" value="mqttv3cs" /> <property name="libname.ssl" value="mqttv3cs" />
......
#============================================================================= #=============================================================================
# CMakeDebHelper, Copyright (C) 2013 Sebastian Kienzl # Removed content due to licensing/copyright issues
# http://knzl.de/cmake-debhelper/
# Licensed under the GPL v2, see LICENSE
#============================================================================= #=============================================================================
# configure() .in-files to the CURRENT_BINARY_DIR
foreach( _F ${DH_INPUT} )
# strip the .in part
string( REGEX REPLACE ".in$" "" _F_WE ${_F} )
configure_file( ${_F} ${_F_WE} @ONLY )
endforeach()
# compat and control is only needed for running the debhelpers,
# CMake is going to make up the one that ends up in the deb.
file( WRITE ${CMAKE_CURRENT_BINARY_DIR}/compat "9" )
if( NOT CPACK_DEBIAN_PACKAGE_NAME )
string( TOLOWER "${CPACK_PACKAGE_NAME}" CPACK_DEBIAN_PACKAGE_NAME )
endif()
file( WRITE ${CMAKE_CURRENT_BINARY_DIR}/control "Package: ${CPACK_DEBIAN_PACKAGE_NAME}\nArchitecture: any\n" )
# Some debhelpers need fakeroot, we use it for all of them
find_program( FAKEROOT fakeroot )
if( NOT FAKEROOT )
message( SEND_ERROR "fakeroot not found, please install" )
endif()
find_program( DEBHELPER dh_prep )
if( NOT DEBHELPER )
message( SEND_ERROR "debhelper not found, please install" )
endif()
# Compose a string with a semicolon-seperated list of debhelpers
foreach( _DH ${DH_RUN} )
set( _DH_RUN_SC_LIST "${_DH_RUN_SC_LIST} ${_DH} ;" )
endforeach()
# Making sure the debhelpers run each time we change one of ${DH_INPUT}
add_custom_command(
OUTPUT dhtimestamp
# dh_prep is needed to clean up, dh_* aren't idempotent
COMMAND ${FAKEROOT} dh_prep
# I haven't found another way to run a list of commands here
COMMAND ${FAKEROOT} -- sh -c "${_DH_RUN_SC_LIST}"
# needed to create the files we'll use
COMMAND ${FAKEROOT} dh_installdeb
COMMAND touch ${CMAKE_CURRENT_BINARY_DIR}/dhtimestamp
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/..
DEPENDS ${DH_INPUT}
COMMENT "Running debhelpers"
VERBATIM
)
add_custom_target( dhtarget ALL
DEPENDS dhtimestamp
)
# these files are generated by debhelpers from our templates
foreach( _F ${DH_GENERATED_CONTROL_EXTRA} )
set( CPACK_DEBIAN_PACKAGE_CONTROL_EXTRA
${CPACK_DEBIAN_PACKAGE_CONTROL_EXTRA}
${CMAKE_CURRENT_BINARY_DIR}/${CPACK_DEBIAN_PACKAGE_NAME}/DEBIAN/${_F}
CACHE INTERNAL ""
)
endforeach()
# This will copy the generated dhhelper-files to our to-be-cpacked-directory.
# CPACK_INSTALL_SCRIPT must be set to the value of CPACK_DEBIAN_INSTALL_SCRIPT in the file
# pointed to by CPACK_PROJECT_CONFIG_FILE.
set( CPACK_DEBIAN_INSTALL_SCRIPT ${CMAKE_CURRENT_LIST_DIR}/CMakeDebHelperInstall.cmake CACHE INTERNAL "" )
File added
VERSION=1.2.0 VERSION=1.2.1
check: check:
rpmlint -i dist/paho-c.spec rpmlint -i dist/paho-c.spec
......
Summary: MQTT C Client Summary: MQTT C Client
Name: paho-c Name: paho-c
Version: 1.2.0 Version: 1.2.1
Release: 3%{?dist} Release: 3%{?dist}
License: Eclipse Distribution License 1.0 and Eclipse Public License 1.0 License: Eclipse Distribution License 1.0 and Eclipse Public License 1.0
Group: Development/Tools Group: Development/Tools
......
...@@ -247,10 +247,17 @@ static int Internal_heap_unlink(char* file, int line, void* p) ...@@ -247,10 +247,17 @@ static int Internal_heap_unlink(char* file, int line, void* p)
*/ */
void myfree(char* file, int line, void* p) void myfree(char* file, int line, void* p)
{ {
if (p) /* it is legal und usual to call free(NULL) */
{
Thread_lock_mutex(heap_mutex); Thread_lock_mutex(heap_mutex);
if (Internal_heap_unlink(file, line, p)) if (Internal_heap_unlink(file, line, p))
free(((int*)p)-1); free(((int*)p)-1);
Thread_unlock_mutex(heap_mutex); Thread_unlock_mutex(heap_mutex);
}
else
{
Log(LOG_ERROR, -1, "Call of free(NULL) in %s,%d",file,line);
}
} }
...@@ -479,3 +486,8 @@ int main(int argc, char *argv[]) ...@@ -479,3 +486,8 @@ int main(int argc, char *argv[])
} }
#endif /* HEAP_UNIT_TESTS */ #endif /* HEAP_UNIT_TESTS */
/* Local Variables: */
/* indent-tabs-mode: t */
/* c-basic-offset: 8 */
/* End: */
...@@ -59,6 +59,9 @@ typedef struct ...@@ -59,6 +59,9 @@ typedef struct
size_t max_size; /**< max size the heap has reached in bytes */ size_t max_size; /**< max size the heap has reached in bytes */
} heap_info; } heap_info;
#if defined(__cplusplus)
extern "C" {
#endif
void* mymalloc(char*, int, size_t size); void* mymalloc(char*, int, size_t size);
void* myrealloc(char*, int, void* p, size_t size); void* myrealloc(char*, int, void* p, size_t size);
...@@ -72,5 +75,8 @@ int HeapDump(FILE* file); ...@@ -72,5 +75,8 @@ int HeapDump(FILE* file);
int HeapDumpString(FILE* file, char* str); int HeapDumpString(FILE* file, char* str);
void* Heap_findItem(void* p); void* Heap_findItem(void* p);
void Heap_unlink(char* file, int line, void* p); void Heap_unlink(char* file, int line, void* p);
#ifdef __cplusplus
}
#endif
#endif #endif
...@@ -218,7 +218,10 @@ static int ListUnlink(List* aList, void* content, int(*callback)(void*, void*), ...@@ -218,7 +218,10 @@ static int ListUnlink(List* aList, void* content, int(*callback)(void*, void*),
next = aList->current->next; next = aList->current->next;
if (freeContent) if (freeContent)
{
free(aList->current->content); free(aList->current->content);
aList->current->content = NULL;
}
if (saved == aList->current) if (saved == aList->current)
saveddeleted = 1; saveddeleted = 1;
free(aList->current); free(aList->current);
...@@ -357,7 +360,10 @@ void ListEmpty(List* aList) ...@@ -357,7 +360,10 @@ void ListEmpty(List* aList)
{ {
ListElement* first = aList->first; ListElement* first = aList->first;
if (first->content != NULL) if (first->content != NULL)
{
free(first->content); free(first->content);
first->content = NULL;
}
aList->first = first->next; aList->first = first->next;
free(first); free(first);
} }
......
...@@ -1002,6 +1002,17 @@ static void MQTTAsync_checkDisconnect(MQTTAsync handle, MQTTAsync_command* comma ...@@ -1002,6 +1002,17 @@ static void MQTTAsync_checkDisconnect(MQTTAsync handle, MQTTAsync_command* comma
FUNC_EXIT; FUNC_EXIT;
} }
/**
* Call Socket_noPendingWrites(int socket) with protection by socket_mutex, see https://github.com/eclipse/paho.mqtt.c/issues/385
*/
static int MQTTAsync_Socket_noPendingWrites(int socket)
{
int rc;
Thread_lock_mutex(socket_mutex);
rc = Socket_noPendingWrites(socket);
Thread_unlock_mutex(socket_mutex);
return rc;
}
/** /**
* See if any pending writes have been completed, and cleanup if so. * See if any pending writes have been completed, and cleanup if so.
...@@ -1037,8 +1048,10 @@ static void MQTTAsync_freeServerURIs(MQTTAsyncs* m) ...@@ -1037,8 +1048,10 @@ static void MQTTAsync_freeServerURIs(MQTTAsyncs* m)
for (i = 0; i < m->serverURIcount; ++i) for (i = 0; i < m->serverURIcount; ++i)
free(m->serverURIs[i]); free(m->serverURIs[i]);
m->serverURIcount = 0;
if (m->serverURIs) if (m->serverURIs)
free(m->serverURIs); free(m->serverURIs);
m->serverURIs = NULL;
} }
...@@ -1052,7 +1065,9 @@ static void MQTTAsync_freeCommand1(MQTTAsync_queuedCommand *command) ...@@ -1052,7 +1065,9 @@ static void MQTTAsync_freeCommand1(MQTTAsync_queuedCommand *command)
free(command->command.details.sub.topics[i]); free(command->command.details.sub.topics[i]);
free(command->command.details.sub.topics); free(command->command.details.sub.topics);
command->command.details.sub.topics = NULL;
free(command->command.details.sub.qoss); free(command->command.details.sub.qoss);
command->command.details.sub.qoss = NULL;
} }
else if (command->command.type == UNSUBSCRIBE) else if (command->command.type == UNSUBSCRIBE)
{ {
...@@ -1062,13 +1077,16 @@ static void MQTTAsync_freeCommand1(MQTTAsync_queuedCommand *command) ...@@ -1062,13 +1077,16 @@ static void MQTTAsync_freeCommand1(MQTTAsync_queuedCommand *command)
free(command->command.details.unsub.topics[i]); free(command->command.details.unsub.topics[i]);
free(command->command.details.unsub.topics); free(command->command.details.unsub.topics);
command->command.details.unsub.topics = NULL;
} }
else if (command->command.type == PUBLISH) else if (command->command.type == PUBLISH)
{ {
/* qos 1 and 2 topics are freed in the protocol code when the flows are completed */ /* qos 1 and 2 topics are freed in the protocol code when the flows are completed */
if (command->command.details.pub.destinationName) if (command->command.details.pub.destinationName)
free(command->command.details.pub.destinationName); free(command->command.details.pub.destinationName);
command->command.details.pub.destinationName = NULL;
free(command->command.details.pub.payload); free(command->command.details.pub.payload);
command->command.details.pub.payload = NULL;
} }
} }
...@@ -1176,7 +1194,7 @@ static int MQTTAsync_processCommand(void) ...@@ -1176,7 +1194,7 @@ static int MQTTAsync_processCommand(void)
continue; continue;
if (cmd->command.type == CONNECT || cmd->command.type == DISCONNECT || (cmd->client->c->connected && if (cmd->command.type == CONNECT || cmd->command.type == DISCONNECT || (cmd->client->c->connected &&
cmd->client->c->connect_state == 0 && Socket_noPendingWrites(cmd->client->c->net.socket))) cmd->client->c->connect_state == 0 && MQTTAsync_Socket_noPendingWrites(cmd->client->c->net.socket)))
{ {
if ((cmd->command.type == PUBLISH || cmd->command.type == SUBSCRIBE || cmd->command.type == UNSUBSCRIBE) && if ((cmd->command.type == PUBLISH || cmd->command.type == SUBSCRIBE || cmd->command.type == UNSUBSCRIBE) &&
cmd->client->c->outboundMsgs->count >= MAX_MSG_ID - 1) cmd->client->c->outboundMsgs->count >= MAX_MSG_ID - 1)
...@@ -3006,10 +3024,8 @@ static MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc) ...@@ -3006,10 +3024,8 @@ static MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
if ((*sock = SSLSocket_getPendingRead()) == -1) if ((*sock = SSLSocket_getPendingRead()) == -1)
{ {
#endif #endif
Thread_lock_mutex(socket_mutex);
/* 0 from getReadySocket indicates no work to do, -1 == error, but can happen normally */ /* 0 from getReadySocket indicates no work to do, -1 == error, but can happen normally */
*sock = Socket_getReadySocket(0, &tp); *sock = Socket_getReadySocket(0, &tp,socket_mutex);
Thread_unlock_mutex(socket_mutex);
if (!tostop && *sock == 0 && (tp.tv_sec > 0L || tp.tv_usec > 0L)) if (!tostop && *sock == 0 && (tp.tv_sec > 0L || tp.tv_usec > 0L))
MQTTAsync_sleep(100L); MQTTAsync_sleep(100L);
#if defined(OPENSSL) #if defined(OPENSSL)
......
...@@ -1839,9 +1839,7 @@ static MQTTPacket* MQTTClient_cycle(int* sock, unsigned long timeout, int* rc) ...@@ -1839,9 +1839,7 @@ static MQTTPacket* MQTTClient_cycle(int* sock, unsigned long timeout, int* rc)
{ {
/* 0 from getReadySocket indicates no work to do, -1 == error, but can happen normally */ /* 0 from getReadySocket indicates no work to do, -1 == error, but can happen normally */
#endif #endif
Thread_lock_mutex(socket_mutex); *sock = Socket_getReadySocket(0, &tp, socket_mutex);
*sock = Socket_getReadySocket(0, &tp);
Thread_unlock_mutex(socket_mutex);
#if defined(OPENSSL) #if defined(OPENSSL)
} }
#endif #endif
......
...@@ -942,3 +942,4 @@ int MQTTPacket_decodeBuf(char* buf, int* value) ...@@ -942,3 +942,4 @@ int MQTTPacket_decodeBuf(char* buf, int* value)
bufptr = buf; bufptr = buf;
return MQTTPacket_VBIdecode(bufchar, value); return MQTTPacket_VBIdecode(bufchar, value);
} }
/******************************************************************************* /*******************************************************************************
* Copyright (c) 2009, 2016 IBM Corp. * Copyright (c) 2009, 2018 IBM Corp.
* *
* All rights reserved. This program and the accompanying materials * All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0 * are made available under the terms of the Eclipse Public License v1.0
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
* Ian Craggs - initial API and implementation and/or initial documentation * Ian Craggs - initial API and implementation and/or initial documentation
* Ian Craggs - async client updates * Ian Craggs - async client updates
* Ian Craggs - fix for bug 484496 * Ian Craggs - fix for bug 484496
* Ian Craggs - fix for issue 285
*******************************************************************************/ *******************************************************************************/
/** /**
...@@ -86,15 +87,25 @@ int pstopen(void **handle, const char* clientID, const char* serverURI, void* co ...@@ -86,15 +87,25 @@ int pstopen(void **handle, const char* clientID, const char* serverURI, void* co
/* create clientDir directory */ /* create clientDir directory */
/* pCrtDirName - holds the directory name we are currently trying to create. */ /* pCrtDirName - holds the directory name we are currently trying to create. */
/* This gets built up level by level until the full path name is created.*/ /* This gets built up level by level untipwdl the full path name is created.*/
/* pTokDirName - holds the directory name that gets used by strtok. */ /* pTokDirName - holds the directory name that gets used by strtok. */
pCrtDirName = (char*)malloc( strlen(clientDir) + 1 ); pCrtDirName = (char*)malloc( strlen(clientDir) + 1 );
pTokDirName = (char*)malloc( strlen(clientDir) + 1 ); pTokDirName = (char*)malloc( strlen(clientDir) + 1 );
strcpy( pTokDirName, clientDir ); strcpy( pTokDirName, clientDir );
/* If first character is directory separator, make sure it's in the created directory name #285 */
if (*pTokDirName == '/' || *pTokDirName == '\\')
{
*pCrtDirName = *pTokDirName;
pToken = strtok_r( pTokDirName + 1, "\\/", &save_ptr );
strcpy( pCrtDirName + 1, pToken );
}
else
{
pToken = strtok_r( pTokDirName, "\\/", &save_ptr ); pToken = strtok_r( pTokDirName, "\\/", &save_ptr );
strcpy( pCrtDirName, pToken ); strcpy( pCrtDirName, pToken );
}
rc = pstmkdir( pCrtDirName ); rc = pstmkdir( pCrtDirName );
pToken = strtok_r( NULL, "\\/", &save_ptr ); pToken = strtok_r( NULL, "\\/", &save_ptr );
while ( (pToken != NULL) && (rc == 0) ) while ( (pToken != NULL) && (rc == 0) )
......
...@@ -690,11 +690,13 @@ void MQTTProtocol_freeClient(Clients* client) ...@@ -690,11 +690,13 @@ void MQTTProtocol_freeClient(Clients* client)
MQTTProtocol_freeMessageList(client->inboundMsgs); MQTTProtocol_freeMessageList(client->inboundMsgs);
ListFree(client->messageQueue); ListFree(client->messageQueue);
free(client->clientID); free(client->clientID);
client->clientID = NULL;
if (client->will) if (client->will)
{ {
free(client->will->payload); free(client->will->payload);
free(client->will->topic); free(client->will->topic);
free(client->will); free(client->will);
client->will = NULL;
} }
if (client->username) if (client->username)
free((void*)client->username); free((void*)client->username);
...@@ -719,6 +721,7 @@ void MQTTProtocol_freeClient(Clients* client) ...@@ -719,6 +721,7 @@ void MQTTProtocol_freeClient(Clients* client)
free((void*)client->sslopts->CApath); free((void*)client->sslopts->CApath);
} }
free(client->sslopts); free(client->sslopts);
client->sslopts = NULL;
} }
#endif #endif
/* don't free the client structure itself... this is done elsewhere */ /* don't free the client structure itself... this is done elsewhere */
......
...@@ -882,7 +882,10 @@ int SSLSocket_putdatas(SSL* ssl, int socket, char* buf0, size_t buf0len, int cou ...@@ -882,7 +882,10 @@ int SSLSocket_putdatas(SSL* ssl, int socket, char* buf0, size_t buf0len, int cou
for (i = 0; i < count; ++i) for (i = 0; i < count; ++i)
{ {
if (frees[i]) if (frees[i])
{
free(buffers[i]); free(buffers[i]);
buffers[i] = NULL;
}
} }
} }
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
......
/******************************************************************************* /*******************************************************************************
* Copyright (c) 2009, 2017 IBM Corp. * Copyright (c) 2009, 2018 IBM Corp.
* *
* All rights reserved. This program and the accompanying materials * All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0 * are made available under the terms of the Eclipse Public License v1.0
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
* Juergen Kosel, Ian Craggs - fix for issue #135 * Juergen Kosel, Ian Craggs - fix for issue #135
* Ian Craggs - issue #217 * Ian Craggs - issue #217
* Ian Craggs - fix for issue #186 * Ian Craggs - fix for issue #186
* Ian Craggs - remove StackTrace print debugging calls
*******************************************************************************/ *******************************************************************************/
/** /**
...@@ -228,7 +229,7 @@ int isReady(int socket, fd_set* read_set, fd_set* write_set) ...@@ -228,7 +229,7 @@ int isReady(int socket, fd_set* read_set, fd_set* write_set)
* @param tp the timeout to be used for the select, unless overridden * @param tp the timeout to be used for the select, unless overridden
* @return the socket next ready, or 0 if none is ready * @return the socket next ready, or 0 if none is ready
*/ */
int Socket_getReadySocket(int more_work, struct timeval *tp) int Socket_getReadySocket(int more_work, struct timeval *tp, mutex_type mutex)
{ {
int rc = 0; int rc = 0;
static struct timeval zero = {0L, 0L}; /* 0 seconds */ static struct timeval zero = {0L, 0L}; /* 0 seconds */
...@@ -236,6 +237,7 @@ int Socket_getReadySocket(int more_work, struct timeval *tp) ...@@ -236,6 +237,7 @@ int Socket_getReadySocket(int more_work, struct timeval *tp)
struct timeval timeout = one; struct timeval timeout = one;
FUNC_ENTRY; FUNC_ENTRY;
Thread_lock_mutex(mutex);
if (s.clientsds->count == 0) if (s.clientsds->count == 0)
goto exit; goto exit;
...@@ -258,7 +260,11 @@ int Socket_getReadySocket(int more_work, struct timeval *tp) ...@@ -258,7 +260,11 @@ int Socket_getReadySocket(int more_work, struct timeval *tp)
memcpy((void*)&(s.rset), (void*)&(s.rset_saved), sizeof(s.rset)); memcpy((void*)&(s.rset), (void*)&(s.rset_saved), sizeof(s.rset));
memcpy((void*)&(pwset), (void*)&(s.pending_wset), sizeof(pwset)); memcpy((void*)&(pwset), (void*)&(s.pending_wset), sizeof(pwset));
if ((rc = select(s.maxfdp1, &(s.rset), &pwset, NULL, &timeout)) == SOCKET_ERROR) /* Prevent performance issue by unlocking the socket_mutex while waiting for a ready socket. */
Thread_unlock_mutex(mutex);
rc = select(s.maxfdp1, &(s.rset), &pwset, NULL, &timeout);
Thread_lock_mutex(mutex);
if (rc == SOCKET_ERROR)
{ {
Socket_error("read select", 0); Socket_error("read select", 0);
goto exit; goto exit;
...@@ -301,6 +307,7 @@ int Socket_getReadySocket(int more_work, struct timeval *tp) ...@@ -301,6 +307,7 @@ int Socket_getReadySocket(int more_work, struct timeval *tp)
ListNextElement(s.clientsds, &s.cur_clientsds); ListNextElement(s.clientsds, &s.cur_clientsds);
} }
exit: exit:
Thread_unlock_mutex(mutex);
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
return rc; return rc;
} /* end getReadySocket */ } /* end getReadySocket */
...@@ -498,7 +505,6 @@ int Socket_putdatas(int socket, char* buf0, size_t buf0len, int count, char** bu ...@@ -498,7 +505,6 @@ int Socket_putdatas(int socket, char* buf0, size_t buf0len, int count, char** bu
if (!Socket_noPendingWrites(socket)) if (!Socket_noPendingWrites(socket))
{ {
Log(LOG_SEVERE, -1, "Trying to write to socket %d for which there is already pending output", socket); Log(LOG_SEVERE, -1, "Trying to write to socket %d for which there is already pending output", socket);
StackTrace_printStack(stdout);
rc = SOCKET_ERROR; rc = SOCKET_ERROR;
goto exit; goto exit;
} }
...@@ -528,7 +534,6 @@ int Socket_putdatas(int socket, char* buf0, size_t buf0len, int count, char** bu ...@@ -528,7 +534,6 @@ int Socket_putdatas(int socket, char* buf0, size_t buf0len, int count, char** bu
#if defined(OPENSSL) #if defined(OPENSSL)
SocketBuffer_pendingWrite(socket, NULL, count+1, iovecs, frees1, total, bytes); SocketBuffer_pendingWrite(socket, NULL, count+1, iovecs, frees1, total, bytes);
#else #else
StackTrace_printStack(stdout);
SocketBuffer_pendingWrite(socket, count+1, iovecs, frees1, total, bytes); SocketBuffer_pendingWrite(socket, count+1, iovecs, frees1, total, bytes);
#endif #endif
*sockmem = socket; *sockmem = socket;
...@@ -538,6 +543,12 @@ int Socket_putdatas(int socket, char* buf0, size_t buf0len, int count, char** bu ...@@ -538,6 +543,12 @@ int Socket_putdatas(int socket, char* buf0, size_t buf0len, int count, char** bu
} }
} }
exit: exit:
#if 0
if (rc == TCPSOCKET_INTERRUPTED)
{
Log(LOG_ERROR, -1, "Socket_putdatas: TCPSOCKET_INTERRUPTED");
}
#endif
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
return rc; return rc;
} }
...@@ -826,7 +837,10 @@ int Socket_continueWrite(int socket) ...@@ -826,7 +837,10 @@ int Socket_continueWrite(int socket)
for (i = 0; i < pw->count; i++) for (i = 0; i < pw->count; i++)
{ {
if (pw->frees[i]) if (pw->frees[i])
{
free(pw->iovecs[i].iov_base); free(pw->iovecs[i].iov_base);
pw->iovecs[i].iov_base = NULL;
}
} }
rc = 1; /* signal complete */ rc = 1; /* signal complete */
Log(TRACE_MIN, -1, "ContinueWrite: partial write now complete for socket %d", socket); Log(TRACE_MIN, -1, "ContinueWrite: partial write now complete for socket %d", socket);
...@@ -842,7 +856,10 @@ int Socket_continueWrite(int socket) ...@@ -842,7 +856,10 @@ int Socket_continueWrite(int socket)
for (i = 0; i < pw->count; i++) for (i = 0; i < pw->count; i++)
{ {
if (pw->frees[i]) if (pw->frees[i])
{
free(pw->iovecs[i].iov_base); free(pw->iovecs[i].iov_base);
pw->iovecs[i].iov_base = NULL;
}
} }
} }
#if defined(OPENSSL) #if defined(OPENSSL)
......
...@@ -65,6 +65,8 @@ ...@@ -65,6 +65,8 @@
#define ULONG size_t #define ULONG size_t
#endif #endif
#include "mutex_type.h" /* Needed for mutex_type */
/** socket operation completed successfully */ /** socket operation completed successfully */
#define TCPSOCKET_COMPLETE 0 #define TCPSOCKET_COMPLETE 0
#if !defined(SOCKET_ERROR) #if !defined(SOCKET_ERROR)
...@@ -124,7 +126,7 @@ typedef struct ...@@ -124,7 +126,7 @@ typedef struct
void Socket_outInitialize(void); void Socket_outInitialize(void);
void Socket_outTerminate(void); void Socket_outTerminate(void);
int Socket_getReadySocket(int more_work, struct timeval *tp); int Socket_getReadySocket(int more_work, struct timeval *tp, mutex_type mutex);
int Socket_getch(int socket, char* c); int Socket_getch(int socket, char* c);
char *Socket_getdata(int socket, size_t bytes, size_t* actual_len); char *Socket_getdata(int socket, size_t bytes, size_t* actual_len);
int Socket_putdatas(int socket, char* buf0, size_t buf0len, int count, char** buffers, size_t* buflens, int* frees); int Socket_putdatas(int socket, char* buf0, size_t buf0len, int count, char** buffers, size_t* buflens, int* frees);
......
...@@ -106,6 +106,7 @@ void SocketBuffer_freeDefQ(void) ...@@ -106,6 +106,7 @@ void SocketBuffer_freeDefQ(void)
{ {
free(def_queue->buf); free(def_queue->buf);
free(def_queue); free(def_queue);
def_queue = NULL;
} }
......
...@@ -21,13 +21,14 @@ ...@@ -21,13 +21,14 @@
#if !defined(THREAD_H) #if !defined(THREAD_H)
#define THREAD_H #define THREAD_H
#include "mutex_type.h" /* Needed for mutex_type */
#if defined(WIN32) || defined(WIN64) #if defined(WIN32) || defined(WIN64)
#include <windows.h> #include <windows.h>
#define thread_type HANDLE #define thread_type HANDLE
#define thread_id_type DWORD #define thread_id_type DWORD
#define thread_return_type DWORD #define thread_return_type DWORD
#define thread_fn LPTHREAD_START_ROUTINE #define thread_fn LPTHREAD_START_ROUTINE
#define mutex_type HANDLE
#define cond_type HANDLE #define cond_type HANDLE
#define sem_type HANDLE #define sem_type HANDLE
#else #else
...@@ -37,7 +38,6 @@ ...@@ -37,7 +38,6 @@
#define thread_id_type pthread_t #define thread_id_type pthread_t
#define thread_return_type void* #define thread_return_type void*
typedef thread_return_type (*thread_fn)(void*); typedef thread_return_type (*thread_fn)(void*);
#define mutex_type pthread_mutex_t*
typedef struct { pthread_cond_t cond; pthread_mutex_t mutex; } cond_type_struct; typedef struct { pthread_cond_t cond; pthread_mutex_t mutex; } cond_type_struct;
typedef cond_type_struct *cond_type; typedef cond_type_struct *cond_type;
#if defined(OSX) #if defined(OSX)
......
/*******************************************************************************
* Copyright (c) 2009, 2018 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
*******************************************************************************/
#if !defined(_MUTEX_TYPE_H_)
#define _MUTEX_TYPE_H_
#if defined(WIN32) || defined(WIN64)
#include <windows.h>
#define mutex_type HANDLE
#else
#include <pthread.h>
#define mutex_type pthread_mutex_t*
#endif
#endif /* _MUTEX_TYPE_H_ */
...@@ -465,3 +465,13 @@ SET_TESTS_PROPERTIES( ...@@ -465,3 +465,13 @@ SET_TESTS_PROPERTIES(
test9-6-offline-buffering-max-buffered-binary-will test9-6-offline-buffering-max-buffered-binary-will
PROPERTIES TIMEOUT 540 PROPERTIES TIMEOUT 540
) )
ADD_EXECUTABLE(
test_issue373
test_issue373.c
)
TARGET_LINK_LIBRARIES(
test_issue373
paho-mqtt3a
)
"""
*******************************************************************
Copyright (c) 2013, 2017 IBM Corp.
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License v1.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
http://www.eclipse.org/legal/epl-v10.html
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
Contributors:
Ian Craggs - initial implementation and/or documentation
*******************************************************************
"""
from __future__ import print_function
# Trace MQTT traffic
import MQTTV3112 as MQTTV3
import socket, sys, select, traceback, datetime, os
import SocketServer as socketserver
logging = True
myWindow = None
def timestamp():
now = datetime.datetime.now()
return now.strftime('%Y%m%d %H%M%S')+str(float("."+str(now.microsecond)))[1:]
class MyHandler(socketserver.StreamRequestHandler):
def handle(self):
if not hasattr(self, "ids"):
self.ids = {}
if not hasattr(self, "versions"):
self.versions = {}
inbuf = True
i = o = e = None
try:
clients = self.request
brokers = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
brokers.connect((brokerhost, brokerport))
terminated = False
while inbuf != None and not terminated:
(i, o, e) = select.select([clients, brokers], [], [])
for s in i:
if s == clients:
inbuf = MQTTV3.getPacket(clients) # get one packet
if inbuf == None:
break
try:
packet = MQTTV3.unpackPacket(inbuf)
if packet.fh.MessageType == MQTTV3.PUBLISH and \
packet.topicName == "MQTTSAS topic" and \
packet.data == b"TERMINATE":
print("Terminating client", self.ids[id(clients)])
brokers.close()
clients.close()
terminated = True
break
elif packet.fh.MessageType == MQTTV3.CONNECT:
self.ids[id(clients)] = packet.ClientIdentifier
self.versions[id(clients)] = 3
print(timestamp() , "C to S", self.ids[id(clients)], repr(packet))
#print([hex(b) for b in inbuf])
#print(inbuf)
except:
traceback.print_exc()
brokers.send(inbuf) # pass it on
elif s == brokers:
inbuf = MQTTV3.getPacket(brokers) # get one packet
if inbuf == None:
break
try:
print(timestamp(), "S to C", self.ids[id(clients)], repr(MQTTV3.unpackPacket(inbuf)))
except:
traceback.print_exc()
clients.send(inbuf)
print(timestamp()+" client "+self.ids[id(clients)]+" connection closing")
except:
print(repr((i, o, e)), repr(inbuf))
traceback.print_exc()
if id(clients) in self.ids.keys():
del self.ids[id(clients)]
elif id(clients) in self.versions.keys():
del self.versions[id(clients)]
class ThreadingTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
pass
def run():
global brokerhost, brokerport
myhost = '127.0.0.1'
if len(sys.argv) > 1:
brokerhost = sys.argv[1]
else:
brokerhost = '127.0.0.1'
if len(sys.argv) > 2:
brokerport = int(sys.argv[2])
else:
brokerport = 1883
if len(sys.argv) > 3:
myport = int(sys.argv[3])
else:
if brokerhost == myhost:
myport = brokerport + 1
else:
myport = 1883
print("Listening on port", str(myport)+", broker on port", brokerport)
s = ThreadingTCPServer(("127.0.0.1", myport), MyHandler)
s.serve_forever()
if __name__ == "__main__":
run()
#!/usr/bin/python
import os
import sys
import time
import subprocess
import random
bindir='/usr/bin'
sys.path.append(bindir)
def input_sel(prompt,max_,selectionoption):
# let the user choose the VM and verify selection
while True:
try:
print ('Please select from the list of running VMs\n\n'+'\n'.join(selectionoption))
userin = int(raw_input(prompt))
except ValueError:
print('\nThat was not a number\n\n')
continue
if userin > max_:
print('\nInput must be less than or equal to {0}.\n\n'.format(max_))
elif userin < 1:
print('\nInput must be greater than or equal to 1\n\n')
else:
return userin
def statustext(result):
if result == 0:
status = 'OK'
else:
status = 'Failed'
return status
def controlvmnetworkstate():
try:
offtime = 600
ontime = 14
vmdict={}
vmlist=[]
executable = os.path.join(bindir, 'VBoxManage')
#retrieve a list of all running VMs
runningvms= subprocess.check_output('%s list runningvms' %executable,shell=True).splitlines()
if len(runningvms) != 0:
for n in range(0, len(runningvms)):
vmlist.append('%s: %s' %(n+1,runningvms[n].rsplit(' ',1)[0].strip('"')))
vmdict[n+1]=runningvms[n].rsplit(' ',1)[-1]
usersel=input_sel('\nEnter the number of the VM: ',len(runningvms),vmlist)
else:
print('Can not retrieve list of running VMs')
sys.exit()
vmuuid=vmdict[usersel]
while True:
offtime = random.randint(60, 90)
ontime = random.randint(10, 90)
timenow = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
on = subprocess.call('%s controlvm %s setlinkstate1 on' %(executable,vmuuid),
shell=True)
status=statustext(on)
print ('%s: Plug Network cable into VM %s for %ds: %s' % (timenow, runningvms[usersel-1].rsplit(' ',1)[0].strip('"'),ontime, str(status)))
time.sleep(ontime)
timenow = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
off = subprocess.call('%s controlvm %s setlinkstate1 off' %(executable,vmuuid),
shell=True)
status = statustext(off)
print ('%s: Unplug Network cable from VM %s for %ds: %s' % (timenow, runningvms[usersel-1].rsplit(' ',1)[0].strip('"'),offtime, str(status)))
time.sleep(offtime)
except KeyboardInterrupt:
sys.exit('\nUser Interrupt')
except Exception as e:
print("Error in %s in function %s: %s" % (__name__, sys._getframe().f_code.co_name, e.message))
if __name__ == "__main__":
sys.exit(controlvmnetworkstate())
...@@ -510,13 +510,13 @@ int recreateReconnect(void) ...@@ -510,13 +510,13 @@ int recreateReconnect(void)
MyLog(LOGA_ALWAYS, "Recreating client"); MyLog(LOGA_ALWAYS, "Recreating client");
MQTTAsync_destroy(&client); /* destroy the client object so that we force persistence to be read on recreate */ MQTTAsync_destroy(&client); /* destroy the client object so that we force persistence to be read on recreate */
#if !defined(_WINDOWS)
heap_info* mqtt_mem = 0; heap_info* mqtt_mem = 0;
/*mqtt_mem = Heap_get_info(); /*mqtt_mem = Heap_get_info();
MyLog(LOGA_INFO, "MQTT mem current %ld, max %ld",mqtt_mem->current_size,mqtt_mem->max_size); MyLog(LOGA_INFO, "MQTT mem current %ld, max %ld",mqtt_mem->current_size,mqtt_mem->max_size);
if (mqtt_mem->current_size > 20) if (mqtt_mem->current_size > 20)
HeapScan(5); */ HeapScan(5); */
#endif
rc = MQTTAsync_create(&client, opts.connection, opts.clientid, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL); rc = MQTTAsync_create(&client, opts.connection, opts.clientid, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
if (rc != MQTTASYNC_SUCCESS) if (rc != MQTTASYNC_SUCCESS)
{ {
...@@ -1028,12 +1028,5 @@ exit: ...@@ -1028,12 +1028,5 @@ exit:
destroy_exit: destroy_exit:
MQTTAsync_destroy(&control_client); MQTTAsync_destroy(&control_client);
/*#include "Heap.h"
heap_info* mqtt_mem = 0;
mqtt_mem = Heap_get_info();
MyLog(LOGA_INFO, "MQTT mem current %ld, max %ld",mqtt_mem->current_size,mqtt_mem->max_size);
if (mqtt_mem->current_size > 0)
failures++; consider any not freed memory as failure */
return 0; return 0;
} }
...@@ -54,12 +54,16 @@ struct Options ...@@ -54,12 +54,16 @@ struct Options
char* proxy_connection; /**< connection to proxy */ char* proxy_connection; /**< connection to proxy */
int verbose; int verbose;
int test_no; int test_no;
unsigned int QoS;
unsigned int iterrations;
} options = } options =
{ {
"localhost:1883", "localhost:1883",
"localhost:1884", "localhost:1884",
0, 0,
0, 0,
0,
5
}; };
void getopts(int argc, char** argv) void getopts(int argc, char** argv)
...@@ -227,7 +231,7 @@ int test373_messageArrived(void* context, char* topicName, int topicLen, MQTTAsy ...@@ -227,7 +231,7 @@ int test373_messageArrived(void* context, char* topicName, int topicLen, MQTTAsy
static char test373Payload[] = "No one is interested in this payload"; static char test373Payload[] = "No one is interested in this payload";
int test373SendPublishMessage(MQTTAsync handle,int id) int test373SendPublishMessage(MQTTAsync handle,int id, const unsigned int QoS)
{ {
int rc = 0; int rc = 0;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
...@@ -240,13 +244,17 @@ int test373SendPublishMessage(MQTTAsync handle,int id) ...@@ -240,13 +244,17 @@ int test373SendPublishMessage(MQTTAsync handle,int id)
pubmsg.payload = test373Payload; pubmsg.payload = test373Payload;
pubmsg.payloadlen = sizeof(test373Payload); pubmsg.payloadlen = sizeof(test373Payload);
pubmsg.qos = 1; pubmsg.qos = QoS;
rc = MQTTAsync_sendMessage(handle, topic, &pubmsg, &opts); rc = MQTTAsync_sendMessage( handle, topic,&pubmsg,&opts);
if (rc == MQTTASYNC_SUCCESS) if (rc == MQTTASYNC_SUCCESS)
{ {
pendingMessageCnt++; pendingMessageCnt++;
if (pendingMessageCnt > pendingMessageCntMax) pendingMessageCntMax = pendingMessageCnt; if (pendingMessageCnt > pendingMessageCntMax) pendingMessageCntMax = pendingMessageCnt;
} }
else
{
MyLog(LOGA_INFO, "Failed to queue message for send with retvalue %d",rc);
}
return rc; return rc;
} }
...@@ -260,7 +268,9 @@ int test_373(struct Options options) ...@@ -260,7 +268,9 @@ int test_373(struct Options options)
char clientid[30 + sizeof(unique)]; char clientid[30 + sizeof(unique)];
heap_info* mqtt_mem = 0; heap_info* mqtt_mem = 0;
MyLog(LOGA_INFO, "Running test373 with QoS=%u, iterrations=%u\n",options.QoS,options.iterrations);
sprintf(clientid, "paho-test373-%s", unique); sprintf(clientid, "paho-test373-%s", unique);
connectCnt = 0;
rc = MQTTAsync_create(&mqttasyncContext, options.proxy_connection, clientid, rc = MQTTAsync_create(&mqttasyncContext, options.proxy_connection, clientid,
MQTTCLIENT_PERSISTENCE_NONE, MQTTCLIENT_PERSISTENCE_NONE,
NULL); NULL);
...@@ -286,7 +296,7 @@ int test_373(struct Options options) ...@@ -286,7 +296,7 @@ int test_373(struct Options options)
goto exit; goto exit;
} }
MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR); MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
while (connectCnt < 5) while (connectCnt < options.iterrations)
{ {
if (!connected) if (!connected)
{ {
...@@ -315,11 +325,11 @@ int test_373(struct Options options) ...@@ -315,11 +325,11 @@ int test_373(struct Options options)
} }
else else
{ {
/* while connected send 1000 message per second */ /* while connected send 100 message per second */
int topicId; int topicId;
for(topicId=0; topicId < 1000; topicId++) for(topicId=0; topicId < 100; topicId++)
{ {
rc = test373SendPublishMessage(mqttasyncContext,topicId); rc = test373SendPublishMessage(mqttasyncContext,topicId,options.QoS);
if (rc != MQTTASYNC_SUCCESS) break; if (rc != MQTTASYNC_SUCCESS) break;
} }
MySleep(100); MySleep(100);
...@@ -333,6 +343,7 @@ int test_373(struct Options options) ...@@ -333,6 +343,7 @@ int test_373(struct Options options)
MyLog(LOGA_INFO, "MQTT mem current %ld, max %ld",mqtt_mem->current_size,mqtt_mem->max_size); MyLog(LOGA_INFO, "MQTT mem current %ld, max %ld",mqtt_mem->current_size,mqtt_mem->max_size);
#endif #endif
MQTTAsync_disconnect(mqttasyncContext, NULL); MQTTAsync_disconnect(mqttasyncContext, NULL);
connected = 0;
MyLog(LOGA_INFO, "PublishCnt %d, FailedCnt %d, Pending %d maxPending %d", MyLog(LOGA_INFO, "PublishCnt %d, FailedCnt %d, Pending %d maxPending %d",
goodPublishCnt,failedPublishCnt,pendingMessageCnt,pendingMessageCntMax); goodPublishCnt,failedPublishCnt,pendingMessageCnt,pendingMessageCntMax);
#if !defined(_WINDOWS) #if !defined(_WINDOWS)
...@@ -359,6 +370,7 @@ int main(int argc, char** argv) ...@@ -359,6 +370,7 @@ int main(int argc, char** argv)
int* numtests = &tests; int* numtests = &tests;
int rc = 0; int rc = 0;
int (*tests[])() = { NULL, test_373}; int (*tests[])() = { NULL, test_373};
unsigned int QoS;
sprintf(unique, "%u", rand()); sprintf(unique, "%u", rand());
MyLog(LOGA_INFO, "Random prefix/suffix is %s", unique); MyLog(LOGA_INFO, "Random prefix/suffix is %s", unique);
...@@ -369,11 +381,26 @@ int main(int argc, char** argv) ...@@ -369,11 +381,26 @@ int main(int argc, char** argv)
if (options.test_no == 0) if (options.test_no == 0)
{ /* run all the tests */ { /* run all the tests */
for (options.test_no = 1; options.test_no < ARRAY_SIZE(tests); ++options.test_no) for (options.test_no = 1; options.test_no < ARRAY_SIZE(tests); ++options.test_no)
{
/* test with QoS 0, 1 and 2 and just 5 iterrations */
for (QoS = 0; QoS < 3; QoS++)
{ {
failures = 0; failures = 0;
options.QoS = QoS;
options.iterrations = 5;
MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR); MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
rc += tests[options.test_no](options); /* return number of failures. 0 = test succeeded */ rc += tests[options.test_no](options); /* return number of failures. 0 = test succeeded */
} }
if (rc == 0)
{
/* Test with much more iterrations for QoS = 0 */
failures = 0;
options.QoS = 0;
options.iterrations = 100;
MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
rc += tests[options.test_no](options); /* return number of failures. 0 = test succeeded */
}
}
} }
else else
{ {
......
...@@ -6,10 +6,11 @@ rm -rf build.paho ...@@ -6,10 +6,11 @@ rm -rf build.paho
mkdir build.paho mkdir build.paho
cd build.paho cd build.paho
echo "travis build dir $TRAVIS_BUILD_DIR pwd $PWD" echo "travis build dir $TRAVIS_BUILD_DIR pwd $PWD"
cmake -DPAHO_WITH_SSL=TRUE -DPAHO_BUILD_DOCUMENTATION=FALSE -DPAHO_BUILD_SAMPLES=TRUE .. cmake -DCMAKE_BUILD_TYPE=Debug -DPAHO_WITH_SSL=TRUE -DPAHO_BUILD_DOCUMENTATION=FALSE -DPAHO_BUILD_SAMPLES=TRUE ..
make make
python ../test/mqttsas.py & python ../test/mqttsas.py &
ctest -VV --timeout 600 ctest -VV --timeout 600
cpack --verbose
kill %1 kill %1
#killall mosquitto #killall mosquitto
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment