Commit b62244d5 authored by Ian Craggs's avatar Ian Craggs

Add Python interface for easier testing

parent 6694ae38
...@@ -5,12 +5,12 @@ ...@@ -5,12 +5,12 @@
<storageModule buildSystemId="org.eclipse.cdt.managedbuilder.core.configurationDataProvider" id="cdt.managedbuild.toolchain.gnu.base.2116872643" moduleId="org.eclipse.cdt.core.settings" name="debug-native"> <storageModule buildSystemId="org.eclipse.cdt.managedbuilder.core.configurationDataProvider" id="cdt.managedbuild.toolchain.gnu.base.2116872643" moduleId="org.eclipse.cdt.core.settings" name="debug-native">
<externalSettings/> <externalSettings/>
<extensions> <extensions>
<extension id="org.eclipse.cdt.core.ELF" point="org.eclipse.cdt.core.BinaryParser"/>
<extension id="org.eclipse.cdt.core.GASErrorParser" point="org.eclipse.cdt.core.ErrorParser"/> <extension id="org.eclipse.cdt.core.GASErrorParser" point="org.eclipse.cdt.core.ErrorParser"/>
<extension id="org.eclipse.cdt.core.GCCErrorParser" point="org.eclipse.cdt.core.ErrorParser"/> <extension id="org.eclipse.cdt.core.GCCErrorParser" point="org.eclipse.cdt.core.ErrorParser"/>
<extension id="org.eclipse.cdt.core.GmakeErrorParser" point="org.eclipse.cdt.core.ErrorParser"/> <extension id="org.eclipse.cdt.core.GmakeErrorParser" point="org.eclipse.cdt.core.ErrorParser"/>
<extension id="org.eclipse.cdt.core.CWDLocator" point="org.eclipse.cdt.core.ErrorParser"/> <extension id="org.eclipse.cdt.core.CWDLocator" point="org.eclipse.cdt.core.ErrorParser"/>
<extension id="org.eclipse.cdt.core.GLDErrorParser" point="org.eclipse.cdt.core.ErrorParser"/> <extension id="org.eclipse.cdt.core.GLDErrorParser" point="org.eclipse.cdt.core.ErrorParser"/>
<extension id="org.eclipse.cdt.core.ELF" point="org.eclipse.cdt.core.BinaryParser"/>
</extensions> </extensions>
</storageModule> </storageModule>
<storageModule moduleId="cdtBuildSystem" version="4.0.0"> <storageModule moduleId="cdtBuildSystem" version="4.0.0">
...@@ -38,6 +38,17 @@ ...@@ -38,6 +38,17 @@
</tool> </tool>
</toolChain> </toolChain>
</folderInfo> </folderInfo>
<fileInfo id="cdt.managedbuild.toolchain.gnu.base.2116872643.660980254" name="mqttclient_module.c" rcbsApplicability="disable" resourcePath="test/python/mqttclient_module.c" toolsToInvoke="cdt.managedbuild.tool.gnu.c.compiler.base.919295913.892752676">
<tool id="cdt.managedbuild.tool.gnu.c.compiler.base.919295913.892752676" name="GCC C Compiler" superClass="cdt.managedbuild.tool.gnu.c.compiler.base.919295913">
<option id="gnu.c.compiler.option.include.paths.1043418401" superClass="gnu.c.compiler.option.include.paths" valueType="includePath">
<listOptionValue builtIn="false" value="/usr/include/python2.6"/>
</option>
<inputType id="cdt.managedbuild.tool.gnu.c.compiler.input.1471037566" superClass="cdt.managedbuild.tool.gnu.c.compiler.input"/>
</tool>
</fileInfo>
<sourceEntries>
<entry flags="VALUE_WORKSPACE_PATH" kind="sourcePath" name="src"/>
</sourceEntries>
</configuration> </configuration>
</storageModule> </storageModule>
<storageModule moduleId="org.eclipse.cdt.core.externalSettings"/> <storageModule moduleId="org.eclipse.cdt.core.externalSettings"/>
...@@ -46,12 +57,12 @@ ...@@ -46,12 +57,12 @@
<storageModule buildSystemId="org.eclipse.cdt.managedbuilder.core.configurationDataProvider" id="cdt.managedbuild.toolchain.gnu.base.2116872643.128299804" moduleId="org.eclipse.cdt.core.settings" name="debug-x86-linux"> <storageModule buildSystemId="org.eclipse.cdt.managedbuilder.core.configurationDataProvider" id="cdt.managedbuild.toolchain.gnu.base.2116872643.128299804" moduleId="org.eclipse.cdt.core.settings" name="debug-x86-linux">
<externalSettings/> <externalSettings/>
<extensions> <extensions>
<extension id="org.eclipse.cdt.core.ELF" point="org.eclipse.cdt.core.BinaryParser"/>
<extension id="org.eclipse.cdt.core.GASErrorParser" point="org.eclipse.cdt.core.ErrorParser"/> <extension id="org.eclipse.cdt.core.GASErrorParser" point="org.eclipse.cdt.core.ErrorParser"/>
<extension id="org.eclipse.cdt.core.GCCErrorParser" point="org.eclipse.cdt.core.ErrorParser"/> <extension id="org.eclipse.cdt.core.GCCErrorParser" point="org.eclipse.cdt.core.ErrorParser"/>
<extension id="org.eclipse.cdt.core.GmakeErrorParser" point="org.eclipse.cdt.core.ErrorParser"/> <extension id="org.eclipse.cdt.core.GmakeErrorParser" point="org.eclipse.cdt.core.ErrorParser"/>
<extension id="org.eclipse.cdt.core.CWDLocator" point="org.eclipse.cdt.core.ErrorParser"/> <extension id="org.eclipse.cdt.core.CWDLocator" point="org.eclipse.cdt.core.ErrorParser"/>
<extension id="org.eclipse.cdt.core.GLDErrorParser" point="org.eclipse.cdt.core.ErrorParser"/> <extension id="org.eclipse.cdt.core.GLDErrorParser" point="org.eclipse.cdt.core.ErrorParser"/>
<extension id="org.eclipse.cdt.core.ELF" point="org.eclipse.cdt.core.BinaryParser"/>
</extensions> </extensions>
</storageModule> </storageModule>
<storageModule moduleId="cdtBuildSystem" version="4.0.0"> <storageModule moduleId="cdtBuildSystem" version="4.0.0">
...@@ -79,6 +90,14 @@ ...@@ -79,6 +90,14 @@
</tool> </tool>
</toolChain> </toolChain>
</folderInfo> </folderInfo>
<fileInfo id="cdt.managedbuild.toolchain.gnu.base.2116872643.128299804.44392871" name="mqttclient_module.c" rcbsApplicability="disable" resourcePath="test/python/mqttclient_module.c" toolsToInvoke="cdt.managedbuild.tool.gnu.c.compiler.base.1542253543.1236301446">
<tool id="cdt.managedbuild.tool.gnu.c.compiler.base.1542253543.1236301446" name="GCC C Compiler" superClass="cdt.managedbuild.tool.gnu.c.compiler.base.1542253543">
<option id="gnu.c.compiler.option.include.paths.387085861" superClass="gnu.c.compiler.option.include.paths" valueType="includePath">
<listOptionValue builtIn="false" value="/usr/include/python2.6/"/>
</option>
<inputType id="cdt.managedbuild.tool.gnu.c.compiler.input.912266645" superClass="cdt.managedbuild.tool.gnu.c.compiler.input"/>
</tool>
</fileInfo>
</configuration> </configuration>
</storageModule> </storageModule>
<storageModule moduleId="org.eclipse.cdt.core.externalSettings"/> <storageModule moduleId="org.eclipse.cdt.core.externalSettings"/>
...@@ -87,12 +106,12 @@ ...@@ -87,12 +106,12 @@
<storageModule buildSystemId="org.eclipse.cdt.managedbuilder.core.configurationDataProvider" id="cdt.managedbuild.toolchain.gnu.base.2116872643.1588218084" moduleId="org.eclipse.cdt.core.settings" name="debug-arm-linux-gnueabihf"> <storageModule buildSystemId="org.eclipse.cdt.managedbuilder.core.configurationDataProvider" id="cdt.managedbuild.toolchain.gnu.base.2116872643.1588218084" moduleId="org.eclipse.cdt.core.settings" name="debug-arm-linux-gnueabihf">
<externalSettings/> <externalSettings/>
<extensions> <extensions>
<extension id="org.eclipse.cdt.core.ELF" point="org.eclipse.cdt.core.BinaryParser"/>
<extension id="org.eclipse.cdt.core.GASErrorParser" point="org.eclipse.cdt.core.ErrorParser"/> <extension id="org.eclipse.cdt.core.GASErrorParser" point="org.eclipse.cdt.core.ErrorParser"/>
<extension id="org.eclipse.cdt.core.GCCErrorParser" point="org.eclipse.cdt.core.ErrorParser"/> <extension id="org.eclipse.cdt.core.GCCErrorParser" point="org.eclipse.cdt.core.ErrorParser"/>
<extension id="org.eclipse.cdt.core.GmakeErrorParser" point="org.eclipse.cdt.core.ErrorParser"/> <extension id="org.eclipse.cdt.core.GmakeErrorParser" point="org.eclipse.cdt.core.ErrorParser"/>
<extension id="org.eclipse.cdt.core.CWDLocator" point="org.eclipse.cdt.core.ErrorParser"/> <extension id="org.eclipse.cdt.core.CWDLocator" point="org.eclipse.cdt.core.ErrorParser"/>
<extension id="org.eclipse.cdt.core.GLDErrorParser" point="org.eclipse.cdt.core.ErrorParser"/> <extension id="org.eclipse.cdt.core.GLDErrorParser" point="org.eclipse.cdt.core.ErrorParser"/>
<extension id="org.eclipse.cdt.core.ELF" point="org.eclipse.cdt.core.BinaryParser"/>
</extensions> </extensions>
</storageModule> </storageModule>
<storageModule moduleId="cdtBuildSystem" version="4.0.0"> <storageModule moduleId="cdtBuildSystem" version="4.0.0">
...@@ -120,6 +139,14 @@ ...@@ -120,6 +139,14 @@
</tool> </tool>
</toolChain> </toolChain>
</folderInfo> </folderInfo>
<fileInfo id="cdt.managedbuild.toolchain.gnu.base.2116872643.1588218084.1410045178" name="mqttclient_module.c" rcbsApplicability="disable" resourcePath="test/python/mqttclient_module.c" toolsToInvoke="cdt.managedbuild.tool.gnu.c.compiler.base.1805700752.404015131">
<tool id="cdt.managedbuild.tool.gnu.c.compiler.base.1805700752.404015131" name="GCC C Compiler" superClass="cdt.managedbuild.tool.gnu.c.compiler.base.1805700752">
<option id="gnu.c.compiler.option.include.paths.253579949" superClass="gnu.c.compiler.option.include.paths" valueType="includePath">
<listOptionValue builtIn="false" value="/usr/include/python2.6/"/>
</option>
<inputType id="cdt.managedbuild.tool.gnu.c.compiler.input.987997893" superClass="cdt.managedbuild.tool.gnu.c.compiler.input"/>
</tool>
</fileInfo>
</configuration> </configuration>
</storageModule> </storageModule>
<storageModule moduleId="org.eclipse.cdt.core.externalSettings"/> <storageModule moduleId="org.eclipse.cdt.core.externalSettings"/>
...@@ -128,9 +155,6 @@ ...@@ -128,9 +155,6 @@
<storageModule moduleId="cdtBuildSystem" version="4.0.0"> <storageModule moduleId="cdtBuildSystem" version="4.0.0">
<project id="org.eclipse.paho.mqtt.c.null.1335713239" name="org.eclipse.paho.mqtt.c"/> <project id="org.eclipse.paho.mqtt.c.null.1335713239" name="org.eclipse.paho.mqtt.c"/>
</storageModule> </storageModule>
<storageModule moduleId="scannerConfiguration">
<autodiscovery enabled="true" problemReportingEnabled="true" selectedProfileId=""/>
</storageModule>
<storageModule moduleId="org.eclipse.cdt.core.LanguageSettingsProviders"/> <storageModule moduleId="org.eclipse.cdt.core.LanguageSettingsProviders"/>
<storageModule moduleId="refreshScope" versionNumber="2"> <storageModule moduleId="refreshScope" versionNumber="2">
<configuration configurationName="Default"> <configuration configurationName="Default">
...@@ -144,4 +168,25 @@ ...@@ -144,4 +168,25 @@
</configuration> </configuration>
</storageModule> </storageModule>
<storageModule moduleId="org.eclipse.cdt.make.core.buildtargets"/> <storageModule moduleId="org.eclipse.cdt.make.core.buildtargets"/>
<storageModule moduleId="scannerConfiguration">
<autodiscovery enabled="true" problemReportingEnabled="true" selectedProfileId=""/>
<scannerConfigBuildInfo instanceId="cdt.managedbuild.toolchain.gnu.base.2116872643.128299804;cdt.managedbuild.toolchain.gnu.base.2116872643.128299804.;cdt.managedbuild.tool.gnu.cpp.compiler.base.1478312340;cdt.managedbuild.tool.gnu.cpp.compiler.input.1918693642">
<autodiscovery enabled="true" problemReportingEnabled="true" selectedProfileId=""/>
</scannerConfigBuildInfo>
<scannerConfigBuildInfo instanceId="cdt.managedbuild.toolchain.gnu.base.2116872643.128299804;cdt.managedbuild.toolchain.gnu.base.2116872643.128299804.;cdt.managedbuild.tool.gnu.c.compiler.base.1542253543;cdt.managedbuild.tool.gnu.c.compiler.input.273102950">
<autodiscovery enabled="true" problemReportingEnabled="true" selectedProfileId=""/>
</scannerConfigBuildInfo>
<scannerConfigBuildInfo instanceId="cdt.managedbuild.toolchain.gnu.base.2116872643.1588218084;cdt.managedbuild.toolchain.gnu.base.2116872643.1588218084.;cdt.managedbuild.tool.gnu.c.compiler.base.1805700752;cdt.managedbuild.tool.gnu.c.compiler.input.892553453">
<autodiscovery enabled="true" problemReportingEnabled="true" selectedProfileId=""/>
</scannerConfigBuildInfo>
<scannerConfigBuildInfo instanceId="cdt.managedbuild.toolchain.gnu.base.2116872643;cdt.managedbuild.toolchain.gnu.base.2116872643.2105697674;cdt.managedbuild.tool.gnu.cpp.compiler.base.2093522624;cdt.managedbuild.tool.gnu.cpp.compiler.input.58852055">
<autodiscovery enabled="true" problemReportingEnabled="true" selectedProfileId=""/>
</scannerConfigBuildInfo>
<scannerConfigBuildInfo instanceId="cdt.managedbuild.toolchain.gnu.base.2116872643.1588218084;cdt.managedbuild.toolchain.gnu.base.2116872643.1588218084.;cdt.managedbuild.tool.gnu.cpp.compiler.base.1005171182;cdt.managedbuild.tool.gnu.cpp.compiler.input.1367844459">
<autodiscovery enabled="true" problemReportingEnabled="true" selectedProfileId=""/>
</scannerConfigBuildInfo>
<scannerConfigBuildInfo instanceId="cdt.managedbuild.toolchain.gnu.base.2116872643;cdt.managedbuild.toolchain.gnu.base.2116872643.2105697674;cdt.managedbuild.tool.gnu.c.compiler.base.919295913;cdt.managedbuild.tool.gnu.c.compiler.input.873628110">
<autodiscovery enabled="true" problemReportingEnabled="true" selectedProfileId=""/>
</scannerConfigBuildInfo>
</storageModule>
</cproject> </cproject>
...@@ -5,6 +5,11 @@ ...@@ -5,6 +5,11 @@
<projects> <projects>
</projects> </projects>
<buildSpec> <buildSpec>
<buildCommand>
<name>org.python.pydev.PyDevBuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand> <buildCommand>
<name>org.eclipse.cdt.managedbuilder.core.genmakebuilder</name> <name>org.eclipse.cdt.managedbuilder.core.genmakebuilder</name>
<triggers>clean,full,incremental,</triggers> <triggers>clean,full,incremental,</triggers>
...@@ -23,5 +28,6 @@ ...@@ -23,5 +28,6 @@
<nature>org.eclipse.cdt.core.ccnature</nature> <nature>org.eclipse.cdt.core.ccnature</nature>
<nature>org.eclipse.cdt.managedbuilder.core.managedBuildNature</nature> <nature>org.eclipse.cdt.managedbuilder.core.managedBuildNature</nature>
<nature>org.eclipse.cdt.managedbuilder.core.ScannerConfigNature</nature> <nature>org.eclipse.cdt.managedbuilder.core.ScannerConfigNature</nature>
<nature>org.python.pydev.pythonNature</nature>
</natures> </natures>
</projectDescription> </projectDescription>
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?eclipse-pydev version="1.0"?><pydev_project>
<pydev_property name="org.python.pydev.PYTHON_PROJECT_INTERPRETER">Default</pydev_property>
<pydev_property name="org.python.pydev.PYTHON_PROJECT_VERSION">python 2.7</pydev_property>
</pydev_project>
...@@ -72,10 +72,10 @@ HEADERS = $(srcdir)/*.h ...@@ -72,10 +72,10 @@ HEADERS = $(srcdir)/*.h
HEADERS_C = $(filter-out $(srcdir)/MQTTAsync.h, $(HEADERS)) HEADERS_C = $(filter-out $(srcdir)/MQTTAsync.h, $(HEADERS))
HEADERS_A = $(HEADERS) HEADERS_A = $(HEADERS)
SAMPLE_FILES_C = stdinpub stdoutsub pubsync pubasync subasync SAMPLE_FILES_C = paho_cs_pub paho_cs_sub MQTTClient_publish MQTTClient_publish_async MQTTClient_subscribe
SYNC_SAMPLES = ${addprefix ${blddir}/samples/,${SAMPLE_FILES_C}} SYNC_SAMPLES = ${addprefix ${blddir}/samples/,${SAMPLE_FILES_C}}
SAMPLE_FILES_A = stdinpuba stdoutsuba MQTTAsync_subscribe MQTTAsync_publish SAMPLE_FILES_A = paho_c_pub paho_c_sub MQTTAsync_subscribe MQTTAsync_publish
ASYNC_SAMPLES = ${addprefix ${blddir}/samples/,${SAMPLE_FILES_A}} ASYNC_SAMPLES = ${addprefix ${blddir}/samples/,${SAMPLE_FILES_A}}
TEST_FILES_C = test1 sync_client_test test_mqtt4sync TEST_FILES_C = test1 sync_client_test test_mqtt4sync
......
...@@ -53,8 +53,8 @@ ...@@ -53,8 +53,8 @@
#define URI_TCP "tcp://" #define URI_TCP "tcp://"
#define BUILD_TIMESTAMP "Sun Feb 14 19:24:53 GMT 2016" #define BUILD_TIMESTAMP "##MQTTCLIENT_BUILD_TAG##"
#define CLIENT_VERSION "1.0.3" #define CLIENT_VERSION "##MQTTCLIENT_VERSION_TAG##"
char* client_timestamp_eye = "MQTTAsyncV3_Timestamp " BUILD_TIMESTAMP; char* client_timestamp_eye = "MQTTAsyncV3_Timestamp " BUILD_TIMESTAMP;
char* client_version_eye = "MQTTAsyncV3_Version " CLIENT_VERSION; char* client_version_eye = "MQTTAsyncV3_Version " CLIENT_VERSION;
...@@ -278,6 +278,9 @@ typedef struct MQTTAsync_struct ...@@ -278,6 +278,9 @@ typedef struct MQTTAsync_struct
MQTTAsync_messageArrived* ma; MQTTAsync_messageArrived* ma;
MQTTAsync_deliveryComplete* dc; MQTTAsync_deliveryComplete* dc;
void* context; /* the context to be associated with the main callbacks*/ void* context; /* the context to be associated with the main callbacks*/
MQTTAsync_connected* connected;
void* connected_context; /* the context to be associated with the connected callback*/
MQTTAsync_command connect; /* Connect operation properties */ MQTTAsync_command connect; /* Connect operation properties */
MQTTAsync_command disconnect; /* Disconnect operation properties */ MQTTAsync_command disconnect; /* Disconnect operation properties */
...@@ -300,6 +303,7 @@ typedef struct MQTTAsync_struct ...@@ -300,6 +303,7 @@ typedef struct MQTTAsync_struct
int currentInterval; int currentInterval;
START_TIME_TYPE lastConnectionFailedTime; START_TIME_TYPE lastConnectionFailedTime;
int retrying; int retrying;
int reconnectNow;
} MQTTAsyncs; } MQTTAsyncs;
...@@ -442,7 +446,7 @@ int MQTTAsync_createWithOptions(MQTTAsync* handle, const char* serverURI, const ...@@ -442,7 +446,7 @@ int MQTTAsync_createWithOptions(MQTTAsync* handle, const char* serverURI, const
if (options) if (options)
{ {
m->createOptions = malloc(sizeof(MQTTAsync_createOptions)); m->createOptions = malloc(sizeof(MQTTAsync_createOptions));
m->createOptions = options; memcpy(m->createOptions, options, sizeof(MQTTAsync_createOptions));
} }
#if !defined(NO_PERSISTENCE) #if !defined(NO_PERSISTENCE)
...@@ -825,6 +829,31 @@ void MQTTAsync_startConnectRetry(MQTTAsyncs* m) ...@@ -825,6 +829,31 @@ void MQTTAsync_startConnectRetry(MQTTAsyncs* m)
} }
int MQTTAsync_reconnect(MQTTAsync handle)
{
int rc = MQTTASYNC_FAILURE;
MQTTAsyncs* m = handle;
FUNC_ENTRY;
MQTTAsync_lock_mutex(mqttasync_mutex);
if (m->automaticReconnect && m->shouldBeConnected)
{
m->reconnectNow = 1;
if (m->retrying == 0)
{
m->currentInterval = m->minRetryInterval;
m->retrying = 1;
}
rc = MQTTASYNC_SUCCESS;
}
MQTTAsync_unlock_mutex(mqttasync_mutex);
FUNC_EXIT_RC(rc);
return rc;
}
void MQTTAsync_checkDisconnect(MQTTAsync handle, MQTTAsync_command* command) void MQTTAsync_checkDisconnect(MQTTAsync handle, MQTTAsync_command* command)
{ {
MQTTAsyncs* m = handle; MQTTAsyncs* m = handle;
...@@ -1308,7 +1337,7 @@ void MQTTAsync_checkTimeouts() ...@@ -1308,7 +1337,7 @@ void MQTTAsync_checkTimeouts()
if (m->automaticReconnect && m->retrying) if (m->automaticReconnect && m->retrying)
{ {
if (MQTTAsync_elapsed(m->lastConnectionFailedTime) > (m->currentInterval * 1000)) if (m->reconnectNow || MQTTAsync_elapsed(m->lastConnectionFailedTime) > (m->currentInterval * 1000))
{ {
/* put the connect command to the head of the command queue, using the next serverURI */ /* put the connect command to the head of the command queue, using the next serverURI */
MQTTAsync_queuedCommand* conn = malloc(sizeof(MQTTAsync_queuedCommand)); MQTTAsync_queuedCommand* conn = malloc(sizeof(MQTTAsync_queuedCommand));
...@@ -1317,6 +1346,7 @@ void MQTTAsync_checkTimeouts() ...@@ -1317,6 +1346,7 @@ void MQTTAsync_checkTimeouts()
conn->command = m->connect; conn->command = m->connect;
Log(TRACE_MIN, -1, "Automatically attempting to reconnect"); Log(TRACE_MIN, -1, "Automatically attempting to reconnect");
MQTTAsync_addCommand(conn, sizeof(m->connect)); MQTTAsync_addCommand(conn, sizeof(m->connect));
m->reconnectNow = 0;
} }
} }
} }
...@@ -1624,6 +1654,7 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n) ...@@ -1624,6 +1654,7 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
Log(TRACE_MIN, -1, "Connect succeeded to %s", Log(TRACE_MIN, -1, "Connect succeeded to %s",
m->connect.details.conn.serverURIs[m->connect.details.conn.currentURI]); m->connect.details.conn.serverURIs[m->connect.details.conn.currentURI]);
MQTTAsync_freeConnect(m->connect); MQTTAsync_freeConnect(m->connect);
int onSuccess = (m->connect.onSuccess != NULL); /* save setting of onSuccess callback */
if (m->connect.onSuccess) if (m->connect.onSuccess)
{ {
MQTTAsync_successData data; MQTTAsync_successData data;
...@@ -1636,6 +1667,13 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n) ...@@ -1636,6 +1667,13 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
data.alt.connect.MQTTVersion = m->connect.details.conn.MQTTVersion; data.alt.connect.MQTTVersion = m->connect.details.conn.MQTTVersion;
data.alt.connect.sessionPresent = sessionPresent; data.alt.connect.sessionPresent = sessionPresent;
(*(m->connect.onSuccess))(m->connect.context, &data); (*(m->connect.onSuccess))(m->connect.context, &data);
m->connect.onSuccess = NULL; /* don't accidentally call it again */
}
if (m->connected)
{
Log(TRACE_MIN, -1, "Calling connected for client %s", m->c->clientID);
char* reason = (onSuccess) ? "connect onSuccess called" : "automatic reconnect";
(*(m->connected))(m->connected_context, reason);
} }
} }
else else
...@@ -1841,6 +1879,28 @@ int MQTTAsync_setCallbacks(MQTTAsync handle, void* context, ...@@ -1841,6 +1879,28 @@ int MQTTAsync_setCallbacks(MQTTAsync handle, void* context,
} }
int MQTTAsync_setConnected(MQTTAsync handle, void* context, MQTTAsync_connected* connected)
{
int rc = MQTTASYNC_SUCCESS;
MQTTAsyncs* m = handle;
FUNC_ENTRY;
MQTTAsync_lock_mutex(mqttasync_mutex);
if (m == NULL || m->c->connect_state != 0)
rc = MQTTASYNC_FAILURE;
else
{
m->connected_context = context;
m->connected = connected;
}
MQTTAsync_unlock_mutex(mqttasync_mutex);
FUNC_EXIT_RC(rc);
return rc;
}
void MQTTAsync_closeOnly(Clients* client) void MQTTAsync_closeOnly(Clients* client)
{ {
FUNC_ENTRY; FUNC_ENTRY;
...@@ -1916,9 +1976,6 @@ int MQTTAsync_cleanSession(Clients* client) ...@@ -1916,9 +1976,6 @@ int MQTTAsync_cleanSession(Clients* client)
} }
int MQTTAsync_deliverMessage(MQTTAsyncs* m, char* topicName, size_t topicLen, MQTTAsync_message* mm) int MQTTAsync_deliverMessage(MQTTAsyncs* m, char* topicName, size_t topicLen, MQTTAsync_message* mm)
{ {
int rc; int rc;
...@@ -2448,7 +2505,7 @@ int MQTTAsync_send(MQTTAsync handle, const char* destinationName, int payloadlen ...@@ -2448,7 +2505,7 @@ int MQTTAsync_send(MQTTAsync handle, const char* destinationName, int payloadlen
if (m == NULL || m->c == NULL) if (m == NULL || m->c == NULL)
rc = MQTTASYNC_FAILURE; rc = MQTTASYNC_FAILURE;
else if (m->c->connected == 0 && (m->createOptions == NULL || else if (m->c->connected == 0 && (m->createOptions == NULL ||
m->createOptions->send_while_disconnected == 0 || m->shouldBeConnected == 0)) m->createOptions->sendWhileDisconnected == 0 || m->shouldBeConnected == 0))
rc = MQTTASYNC_DISCONNECTED; rc = MQTTASYNC_DISCONNECTED;
else if (!UTF8_validateString(destinationName)) else if (!UTF8_validateString(destinationName))
rc = MQTTASYNC_BAD_UTF8_STRING; rc = MQTTASYNC_BAD_UTF8_STRING;
......
...@@ -313,6 +313,23 @@ typedef void MQTTAsync_deliveryComplete(void* context, MQTTAsync_token token); ...@@ -313,6 +313,23 @@ typedef void MQTTAsync_deliveryComplete(void* context, MQTTAsync_token token);
*/ */
typedef void MQTTAsync_connectionLost(void* context, char* cause); typedef void MQTTAsync_connectionLost(void* context, char* cause);
/**
* This is a callback function, which will be called when the client
* library successfully connects. This is superfluous when the connection
* is made in response to a MQTTAsync_connect call, because the onSuccess
* callback can be used. It is intended for use when automatic reconnect
* is enabled, so that when a reconnection attempt succeeds in the background,
* the application is notified and can take any required actions.
* @param context A pointer to the <i>context</i> value originally passed to
* MQTTAsync_setCallbacks(), which contains any application-specific context.
* @param cause The reason for the disconnection.
* Currently, <i>cause</i> is always set to NULL.
*/
typedef void MQTTAsync_connected(void* context, char* cause);
/** The data returned on completion of an unsuccessful API call in the response callback onFailure. */ /** The data returned on completion of an unsuccessful API call in the response callback onFailure. */
typedef struct typedef struct
{ {
...@@ -436,6 +453,14 @@ typedef struct ...@@ -436,6 +453,14 @@ typedef struct
*/ */
DLLExport int MQTTAsync_setCallbacks(MQTTAsync handle, void* context, MQTTAsync_connectionLost* cl, DLLExport int MQTTAsync_setCallbacks(MQTTAsync handle, void* context, MQTTAsync_connectionLost* cl,
MQTTAsync_messageArrived* ma, MQTTAsync_deliveryComplete* dc); MQTTAsync_messageArrived* ma, MQTTAsync_deliveryComplete* dc);
DLLExport int MQTTAsync_setConnected(MQTTAsync handle, void* context, MQTTAsync_connected* co);
/** /**
...@@ -491,9 +516,9 @@ typedef struct ...@@ -491,9 +516,9 @@ typedef struct
/** The version number of this structure. Must be 0 */ /** The version number of this structure. Must be 0 */
int struct_version; int struct_version;
/** Whether to allow messages to be sent when the client library is not connected. */ /** Whether to allow messages to be sent when the client library is not connected. */
int send_while_disconnected; int sendWhileDisconnected;
/** the maximum number of messages allowed to be buffered while not connected. */ /** the maximum number of messages allowed to be buffered while not connected. */
int max_buffered_messages; int maxBufferedMessages;
} MQTTAsync_createOptions; } MQTTAsync_createOptions;
#define MQTTAsync_createOptions_initializer { {'M', 'Q', 'C', 'O'}, 0, 0, 100 } #define MQTTAsync_createOptions_initializer { {'M', 'Q', 'C', 'O'}, 0, 0, 100 }
......
...@@ -59,8 +59,8 @@ ...@@ -59,8 +59,8 @@
#define URI_TCP "tcp://" #define URI_TCP "tcp://"
#define BUILD_TIMESTAMP "Sun Feb 14 19:24:42 GMT 2016" #define BUILD_TIMESTAMP "##MQTTCLIENT_BUILD_TAG##"
#define CLIENT_VERSION "1.0.3" #define CLIENT_VERSION "##MQTTCLIENT_VERSION_TAG##"
char* client_timestamp_eye = "MQTTClientV3_Timestamp " BUILD_TIMESTAMP; char* client_timestamp_eye = "MQTTClientV3_Timestamp " BUILD_TIMESTAMP;
char* client_version_eye = "MQTTClientV3_Version " CLIENT_VERSION; char* client_version_eye = "MQTTClientV3_Version " CLIENT_VERSION;
......
...@@ -208,7 +208,7 @@ int main(int argc, char** argv) ...@@ -208,7 +208,7 @@ int main(int argc, char** argv)
topic = argv[1]; topic = argv[1];
printf("Using topic %s\n", topic); printf("Using topic %s\n", topic);
create_opts.send_while_disconnected = 1; create_opts.sendWhileDisconnected = 1;
rc = MQTTAsync_createWithOptions(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL, &create_opts); rc = MQTTAsync_createWithOptions(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL, &create_opts);
signal(SIGINT, cfinish); signal(SIGINT, cfinish);
......
/*******************************************************************************
* Copyright (c) 2012, 2013 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial contribution
*******************************************************************************/
/*
stdin publisher
compulsory parameters:
--topic topic to publish on
defaulted parameters:
--host localhost
--port 1883
--qos 0
--delimiters \n
--clientid stdin_publisher
--maxdatalen 100
--userid none
--password none
*/
#include "MQTTClient.h"
#include "MQTTClientPersistence.h"
#include <stdio.h>
#include <signal.h>
#include <memory.h>
#if defined(WIN32)
#include <Windows.h>
#define sleep Sleep
#else
#include <sys/time.h>
#include <stdlib.h>
#endif
volatile int toStop = 0;
void usage()
{
printf("MQTT stdin publisher\n");
printf("Usage: stdinpub topicname <options>, where options are:\n");
printf(" --host <hostname> (default is localhost)\n");
printf(" --port <port> (default is 1883)\n");
printf(" --qos <qos> (default is 0)\n");
printf(" --retained (default is off)\n");
printf(" --delimiter <delim> (default is \\n)");
printf(" --clientid <clientid> (default is hostname+timestamp)");
printf(" --maxdatalen 100\n");
printf(" --username none\n");
printf(" --password none\n");
exit(-1);
}
void myconnect(MQTTClient* client, MQTTClient_connectOptions* opts)
{
printf("Connecting\n");
if (MQTTClient_connect(*client, opts) != 0)
{
printf("Failed to connect\n");
exit(-1);
}
}
void cfinish(int sig)
{
signal(SIGINT, NULL);
toStop = 1;
}
struct
{
char* clientid;
char* delimiter;
int maxdatalen;
int qos;
int retained;
char* username;
char* password;
char* host;
char* port;
int verbose;
} opts =
{
"publisher", "\n", 100, 0, 0, NULL, NULL, "localhost", "1883", 0
};
void getopts(int argc, char** argv);
int messageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* m)
{
/* not expecting any messages */
return 1;
}
int main(int argc, char** argv)
{
MQTTClient client;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
char* topic = NULL;
char* buffer = NULL;
int rc = 0;
char url[100];
if (argc < 2)
usage();
getopts(argc, argv);
sprintf(url, "%s:%s", opts.host, opts.port);
if (opts.verbose)
printf("URL is %s\n", url);
topic = argv[1];
printf("Using topic %s\n", topic);
rc = MQTTClient_create(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL);
signal(SIGINT, cfinish);
signal(SIGTERM, cfinish);
rc = MQTTClient_setCallbacks(client, NULL, NULL, messageArrived, NULL);
conn_opts.keepAliveInterval = 10;
conn_opts.reliable = 0;
conn_opts.cleansession = 1;
conn_opts.username = opts.username;
conn_opts.password = opts.password;
myconnect(&client, &conn_opts);
buffer = malloc(opts.maxdatalen);
while (!toStop)
{
int data_len = 0;
int delim_len = 0;
delim_len = strlen(opts.delimiter);
do
{
buffer[data_len++] = getchar();
if (data_len > delim_len)
{
//printf("comparing %s %s\n", opts.delimiter, &buffer[data_len - delim_len]);
if (strncmp(opts.delimiter, &buffer[data_len - delim_len], delim_len) == 0)
break;
}
} while (data_len < opts.maxdatalen);
if (opts.verbose)
printf("Publishing data of length %d\n", data_len);
rc = MQTTClient_publish(client, topic, data_len, buffer, opts.qos, opts.retained, NULL);
if (rc != 0)
{
myconnect(&client, &conn_opts);
rc = MQTTClient_publish(client, topic, data_len, buffer, opts.qos, opts.retained, NULL);
}
if (opts.qos > 0)
MQTTClient_yield();
}
printf("Stopping\n");
free(buffer);
MQTTClient_disconnect(client, 0);
MQTTClient_destroy(&client);
return 0;
}
void getopts(int argc, char** argv)
{
int count = 2;
while (count < argc)
{
if (strcmp(argv[count], "--retained") == 0)
opts.retained = 1;
if (strcmp(argv[count], "--verbose") == 0)
opts.verbose = 1;
else if (strcmp(argv[count], "--qos") == 0)
{
if (++count < argc)
{
if (strcmp(argv[count], "0") == 0)
opts.qos = 0;
else if (strcmp(argv[count], "1") == 0)
opts.qos = 1;
else if (strcmp(argv[count], "2") == 0)
opts.qos = 2;
else
usage();
}
else
usage();
}
else if (strcmp(argv[count], "--host") == 0)
{
if (++count < argc)
opts.host = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--port") == 0)
{
if (++count < argc)
opts.port = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--clientid") == 0)
{
if (++count < argc)
opts.clientid = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--username") == 0)
{
if (++count < argc)
opts.username = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--password") == 0)
{
if (++count < argc)
opts.password = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--maxdatalen") == 0)
{
if (++count < argc)
opts.maxdatalen = atoi(argv[count]);
else
usage();
}
else if (strcmp(argv[count], "--delimiter") == 0)
{
if (++count < argc)
opts.delimiter = argv[count];
else
usage();
}
count++;
}
}
#include <Python.h>
#include "MQTTAsync.h"
#include "LinkedList.h"
static PyObject *MqttV3Error;
static PyObject* mqttv3_create(PyObject* self, PyObject *args)
{
MQTTAsync c;
char* serverURI;
char* clientId;
int persistence_option = MQTTCLIENT_PERSISTENCE_DEFAULT;
PyObject *pyoptions = NULL, *temp;
MQTTAsync_createOptions options = MQTTAsync_createOptions_initializer;
int rc;
if (!PyArg_ParseTuple(args, "ss|iO", &serverURI, &clientId,
&persistence_option, &pyoptions))
return NULL;
if (persistence_option != MQTTCLIENT_PERSISTENCE_DEFAULT
&& persistence_option != MQTTCLIENT_PERSISTENCE_NONE)
{
PyErr_SetString(PyExc_TypeError, "persistence must be DEFAULT or NONE");
return NULL;
}
if (pyoptions)
{
if (!PyDict_Check(pyoptions))
{
PyErr_SetString(PyExc_TypeError,
"Create options parameter must be a dictionary");
return NULL;
}
if ((temp = PyDict_GetItemString(pyoptions, "sendWhileDisconnected"))
!= NULL)
{
if (PyInt_Check(temp))
options.sendWhileDisconnected = (int) PyInt_AsLong(temp);
else
{
PyErr_SetString(PyExc_TypeError, "sendWhileDisconnected value must be int");
return NULL;
}
}
if ((temp = PyDict_GetItemString(pyoptions, "maxBufferedMessages"))
!= NULL)
{
if (PyInt_Check(temp))
options.maxBufferedMessages = (int) PyInt_AsLong(temp);
else
{
PyErr_SetString(PyExc_TypeError, "maxBufferedMessages value must be int");
return NULL;
}
}
rc = MQTTAsync_createWithOptions(&c, serverURI, clientId, persistence_option, NULL, &options);
}
else
rc = MQTTAsync_create(&c, serverURI, clientId, persistence_option, NULL);
return Py_BuildValue("ik", rc, c);
}
static List* callbacks = NULL;
static List* connected_callbacks = NULL;
enum msgTypes
{
CONNECT, PUBLISH, SUBSCRIBE, SUBSCRIBE_MANY, UNSUBSCRIBE
};
typedef struct
{
MQTTAsync c;
PyObject *context;
PyObject *cl, *ma, *dc;
} CallbackEntry;
typedef struct
{
MQTTAsync c;
PyObject *context;
PyObject *co;
} ConnectedEntry;
int clientCompare(void* a, void* b)
{
CallbackEntry* e = (CallbackEntry*) a;
return e->c == (MQTTAsync) b;
}
int connectedCompare(void* a, void* b)
{
ConnectedEntry* e = (ConnectedEntry*) a;
return e->c == (MQTTAsync) b;
}
void connected(void* context, char* cause)
{
/* call the right Python function, using the context */
PyObject *arglist;
PyObject *result;
ConnectedEntry* e = context;
PyGILState_STATE gstate;
gstate = PyGILState_Ensure();
arglist = Py_BuildValue("Os", e->context, cause);
result = PyEval_CallObject(e->co, arglist);
Py_DECREF(arglist);
PyGILState_Release(gstate);
}
void connectionLost(void* context, char* cause)
{
/* call the right Python function, using the context */
PyObject *arglist;
PyObject *result;
CallbackEntry* e = context;
PyGILState_STATE gstate;
gstate = PyGILState_Ensure();
arglist = Py_BuildValue("Os", e->context, cause);
result = PyEval_CallObject(e->cl, arglist);
Py_DECREF(arglist);
PyGILState_Release(gstate);
}
int messageArrived(void* context, char* topicName, int topicLen,
MQTTAsync_message* message)
{
PyObject *result = NULL;
CallbackEntry* e = context;
int rc = -99;
PyGILState_STATE gstate;
gstate = PyGILState_Ensure();
if (topicLen == 0)
result = PyObject_CallFunction(e->ma, "Os{ss#sisisisi}", e->context,
topicName, "payload", message->payload, message->payloadlen,
"qos", message->qos, "retained", message->retained, "dup",
message->dup, "msgid", message->msgid);
else
result = PyObject_CallFunction(e->ma, "Os#{ss#sisisisi}", e->context,
topicName, topicLen, "payload", message->payload,
message->payloadlen, "qos", message->qos, "retained",
message->retained, "dup", message->dup, "msgid",
message->msgid);
if (result)
{
if (PyInt_Check(result))
rc = (int) PyInt_AsLong(result);
Py_DECREF(result);
}
PyGILState_Release(gstate);
MQTTAsync_free(topicName);
MQTTAsync_freeMessage(&message);
return rc;
}
void deliveryComplete(void* context, MQTTAsync_token dt)
{
PyObject *arglist;
PyObject *result;
CallbackEntry* e = context;
PyGILState_STATE gstate;
gstate = PyGILState_Ensure();
arglist = Py_BuildValue("Oi", e->context, dt);
result = PyEval_CallObject(e->dc, arglist);
Py_DECREF(arglist);
PyGILState_Release(gstate);
}
static PyObject* mqttv3_setcallbacks(PyObject* self, PyObject *args)
{
MQTTAsync c;
CallbackEntry* e = NULL;
int rc;
e = malloc(sizeof(CallbackEntry));
if (!PyArg_ParseTuple(args, "kOOOO", &c, (PyObject**) &e->context, &e->cl,
&e->ma, &e->dc))
return NULL;
e->c = c;
if ((e->cl != Py_None && !PyCallable_Check(e->cl))
|| (e->ma != Py_None && !PyCallable_Check(e->ma))
|| (e->dc != Py_None && !PyCallable_Check(e->dc)))
{
PyErr_SetString(PyExc_TypeError,
"3rd, 4th and 5th parameters must be callable or None");
return NULL;
}
Py_BEGIN_ALLOW_THREADS
rc = MQTTAsync_setCallbacks(c, e, connectionLost, messageArrived, deliveryComplete);
Py_END_ALLOW_THREADS
if (rc == MQTTASYNC_SUCCESS)
{
ListElement* temp = NULL;
if ((temp = ListFindItem(callbacks, c, clientCompare)) != NULL)
{
ListDetach(callbacks, temp->content);
free(temp->content);
}
ListAppend(callbacks, e, sizeof(e));
Py_XINCREF(e->cl);
Py_XINCREF(e->ma);
Py_XINCREF(e->dc);
Py_XINCREF(e->context);
}
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_setconnected(PyObject* self, PyObject *args)
{
MQTTAsync c;
ConnectedEntry* e = NULL;
int rc;
e = malloc(sizeof(ConnectedEntry));
if (!PyArg_ParseTuple(args, "kOO", &c, (PyObject**) &e->context, &e->co))
return NULL;
e->c = c;
if (e->co != Py_None && !PyCallable_Check(e->co))
{
PyErr_SetString(PyExc_TypeError,
"3rd parameter must be callable or None");
return NULL;
}
Py_BEGIN_ALLOW_THREADS
rc = MQTTAsync_setConnected(c, e, connected);
Py_END_ALLOW_THREADS
if (rc == MQTTASYNC_SUCCESS)
{
ListElement* temp = NULL;
if ((temp = ListFindItem(connected_callbacks, c, connectedCompare)) != NULL)
{
ListDetach(connected_callbacks, temp->content);
free(temp->content);
}
ListAppend(connected_callbacks, e, sizeof(e));
Py_XINCREF(e->co);
Py_XINCREF(e->context);
}
return Py_BuildValue("i", rc);
}
typedef struct
{
MQTTAsync c;
PyObject *context;
PyObject *onSuccess, *onFailure;
enum msgTypes msgType;
} ResponseEntry;
void onSuccess(void* context, MQTTAsync_successData* response)
{
PyObject *result = NULL;
ResponseEntry* e = context;
PyGILState_STATE gstate;
gstate = PyGILState_Ensure();
switch (e->msgType)
{
case CONNECT:
result = PyObject_CallFunction(e->onSuccess, "O{sisiss}", (e->context) ? e->context : Py_None,
"MQTTVersion", response->alt.connect.MQTTVersion,
"sessionPresent", response->alt.connect.sessionPresent,
"serverURI", response->alt.connect.serverURI);
break;
case PUBLISH:
result = PyObject_CallFunction(e->onSuccess, "O{si ss s{ss# sisi}}", (e->context) ? e->context : Py_None,
"token", response->token,
"destinationName", response->alt.pub.destinationName,
"message",
"payload", response->alt.pub.message.payload,
response->alt.pub.message.payloadlen,
"qos", response->alt.pub.message.qos,
"retained", response->alt.pub.message.retained);
break;
case SUBSCRIBE:
result = PyObject_CallFunction(e->onSuccess, "O{sisi}", (e->context) ? e->context : Py_None,
"token", response->token,
"qos", response->alt.qos);
break;
case SUBSCRIBE_MANY:
result = PyObject_CallFunction(e->onSuccess, "O{sis[i]}", (e->context) ? e->context : Py_None,
"token", response->token,
"qosList", response->alt.qosList[0]);
break;
case UNSUBSCRIBE:
result = PyObject_CallFunction(e->onSuccess, "O{si}", (e->context) ? e->context : Py_None,
"token", response->token);
break;
}
if (result)
{
Py_DECREF(result);
printf("decrementing reference count for result\n");
}
PyGILState_Release(gstate);
free(e);
}
void onFailure(void* context, MQTTAsync_failureData* response)
{
PyObject *result = NULL;
PyObject *arglist = NULL;
ResponseEntry* e = context;
PyGILState_STATE gstate;
gstate = PyGILState_Ensure();
// TODO: convert response into Python structure
if (e->context)
arglist = Py_BuildValue("OO", e->context, response);
else
arglist = Py_BuildValue("OO", Py_None, response);
result = PyEval_CallObject(e->onFailure, arglist);
Py_DECREF(arglist);
PyGILState_Release(gstate);
free(e);
}
/* return true if ok, false otherwise */
int getResponseOptions(MQTTAsync c, PyObject *pyoptions, MQTTAsync_responseOptions* responseOptions,
enum msgTypes msgType)
{
PyObject *temp = NULL;
if (!pyoptions)
return 1;
if (!PyDict_Check(pyoptions))
{
PyErr_SetString(PyExc_TypeError, "Response options must be a dictionary");
return 0;
}
if ((temp = PyDict_GetItemString(pyoptions, "onSuccess")) != NULL)
{
if (PyCallable_Check(temp)) /* temp points to Python function */
responseOptions->onSuccess = (MQTTAsync_onSuccess*)temp;
else
{
PyErr_SetString(PyExc_TypeError,
"onSuccess value must be callable");
return 0;
}
}
if ((temp = PyDict_GetItemString(pyoptions, "onFailure")) != NULL)
{
if (PyCallable_Check(temp))
responseOptions->onFailure = (MQTTAsync_onFailure*)temp;
else
{
PyErr_SetString(PyExc_TypeError,
"onFailure value must be callable");
return 0;
}
}
responseOptions->context = PyDict_GetItemString(pyoptions, "context");
if (responseOptions->onFailure || responseOptions->onSuccess)
{
ResponseEntry* r = malloc(sizeof(ResponseEntry));
r->c = c;
r->context = responseOptions->context;
responseOptions->context = r;
r->onSuccess = (PyObject*)responseOptions->onSuccess;
responseOptions->onSuccess = onSuccess;
r->onFailure = (PyObject*)responseOptions->onFailure;
responseOptions->onFailure = onFailure;
r->msgType = msgType;
}
return 1; /* not an error, if we get here */
}
static PyObject* mqttv3_connect(PyObject* self, PyObject *args)
{
MQTTAsync c;
PyObject *pyoptions = NULL, *temp;
MQTTAsync_connectOptions connectOptions = MQTTAsync_connectOptions_initializer;
MQTTAsync_willOptions willOptions = MQTTAsync_willOptions_initializer;
int rc;
if (!PyArg_ParseTuple(args, "k|O", &c, &pyoptions))
return NULL;
if (!pyoptions)
goto skip;
if (!PyDict_Check(pyoptions))
{
PyErr_SetString(PyExc_TypeError, "2nd parameter must be a dictionary");
return NULL;
}
if ((temp = PyDict_GetItemString(pyoptions, "onSuccess")) != NULL)
{
if (PyCallable_Check(temp)) /* temp points to Python function */
connectOptions.onSuccess = (MQTTAsync_onSuccess*)temp;
else
{
PyErr_SetString(PyExc_TypeError,
"onSuccess value must be callable");
return NULL;
}
}
if ((temp = PyDict_GetItemString(pyoptions, "onFailure")) != NULL)
{
if (PyCallable_Check(temp))
connectOptions.onFailure = (MQTTAsync_onFailure*)temp;
else
{
PyErr_SetString(PyExc_TypeError,
"onFailure value must be callable");
return NULL;
}
}
connectOptions.context = PyDict_GetItemString(pyoptions, "context");
if (connectOptions.onFailure || connectOptions.onSuccess)
{
ResponseEntry* r = malloc(sizeof(ResponseEntry));
r->c = c;
r->context = connectOptions.context;
connectOptions.context = r;
r->onSuccess = (PyObject*)connectOptions.onSuccess;
connectOptions.onSuccess = onSuccess;
r->onFailure = (PyObject*)connectOptions.onFailure;
connectOptions.onFailure = onFailure;
r->msgType = CONNECT;
}
if ((temp = PyDict_GetItemString(pyoptions, "keepAliveInterval")) != NULL)
{
if (PyInt_Check(temp))
connectOptions.keepAliveInterval = (int) PyInt_AsLong(temp);
else
{
PyErr_SetString(PyExc_TypeError,
"keepAliveLiveInterval value must be int");
return NULL;
}
}
if ((temp = PyDict_GetItemString(pyoptions, "cleansession")) != NULL)
{
if (PyInt_Check(temp))
connectOptions.cleansession = (int) PyInt_AsLong(temp);
else
{
PyErr_SetString(PyExc_TypeError, "cleansession value must be int");
return NULL;
}
}
if ((temp = PyDict_GetItemString(pyoptions, "will")) != NULL)
{
if (PyDict_Check(temp))
{
PyObject *wtemp = NULL;
if ((wtemp = PyDict_GetItemString(temp, "topicName")) == NULL)
{
PyErr_SetString(PyExc_TypeError,
"will topicName value must be set");
return NULL;
}
else
{
if (PyString_Check(wtemp))
willOptions.topicName = PyString_AsString(wtemp);
else
{
PyErr_SetString(PyExc_TypeError,
"will topicName value must be string");
return NULL;
}
}
if ((wtemp = PyDict_GetItemString(temp, "message")) == NULL)
{
PyErr_SetString(PyExc_TypeError,
"will message value must be set");
return NULL;
}
else
{
if (PyString_Check(wtemp))
willOptions.message = PyString_AsString(wtemp);
else
{
PyErr_SetString(PyExc_TypeError,
"will message value must be string");
return NULL;
}
}
if ((wtemp = PyDict_GetItemString(temp, "retained")) != NULL)
{
if (PyInt_Check(wtemp))
willOptions.retained = (int) PyInt_AsLong(wtemp);
else
{
PyErr_SetString(PyExc_TypeError,
"will retained value must be int");
return NULL;
}
}
if ((wtemp = PyDict_GetItemString(temp, "qos")) != NULL)
{
if (PyInt_Check(wtemp))
willOptions.qos = (int) PyInt_AsLong(wtemp);
else
{
PyErr_SetString(PyExc_TypeError,
"will qos value must be int");
return NULL;
}
}
connectOptions.will = &willOptions;
}
else
{
PyErr_SetString(PyExc_TypeError, "will value must be dictionary");
return NULL;
}
}
if ((temp = PyDict_GetItemString(pyoptions, "username")) != NULL)
{
if (PyString_Check(temp))
connectOptions.username = PyString_AsString(temp);
else
{
PyErr_SetString(PyExc_TypeError, "username value must be string");
return NULL;
}
}
if ((temp = PyDict_GetItemString(pyoptions, "password")) != NULL)
{
if (PyString_Check(temp))
connectOptions.username = PyString_AsString(temp);
else
{
PyErr_SetString(PyExc_TypeError, "password value must be string");
return NULL;
}
}
if ((temp = PyDict_GetItemString(pyoptions, "automaticReconnect")) != NULL)
{
if (PyInt_Check(temp))
connectOptions.automaticReconnect = (int) PyInt_AsLong(temp);
else
{
PyErr_SetString(PyExc_TypeError, "automatic reconnect value must be int");
return NULL;
}
}
if ((temp = PyDict_GetItemString(pyoptions, "minRetryInterval")) != NULL)
{
if (PyInt_Check(temp))
connectOptions.minRetryInterval = (int) PyInt_AsLong(temp);
else
{
PyErr_SetString(PyExc_TypeError, "minRetryInterval value must be int");
return NULL;
}
}
if ((temp = PyDict_GetItemString(pyoptions, "maxRetryInterval")) != NULL)
{
if (PyInt_Check(temp))
connectOptions.maxRetryInterval = (int) PyInt_AsLong(temp);
else
{
PyErr_SetString(PyExc_TypeError, "maxRetryInterval value must be int");
return NULL;
}
}
skip:
Py_BEGIN_ALLOW_THREADS
rc = MQTTAsync_connect(c, &connectOptions);
Py_END_ALLOW_THREADS
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_disconnect(PyObject* self, PyObject *args)
{
MQTTAsync c;
MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer;
int rc;
if (!PyArg_ParseTuple(args, "k|i", &c, &options.timeout))
return NULL;
Py_BEGIN_ALLOW_THREADS
rc = MQTTAsync_disconnect(c, &options);
Py_END_ALLOW_THREADS
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_isConnected(PyObject* self, PyObject *args)
{
MQTTAsync c;
int rc;
if (!PyArg_ParseTuple(args, "k", &c))
return NULL;
Py_BEGIN_ALLOW_THREADS
rc = MQTTAsync_isConnected(c);
Py_END_ALLOW_THREADS
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_subscribe(PyObject* self, PyObject *args)
{
MQTTAsync c;
MQTTAsync_responseOptions response = MQTTAsync_responseOptions_initializer;
PyObject *pyoptions = NULL;
char* topic;
int qos = 2;
int rc;
if (!PyArg_ParseTuple(args, "ks|iO", &c, &topic, &qos, &pyoptions))
return NULL;
if (!getResponseOptions(c, pyoptions, &response, SUBSCRIBE))
return NULL;
Py_BEGIN_ALLOW_THREADS;
rc = MQTTAsync_subscribe(c, topic, qos, &response);
Py_END_ALLOW_THREADS;
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_subscribeMany(PyObject* self, PyObject *args)
{
MQTTAsync c;
MQTTAsync_responseOptions response = MQTTAsync_responseOptions_initializer;
PyObject* topicList;
PyObject* qosList;
PyObject *pyoptions = NULL;
int count;
char** topics;
int* qoss;
int i, rc = 0;
if (!PyArg_ParseTuple(args, "kOO|O", &c, &topicList, &qosList, &pyoptions))
return NULL;
if (!getResponseOptions(c, pyoptions, &response, SUBSCRIBE))
return NULL;
if (!PySequence_Check(topicList) || !PySequence_Check(qosList))
{
PyErr_SetString(PyExc_TypeError,
"3rd and 4th parameters must be sequences");
return NULL;
}
if ((count = PySequence_Length(topicList)) != PySequence_Length(qosList))
{
PyErr_SetString(PyExc_TypeError,
"3rd and 4th parameters must be sequences of the same length");
return NULL;
}
topics = malloc(count * sizeof(char*));
for (i = 0; i < count; ++i)
topics[i] = PyString_AsString(PySequence_GetItem(topicList, i));
qoss = malloc(count * sizeof(int));
for (i = 0; i < count; ++i)
qoss[i] = (int) PyInt_AsLong(PySequence_GetItem(qosList, i));
Py_BEGIN_ALLOW_THREADS
rc = MQTTAsync_subscribeMany(c, count, topics, qoss, &response);
Py_END_ALLOW_THREADS
for (i = 0; i < count; ++i)
PySequence_SetItem(qosList, i, PyInt_FromLong((long) qoss[i]));
free(topics);
free(qoss);
if (rc == MQTTASYNC_SUCCESS)
return Py_BuildValue("iO", rc, qosList);
else
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_unsubscribe(PyObject* self, PyObject *args)
{
MQTTAsync c;
MQTTAsync_responseOptions response = MQTTAsync_responseOptions_initializer;
PyObject *pyoptions = NULL;
char* topic;
int rc;
if (!PyArg_ParseTuple(args, "ks|O", &c, &topic, &pyoptions))
return NULL;
if (!getResponseOptions(c, pyoptions, &response, UNSUBSCRIBE))
return NULL;
Py_BEGIN_ALLOW_THREADS
rc = MQTTAsync_unsubscribe(c, topic, &response);
Py_END_ALLOW_THREADS
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_unsubscribeMany(PyObject* self, PyObject *args)
{
MQTTAsync c;
MQTTAsync_responseOptions response = MQTTAsync_responseOptions_initializer;
PyObject* topicList;
PyObject *pyoptions = NULL;
int count;
char** topics;
int i, rc = 0;
if (!PyArg_ParseTuple(args, "kO|O", &c, &topicList, &pyoptions))
return NULL;
if (!getResponseOptions(c, pyoptions, &response, UNSUBSCRIBE))
return NULL;
if (!PySequence_Check(topicList))
{
PyErr_SetString(PyExc_TypeError, "3rd parameter must be sequences");
return NULL;
}
count = PySequence_Length(topicList);
topics = malloc(count * sizeof(char*));
for (i = 0; i < count; ++i)
topics[i] = PyString_AsString(PySequence_GetItem(topicList, i));
Py_BEGIN_ALLOW_THREADS
rc = MQTTAsync_unsubscribeMany(c, count, topics, &response);
Py_END_ALLOW_THREADS
free(topics);
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_send(PyObject* self, PyObject *args)
{
MQTTAsync c;
char* destinationName;
int payloadlen;
void* payload;
int qos = 0;
int retained = 0;
MQTTAsync_responseOptions response = MQTTAsync_responseOptions_initializer;
PyObject *pyoptions = NULL;
int rc;
if (!PyArg_ParseTuple(args, "kss#|iiO", &c, &destinationName, &payload,
&payloadlen, &qos, &retained, &pyoptions))
return NULL;
if (!getResponseOptions(c, pyoptions, &response, PUBLISH))
return NULL;
Py_BEGIN_ALLOW_THREADS
rc = MQTTAsync_send(c, destinationName, payloadlen, payload, qos, retained, &response);
Py_END_ALLOW_THREADS
if (rc == MQTTASYNC_SUCCESS && qos > 0)
return Py_BuildValue("ii", rc, response);
else
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_sendMessage(PyObject* self, PyObject *args)
{
MQTTAsync c;
char* destinationName;
PyObject *message, *temp;
MQTTAsync_message msg = MQTTAsync_message_initializer;
MQTTAsync_responseOptions response = MQTTAsync_responseOptions_initializer;
PyObject *pyoptions = NULL;
int rc;
if (!PyArg_ParseTuple(args, "ksO|O", &c, &destinationName, &message, &pyoptions))
return NULL;
if (!getResponseOptions(c, pyoptions, &response, PUBLISH))
return NULL;
if (!PyDict_Check(message))
{
PyErr_SetString(PyExc_TypeError, "3rd parameter must be a dictionary");
return NULL;
}
if ((temp = PyDict_GetItemString(message, "payload")) == NULL)
{
PyErr_SetString(PyExc_TypeError, "dictionary must have payload key");
return NULL;
}
if (PyString_Check(temp))
PyString_AsStringAndSize(temp, (char**) &msg.payload,
(Py_ssize_t*) &msg.payloadlen);
else
{
PyErr_SetString(PyExc_TypeError, "payload value must be string");
return NULL;
}
if ((temp = PyDict_GetItemString(message, "qos")) == NULL)
msg.qos = (int) PyInt_AsLong(temp);
if ((temp = PyDict_GetItemString(message, "retained")) == NULL)
msg.retained = (int) PyInt_AsLong(temp);
Py_BEGIN_ALLOW_THREADS
rc = MQTTAsync_sendMessage(c, destinationName, &msg, &response);
Py_END_ALLOW_THREADS
if (rc == MQTTASYNC_SUCCESS && msg.qos > 0)
return Py_BuildValue("ii", rc, response);
else
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_waitForCompletion(PyObject* self, PyObject *args)
{
MQTTAsync c;
unsigned long timeout = 1000L;
MQTTAsync_token dt;
int rc;
if (!PyArg_ParseTuple(args, "ki|i", &c, &dt, &timeout))
return NULL;
Py_BEGIN_ALLOW_THREADS
rc = MQTTAsync_waitForCompletion(c, dt, timeout);
Py_END_ALLOW_THREADS
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_getPendingTokens(PyObject* self, PyObject *args)
{
MQTTAsync c;
MQTTAsync_token* tokens;
int rc;
if (!PyArg_ParseTuple(args, "k", &c))
return NULL;
Py_BEGIN_ALLOW_THREADS
rc = MQTTAsync_getPendingTokens(c, &tokens);
Py_END_ALLOW_THREADS
if (rc == MQTTASYNC_SUCCESS)
{
int i = 0;
PyObject* dts = PyList_New(0);
while (tokens[i] != -1)
PyList_Append(dts, PyInt_FromLong((long) tokens[i]));
return Py_BuildValue("iO", rc, dts);
}
else
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_destroy(PyObject* self, PyObject *args)
{
MQTTAsync c;
ListElement* temp = NULL;
if (!PyArg_ParseTuple(args, "k", &c))
return NULL;
if ((temp = ListFindItem(callbacks, c, clientCompare)) != NULL)
{
ListDetach(callbacks, temp->content);
free(temp->content);
}
if ((temp = ListFindItem(connected_callbacks, c, connectedCompare)) != NULL)
{
ListDetach(connected_callbacks, temp->content);
free(temp->content);
}
MQTTAsync_destroy(&c);
Py_INCREF(Py_None);
return Py_None;
}
static PyMethodDef MqttV3Methods[] =
{
{ "create", mqttv3_create, METH_VARARGS, "Create an MQTTv3 client." },
{ "setcallbacks", mqttv3_setcallbacks, METH_VARARGS,
"Sets the callback functions for a particular client." },
{ "setconnected", mqttv3_setconnected, METH_VARARGS,
"Sets the connected callback function for a particular client." },
{ "connect", mqttv3_connect, METH_VARARGS,
"Connects to a server using the specified options." },
{ "disconnect", mqttv3_disconnect, METH_VARARGS,
"Disconnects from a server." },
{ "isConnected", mqttv3_isConnected, METH_VARARGS,
"Determines if this client is currently connected to the server." },
{ "subscribe", mqttv3_subscribe, METH_VARARGS,
"Subscribe to the given topic." },
{ "subscribeMany", mqttv3_subscribeMany, METH_VARARGS,
"Subscribe to the given topics." },
{ "unsubscribe", mqttv3_unsubscribe, METH_VARARGS,
"Unsubscribe from the given topic." },
{ "unsubscribeMany", mqttv3_unsubscribeMany, METH_VARARGS,
"Unsubscribe from the given topics." },
{ "send", mqttv3_send, METH_VARARGS,
"Publish a message to the given topic." },
{ "sendMessage", mqttv3_sendMessage, METH_VARARGS,
"Publish a message to the given topic." },
{ "waitForCompletion", mqttv3_waitForCompletion, METH_VARARGS,
"Waits for the completion of the delivery of the message represented by a delivery token." },
{ "getPendingTokens", mqttv3_getPendingTokens, METH_VARARGS,
"Returns the tokens pending of completion." },
{ "destroy", mqttv3_destroy, METH_VARARGS,
"Free memory allocated to a MQTT client. It is the opposite to create." },
{ NULL, NULL, 0, NULL } /* Sentinel */
};
PyMODINIT_FUNC initpaho_mqtt3a(void)
{
PyObject *m;
PyEval_InitThreads();
callbacks = ListInitialize();
connected_callbacks = ListInitialize();
m = Py_InitModule("paho_mqtt3a", MqttV3Methods);
if (m == NULL)
return;
MqttV3Error = PyErr_NewException("paho_mqtt3a.error", NULL, NULL);
Py_INCREF(MqttV3Error);
PyModule_AddObject(m, "error", MqttV3Error);
PyModule_AddIntConstant(m, "SUCCESS", MQTTASYNC_SUCCESS);
PyModule_AddIntConstant(m, "FAILURE", MQTTASYNC_FAILURE);
PyModule_AddIntConstant(m, "DISCONNECTED", MQTTASYNC_DISCONNECTED);
PyModule_AddIntConstant(m, "MAX_MESSAGES_INFLIGHT", MQTTASYNC_MAX_MESSAGES_INFLIGHT);
PyModule_AddIntConstant(m, "BAD_UTF8_STRING", MQTTASYNC_BAD_UTF8_STRING);
PyModule_AddIntConstant(m, "BAD_NULL_PARAMETER", MQTTASYNC_NULL_PARAMETER);
PyModule_AddIntConstant(m, "BAD_TOPICNAME_TRUNCATED", MQTTASYNC_TOPICNAME_TRUNCATED);
PyModule_AddIntConstant(m, "PERSISTENCE_DEFAULT", MQTTCLIENT_PERSISTENCE_DEFAULT);
PyModule_AddIntConstant(m, "PERSISTENCE_NONE", MQTTCLIENT_PERSISTENCE_NONE);
PyModule_AddIntConstant(m, "PERSISTENCE_USER", MQTTCLIENT_PERSISTENCE_USER);
PyModule_AddIntConstant(m, "PERSISTENCE_ERROR",
MQTTCLIENT_PERSISTENCE_ERROR);
}
#include <Python.h>
#include "MQTTClient.h"
#include "LinkedList.h"
static PyObject *MqttV3Error;
static PyObject* mqttv3_create(PyObject* self, PyObject *args)
{
MQTTClient c;
char* serverURI;
char* clientId;
int persistence_option = MQTTCLIENT_PERSISTENCE_DEFAULT;
int rc;
if (!PyArg_ParseTuple(args, "ss|i", &serverURI, &clientId,
&persistence_option))
return NULL;
if (persistence_option != MQTTCLIENT_PERSISTENCE_DEFAULT
&& persistence_option != MQTTCLIENT_PERSISTENCE_NONE)
{
PyErr_SetString(PyExc_TypeError, "persistence must be DEFAULT or NONE");
return NULL;
}
rc = MQTTClient_create(&c, serverURI, clientId, persistence_option, NULL);
printf("create MQTTClient pointer %p\n", c);
return Py_BuildValue("ik", rc, c);
}
static List* callbacks = NULL;
typedef struct
{
MQTTClient c;
PyObject *context;
PyObject *cl, *ma, *dc;
} CallbackEntry;
int clientCompare(void* a, void* b)
{
CallbackEntry* e = (CallbackEntry*) a;
return e->c == (MQTTClient) b;
}
void connectionLost(void* context, char* cause)
{
/* call the right Python function, using the context */
PyObject *arglist;
PyObject *result;
CallbackEntry* e = context;
PyGILState_STATE gstate;
gstate = PyGILState_Ensure();
arglist = Py_BuildValue("Os", e->context, cause);
result = PyEval_CallObject(e->cl, arglist);
Py_DECREF(arglist);
PyGILState_Release(gstate);
}
int messageArrived(void* context, char* topicName, int topicLen,
MQTTClient_message* message)
{
PyObject *result = NULL;
CallbackEntry* e = context;
int rc = -99;
PyGILState_STATE gstate;
//printf("messageArrived %s %s %d %.*s\n", PyString_AsString(e->context), topicName, topicLen,
// message->payloadlen, (char*)message->payload);
gstate = PyGILState_Ensure();
if (topicLen == 0)
result = PyEval_CallFunction(e->ma, "Os{ss#sisisisi}", e->context,
topicName, "payload", message->payload, message->payloadlen,
"qos", message->qos, "retained", message->retained, "dup",
message->dup, "msgid", message->msgid);
else
result = PyEval_CallFunction(e->ma, "Os#{ss#sisisisi}", e->context,
topicName, topicLen, "payload", message->payload,
message->payloadlen, "qos", message->qos, "retained",
message->retained, "dup", message->dup, "msgid",
message->msgid);
if (result)
{
if (PyInt_Check(result))
rc = (int) PyInt_AsLong(result);
Py_DECREF(result);
}
PyGILState_Release(gstate);
MQTTClient_free(topicName);
MQTTClient_freeMessage(&message);
return rc;
}
void deliveryComplete(void* context, MQTTClient_deliveryToken dt)
{
PyObject *arglist;
PyObject *result;
CallbackEntry* e = context;
PyGILState_STATE gstate;
gstate = PyGILState_Ensure();
arglist = Py_BuildValue("Oi", e->context, dt);
result = PyEval_CallObject(e->dc, arglist);
Py_DECREF(arglist);
PyGILState_Release(gstate);
}
static PyObject* mqttv3_setcallbacks(PyObject* self, PyObject *args)
{
MQTTClient c;
CallbackEntry* e = NULL;
int rc;
e = malloc(sizeof(CallbackEntry));
if (!PyArg_ParseTuple(args, "kOOOO", &c, (PyObject**) &e->context, &e->cl,
&e->ma, &e->dc))
return NULL;
e->c = c;
printf("setCallbacks MQTTClient pointer %p\n", c);
if ((e->cl != Py_None && !PyCallable_Check(e->cl))
|| (e->ma != Py_None && !PyCallable_Check(e->ma))
|| (e->dc != Py_None && !PyCallable_Check(e->dc)))
{
PyErr_SetString(PyExc_TypeError,
"3rd, 4th and 5th parameters must be callable or None");
return NULL;
}
Py_BEGIN_ALLOW_THREADS rc = MQTTClient_setCallbacks(c, e, connectionLost,
messageArrived, deliveryComplete);
Py_END_ALLOW_THREADS
if (rc == MQTTCLIENT_SUCCESS)
{
ListElement* temp = NULL;
if ((temp = ListFindItem(callbacks, c, clientCompare)) != NULL)
{
ListDetach(callbacks, temp->content);
free(temp->content);
}
ListAppend(callbacks, e, sizeof(e));
Py_XINCREF(e->cl);
Py_XINCREF(e->ma);
Py_XINCREF(e->dc);
Py_XINCREF(e->context);
}
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_connect(PyObject* self, PyObject *args)
{
MQTTClient c;
PyObject *pyoptions = NULL, *temp;
MQTTClient_connectOptions options = MQTTClient_connectOptions_initializer;
MQTTClient_willOptions woptions = MQTTClient_willOptions_initializer;
int rc;
if (!PyArg_ParseTuple(args, "k|O", &c, &pyoptions))
return NULL;
printf("connect MQTTClient pointer %p\n", c);
if (!pyoptions)
goto skip;
if (!PyDict_Check(pyoptions))
{
PyErr_SetString(PyExc_TypeError, "2nd parameter must be a dictionary");
return NULL;
}
if ((temp = PyDict_GetItemString(pyoptions, "keepAliveInterval")) != NULL)
{
if (PyInt_Check(temp))
options.keepAliveInterval = (int) PyInt_AsLong(temp);
else
{
PyErr_SetString(PyExc_TypeError,
"keepAliveLiveInterval value must be int");
return NULL;
}
}
if ((temp = PyDict_GetItemString(pyoptions, "cleansession")) != NULL)
{
if (PyInt_Check(temp))
options.cleansession = (int) PyInt_AsLong(temp);
else
{
PyErr_SetString(PyExc_TypeError, "cleansession value must be int");
return NULL;
}
}
if ((temp = PyDict_GetItemString(pyoptions, "reliable")) != NULL)
{
if (PyInt_Check(temp))
options.reliable = (int) PyInt_AsLong(temp);
else
{
PyErr_SetString(PyExc_TypeError, "reliable value must be int");
return NULL;
}
}
if ((temp = PyDict_GetItemString(pyoptions, "will")) != NULL)
{
if (PyDict_Check(temp))
{
PyObject *wtemp = NULL;
if ((wtemp = PyDict_GetItemString(temp, "topicName")) == NULL)
{
PyErr_SetString(PyExc_TypeError,
"will topicName value must be set");
return NULL;
}
else
{
if (PyString_Check(wtemp))
woptions.topicName = PyString_AsString(wtemp);
else
{
PyErr_SetString(PyExc_TypeError,
"will topicName value must be string");
return NULL;
}
}
if ((wtemp = PyDict_GetItemString(temp, "message")) == NULL)
{
PyErr_SetString(PyExc_TypeError,
"will message value must be set");
return NULL;
}
else
{
if (PyString_Check(wtemp))
woptions.message = PyString_AsString(wtemp);
else
{
PyErr_SetString(PyExc_TypeError,
"will message value must be string");
return NULL;
}
}
if ((wtemp = PyDict_GetItemString(temp, "retained")) != NULL)
{
if (PyInt_Check(wtemp))
woptions.retained = (int) PyInt_AsLong(wtemp);
else
{
PyErr_SetString(PyExc_TypeError,
"will retained value must be int");
return NULL;
}
}
if ((wtemp = PyDict_GetItemString(temp, "qos")) != NULL)
{
if (PyInt_Check(wtemp))
woptions.qos = (int) PyInt_AsLong(wtemp);
else
{
PyErr_SetString(PyExc_TypeError,
"will qos value must be int");
return NULL;
}
}
options.will = &woptions;
}
else
{
PyErr_SetString(PyExc_TypeError, "will value must be dictionary");
return NULL;
}
}
if ((temp = PyDict_GetItemString(pyoptions, "username")) != NULL)
{
if (PyString_Check(temp))
options.username = PyString_AsString(temp);
else
{
PyErr_SetString(PyExc_TypeError, "username value must be string");
return NULL;
}
}
if ((temp = PyDict_GetItemString(pyoptions, "password")) != NULL)
{
if (PyString_Check(temp))
options.username = PyString_AsString(temp);
else
{
PyErr_SetString(PyExc_TypeError, "password value must be string");
return NULL;
}
}
skip: Py_BEGIN_ALLOW_THREADS rc = MQTTClient_connect(c, &options);
Py_END_ALLOW_THREADS
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_disconnect(PyObject* self, PyObject *args)
{
MQTTClient c;
int timeout = 0;
int rc;
if (!PyArg_ParseTuple(args, "k|i", &c, &timeout))
return NULL;
Py_BEGIN_ALLOW_THREADS rc = MQTTClient_disconnect(c, timeout);
Py_END_ALLOW_THREADS
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_isConnected(PyObject* self, PyObject *args)
{
MQTTClient c;
int rc;
if (!PyArg_ParseTuple(args, "k", &c))
return NULL;
Py_BEGIN_ALLOW_THREADS rc = MQTTClient_isConnected(c);
Py_END_ALLOW_THREADS
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_subscribe(PyObject* self, PyObject *args)
{
MQTTClient c;
char* topic;
int qos = 2;
int rc;
if (!PyArg_ParseTuple(args, "ks|i", &c, &topic, &qos))
return NULL;
Py_BEGIN_ALLOW_THREADS rc = MQTTClient_subscribe(c, topic, qos);
Py_END_ALLOW_THREADS
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_subscribeMany(PyObject* self, PyObject *args)
{
MQTTClient c;
PyObject* topicList;
PyObject* qosList;
int count;
char** topics;
int* qoss;
int i, rc = 0;
if (!PyArg_ParseTuple(args, "kOO", &c, &topicList, &qosList))
return NULL;
if (!PySequence_Check(topicList) || !PySequence_Check(qosList))
{
PyErr_SetString(PyExc_TypeError,
"3rd and 4th parameters must be sequences");
return NULL;
}
if ((count = PySequence_Length(topicList)) != PySequence_Length(qosList))
{
PyErr_SetString(PyExc_TypeError,
"3rd and 4th parameters must be sequences of the same length");
return NULL;
}
topics = malloc(count * sizeof(char*));
for (i = 0; i < count; ++i)
topics[i] = PyString_AsString(PySequence_GetItem(topicList, i));
qoss = malloc(count * sizeof(int));
for (i = 0; i < count; ++i)
qoss[i] = (int) PyInt_AsLong(PySequence_GetItem(qosList, i));
Py_BEGIN_ALLOW_THREADS rc = MQTTClient_subscribeMany(c, count, topics,
qoss);
Py_END_ALLOW_THREADS
for (i = 0; i < count; ++i)
PySequence_SetItem(qosList, i, PyInt_FromLong((long) qoss[i]));
free(topics);
free(qoss);
if (rc == MQTTCLIENT_SUCCESS)
return Py_BuildValue("iO", rc, qosList);
else
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_unsubscribe(PyObject* self, PyObject *args)
{
MQTTClient c;
char* topic;
int rc;
if (!PyArg_ParseTuple(args, "ks", &c, &topic))
return NULL;
Py_BEGIN_ALLOW_THREADS rc = MQTTClient_unsubscribe(c, topic);
Py_END_ALLOW_THREADS
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_unsubscribeMany(PyObject* self, PyObject *args)
{
MQTTClient c;
PyObject* topicList;
int count;
char** topics;
int i, rc = 0;
if (!PyArg_ParseTuple(args, "kOO", &c, &topicList))
return NULL;
if (!PySequence_Check(topicList))
{
PyErr_SetString(PyExc_TypeError, "3rd parameter must be sequences");
return NULL;
}
count = PySequence_Length(topicList);
topics = malloc(count * sizeof(char*));
for (i = 0; i < count; ++i)
topics[i] = PyString_AsString(PySequence_GetItem(topicList, i));
Py_BEGIN_ALLOW_THREADS rc = MQTTClient_unsubscribeMany(c, count, topics);
Py_END_ALLOW_THREADS
free( topics);
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_publish(PyObject* self, PyObject *args)
{
MQTTClient c;
char* topicName;
int payloadlen;
void* payload;
int qos = 0;
int retained = 0;
MQTTClient_deliveryToken dt;
int rc;
if (!PyArg_ParseTuple(args, "kss#|ii", &c, &topicName, &payload,
&payloadlen, &qos, &retained))
return NULL;
Py_BEGIN_ALLOW_THREADS rc = MQTTClient_publish(c, topicName, payloadlen,
payload, qos, retained, &dt);
Py_END_ALLOW_THREADS
if (rc == MQTTCLIENT_SUCCESS && qos > 0)
return Py_BuildValue("ii", rc, dt);
else
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_publishMessage(PyObject* self, PyObject *args)
{
MQTTClient c;
char* topicName;
PyObject *message, *temp;
MQTTClient_message msg = MQTTClient_message_initializer;
MQTTClient_deliveryToken dt;
int rc;
if (!PyArg_ParseTuple(args, "ksO", &c, &topicName, &message))
return NULL;
if (!PyDict_Check(message))
{
PyErr_SetString(PyExc_TypeError, "3rd parameter must be a dictionary");
return NULL;
}
if ((temp = PyDict_GetItemString(message, "payload")) == NULL)
{
PyErr_SetString(PyExc_TypeError, "dictionary must have payload key");
return NULL;
}
if (PyString_Check(temp))
PyString_AsStringAndSize(temp, (char**) &msg.payload,
(Py_ssize_t*) &msg.payloadlen);
else
{
PyErr_SetString(PyExc_TypeError, "payload value must be string");
return NULL;
}
if ((temp = PyDict_GetItemString(message, "qos")) == NULL)
msg.qos = (int) PyInt_AsLong(temp);
if ((temp = PyDict_GetItemString(message, "retained")) == NULL)
msg.retained = (int) PyInt_AsLong(temp);
Py_BEGIN_ALLOW_THREADS rc = MQTTClient_publishMessage(c, topicName, &msg,
&dt);
Py_END_ALLOW_THREADS
if (rc == MQTTCLIENT_SUCCESS && msg.qos > 0)
return Py_BuildValue("ii", rc, dt);
else
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_waitForCompletion(PyObject* self, PyObject *args)
{
MQTTClient c;
unsigned long timeout = 1000L;
MQTTClient_deliveryToken dt;
int rc;
if (!PyArg_ParseTuple(args, "ki|i", &c, &dt, &timeout))
return NULL;
Py_BEGIN_ALLOW_THREADS rc = MQTTClient_waitForCompletion(c, dt, timeout);
Py_END_ALLOW_THREADS
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_getPendingDeliveryTokens(PyObject* self, PyObject *args)
{
MQTTClient c;
MQTTClient_deliveryToken* tokens;
int rc;
if (!PyArg_ParseTuple(args, "k", &c))
return NULL;
Py_BEGIN_ALLOW_THREADS rc = MQTTClient_getPendingDeliveryTokens(c, &tokens);
Py_END_ALLOW_THREADS
if (rc == MQTTCLIENT_SUCCESS)
{
int i = 0;
PyObject* dts = PyList_New(0);
while (tokens[i] != -1)
PyList_Append(dts, PyInt_FromLong((long) tokens[i]));
return Py_BuildValue("iO", rc, dts);
}
else
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_yield(PyObject* self, PyObject *args)
{
if (!PyArg_ParseTuple(args, ""))
return NULL;
Py_BEGIN_ALLOW_THREADS
MQTTClient_yield();
Py_END_ALLOW_THREADS
Py_INCREF( Py_None);
return Py_None;
}
static PyObject* mqttv3_receive(PyObject* self, PyObject *args)
{
MQTTClient c;
unsigned long timeout = 1000L;
int rc;
PyObject* temp = NULL;
char* topicName;
int topicLen;
MQTTClient_message* message;
if (!PyArg_ParseTuple(args, "k|k", &c, &timeout))
return NULL;
Py_BEGIN_ALLOW_THREADS rc = MQTTClient_receive(c, &topicName, &topicLen,
&message, timeout);
Py_END_ALLOW_THREADS
if (message)
{
temp = Py_BuildValue("is#{ss#sisisisi}", rc, topicName, topicLen,
"payload", message->payload, message->payloadlen, "qos",
message->qos, "retained", message->retained, "dup",
message->dup, "msgid", message->msgid);
free(topicName);
MQTTClient_freeMessage(&message);
}
else
temp = Py_BuildValue("iz", rc, NULL);
return temp;
}
static PyObject* mqttv3_destroy(PyObject* self, PyObject *args)
{
MQTTClient c;
ListElement* temp = NULL;
if (!PyArg_ParseTuple(args, "k", &c))
return NULL;
if ((temp = ListFindItem(callbacks, c, clientCompare)) != NULL)
{
ListDetach(callbacks, temp->content);
free(temp->content);
}
MQTTClient_destroy(&c);
Py_INCREF(Py_None);
return Py_None;
}
static PyMethodDef MqttV3Methods[] =
{
{ "create", mqttv3_create, METH_VARARGS, "Create an MQTTv3 client." },
{ "setcallbacks", mqttv3_setcallbacks, METH_VARARGS,
"Sets the callback functions for a particular client." },
{ "connect", mqttv3_connect, METH_VARARGS,
"Connects to a server using the specified options." },
{ "disconnect", mqttv3_disconnect, METH_VARARGS,
"Disconnects from a server." },
{ "isConnected", mqttv3_isConnected, METH_VARARGS,
"Determines if this client is currently connected to the server." },
{ "subscribe", mqttv3_subscribe, METH_VARARGS,
"Subscribe to the given topic." },
{ "subscribeMany", mqttv3_subscribeMany, METH_VARARGS,
"Subscribe to the given topics." },
{ "unsubscribe", mqttv3_unsubscribe, METH_VARARGS,
"Unsubscribe from the given topic." },
{ "unsubscribeMany", mqttv3_unsubscribeMany, METH_VARARGS,
"Unsubscribe from the given topics." },
{ "publish", mqttv3_publish, METH_VARARGS,
"Publish a message to the given topic." },
{ "publishMessage", mqttv3_publishMessage, METH_VARARGS,
"Publish a message to the given topic." },
{ "waitForCompletion", mqttv3_waitForCompletion, METH_VARARGS,
"Waits for the completion of the delivery of the message represented by a delivery token." },
{ "getPendingDeliveryTokens", mqttv3_getPendingDeliveryTokens,
METH_VARARGS,
"Returns the delivery tokens pending of completion." },
{ "yield", mqttv3_yield, METH_VARARGS,
"Single-thread keep alive but don't receive message." },
{ "receive", mqttv3_receive, METH_VARARGS,
"Single-thread receive message if available." },
{ "destroy", mqttv3_destroy, METH_VARARGS,
"Free memory allocated to a MQTT client. It is the opposite to create." },
{ NULL, NULL, 0, NULL } /* Sentinel */
};
PyMODINIT_FUNC initpaho_mqtt3c(void)
{
PyObject *m;
PyEval_InitThreads();
callbacks = ListInitialize();
m = Py_InitModule("paho_mqtt3c", MqttV3Methods);
if (m == NULL)
return;
MqttV3Error = PyErr_NewException("paho_mqtt3c.error", NULL, NULL);
Py_INCREF(MqttV3Error);
PyModule_AddObject(m, "error", MqttV3Error);
PyModule_AddIntConstant(m, "SUCCESS", MQTTCLIENT_SUCCESS);
PyModule_AddIntConstant(m, "FAILURE", MQTTCLIENT_FAILURE);
PyModule_AddIntConstant(m, "DISCONNECTED", MQTTCLIENT_DISCONNECTED);
PyModule_AddIntConstant(m, "MAX_MESSAGES_INFLIGHT", MQTTCLIENT_MAX_MESSAGES_INFLIGHT);
PyModule_AddIntConstant(m, "BAD_UTF8_STRING", MQTTCLIENT_BAD_UTF8_STRING);
PyModule_AddIntConstant(m, "BAD_NULL_PARAMETER", MQTTCLIENT_NULL_PARAMETER);
PyModule_AddIntConstant(m, "BAD_TOPICNAME_TRUNCATED", MQTTCLIENT_TOPICNAME_TRUNCATED);
PyModule_AddIntConstant(m, "PERSISTENCE_DEFAULT", MQTTCLIENT_PERSISTENCE_DEFAULT);
PyModule_AddIntConstant(m, "PERSISTENCE_NONE", MQTTCLIENT_PERSISTENCE_NONE);
PyModule_AddIntConstant(m, "PERSISTENCE_USER", MQTTCLIENT_PERSISTENCE_USER);
PyModule_AddIntConstant(m, "PERSISTENCE_ERROR",
MQTTCLIENT_PERSISTENCE_ERROR);
}
from distutils.core import setup, Extension
paho_mqtt3c = Extension('paho_mqtt3c',
define_macros = [('NO_HEAP_TRACKING', '1')],
sources = ['mqttclient_module.c', '../../src/LinkedList.c'],
libraries = ['paho-mqtt3c'],
library_dirs = ['../../build/output'],
include_dirs = ['../../src'])
paho_mqtt3a = Extension('paho_mqtt3a',
define_macros = [('NO_HEAP_TRACKING', '1')],
sources = ['mqttasync_module.c', '../../src/LinkedList.c'],
libraries = ['paho-mqtt3a'],
library_dirs = ['../../build/output'],
include_dirs = ['../../src'])
setup (name = 'EclipsePahoMQTTClient',
version = '1.0',
description = 'Binding to the Eclipse Paho C clients',
ext_modules = [paho_mqtt3c, paho_mqtt3a])
import paho_mqtt3c as mqttv3, time, random
print dir(mqttv3)
host = "localhost"
clientid = "myclientid"
noclients = 4
def deliveryComplete(context, msgid):
print "deliveryComplete", msgid
def connectionLost(context, cause):
print "connectionLost"
print "rc from reconnect is", mqttv3.connect(self.client)
def messageArrived(context, topicName, message):
print "clientid", context
print "topicName", topicName
print "message", message
return 1
print messageArrived
myclientid = None
clients = []
for i in range(noclients):
myclientid = clientid+str(i)
rc, client = mqttv3.create("tcp://"+host+":1883", myclientid)
print "client is", hex(client)
print "rc from create is", rc
print "rc from setcallbacks is", mqttv3.setcallbacks(client, client, connectionLost, messageArrived, deliveryComplete)
print "client is", hex(client)
print "rc from connect is", mqttv3.connect(client, {})
clients.append(client)
for client in clients:
print "rc from subscribe is", mqttv3.subscribe(client, "$SYS/#")
for client in clients:
print "rc from publish is", mqttv3.publish(client, "a topic", "a message")
print "rc from publish is", mqttv3.publish(client, "a topic", "a message", 1)
print "rc from publish is", mqttv3.publish(client, "a topic", "a message", 2)
print "about to sleep"
time.sleep(10)
print "finished sleeping"
for client in clients:
print "rc from isConnected is", mqttv3.isConnected(client)
print "rc from disconnect is", mqttv3.disconnect(client)
mqttv3.destroy(client)
import paho_mqtt3a as mqttv3, time, random
import contextlib
print dir(mqttv3)
hostname = "localhost"
clientid = "myclientid"
topic = "test2_topic"
def deliveryComplete(context, msgid):
print "deliveryComplete", msgid
def connectionLost(context, cause):
print "connectionLost"
print "rc from reconnect is", mqttv3.connect(self.client)
def messageArrived(context, topicName, message):
print "messageArrived", message
#print "clientid", context
#print "topicName", topicName
return 1
def onSuccess(context, successData):
print "onSuccess for", context["clientid"], context["state"], successData
responseOptions = {"context": context, "onSuccess": onSuccess, "onFailure" : onFailure}
#responseOptions = {"context": context}
if context["state"] == "connecting":
context["state"] = "subscribing"
print "rc from subscribe is", mqttv3.subscribe(client, topic, 2, responseOptions)
elif context["state"] == "subscribing":
context["state"] = "publishing qos 0"
print "rc from publish is", mqttv3.send(client, topic, "a QoS 0 message", 0, 0, responseOptions)
elif context["state"] == "publishing qos 0":
context["state"] = "publishing qos 1"
print "rc from publish is", mqttv3.send(client, topic, "a QoS 1 message", 1, 0, responseOptions)
elif context["state"] == "publishing qos 1":
context["state"] = "publishing qos 2"
print "rc from publish is", mqttv3.send(client, topic, "a QoS 2 message", 2, 0, responseOptions)
elif context["state"] == "publishing qos 2":
context["state"] = "finished"
print "leaving onSuccess"
def onFailure(context, failureData):
print "onFailure for", context["clientid"]
context["state"] = "finished"
noclients = 1
myclientid = None
clients = []
for i in range(noclients):
myclientid = clientid+str(i)
rc, client = mqttv3.create("tcp://"+hostname+":1883", myclientid)
#print "client is", hex(client)
print "rc from create is", rc
print "rc from setcallbacks is", mqttv3.setcallbacks(client, client, connectionLost, messageArrived, deliveryComplete)
context = {"client" : client, "clientid" : clientid, "state" : "connecting"}
print "rc from connect is", mqttv3.connect(client, {"context": context, "onSuccess": onSuccess, "onFailure": onFailure})
clients.append(context)
while [x for x in clients if x["state"] != "finished"]:
print [x for x in clients if x["state"] != "finished"]
time.sleep(1)
for client in clients:
if mqttv3.isConnected(client["client"]):
print "rc from disconnect is", mqttv3.disconnect(client["client"], 1000)
time.sleep(1)
mqttv3.destroy(client["client"])
print "after destroy"
import paho_mqtt3a as mqttv3, time, random
import contextlib
print dir(mqttv3)
hostname = "localhost"
clientid = "myclientid"
topic = "test2_topic"
def deliveryComplete(context, msgid):
print "deliveryComplete", msgid
def connectionLost(context, cause):
print "connectionLost", cause
client = context
responseOptions = {"context": context, "onSuccess": onSuccess, "onFailure" : onFailure}
print "rc from publish while disconnected is", mqttv3.send(client, topic, "message while disconnected", 2, 0, responseOptions)
def messageArrived(context, topicName, message):
print "messageArrived", message
#print "clientid", context
#print "topicName", topicName
return 1
def connected(context, cause):
print "connected", cause
def onSuccess(context, successData):
print "onSuccess for", context["clientid"], context["state"], successData
responseOptions = {"context": context, "onSuccess": onSuccess, "onFailure" : onFailure}
if context["state"] == "connecting":
context["state"] = "subscribing"
print "rc from subscribe is", mqttv3.subscribe(client, topic, 2, responseOptions)
elif context["state"] == "subscribing":
context["state"] = "publishing qos 0"
print "rc from publish is", mqttv3.send(client, topic, "a QoS 0 message", 0, 0, responseOptions)
elif context["state"] == "publishing qos 0":
context["state"] = "publishing qos 1"
print "rc from publish is", mqttv3.send(client, topic, "a QoS 1 message", 1, 0, responseOptions)
elif context["state"] == "publishing qos 1":
context["state"] = "publishing qos 2"
print "rc from publish is", mqttv3.send(client, topic, "a QoS 2 message", 2, 0, responseOptions)
elif context["state"] == "publishing qos 2":
context["state"] = "finished"
print "leaving onSuccess"
def onFailure(context, failureData):
print "onFailure for", context["clientid"]
context["state"] = "finished"
noclients = 1
myclientid = None
clients = []
for i in range(noclients):
myclientid = clientid+str(i)
rc, client = mqttv3.create("tcp://"+hostname+":1883", myclientid, mqttv3.PERSISTENCE_DEFAULT, {"sendWhileDisconnected" : 1})
#print "client is", hex(client)
print "rc from create is", rc
print "rc from setcallbacks is", mqttv3.setcallbacks(client, client, connectionLost, messageArrived, deliveryComplete)
print "rc from setconnected is", mqttv3.setconnected(client, client, connected)
context = {"client" : client, "clientid" : clientid, "state" : "connecting"}
print "rc from connect is", mqttv3.connect(client, {"cleansession" : 0, "automaticReconnect": 1, "context": context, "onSuccess": onSuccess, "onFailure": onFailure})
clients.append(context)
while [x for x in clients if x["state"] != "finished"]:
print [x for x in clients if x["state"] != "finished"]
time.sleep(1)
print "waiting for 60 seconds"
time.sleep(60)
for client in clients:
if mqttv3.isConnected(client["client"]):
print "rc from disconnect is", mqttv3.disconnect(client["client"], 1000)
time.sleep(1)
mqttv3.destroy(client["client"])
print "after destroy"
/*******************************************************************************
* Copyright (c) 2012, 2016 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
*******************************************************************************/
/**
* @file
* Offline buffering and automatic reconnect tests for the MQ Telemetry Asynchronous MQTT C client
*
*/
/*
#if !defined(_RTSHEADER)
#include <rts.h>
#endif
*/
#include "MQTTAsync.h"
#include <string.h>
#include <stdlib.h>
#include "Thread.h"
#if !defined(_WINDOWS)
#include <sys/time.h>
#include <sys/socket.h>
#include <unistd.h>
#include <errno.h>
#else
#include <winsock2.h>
#include <ws2tcpip.h>
#define MAXHOSTNAMELEN 256
#define EAGAIN WSAEWOULDBLOCK
#define EINTR WSAEINTR
#define EINPROGRESS WSAEINPROGRESS
#define EWOULDBLOCK WSAEWOULDBLOCK
#define ENOTCONN WSAENOTCONN
#define ECONNRESET WSAECONNRESET
#define snprintf _snprintf
#endif
#define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0]))
void usage()
{
printf("Options:\n");
printf("\t--test_no <test_no> - Run test number <test_no>\n");
printf("\t--server <hostname> - Connect to <hostname> for tests\n");
printf("\t--client_key <key_file> - Use <key_file> as the client certificate for SSL authentication\n");
printf("\t--client_key_pass <password> - Use <password> to access the private key in the client certificate\n");
printf("\t--server_key <key_file> - Use <key_file> as the trusted certificate for server\n");
printf("\t--verbose - Enable verbose output \n");
printf("\t--help - This help output\n");
exit(-1);
}
struct Options
{
char connection[100]; /**< connection to system under test. */
char mutual_auth_connection[100]; /**< connection to system under test. */
char nocert_mutual_auth_connection[100];
char server_auth_connection[100];
char anon_connection[100];
char* client_key_file;
char* client_key_pass;
char* server_key_file;
char* client_private_key_file;
int verbose;
int test_no;
int size;
} options =
{
"ssl://m2m.eclipse.org:18883",
"ssl://m2m.eclipse.org:18884",
"ssl://m2m.eclipse.org:18887",
"ssl://m2m.eclipse.org:18885",
"ssl://m2m.eclipse.org:18886",
"../../../test/ssl/client.pem",
NULL,
"../../../test/ssl/test-root-ca.crt",
NULL,
0,
0,
5000000
};
typedef struct
{
MQTTAsync client;
char clientid[24];
char topic[100];
int maxmsgs;
int rcvdmsgs[3];
int sentmsgs[3];
int testFinished;
int subscribed;
} AsyncTestClient;
#define AsyncTestClient_initializer {NULL, "\0", "\0", 0, {0, 0, 0}, {0, 0, 0}, 0, 0}
void getopts(int argc, char** argv)
{
int count = 1;
while (count < argc)
{
if (strcmp(argv[count], "--help") == 0)
{
usage();
}
else if (strcmp(argv[count], "--test_no") == 0)
{
if (++count < argc)
options.test_no = atoi(argv[count]);
else
usage();
}
else if (strcmp(argv[count], "--client_key") == 0)
{
if (++count < argc)
options.client_key_file = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--client_key_pass") == 0)
{
if (++count < argc)
options.client_key_pass = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--server_key") == 0)
{
if (++count < argc)
options.server_key_file = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--verbose") == 0)
{
options.verbose = 1;
printf("\nSetting verbose on\n");
}
else if (strcmp(argv[count], "--hostname") == 0)
{
if (++count < argc)
{
sprintf(options.connection, "ssl://%s:18883", argv[count]);
printf("Setting connection to %s\n", options.connection);
sprintf(options.mutual_auth_connection, "ssl://%s:18884", argv[count]);
printf("Setting mutual_auth_connection to %s\n", options.mutual_auth_connection);
sprintf(options.nocert_mutual_auth_connection, "ssl://%s:18887", argv[count]);
printf("Setting nocert_mutual_auth_connection to %s\n",
options.nocert_mutual_auth_connection);
sprintf(options.server_auth_connection, "ssl://%s:18885", argv[count]);
printf("Setting server_auth_connection to %s\n", options.server_auth_connection);
sprintf(options.anon_connection, "ssl://%s:18886", argv[count]);
printf("Setting anon_connection to %s\n", options.anon_connection);
}
else
usage();
}
count++;
}
}
#if 0
#include <logaX.h> /* For general log messages */
#define MyLog logaLine
#else
#define LOGA_DEBUG 0
#define LOGA_INFO 1
#include <stdarg.h>
#include <time.h>
#include <sys/timeb.h>
void MyLog(int LOGA_level, char* format, ...)
{
static char msg_buf[256];
va_list args;
struct timeb ts;
struct tm *timeinfo;
if (LOGA_level == LOGA_DEBUG && options.verbose == 0)
return;
ftime(&ts);
timeinfo = localtime(&ts.time);
strftime(msg_buf, 80, "%Y%m%d %H%M%S", timeinfo);
sprintf(&msg_buf[strlen(msg_buf)], ".%.3hu ", ts.millitm);
va_start(args, format);
vsnprintf(&msg_buf[strlen(msg_buf)], sizeof(msg_buf) - strlen(msg_buf),
format, args);
va_end(args);
printf("%s\n", msg_buf);
fflush(stdout);
}
#endif
void MySleep(long milliseconds)
{
#if defined(WIN32) || defined(WIN64)
Sleep(milliseconds);
#else
usleep(milliseconds*1000);
#endif
}
#if defined(WIN32) || defined(_WINDOWS)
#define START_TIME_TYPE DWORD
static DWORD start_time = 0;
START_TIME_TYPE start_clock(void)
{
return GetTickCount();
}
#elif defined(AIX)
#define START_TIME_TYPE struct timespec
START_TIME_TYPE start_clock(void)
{
static struct timespec start;
clock_gettime(CLOCK_REALTIME, &start);
return start;
}
#else
#define START_TIME_TYPE struct timeval
/* TODO - unused - remove? static struct timeval start_time; */
START_TIME_TYPE start_clock(void)
{
struct timeval start_time;
gettimeofday(&start_time, NULL);
return start_time;
}
#endif
#if defined(WIN32)
long elapsed(START_TIME_TYPE start_time)
{
return GetTickCount() - start_time;
}
#elif defined(AIX)
#define assert(a)
long elapsed(struct timespec start)
{
struct timespec now, res;
clock_gettime(CLOCK_REALTIME, &now);
ntimersub(now, start, res);
return (res.tv_sec)*1000L + (res.tv_nsec)/1000000L;
}
#else
long elapsed(START_TIME_TYPE start_time)
{
struct timeval now, res;
gettimeofday(&now, NULL);
timersub(&now, &start_time, &res);
return (res.tv_sec) * 1000 + (res.tv_usec) / 1000;
}
#endif
#define assert(a, b, c, d) myassert(__FILE__, __LINE__, a, b, c, d)
#define assert1(a, b, c, d, e) myassert(__FILE__, __LINE__, a, b, c, d, e)
#define MAXMSGS 30;
int tests = 0;
int failures = 0;
FILE* xml;
START_TIME_TYPE global_start_time;
char output[3000];
char* cur_output = output;
void write_test_result()
{
long duration = elapsed(global_start_time);
fprintf(xml, " time=\"%ld.%.3ld\" >\n", duration / 1000, duration % 1000);
if (cur_output != output)
{
fprintf(xml, "%s", output);
cur_output = output;
}
fprintf(xml, "</testcase>\n");
}
void myassert(char* filename, int lineno, char* description, int value,
char* format, ...)
{
++tests;
if (!value)
{
va_list args;
++failures;
printf("Assertion failed, file %s, line %d, description: %s", filename,
lineno, description);
va_start(args, format);
vprintf(format, args);
va_end(args);
cur_output += sprintf(cur_output, "<failure type=\"%s\">file %s, line %d </failure>\n",
description, filename, lineno);
}
else
MyLog(LOGA_DEBUG,
"Assertion succeeded, file %s, line %d, description: %s",
filename, lineno, description);
}
/*********************************************************************
Tests: offline buffering - sending messages while disconnected
1. send some messages while disconnected, check that they are sent
2. check max-buffered
3. check auto-reconnect parms alter behaviour as expected
*********************************************************************/
/*********************************************************************
Test1: offline buffering - sending messages while disconnected
1. call connect
2. use proxy to disconnect the client
3. while the client is disconnected, send more messages
4. when the client reconnects, check that those messages are sent
*********************************************************************/
int test1Finished = 0;
int test1OnFailureCalled = 0;
void test1cOnFailure(void* context, MQTTAsync_failureData* response)
{
MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
test1OnFailureCalled++;
test1Finished = 1;
}
void test1cOnFailure(void* context, MQTTAsync_failureData* response)
{
MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
test1OnFailureCalled++;
test1Finished = 1;
}
void test1cOnConnect(void* context, MQTTAsync_successData* response)
{
MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p\n", context);
}
int test1(struct Options options)
{
char* testname = "test1";
int subsqos = 2;
MQTTAsync c, d;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
int rc = 0;
char* test_topic = "C client offline buffering test";
int count = 0;
test1Finished = 0;
failures = 0;
MyLog(LOGA_INFO, "Starting Offline buffering 1 - messages while disconnected");
fprintf(xml, "<testcase classname=\"test1\" name=\"%s\"", testname);
global_start_time = start_clock();
rc = MQTTAsync_create(&c, options.connection, "paho-test9-test1-c", MQTTCLIENT_PERSISTENCE_DEFAULT,
NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit;
}
rc = MQTTAsync_create(&d, options.connection, "paho-test9-test1-d", MQTTCLIENT_PERSISTENCE_DEFAULT,
NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit;
}
opts.keepAliveInterval = 20;
opts.cleansession = 1;
opts.username = "testuser";
opts.password = "testpassword";
opts.will = NULL; /* don't need will for this client, as it's going to be connected all the time */
opts.context = d;
opts.onSuccess = test1dOnConnect;
opts.onFailure = test1dOnFailure;
MyLog(LOGA_DEBUG, "Connecting client d");
rc = MQTTAsync_connect(d, &opts);
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
if (rc != MQTTASYNC_SUCCESS)
{
failures++;
goto exit;
}
/* wait until d is ready: connected and subscribed */
count = 0;
while (!test1dReady && ++count < 10000)
MySleep(100);
assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
/* let client c go: connect, publish some messages, and send disconnect command to proxy */
opts.will = &wopts;
opts.will->message = "will message";
opts.will->qos = 1;
opts.will->retained = 0;
opts.will->topicName = "will topic";
opts.onSuccess = test1cOnConnect;
opts.onFailure = test1cOnFailure;
opts.context = c;
MyLog(LOGA_DEBUG, "Connecting client c");
rc = MQTTAsync_connect(c, &opts);
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
if (rc != MQTTASYNC_SUCCESS)
{
failures++;
goto exit;
}
/* wait for success or failure callback */
while (!test1Finished && ++count < 10000)
MySleep(100);
exit: MQTTAsync_destroy(&c);
MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", testname, tests, failures);
write_test_result();
return failures;
}
void handleTrace(enum MQTTASYNC_TRACE_LEVELS level, char* message)
{
printf("%s\n", message);
}
int main(int argc, char** argv)
{
int* numtests = &tests;
int rc = 0;
int (*tests[])() = { NULL, test1, };
xml = fopen("TEST-test9.xml", "w");
fprintf(xml, "<testsuite name=\"test9\" tests=\"%lu\">\n", ARRAY_SIZE(tests) - 1);
MQTTAsync_setTraceCallback(handleTrace);
getopts(argc, argv);
if (options.test_no == 0)
{ /* run all the tests */
for (options.test_no = 1; options.test_no < ARRAY_SIZE(tests); ++options.test_no)
{
failures = 0;
MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
rc += tests[options.test_no](options); /* return number of failures. 0 = test succeeded */
}
}
else
{
MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
rc = tests[options.test_no](options); /* run just the selected test */
}
MyLog(LOGA_INFO, "Total tests run: %d", *numtests);
if (rc == 0)
MyLog(LOGA_INFO, "verdict pass");
else
MyLog(LOGA_INFO, "verdict fail");
fprintf(xml, "</testsuite>\n");
fclose(xml);
return rc;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment