Commit d43949c4 authored by aohui.li's avatar aohui.li

Upload deploy scripts

parent 22ef3bc2
#!/bin/bash
current_dir=$(pwd)
resources="${current_dir}/resources1"
status_folder="${current_dir}/status1"
echo "resources->${resources}"
echo "status_folder->${status_folder}"
# Function
run_command() {
echo "执行命令: $@"
if ! sudo "$@"; then
echo "错误:命令执行失败 - $@"
exit 1
fi
}
#############################################
if [ ! -e ${resources} ]; then
echo "${resources} folder is empty, stop "
exit 1
fi
if [ ! -e ${status_folder} ]; then
echo "mkdir ..............."
mkdir -p ${status_folder}
fi
if [ ! -e ${status_folder}/success-1 ]; then
run_command cp ${resources}/sources.list /etc/apt/sources.list
run_command apt update
run_command apt install python3-dev python3-pip lrzsz git gcc pkg-config \
libcairo2-dev tig libgirepository1.0-dev minicom \
cron logrotate \
-y
run_command apt autoremove -y
run_command apt autoclean -y
touch ${status_folder}/success-1
fi
if [ ! -e ${status_folder}/success-2 ]; then
run_command pip3 install --upgrade pip -i https://mirrors.aliyun.com/pypi/simple/
run_command pip3 install --upgrade setuptools -i https://mirrors.aliyun.com/pypi/simple/
touch ${status_folder}/success-2
fi
if [ -e ${status_folder}/success-1 ] && [ -e ${status_folder}/success-2 ]; then
touch ${status_folder}/success_ok
echo "所有命令执行成功。"
fi
\ No newline at end of file
#!/bin/bash
FIXED_FTP_SERVER="wtlink.witium.com"
FIXED_FTP_PATH="/607/wtdiagnose/1.16_Fit_2.62/wtdiagnose.zip"
FIXED_FTP_EXPECTED_MD5="c33ffc52c31eec6d2a37619de563a26b"
# git clone 函数
function git_clone_with_auth() {
# 函数参数: $1 - Git仓库URL, $2 - 用户名, $3 - 密码, $4 - 克隆后的目录名
local url="$1"
local username="$2"
local password="$3"
local repo_name="$4"
# 检查git命令是否存在
if ! command -v git &> /dev/null; then
echo "git 未安装, 请先安装"
exit 1
fi
# 清理旧目录,如果存在
if [ -d "$repo_name" ]; then
echo "清理旧的克隆目录: $repo_name"
rm -rf "$repo_name"
fi
mkdir "$repo_name"
echo "git clone 开始 ..... "
# 创建Base64编码的认证字符串
local auth_header=$(echo -n "$username:$password" | base64)
# 执行git clone命令,使用--config来设置HTTP头部
git clone --config http.extraheader="Authorization: Basic $auth_header" "$url" "$repo_name"
}
# ftp 文件下载和MD5校验
function download_and_extract_zip(){
local server=$1
local port=$2
local path=$3
local user=$4
local password=$5
local expected_md5=$6
local filename="${path##*/}"
local file_app=$7
# 检查curl 和 md5sum 是否被安装
if ! command -v curl &> /dev/null || ! command -v md5sum &> /dev/null; then
echo "Error: This script requires 'curl' and 'md5sum'. Please install them."
return 1
fi
# 根据传参合成ftp url
local ftp_url="ftp://${server}:${port}${path}"
# 检查文件是否存在
if [ -d "$file_app" ]; then
echo "清理旧的解压目录: $file_app"
rm -rf "$file_app"
fi
# 下载文件
echo "下载文件从 ${ftp_url}"
curl -u "${user}:${password}" -o "${filename}" "${ftp_url}"
# 检查文件是否存在
if [ ! -f "$filename" ]; then
echo "Error: 下载失败. 文件不被发现."
return 1
fi
# MD5检查
local actual_md5=$(md5sum "$filename" | awk '{print $1}')
if [ "$actual_md5" != "$expected_md5" ]; then
echo "Error: MD5 校验失败. 文件也许是错误的."
rm "$filename"
return 1
fi
mkdir "$file_app"
echo "下载后的文件位置 ${filename}..."
unzip "$filename" -d "$file_app"
chmod 777 "$file_app/${filename%.zip}"
# 检查解压状态
if [ $? -eq 0 ]; then
echo "文件解压成功."
else
echo "Error: 解压失败."
rm "$filename"
return 1
fi
# Cleanup ZIP file
rm "$filename"
echo "zip 文件删除."
return 0
}
setup_quectel() {
local env=$1
local zip_file="quectel-CM-binary-RK3568.zip"
local bin_file="quectel-CM"
local service_file="quectel-autodial.service"
local extract_dir="${zip_file%.zip}"
echo "4G驱动程序初始化.... "
if [ ! -d "$env" ]; then
echo "错误: 指定的环境目录 '$env' 不存在。"
return 1
fi
local zip_path="${env}/${zip_file}"
if [ ! -f "$zip_path" ]; then
echo "错误: ZIP文件 '$zip_path' 不存在。"
return 1
fi
local extract_path="${env}/${extract_dir}"
if [ ! -d "$extract_path" ]; then
echo "解压缩4G配置文件..."
unzip "$zip_path" -d "$env"
fi
local bin_path="${extract_path}/${bin_file}"
if [ ! -f "$bin_path" ]; then
echo "错误:二进制文件 '$bin_path' 不存在。"
return 1
fi
local service_path="${extract_path}/${service_file}"
if [ ! -f "$service_path" ]; then
echo "错误:service 文件 '$service_path' 不存在。"
return 1
fi
chmod +x "$bin_path"
sudo cp "$bin_path" /usr/sbin/
sudo cp "$service_path" /lib/systemd/system/
# sudo systemctl enable "$service_file"
# sudo systemctl start "$service_file"
rm -rf "$extract_path"
echo "设置完成."
}
setup_frpc() {
local env=$1
local zip_file="frpc_0.59.0_linux_arm64.zip"
local extract_dir="${zip_file%.zip}"
echo "frpc驱动程序初始化.... "
if [ ! -d "$env" ]; then
echo "错误: 指定的环境目录 '$env' 不存在。"
return 1
fi
local zip_path="${env}/${zip_file}"
if [ ! -f "$zip_path" ]; then
echo "错误: ZIP文件 '$zip_path' 不存在。"
return 1
fi
local extract_path="${env}/${extract_dir}"
if [ ! -d "$extract_path" ]; then
echo "解压缩frpc配置文件..."
unzip "$zip_path" -d "$env"
fi
chmod 777 "${extract_path}/frpc"
sudo cp "${extract_path}/frpc" /usr/bin/
chmod 777 "${extract_path}/mqtt_frpc"
sudo cp "${extract_path}/mqtt_frpc" /usr/bin/
sudo mkdir /etc/frp
sudo cp "${extract_path}/frpc.toml" /etc/frp/
sudo cp "${extract_path}/config.json" /etc/frp/
sudo cp "${extract_path}/mqtt_frpc.service" "/usr/lib/systemd/system/"
# sudo systemctl enable mqtt_frpc
rm -rf "$extract_path"
echo "设置完成."
}
setup_ssh() {
local env=$1
echo "ssh密钥设置.... "
if [ ! -d "$env" ]; then
echo "错误: 指定的环境目录 '$env' 不存在。"
return 1
fi
mkdir -p ~/.ssh
chmod 700 ~/.ssh
cp "${env}/authorized_keys" ~/.ssh/authorized_keys
chmod 600 ~/.ssh/authorized_keys
echo "设置完成."
}
# Example usage of the function
if [ "${BASH_SOURCE[0]}" == "${0}" ]; then
# 显示改脚本功能
echo "ECGPro-WTG6XXE 应用程序安装"
# # 输入相关信息
# read -p "输入FTP服务地址: " server
# read -p "输入在文件上的服务路径: " path
# echo
# read -p "输入MD5校验码: " expected_md5
server=${FIXED_FTP_SERVER}
path=${FIXED_FTP_PATH}
expected_md5=${FIXED_FTP_EXPECTED_MD5}
app_path="wtdiagnose"
git_path="wtdiagnose_debug"
# Call the function with provided arguments
if download_and_extract_zip "$server" "21" "$path" "witium" "Huidu37774020" "$expected_md5" "$app_path"; then
echo "app 可执行文件下载成功"
git_clone_with_auth "http://gitlab.witium.com.cn/jie_embedded/wtdiagnose.git" "aohui.li@witium.com" "Ktliaohui0808" "$git_path"
# 检查函数执行结果
if [ $? -eq 0 ]; then
echo "克隆成功."
echo "app应用程序初始化正在操作....."
cp -r "${git_path}/file" "${app_path}"
cp -r "${git_path}/db" "${app_path}"
cp "${git_path}/ftp_update" "${app_path}"
chmod 777 "${app_path}/ftp_update"
sudo cp "${git_path}/service/ftp_update.service" "/usr/lib/systemd/system/"
sudo cp "${git_path}/service/frpc.service" "/usr/lib/systemd/system/"
sudo cp "${git_path}/service/wtdiagnose.service" "/usr/lib/systemd/system/"
# sudo systemctl enable wtdiagnose
echo "app应用程序初始化完成....."
setup_quectel "${git_path}/env"
setup_frpc "${git_path}/env"
setup_ssh "${git_path}/env"
else
echo "克隆失败."
fi
else
echo "存在错误操作."
fi
fi
\ No newline at end of file
#!/bin/bash
current_dir=$(pwd)
resources=${current_dir}/resources3
status_folder=${current_dir}/status3
run_command() {
echo "执行命令: $@"
if ! sudo "$@"; then
echo "错误:命令执行失败 - $@"
exit 1
fi
}
REQ="${resources}/r.txt"
#############################################
if [ ! -e ${status_folder} ]; then
echo "mkdir ..............."
mkdir -p ${status_folder}
fi
if [ ! -e ${resources} ]; then
echo "${resources} folder is empty, stop "
exit 1
fi
if [ ! -e ${status_folder}/success-1 ]; then
if [ -e "$REQ" ]; then
run_command pip install -r ${REQ} -i https://mirrors.aliyun.com/pypi/simple/
touch ${status_folder}/success-1
else
echo "$REQ Not Found"
exit 1
fi
fi
if [ -e ${status_folder}/success-1 ]; then
touch ${status_folder}/success_ok
echo "所有命令执行成功。"
fi
\ No newline at end of file
#!/bin/bash
current_dir=$(pwd)
resources="${current_dir}/resources4"
status_folder="${current_dir}/status4"
if [ -d ${resources} ];then
cd ${resources}
sudo pip install iic_gpio-1.0.0-cp37-cp37m-linux_aarch64.whl
else
echo "No "
exit 1
fi
cd ${resources}/i2c/
sudo chmod +x *.sh
sudo python3 main.py
import sys
import time
import serial
import datetime
import os
executable_path = sys.argv[0]
path = os.path.dirname(os.path.abspath(executable_path))
resourcesPath = path + os.sep
# 创建日志文件保存目录
current_time = datetime.datetime.now()
# 创建日志文件名
log_filename = os.path.join(resourcesPath, f"signal_log_{current_time.strftime('%Y%m%d_%H%M%S')}.bin")
# 检查日志文件是否已经存在
if os.path.exists(log_filename):
# 如果文件存在,则继续写入现有文件
with open(log_filename, 'ab') as log_file:
ser = serial.Serial('/dev/ttyUSB2', 2000000, timeout=2)
try:
while True:
ser.write(b'AT+CSQ\r\n')
response = ser.read_until(b'OK')
data = response.decode('utf-8')
signal = data.split(':')[1].split(',')[0].strip()
# 获取当前时间并将其转换为二进制形式
current_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S').encode('utf-8')
# 将信号值和时间写入日志文件
log_file.write(current_time + b'\n')
log_file.write(signal.encode('utf-8') + b'\n')
time.sleep(60)
finally:
ser.close() # 确保串口在程序结束时关闭
else:
# 如果文件不存在,则创建新文件
with open(log_filename, 'wb') as log_file:
ser = serial.Serial('/dev/ttyUSB2', 2000000, timeout=2)
try:
while True:
ser.write(b'AT+CSQ\r\n')
response = ser.read_until(b'OK')
data = response.decode('utf-8')
signal = data.split(':')[1].split(',')[0].strip()
# 获取当前时间并将其转换为二进制形式
current_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S').encode('utf-8')
# 将信号值和时间写入日志文件
log_file.write(current_time + b'\n')
log_file.write(signal.encode('utf-8') + b'\n')
time.sleep(60)
finally:
ser.close() # 确保串口在程序结束时关闭
\ No newline at end of file
import time
import iic_gpio
def my_callback(value):
try:
print('开始采集数据')
except Exception as e:
print(f"An error occurred in the callback: {e}")
# Simulate calling the callback function multiple times
if __name__ == "__main__":
iic_gpio.set_callback(my_callback)
iic_gpio.init_isr_gpio("467", "falling")
iic_gpio.wait_interrupt("467")
while True:
time.sleep(300)
.PHONY: all
all: debian_app pip_app led_app isr_app wtcurrent_app
@echo "all done."
.PHONY: debian_app
debian_app: resources1
cp ./resources1/sources.list /etc/apt/sources.list
sudo apt update
sudo apt install \
python3-dev python3-pip lrzsz git gcc pkg-config \
libcairo2-dev tig libgirepository1.0-dev minicom \
cron logrotate -y
sudo apt autoremove -y
sudo apt autoclean -y
touch debian_app
echo "debian_app done"
.PHONY: pip_app
pip_app: resources1
sudo pip3 install --upgrade pip -i https://mirrors.aliyun.com/pypi/simple/
sudo pip3 install --upgrade setuptools -i https://mirrors.aliyun.com/pypi/simple/
sudo pip3 install -r ./resources3/r.txt -i https://mirrors.aliyun.com/pypi/simple/
touch pip_app
echo "pip_app done"
.PHONY: wt_app
repo_name ?= source_code
username=aohui.li
password=Ktliaohui0808
remote_url=http://$(username):$(password)@gitlab.witium.com.cn/jie_embedded/wtdiagnose.git
wt_app:
@echo "git clone 开始 ..... "
@echo
git clone $(remote_url) $(repo_name)
@if [ $clone_status -eq 0 ]; then \
echo "仓库克隆成功。" \
# 这里可以添加克隆成功后要执行的其他操作,比如进入仓库目录执行其他命令
cd "$repo_name" \
# 示例:查看克隆下来的仓库的分支信息
git branch \
else \
echo "仓库克隆失败,退出状态码: $clone_status" \
# 这里可以添加克隆失败后要执行的其他操作,比如记录错误日志等
fi
.PHONY: led_app
led_op ?= open
led_app: resources4
@echo "Wait..."
@if pip show iic-gpio > /dev/null 2>&1; then \
echo "iic-gpio 已安装"; \
else \
echo "iic-gpio 未安装"; \
sudo pip install ./resources4/iic_gpio-1.0.0-cp37-cp37m-linux_aarch64.whl; \
fi
@if [ "$(led_op)" = "open" ]; then \
echo "正在打开GPIO $(PIN)..."; \
for pin in $$(seq 447 462); do \
echo "正在设置GPIO $$pin"; \
sudo bash ./resources4/i2c/write_gpio.sh $$pin 1; \
done; \
echo "LED灯全部点亮"; \
elif [ "$(led_op)" = "close" ]; then \
echo "正在关闭GPIO $(PIN)..."; \
for pin in $$(seq 447 462); do \
echo "正在设置GPIO $$pin"; \
sudo bash ./resources4/i2c/write_gpio.sh $$pin 0; \
done; \
echo "LED灯全部关闭"; \
else \
echo "未知的操作: $(led_op). 请使用 'open' 或 'close'."; \
exit 1; \
fi
@echo "Check Led Status!"
.PHONY: isr_app
isr_app: resources6
@gcc resources6/isr.c -o isr
@echo "测试中断输入:sudo ./isr <interrupt_pin> \n举例:sudo ./isr 467"
.PHONY: rs485_app
rs485_app: RES
@sudo python3 RES/rs485_scanf.py
@echo "执行完毕,请检查!"
.PHONY: wtcurrent_app
wtcurrent_app: resources7
@sudo pip install resources7/wtina226-1.0.1-cp37-cp37m-linux_aarch64.whl
##############################################
###
### 特殊功能
###
##############################################
.PHONY: modify_mac
eth?='unknow'
mac?='unkonw'
modify_mac: RES
@chmod +x ./RES/rk3568-snmac
# 清理目标(可选)
.PHONY: clean
clean:
@echo "Cleaning up..."
@rm -rf dist build *.log
rm led_app
# 帮助信息(可选)
.PHONY: help
help:
@echo "Usage:"
@echo " make all - Build, test and deploy the application"
\ No newline at end of file
import serial
from modbus_tk import modbus_rtu
from modbus_tk.defines import READ_HOLDING_REGISTERS
from time import sleep, time ,strftime ,localtime
def poll_modbus_channels():
"""Poll each Modbus channel 100 times to evaluate performance."""
# Configurations for Modbus RTU
config = {
"baud": 9600,
"COM": "/dev/ttyS0"
}
print(config)
try:
# Open the serial port
serial_port = serial.Serial(
port=config["COM"],
baudrate=config["baud"],
bytesize=8,
parity='N',
stopbits=1,
timeout=0.5 # Timeout duration
)
# Initialize Modbus master
master = modbus_rtu.RtuMaster(serial_port)
master.set_timeout(0.5) # Set Modbus timeout
master.set_verbose(True) # Enable detailed logs
print("Starting Modbus channel polling...")
# Correctly adjusting the time by 8 hours
time_adjusted_correct = strftime("%Y-%m-%d %H:%M:%S", localtime(time() + 8*3600))
print("执行时间 :" , time_adjusted_correct)
# Iterate over Modbus ID range (1 to 247)
for modbus_id in range(1, 2):
if modbus_id in [] : continue
success_count = 0
failure_count = 0
continue_failure_num =0
ts1 = time()
for attempt in range(10):
try:
# Attempt to read holding register
response = master.execute(modbus_id, READ_HOLDING_REGISTERS, 6, 2)
success_count += 1
continue_failure_num =0
except Exception as e:
failure_count += 1
continue_failure_num +=1
if continue_failure_num >= 50 :
break
tsdiff = time() - ts1
# Print summary after 100 attempts
print(f"Modbus ID {modbus_id}: {success_count} 次成功, {failure_count} 次失败. 总耗时 {tsdiff}秒")
print("Completed all channel polling.")
except serial.SerialException as se:
print(f"Serial port error: {se}")
except Exception as e:
print(f"Unexpected error: {e}")
finally:
if serial_port and serial_port.is_open:
serial_port.close()
print("Serial port closed.")
if __name__ == "__main__":
poll_modbus_channels()
This diff is collapsed.
This diff is collapsed.
{
"mqtt":{
"broker":"116.62.127.242",
"port": 1883,
"user_name": "witcd",
"password": "Witium37774020"
},
"Topic":"WT",
"Type":"WTG6XXF",
"SN": "24010535",
"phy_4g_socket": false
}
\ No newline at end of file
module/__pycache__
\ No newline at end of file
# python 工况网关
## V1.00
描述:
1. ./file/Config.json 为网关类型和SN;
2. ./file/WtLinkConfig.json 为工况配置文件
## V1.01
描述:
1. CheckIn上发本地IP
## V1.02
描述:
1. 607和Window兼容
## V1.03
描述:
1. 增加opc ua 读取
{
"gateway":"WTG93RF", // mqtt 上发的 type
"sn": "20240822", // mqtt 上发的 sn
"baud":9600, // 串口波特率设置
"COM": "/dev/ttyS0", // 串口端口号
"code_status": 1, // 额外功能, 0:不开启modbus tcp salve功能, 1:开启 modbus tcp salve功能
"source_rm":1 // 通信方式,0: modbus rtu master; 1: opc ua ; 2: modbus rtu master 和 opc ua 同时获取; 3: modbus tcp
}
\ No newline at end of file
{
"gateway":"WTG93RF",
"sn": "24010538",
"baud":9600,
"COM": "/dev/ttyS0",
"code_status": 0,
"source_rm": 0
}
\ No newline at end of file
{"period":30,"ip":"116.62.127.242","port":"1883","devID":1,"topic":"","cfgTopic":"","resTopic":"","user":"witcd","pwd":"Witium37774020","type":"MQTT","config":[
{"id":1,"cmd":4099,"addr":0,"cnt":1,"map":1,"name":"ptrs","max":100,"min":0,"rate":10,"sensor":1,"type":"int"}
]}
\ No newline at end of file
[{
"ip": "172.16.0.111",
"port":502,
"slave_id": [1, 10]
}]
\ No newline at end of file
[{
"url": "opc.tcp://LAPTOP-BK6463EO:53530/OPCUA/SimulationServer",
"opcua": [{
"node_ns": "3",
"node_s":"i",
"linkrm":["1007", "1001"],
"feature":["etya", "ept1"]
}]
}]
\ No newline at end of file
import os
import platform
import socket
import threading
import time
from module.tcp_handler import tcp_handler
from module.file_proc import file_proc
from module.smqtt import smqtt
from module.wlmodbus import wlmodbus
from module.wtopcua import wtopcua
path = os.path.dirname(os.path.abspath(__file__))
dev_sys = platform.system()
resourcesPath = path + os.sep + 'file' + os.sep
def checkin_timer():
global g_bSendCheckIn
g_bSendCheckIn = True
def opData_timer():
global g_bSendOpData
g_bSendOpData = True
def error_timer():
global g_bErrorCon
g_bErrorCon = True
def main_run(source_rm):
global g_bSendCheckIn
global g_bSendOpData
global g_bErrorCon
global witConfigSep
while True:
sendData = {}
try:
if mqtt._start_flag is True:
if g_bSendCheckIn is True:
g_bSendCheckIn = False
sendPack = [{"addr":0,"version":"1.02","hardware":"607","local_ip":ip}]
mqtt.publish(checkInpub, sendPack)
threading.Timer(24 * 3600, checkin_timer).start()
if source_rm == 0 or source_rm == 2:
if wtlinkOp._is_ok is False and g_bErrorCon is True:
g_bErrorCon = False
mqtt.publish(ErrorPub, "Serial port is not connected")
threading.Timer(300, error_timer).start()
# 读取工况
if wtlinkOp._is_ok is True:
witConfigSep.acquire()
wtLinkJson = wtLinkData.getWtLinkConfigData()
operation = wtLinkJson['config']
wtlinkOp.set_rule_data(operation)
for index in operation:
cmd = index['cmd'] % 256
wtlinkOp.readWtLinkOpData(index['id'], cmd, index['name'], index['type'], index['rate'], index['addr'], index['cnt'], index['sensor'])
time.sleep(0.1)
witConfigSep.release()
sendOpdata = wtlinkOp.getWtLinkOpData()
for key, value in sendOpdata.items():
sendData[key] = value
if source_rm == 1 or source_rm == 2:
wt_opcua.get_opcua_data()
sendOpdata = wt_opcua.get_opcua_rm_data()
for key, value in sendOpdata.items():
sendData[key] = value
# 发送工况
sendData['ts'] = round(time.time())
sendData['addr'] = 1
sendList = []
sendList.append(sendData)
if g_bSendOpData is True:
g_bSendOpData = False
mqtt.publish(opdataPub, sendList)
if source_rm == 0 or source_rm == 2:
wtlinkOp.del_data()
threading.Timer(opDataPeriod_s, opData_timer).start()
else:
time.sleep(0.1)
except Exception as exp:
print(exp)
witConfigSep.release()
time.sleep(0.1)
def config_task():
global witConfigSep
while True:
try:
if mqtt._start_flag is True:
mqtt.wait_rcv_event()
witConfigSep.acquire()
config = mqtt.get_rcv_data()
wtLinkData.setOpDatafile(resourcesPath + 'WtLinkConfig.json', config)
witConfigSep.release()
mqtt.clear_rcv_event()
except Exception as exp:
print(exp)
witConfigSep.release()
mqtt.clear_rcv_event()
time.sleep(0.1)
if __name__ == '__main__':
hostname = socket.gethostname()
ip = socket.gethostbyname(hostname)
configData = file_proc(resourcesPath + 'Config.json')
Config = configData.getConfigData()
wtLinkData = file_proc(resourcesPath + 'WtLinkConfig.json')
gateway_type = Config.get("gateway", "WTG93RF")
gateway_sn = Config.get("sn", "12345678")
g_com = Config.get("COM", "/dev/ttyS0")
g_baud = Config.get("baud", 9600)
code_status = Config.get("code_status", 0)
source_rm = Config.get("source_rm", 0)
opcuacfg = file_proc(resourcesPath + 'opcuaConfig.json')
if code_status == 1:
time.sleep(60)
tcp_config = file_proc(resourcesPath + 'modbustcp.json')
mb_tcp = tcp_handler(tcp_config._data)
mqtt_info = wtLinkData.getMqttInfo()
opDataPeriod_s = wtLinkData.getOpDataPeriod()
configsub = 'WT/%s/%s/ConfigRM' %(gateway_type, gateway_sn)
checkInpub = 'WT/%s/%s/CheckIn' %(gateway_type, gateway_sn)
opdataPub = 'WT/%s/%s/opData' %(gateway_type, gateway_sn)
ErrorPub = 'WT/%s/%s/Error' %(gateway_type, gateway_sn)
g_bSendCheckIn = True
g_bSendOpData = True
g_bErrorCon = True
witConfigSep = threading.Lock()
mqtt = smqtt(mqtt_info)
mqtt.set_add_topic(configsub)
mqtt.open_mqtt()
mqtt_thread = threading.Thread(target=mqtt.internet_task, args=(dev_sys,))
mqtt_thread.start()
if source_rm == 0 or source_rm == 2:
if code_status == 1:
wtlinkOp = wlmodbus(g_com, g_baud, mb_tcp.data_write_handler)
else:
wtlinkOp = wlmodbus(g_com, g_baud, None)
uart_thread = threading.Thread(target=wtlinkOp.com_connect_task)
uart_thread.start()
if source_rm == 1 or source_rm == 2:
cfg_opcua = opcuacfg._data[0]
opcua_url = cfg_opcua.get("url", "opc.tcp://localhost:4840")
wt_opcua = wtopcua(opcua_url)
wt_opcua.get_node_and_feature_from_opcuacfg(cfg_opcua["opcua"])
rcv_thread = threading.Thread(target=config_task)
rcv_thread.start()
main_thread = threading.Thread(target=main_run, args=(source_rm,))
main_thread.start()
# 607需要加上死循环,否则程序就会关闭
while True:
time.sleep(300)
\ No newline at end of file
import json
class file_proc:
def __init__(self, path) -> None:
self._data = None
try:
with open(path, 'r') as load_f:
context = load_f.read()
self._data = json.loads(context)
except Exception as exp:
print(exp)
def getMqttInfo(self):
mqtt_info = {
'broker': 'www.witium.com.cn',
'port': 12883,
'user': 'witcd',
'password': 'Witium37774020'
}
try:
if self._data is not None:
mqtt_info['broker'] = str(self._data['ip'])
mqtt_info['port'] = int(self._data['port'])
mqtt_info['user'] = str(self._data['user'])
mqtt_info['password'] = str(self._data['pwd'])
return mqtt_info
except Exception as exp:
print(exp)
return mqtt_info
def getOpDataPeriod(self):
period_s = 300
try:
if self._data is not None:
period_s = self._data['period']
return period_s
except Exception as exp:
print(exp)
return period_s
def setOpDatafile(self, path, data):
self._data = data
try:
with open(path ,"w") as f:
json.dump(data, f)
except Exception as exp:
print(exp)
def getConfigData(self):
data = {"gateway":"WTG93RF","sn": "99991234","COM": "COM7"}
if self._data is not None:
return self._data
else:
return data
def getWtLinkConfigData(self):
if self._data is not None:
return self._data
else:
return None
import logging
class slog:
def __init__(self) -> None:
logging.getLogger().setLevel(logging.INFO)
def info(self, log) ->None:
logging.info(log)
def warning(self, log) ->None:
logging.warning(log)
def error(self, log) ->None:
logging.error(log)
\ No newline at end of file
#!usr/bin/python3
import json
import threading
import time
import paho.mqtt.client as mqtt
import subprocess
import os
from module.slog import slog
class smqtt:
def __init__(self, mqtt_info):
self._log = slog()
self._topic_enable = False
self._topic = []
self._mqtt_enable = False
self._mqtt_info = mqtt_info
self._client = None
self._start_flag = False
self._rcv_event = threading.Event()
self._config_event = threading.Event()
self._data = None
self._updata_data = None
self._cnt = 0
self._totalPack = 0
def open_mqtt(self):
# open mqtt
try:
if 'user' in self._mqtt_info and 'password' in self._mqtt_info \
and 'broker' in self._mqtt_info and 'port' in self._mqtt_info:
self._client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1)
self._client.username_pw_set(self._mqtt_info['user'], self._mqtt_info['password'])
self._client.on_connect = self.on_connect
self._client.on_message = self.on_message
self._client.connect(self._mqtt_info['broker'], self._mqtt_info['port'])
self._mqtt_enable = True
self._log.info("mqtt config success")
else:
self._mqtt_enable = False
self._log.error("mqtt config error")
except Exception as exp:
self._mqtt_enable = False
self._log.error(exp)
def set_add_topic(self, topic):
self._topic_enable = True
self._topic.append(topic)
def get_add_topic(self):
if self._topic_enable:
return self._topic
else:
return None
def clear_topic(self):
del self._topic
self._topic = []
self._topic_enable = False
def on_connect(self, client, userdata, flags, rc):
if self._topic_enable and self._mqtt_enable:
for index in self._topic:
self._client.subscribe(index)
print(index)
self._start_flag = True
def on_message(self, client, userdata, msg):
try:
if self._topic_enable and self._mqtt_enable:
if "ConfigRM" in msg.topic:
data = str(msg.payload, encoding="utf-8")
data = eval(data)
if 'pack_id' in data:
if data["pack_id"] == 0:
self._cnt = 0
self._totalPack = -1
del self._updata_data
self._updata_data = None
self._updata_data = data
bStatus = True
else:
if self._cnt == data["pack_id"]:
for index in data["config"]:
self._updata_data['config'].append(index)
bStatus = True
else:
bStatus = False
if 'totalPack' in data:
self._totalPack = data["totalPack"]
if self._cnt == self._totalPack:
self._totalPack = 0
bStatus = True
self.set_rcv_event()
self._cnt = self._cnt + 1
if bStatus is True:
ConfigResponse = {"ss":[{"result":{"code":0,"message":"success"}}]}
else:
ConfigResponse = {"ss":[{"result":{"code":0,"message":"error"}}]}
self.publish(msg.topic + 'Response', ConfigResponse)
except Exception as exp:
print(exp)
def publish(self, config_topic, data):
try:
if self._client._state != 2 and self._mqtt_enable:
self._client.publish(config_topic, json.dumps(data, separators=(',', ':')), qos=1)
except Exception as exp:
self._log.error(exp)
def set_rcv_event(self):
if self._rcv_event is not None:
self._rcv_event.set()
def wait_rcv_event(self):
if self._rcv_event is not None:
self._rcv_event.wait()
def clear_rcv_event(self):
if self._rcv_event is not None:
self._rcv_event.clear()
def set_config_event(self):
if self._config_event is not None:
self._config_event.set()
def wait_config_event(self):
if self._config_event is not None:
self._config_event.wait()
def clear_config_event(self):
if self._config_event is not None:
self._config_event.clear()
def get_rcv_data(self):
return self._updata_data
def loop_task(self):
try:
if self._mqtt_enable:
self._client.loop_start()
except Exception as exp:
self._log.error(exp)
def del_task(self):
try:
if self._mqtt_enable:
self._client.loop_stop()
except Exception as exp:
self._log.error(exp)
def internet_task(self, dev):
connec_status = False
while True:
try:
fnull = open(os.devnull, 'w')
if dev != "Windows":
r = subprocess.call('sudo ping -c 1 ' + self._mqtt_info['broker'], shell=True, stdout=fnull, stderr=fnull)
else:
r = subprocess.call('ping ' + self._mqtt_info['broker'], shell=True, stdout=fnull, stderr=fnull)
if r:
if connec_status is True:
self._client.disconnect()
self.del_task()
connec_status = False
self._log.info("通信失败")
else:
if connec_status is False:
self._client.username_pw_set(self._mqtt_info['user'], self._mqtt_info['password'])
self._client.connect(self._mqtt_info['broker'], self._mqtt_info['port'])
if self._topic_enable and self._mqtt_enable:
for index in self._topic:
self._client.subscribe(index)
print(index)
self.loop_task()
connec_status = True
self._log.info('正常联网')
except Exception as exp:
print(exp)
time.sleep(60)
\ No newline at end of file
from module.wtMBtcp import wtMBtcp
class tcp_handler:
def __init__(self, config):
self._mbtcp = []
try:
for cfg in config:
mbtc = wtMBtcp(cfg['ip'], cfg['port'], cfg['slave_id'])
self._mbtcp.append(mbtc)
del mbtc
except Exception as exp:
print(exp)
def data_write_handler(self, id, cmd, address, new_value, data_type):
wtmbtcp : wtMBtcp
try:
for wtmbtcp in self._mbtcp:
wtmbtcp.modify_register(id, cmd, address, new_value, data_type)
except Exception as exp:
print(exp)
import struct
import time
import serial
from modbus_tk import modbus_rtu, modbus_tcp
# import usb
class wlmodbus:
def __init__(self, port=None, baudrate=None, slave_ip=None, tcp_callback=None) -> None:
self._port = port
self._baudrate = baudrate
self._slave_ip = slave_ip
self._is_ok = False
self._work_data = {}
self._rule = None
self._tcp_callback = tcp_callback
try:
if baudrate and slave_ip:
print("tcp和uart 不能同时连接")
return
if baudrate:
self._ser = serial.Serial(self._port, self._baudrate, rtscts=False, dsrdtr=False)
time.sleep(0.5)
self._master = modbus_rtu.RtuMaster(self._ser)
if slave_ip:
self._master = modbus_tcp.TcpMaster(host=self._slave_ip, port=self._port, timeout_in_sec=5.0)
self._master.set_timeout(5, use_sw_timeout=True)
self._is_ok = True
except Exception as exp:
print(self._port + ': Serial port is not connected')
def getWtLinkOpData(self):
return self._work_data
def readWtLinkOpData(self, modbus_id, cmd, name, type, rate, addr, cnt, sensor):
opdata = None
data = None
try:
if type != 'avg' and type != 'max' and type != 'min':
data = self._master.execute(modbus_id, cmd, addr, cnt)
o_data = [0, 0]
# data = [0,10] test
if type == 'int':
h_data = data[0]
elif type == 'int1':
h_data = struct.unpack('>H', struct.pack('<H', data[0]))[0]
elif type == 'long':
h_data = int.from_bytes(struct.pack('>HH', data[0], data[1]), 'big')
elif type == 'long2':
h_data = int.from_bytes(struct.pack('<HH', data[0], data[1]), 'little')
elif type == 'long3':
o_data[0] = struct.unpack('>H', struct.pack('<H', data[0]))[0]
o_data[1] = struct.unpack('>H', struct.pack('<H', data[1]))[0]
h_data = int.from_bytes(struct.pack('>HH', o_data[0], o_data[1]), 'big')
elif type == 'long4':
o_data[0] = struct.unpack('>H', struct.pack('<H', data[0]))[0]
o_data[1] = struct.unpack('>H', struct.pack('<H', data[1]))[0]
h_data = int.from_bytes(struct.pack('<HH', o_data[0], o_data[1]), 'little')
elif type == 'float':
h_data = struct.unpack('>f', struct.pack('>I', data[0] << 16 | data[1]))[0]
elif type == 'float2':
h_data = struct.unpack('<f', struct.pack('<I', data[1] << 16 | data[0]))[0]
elif type == 'float3':
o_data[0] = struct.unpack('>H', struct.pack('<H', data[0]))[0]
o_data[1] = struct.unpack('>H', struct.pack('<H', data[1]))[0]
h_data = struct.unpack('>f', struct.pack('>I', o_data[0] << 16 | o_data[1]))[0] # BA DC
elif type == 'float4':
o_data[0] = struct.unpack('>H', struct.pack('<H', data[0]))[0]
o_data[1] = struct.unpack('>H', struct.pack('<H', data[1]))[0]
h_data = struct.unpack('<f', struct.pack('<I', o_data[1] << 16 | o_data[0]))[0] # DC AB
else:
h_data = data[0]
if self._tcp_callback is not None:
self._tcp_callback(modbus_id, cmd, addr, h_data, type)
else:
if self._rule is not None:
for index in self._rule:
if index['id'] == modbus_id and index['addr'] == addr and index['sensor'] == sensor:
if index['name'] in self._work_data:
h_data = self._work_data[index['name']]
break
else:
return None
if type == '418x':
# * rate
if h_data < 0x8000:
opdata = h_data * 0.00061037 * rate
else:
opdata = (h_data - 0x10000) * 0.00061037 * rate
elif type == 'int' or type == 'int1' \
or type == 'long' or type == 'long2' or type == 'long3' or type == 'long4' \
or type == 'float' or type == 'float2' or type == 'float3' or type == 'float4':
opdata = h_data * rate
elif type == 'avg':
if name in self._work_data:
opdata = (h_data + self._work_data[name]) / 2.0
else:
opdata = (h_data + h_data) / 2.0
elif type == 'max':
if name in self._work_data:
if self._work_data[name] < h_data:
opdata = h_data
else:
opdata = self._work_data[name]
else:
opdata = h_data
elif type == 'min':
if name in self._work_data:
if self._work_data[name] > h_data:
opdata = h_data
else:
opdata = self._work_data[name]
else:
opdata = h_data
self._work_data[name] = round(opdata, 3)
self._is_ok = True
except Exception as exp:
self._is_ok = False
print(exp)
def del_data(self):
del self._work_data
self._work_data = {}
def set_rule_data(self, data):
self._rule = data
def com_connect_task(self):
while True:
if self._is_ok is True:
time.sleep(0.1)
else:
try:
if self._baudrate:
self._ser.close()
self._master.close()
del self._ser
del self._master
self._ser = serial.Serial(self._port, self._baudrate, rtscts=False, dsrdtr=False)
self._master = modbus_rtu.RtuMaster(self._ser)
elif self._slave_ip:
self._master.close()
self._master = modbus_tcp.TcpMaster(host=self._slave_ip, port=self._port, timeout_in_sec=5.0)
self._master.set_timeout(2, use_sw_timeout=True)
self._is_ok = True
except Exception as exp:
print(exp)
time.sleep(10)
import struct
import threading
import time
from modbus_tk.modbus_tcp import TcpServer
import modbus_tk.defines as cst
class wtMBtcp:
def __init__(self, ip='127.0.0.1', port=502, salve_id = 1):
self.ip = ip
self.port = port
self.slave_id = salve_id
self.slave = []
self.server = None
self.network_status = False
threading.Thread(target=self.initialize_data_store).start()
def all_keys_exist(self, variable, required_keys):
return all(key in variable for key in required_keys)
def initialize_data_store(self):
try:
if self.network_status is False:
self.network_status = True
self.server = TcpServer(address=self.ip, port=self.port)
for index, id in enumerate(self.slave_id):
self.slave.append(self.server.add_slave(id))
self.slave[index].add_block("A", cst.HOLDING_REGISTERS, 0, 10) # 模拟100个寄存器
self.server.start()
except Exception as exp:
self.network_status = False
for index, id in enumerate(self.slave_id):
self.slave[index].remove_block("A")
self.server.stop()
self.slave = []
print(exp)
time.sleep(10)
def modify_register(self, id, cmd, address, new_value, data_type):
try:
for index, id_cn in enumerate(self.slave_id):
if id_cn == id:
if data_type == 'int':
# int 类型使用两个寄存器,大端模式 # AB
values = struct.unpack('>H', struct.pack('>H', new_value))
self.slave[index].set_values("A", address, list(values))
elif data_type == 'int1':
# int 类型使用两个寄存器,小端模式 # BA
values = struct.unpack('>H', struct.pack('<H', new_value))
self.slave[index].set_values("A", address, list(values))
elif data_type == 'float':
# float 类型使用四个寄存器,大端模式 # AB CD
values = struct.unpack('>HH', struct.pack('>f', new_value))
self.slave[index].set_values("A", address, list(values))
elif data_type == 'float2':
# float 类型使用四个寄存器,小端模式 # CD BA
values = struct.unpack('<HH', struct.pack('<f', new_value))
self.slave[index].set_values("A", address, list(values))
elif data_type == 'float3':
values = struct.unpack('<HH', struct.pack('>f', new_value)) # BA DC
self.slave[index].set_values("A", address, list(values))
elif data_type == 'float4':
values = struct.unpack('>HH', struct.pack('<f', new_value)) # DC AB
self.slave[index].set_values("A", address, list(values))
elif data_type == 'long':
# long 类型使用四个寄存器,大端模式 # AB CD
values = struct.unpack('>HH', struct.pack('>i', new_value))
self.slave[index].set_values("A", address, list(values))
elif data_type == 'long2':
# long 类型使用四个寄存器,小端模式 # CD BA
values = struct.unpack('<HH', struct.pack('<i', new_value))
self.slave[index].set_values("A", address, list(values))
elif data_type == 'long3':
values = struct.unpack('<HH', struct.pack('>i', new_value)) # BA DC
self.slave[index].set_values("A", address, list(values))
elif data_type == 'long4':
values = struct.unpack('>HH', struct.pack('<i', new_value)) # DC AB
self.slave[index].set_values("A", address, list(values))
except Exception as exp:
print(exp)
def reconnect_tcp(self):
self.server.stop()
self.server = TcpServer(address=self.ip, port=self.port)
self.server.start()
self.slave = self.server.add_slave(self.slave_id)
import threading
import time
from asyncua.sync import Client
class wtopcua:
def __init__(self, url):
self.url = url
self.client = None
self.node = []
self.featureName = None
self.feaDict = {}
self.connected = False
self.reconnect_thread = None
self.running = False
def connect(self):
try:
self.client = Client(self.url, 5)
self.client.connect()
self.connected = True
except Exception as e:
self.connected = False
print(f"Connection failed: {e}")
def disconnect(self):
if self.client:
self.client.disconnect()
self.connected = False
print("Disconnected from server")
self.client = None
def get_node_and_feature_from_opcuacfg(self, config):
for cfg in config:
ns = cfg["node_ns"]
s = cfg["node_s"]
for rm in cfg['linkrm']:
self.node.append(f'ns={ns};{s}={rm}')
self.featureName = cfg['feature']
self.start_reconnect_thread()
def read_node_value(self, node_id: str) -> any:
try:
node = self.client.get_node(node_id)
value = node.read_value()
return value
except Exception as e:
print(f"Error reading node {node_id}: {e}")
return None
def write_node_value(self, node_id: str, value: any):
try:
node = self.client.get_node(node_id)
node.write_value(value)
except Exception as e:
print(f"Error writing to node {node_id}: {e}")
def get_opcua_data(self):
if self.client:
for index, s_node in enumerate(self.node):
self.feaDict[self.featureName[index]] = self.read_node_value(s_node)
def get_opcua_rm_data(self):
if self.client:
return self.feaDict
return []
def start_reconnect_thread(self):
self.running = True
self.reconnect_thread = threading.Thread(target=self.reconnect_loop)
self.reconnect_thread.daemon = True
self.reconnect_thread.start()
def stop_reconnect_thread(self):
self.running = False
if self.reconnect_thread:
self.reconnect_thread.join()
self.reconnect_thread = None
def reconnect_loop(self):
while self.running:
if not self.connected:
self.connect()
time.sleep(60)
def __del__(self):
self.stop_reconnect_thread()
self.disconnect()
\ No newline at end of file
[Unit]
Description=witium pywtlink gateway
Requires=network-online.target
After=network.target network-online.target syslog.target
Wants=network.target
[Service]
Type=simple
TimeoutStartUSec=10000000
NotifyAccess=main
ExecStart=python3 /home/linaro/pywtlink/main.py
ExecReload=/bin/kill -HUP $MAINPID
Restart=always
[Install]
WantedBy=multi-user.target
\ No newline at end of file
typing-extensions==4.7.1
modbus_tk==1.1.3
paho_mqtt==2.0.0
pyserial==3.5
deb https://mirrors.ustc.edu.cn/debian buster main contrib non-free
deb-src https://mirrors.ustc.edu.cn/debian buster main contrib non-free
deb https://mirrors.ustc.edu.cn/debian-security buster/updates main contrib non-free
deb-src https://mirrors.ustc.edu.cn/debian-security buster/updates main contrib non-free
deb https://mirrors.ustc.edu.cn/debian buster-updates main contrib non-free
deb-src https://mirrors.ustc.edu.cn/debian buster-updates main contrib non-free
\ No newline at end of file
altgraph
APScheduler
backports.zoneinfo
certifi
chardet
click
Cython
Flask
Flask-Cors
gevent
greenlet
httplib2
idna
importlib-metadata
iotop
itsdangerous
Jinja2
#lightdm-gtk-greeter-settings
MarkupSafe
modbus-tk
numpy
#onboard
packaging
paho-mqtt
pandas
pycairo
PyGObject
pyinstaller
pyinstaller-hooks-contrib
pyparsing
pyserial
python-dateutil
pytz
requests
scipy
six
ssh-import-id
typing_extensions
tzlocal
urllib3
Werkzeug
zipp
zope.event
zope.interface
typing-extensions
modbus_tk
paho_mqtt
pyserial
asyncua
import os
import subprocess
import sys
executable_path = sys.argv[0]
path = os.path.dirname(os.path.abspath(executable_path))
resourcesPath = path + os.sep
class i2c_gpio:
def __init__(self):
self._read_path = resourcesPath + "read_gpio.sh"
self._write_path = resourcesPath + "write_gpio.sh"
def read_io(self, pin):
try:
command = f"sudo {self._read_path} {pin}"
data = subprocess.check_output(command, shell=True, stderr=subprocess.STDOUT,timeout=5)
print(data)
return int(data[-2])
except subprocess.CalledProcessError as e:
print(e)
return None
def write_io(self, pin, level):
try:
command = f"sudo {self._write_path} {pin} {level}"
subprocess.check_output(command, shell=True, stderr=subprocess.STDOUT,timeout=5)
except subprocess.CalledProcessError as e:
print(e)
return None
\ No newline at end of file
from i2c_gpio import i2c_gpio
if __name__ == '__main__':
i2c_io = i2c_gpio()
i2c_io.write_io(447, 1)
i2c_io.write_io(448, 1)
i2c_io.write_io(449, 1)
i2c_io.write_io(450, 1)
i2c_io.write_io(451, 1)
i2c_io.write_io(452, 1)
i2c_io.write_io(453, 1)
i2c_io.write_io(454, 1)
i2c_io.write_io(455, 1)
i2c_io.write_io(456, 1)
i2c_io.write_io(457, 1)
i2c_io.write_io(459, 1)
i2c_io.write_io(460, 1)
i2c_io.write_io(461, 1)
i2c_io.write_io(462, 1)
\ No newline at end of file
from i2c_gpio import i2c_gpio
if __name__ == '__main__':
i2c_io = i2c_gpio()
i2c_io.write_io(447, 0)
i2c_io.write_io(448, 0)
i2c_io.write_io(449, 0)
i2c_io.write_io(450, 0)
i2c_io.write_io(451, 0)
i2c_io.write_io(452, 0)
i2c_io.write_io(453, 0)
i2c_io.write_io(454, 0)
i2c_io.write_io(455, 0)
i2c_io.write_io(456, 0)
i2c_io.write_io(457, 0)
i2c_io.write_io(459, 0)
i2c_io.write_io(460, 0)
i2c_io.write_io(461, 0)
i2c_io.write_io(462, 0)
#!/bin/bash
PORT_NUM=$1
# export gpio if needed
if [ ! -e /sys/class/gpio/gpio${PORT_NUM} ]; then
echo ${PORT_NUM} > /sys/class/gpio/export
fi
# set gpio as in if needed
direction=$(cat /sys/class/gpio/gpio${PORT_NUM}/direction)
if [ "$direction" != "in" ]; then
echo "GPIO ${PORT_NUM} is not in mode, but $direction mode"
echo "Set GPIO ${PORT_NUM} as in mode..."
echo in > /sys/class/gpio/gpio${PORT_NUM}/direction
echo "GPIO ${PORT_NUM} has been set as in mode."
else
echo "GPIO ${PORT_NUM} is in mode now."
fi
cat /sys/class/gpio/gpio${PORT_NUM}/value
\ No newline at end of file
#!/bin/bash
PORT_NUM=$1
PORT_LEVEL=$2
# export gpio if needed
if [ ! -e /sys/class/gpio/gpio${PORT_NUM} ]; then
echo ${PORT_NUM} > /sys/class/gpio/export
fi
# set gpio as out if needed
direction=$(cat /sys/class/gpio/gpio${PORT_NUM}/direction)
if [ "$direction" != "out" ]; then
echo "GPIO ${PORT_NUM} is not out mode, but $direction mode"
echo "Set GPIO ${PORT_NUM} as out mode..."
echo out > /sys/class/gpio/gpio${PORT_NUM}/direction
echo "GPIO ${PORT_NUM} has been set as out mode."
else
echo "GPIO ${PORT_NUM} is out mode now."
fi
echo ${PORT_LEVEL} > /sys/class/gpio/gpio${PORT_NUM}/value
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <sys/select.h>
#include <time.h>
#include <sys/time.h>
#define GPIO_PATH "/sys/class/gpio"
void export_gpio(int pin) {
int fd;
char buf[64];
fd = open(GPIO_PATH "/export", O_WRONLY);
if (fd < 0) {
printf("Failed to open export for writing");
}
snprintf(buf, sizeof(buf), "%d", pin);
if (write(fd, buf, strlen(buf)) < 0) {
printf("Failed to export gpio");
close(fd);
}
close(fd);
}
void unexport_gpio(int pin) {
int fd;
char buf[64];
fd = open(GPIO_PATH "/unexport", O_WRONLY);
if (fd < 0) {
printf("Failed to open unexport for writing");
}
snprintf(buf, sizeof(buf), "%d", pin);
if (write(fd, buf, strlen(buf)) < 0) {
printf("Failed to unexport gpio");
close(fd);
}
close(fd);
}
void set_gpio_direction(int pin, const char *direction) {
int fd;
char buf[64];
snprintf(buf, sizeof(buf), GPIO_PATH "/gpio%d/direction", pin);
fd = open(buf, O_WRONLY);
if (fd < 0) {
printf("Failed to open gpio direction for writing");
}
if (write(fd, direction, strlen(direction)) < 0) {
printf("Failed to set direction");
close(fd);
}
close(fd);
}
void set_gpio_edge(int pin, const char *edge) {
int fd;
char buf[64];
snprintf(buf, sizeof(buf), GPIO_PATH "/gpio%d/edge", pin);
fd = open(buf, O_WRONLY);
if (fd < 0) {
printf("Failed to open gpio edge for writing");
}
if (write(fd, edge, strlen(edge)) < 0) {
printf("Failed to set edge");
close(fd);
}
close(fd);
}
int main(int argc, char *argv[])
{
int fd;
char buf[64];
fd_set set;
struct timeval tv;
// 检查参数数量
if (argc < 2) {
printf("Usage: %s <argument1> ...\n", argv[0]);
return 1;
}
printf("Program name: %s\n", argv[0]);
for (int i = 1; i < argc; i++) {
printf("Argument %d: %s\n", i, argv[i]);
}
int gpio_pin = atoi(argv[1]);
printf("test interrupt gpio pin %d\n", gpio_pin);
unexport_gpio(gpio_pin);
// export GPIO
export_gpio(gpio_pin);
// Set GPIO as input mode
set_gpio_direction(gpio_pin, "in");
// Set GPIO interrupt as falling mode
set_gpio_edge(gpio_pin, "falling");
// Open GPIO value file
snprintf(buf, sizeof(buf), "/sys/class/gpio/gpio%d/value", gpio_pin);
fd = open(buf, O_RDONLY);
if (fd < 0) {
printf("Failed to open gpio value file");
return EXIT_FAILURE;
}
// First read, clear buf
read(fd, buf, sizeof(buf));
printf("GPIO[%d] start monitoring\n", gpio_pin);
while (1) {
FD_ZERO(&set);
FD_SET(fd, &set);
// Wait interrupt
int ret = select(fd + 1, NULL, NULL, &set, NULL);
if (ret < 0) {
printf("select failed");
close(fd);
unexport_gpio(gpio_pin);
return EXIT_FAILURE;
} else if (ret == 0) {
printf("Timeout occurred! No GPIO interrupt.\n");
} else {
if (FD_ISSET(fd, &set)) {
// Read GPIO value
lseek(fd, 0, SEEK_SET);
read(fd, buf, sizeof(buf));
// 获取当前时间
gettimeofday(&tv, NULL);
// 打印时间戳
printf("Current time: %ld seconds, %ld microseconds\n", tv.tv_sec, tv.tv_usec);
// printf("GPIO interrupt occurred, value: %c\n", buf[0]);
}
}
}
close(fd);
unexport_gpio(gpio_pin);
return EXIT_SUCCESS;
}
import struct
import sys
import time
import modbus_tk.defines as cst
from module.wtserial import wtserial
from i2c_io.i2c_gpio import i2c_gpio
from modbus.wtmodbus import wtmodbus
from module.wtserial import wtserial
_operate = sys.argv[1]
_addr = sys.argv[2]
updataAddr = int(_addr)
sensorDict = {
1: {'comNum': '/dev/ttyS2', 'wakeup': 495},
2: {'comNum': '/dev/ttyS3', 'wakeup': 496},
3: {'comNum': '/dev/ttyS4', 'wakeup': 497},
4: {'comNum': '/dev/ttyS9', 'wakeup': 498},
5: {'comNum': '/dev/ttyS7', 'wakeup': 499},
6: {'comNum': '/dev/ttyS5', 'wakeup': 500},
7: {'comNum': '/dev/ttyS5', 'wakeup': 501},
8: {'comNum': '/dev/ttyS5', 'wakeup': 502},
9: {'comNum': '/dev/ttyS5', 'wakeup': 503},
10: {'comNum': '/dev/ttyS5', 'wakeup': 504},
11: {'comNum': '/dev/ttyS5', 'wakeup': 505},
12: {'comNum': '/dev/ttyS5', 'wakeup': 506}
}
updating_com = sensorDict[updataAddr]["comNum"]
wakeup_gpio_num = sensorDict[updataAddr]["wakeup"] # 495~506
updating_img = 'WTS4V3C_v2.62.bin'
update_data = None
def update(self, bin_data, per_package_size=120, version="2.99"):
if self._wakeup_callback is not None:
self._wakeup_callback(str(self._wakeup), '1')
time.sleep(0.01)
total_bin_size = len(bin_data)
rest_size = total_bin_size
update_index = 0
update_addr = 0x5000
update_index_addr = 0x5078
update_pwd_addr = 0x5079
self.__stop_update = False
try:
if int(version[0]) >= 2 and int(version[2]) >= 4 and int(version[3]) >= 1:
check_sum = 0
for each in bin_data:
check_sum += each
check_sum = check_sum & 0xFF
data = check_sum << 8
self._master.execute(self._slave_id, cst.WRITE_MULTIPLE_REGISTERS,0x5080,
output_value=[data])
# write index to 0
self._master.execute(self._slave_id, cst.WRITE_MULTIPLE_REGISTERS, update_index_addr,
output_value=[0])
while rest_size:
if self.__stop_update:
return False
if rest_size >= per_package_size * 2:
update_data = struct.unpack('<' + 'H' * per_package_size,
bin_data[update_index * per_package_size * 2:
(update_index + 1) * per_package_size * 2])
rest_size -= per_package_size * 2
else:
# 补全
update_data = struct.unpack('<' + 'H' * per_package_size,
bin_data[update_index * per_package_size * 2:
(update_index + 1) * per_package_size * 2] +
b'\x00' * (per_package_size * 2 - rest_size))
rest_size = 0
self._master.execute(self._slave_id, cst.WRITE_MULTIPLE_REGISTERS, update_addr,
output_value=update_data)
update_index += 1
# read index
try_cnts = 10000
while try_cnts:
index = self._master.execute(self._slave_id, cst.READ_HOLDING_REGISTERS,
update_index_addr, 1)
if index[0] == update_index:
break
try_cnts -= 1
if try_cnts == 0:
if self._wakeup_callback is not None:
self._wakeup_callback(str(self._wakeup), '0')
time.sleep(0.01)
return False
password_data = struct.unpack('>HHH', b'WITIUM')
self._master.execute(self._slave_id, cst.WRITE_MULTIPLE_REGISTERS, update_pwd_addr,
output_value=password_data)
if self._wakeup_callback is not None:
self._wakeup_callback(str(self._wakeup), '0')
time.sleep(0.01)
return True
except Exception as exp:
print("update error:{}".format(exp))
if self._wakeup_callback is not None:
self._wakeup_callback(str(self._wakeup), '0')
time.sleep(0.1)
return False
def main():
if _operate == 'update':
print("Update Sensor Start.")
print("addr:" ,updataAddr)
iic_gpio = i2c_gpio()
#iic_gpio.write_gpio(str(wakeup_gpio_num), "1")
#time.sleep(15)
try:
with open(updating_img, 'rb') as files:
update_data = files.read()
except IOError as e:
print(f"Exception occurred: {e}")
print("Read File Ok.")
ser = wtserial(str(updating_com), 115200)
smodbus = wtmodbus(None, None, None, 1, ser, iic_gpio.write_gpio)
sensor_info = smodbus.get_sensor_base_parameter()
print(sensor_info)
print("Start Update.")
smodbus.update(update_data)
iic_gpio.write_gpio(str(wakeup_gpio_num), "0")
print("Update Finished.")
elif _operate == 'read':
print(f"Read addr {updataAddr} <START>")
iic_gpio = i2c_gpio()
iic_gpio.write_gpio(str(wakeup_gpio_num), "1")
ser = wtserial(str(updating_com), 115200)
smodbus = wtmodbus(None, None, None, 1, ser, iic_gpio.write_gpio)
time.sleep(1)
sensor_info = smodbus.get_sensor_base_parameter()
print(sensor_info)
print("Start Update.")
iic_gpio.write_gpio(str(wakeup_gpio_num), "0")
print(f"Read addr {updataAddr} <END>")
if __name__ == "__main__":
main()
exit(0)
\ No newline at end of file
[{
"mqtt":{
"broker":"116.62.127.242",
"port": 1883,
"user_name": "witcd",
"password": "Witium37774020"
},
"rawInterval": 300,
"featureInterval": 300,
"storeInterval": 300,
"Topic":"WT",
"Type":"WTG6XXF",
"SN": "24010534",
"reducerNo": "RN747624499620753408",
"reducerName":"自吸泵",
"version" :"1.00",
"grippers" : false,
"trigger" : "falling",
"grippers_io": "467",
"grippers_cnt": 3,
"timeout": 60,
"db_status": false,
"phy_socket_4g":true,
"phy_socket_eth":false,
"sensor": [
{
"comNum": "/dev/ttyS2",
"addr": 1,
"mode": 0,
"wakeup": 495,
"SN" : 22122023
},
{
"comNum": "/dev/ttyS3",
"addr": 2,
"mode": 0,
"wakeup": 496,
"SN" : 22122023
},
{
"comNum": "/dev/ttyS4",
"addr": 3,
"mode": 0,
"wakeup": 497,
"SN" : 22122023
},
{
"comNum": "/dev/ttyS9",
"addr": 4,
"mode": 0,
"wakeup": 498,
"SN" : 22122023
},
{
"comNum": "/dev/ttyS7",
"addr": 5,
"mode": 0,
"wakeup": 499,
"SN" : 22122023
},
{
"comNum": "/dev/ttyS5",
"addr": 6,
"mode": 1,
"wakeup": 500,
"SN" : 22122023
},
{
"comNum": "/dev/ttyS5",
"addr": 7,
"mode": 1,
"wakeup": 501,
"SN" : 22122023
},
{
"comNum": "/dev/ttyS5",
"addr": 8,
"mode": 1,
"wakeup": 502,
"SN" : 22122023
},
{
"comNum": "/dev/ttyS5",
"addr": 9,
"mode": 1,
"wakeup": 503,
"SN" : 22122023
},
{
"comNum": "/dev/ttyS5",
"addr": 10,
"mode": 1,
"wakeup": 504,
"SN" : 22122023
},
{
"comNum": "/dev/ttyS5",
"addr": 11,
"mode": 1,
"wakeup": 505,
"SN" : 22122023
},
{
"comNum": "/dev/ttyS5",
"addr": 12,
"mode": 1,
"wakeup": 506,
"SN" : 22122023
}
],
"workSendData": {
"enable": true,
"WorkTh": 1
},
"mqttEnable": true,
"remoteEnable": true,
"rawDataStoreFlag":false
}
]
import ctypes
import json
import os
import sys
import threading
import time
import platform
from module.timer import timer
from module.I2CDevice import I2CDevice
from module.smqtt import smqtt
executable_path = sys.argv[0]
path = os.path.dirname(os.path.abspath(executable_path))
dev_sys = platform.system()
resourcesPath = path + os.sep
IDLE = 0
COLLECT_NORNAL = 1
COMPLETE = 2
def main_run():
global high_enable, wtlink_enable
global sample_num, sleep_ms
global opdata_key
global scr_data
wttimer = timer(24 * 3600, 300)
status = IDLE
complete = False
timesample = None
opdata_buf = [{}]
while True:
if status == IDLE:
if wttimer.getSendCheckInFlag() is True:
mqtt.publish(checkinpub, [{"addr":0,"version":"1.02","hardware":"607"}])
mqtt.publish(opcheckinpub, [{"addr":0,"version":"1.02","hardware":"607"}])
wttimer.setSendCheckInFlag(False)
if high_enable:
# I2CDevice._collect_event.set()
dev:I2CDevice
for dev in high_device:
dev.start_sampling(sleep_ms * 1000, sample_num)
status = COLLECT_NORNAL
if status == COLLECT_NORNAL:
timesample = round(time.time())
# 读工况数据
if wtlink_enable:
dev:I2CDevice
for index, dev in enumerate(wtlink_device):
data = ctypes.c_short(dev.read_reg(0x01)).value # 读取寄存器
ma4to20 = round((data * 2.5) / 3900.0 , 3)
opdata_buf[0][opdata_key[index]] = ma4to20
print("读取工况数据完成")
complete = True
# 高频数据
if high_enable:
thread_complete = False
while thread_complete is not True:
dev:I2CDevice
complete_cnt = 0
for dev in high_device:
if dev.is_sampling_complete() is True:
complete_cnt = complete_cnt + 1
if complete_cnt == len(high_device):
thread_complete = True
time.sleep(1)
print("wave数据读取完成")
complete = True
if status == COMPLETE:
if complete is True:
complete = False
if wttimer.getSendWaveFlag is True:
if wtlink_enable:
opdata_buf[0]['addr'] = "1"
opdata_buf[0]['ts'] = timesample
mqtt.publish(opdatapub, opdata_buf)
if high_enable:
dev:I2CDevice
for dev in high_device:
send = {'x': [], 'y': [], 'z': []}
wave_data = dev.get_c_data()
for item in wave_data:
value, timestamps = item
current_data = ctypes.c_short(value).value # 读取寄存器
send['x'].append(round((current_data * 2.5) / 3900.0 , 3))
sample_data = [{
"addr": dev.addr,
"id": topic_id,
"ts": timesample,
"x": send['x'],
"y": send['y'],
"z": send['z']
}]
mqtt.publish(datapub, sample_data)
topic_id = topic_id + 1
wttimer.setSendWaveFlag()
scr_data['topic_id'] = topic_id
with open(resourcesPath + 'file/config.json' ,"w") as f:
json.dump(scr_data, f)
if __name__ == "__main__":
topic = 'WT'
scr_data = None
high_enable = False
wtlink_enable = False
with open(resourcesPath + 'file/config.json', 'rb') as load_f:
context = load_f.read()
scr_data = json.loads(context)
mqtt_info = scr_data.get("mqtt_info", {'broker': 'www.witium.com.cn', 'port': 12883, 'user': 'witcd','password': 'Witium37774020'})
gw_type = scr_data.get("gw_type", "WTG936F")
sn = scr_data.get("sn", "20240621")
period = scr_data.get("period", 300)
opdata_address = scr_data.get("iic_opdata_addr", [])
opdata_key = scr_data.get("opdata_key", [])
shift_address = scr_data.get("iic_shift_addr", [])
sleep_ms = scr_data.get("sleep_ms", 5)
sample_num = scr_data.get("sample_num", 16384)
topic_id = scr_data.get("topic_id", 0)
if len(shift_address) != 0:
high_enable = True
if len(opdata_address) != 0:
wtlink_enable = True
checkinpub = '%s/%s/%s/CheckIn' %(topic, gw_type, sn)
opcheckinpub = '%s/WTG93RF/%sW/CheckIn' %(topic, sn)
datapub = '%s/%s/%s/Data' %(topic, gw_type, sn)
opdatapub = '%s/WTG93RF/%sW/opData' %(topic, sn)
high_device = []
wtlink_device = []
if high_enable:
dev:I2CDevice
for index, addr in enumerate(shift_address):
dev = I2CDevice(index + 1, addr)
# threading.Thread(target=dev.collect_task, args=(int(sleep_ms * 1000), int(sample_num), )).start()
dev.write_reg(0x00, 0x4005)
dev.write_reg(0x05, 0x2000)
cfg_reg_value = dev.read_reg(0x00)
cal_reg_value = dev.read_reg(0x05)
print(f'addr {index + 1} 配置寄存器0x00: {cfg_reg_value}')
print(f'addr {index + 1} 校准寄存器0x05: {cal_reg_value}')
high_device.append(dev)
del dev
if wtlink_enable:
dev:I2CDevice
for index, addr in enumerate(opdata_address):
dev = I2CDevice(index + 1, addr)
threading.Thread(target=dev.collect_task, args=(int(sleep_ms * 1000), int(sample_num), )).start()
dev.write_reg(0x00, 0x4005)
dev.write_reg(0x05, 0x2000)
cfg_reg_value = dev.read_reg(0x00)
cal_reg_value = dev.read_reg(0x05)
print(f'addr {addr} 配置寄存器0x00: {cfg_reg_value}')
print(f'addr {addr} 校准寄存器0x05: {cal_reg_value}')
high_device.append(dev)
del dev
mqtt = smqtt(mqtt_info, None, None)
mqtt.open_mqtt()
threading.Thread(target=mqtt.internet_task).start()
time.sleep(2)
main_process = threading.Thread(target=main_run, ).start()
time.sleep(300)
607 whl 生成
cd python_c
python3 setup.py bdist_wheel
\ No newline at end of file
{"mqtt_info": {"broker": "116.62.127.242", "port": 1883, "user": "witcd", "password": "Witium37774020"}, "gw_type": "WTG936F", "sn": "24010536", "period": 30, "iic_opdata_addr": [79], "opdata_key": ["a"], "iic_shift_addr": [], "sleep_ms": 5, "sample_num": 512, "topic_id": 134}
\ No newline at end of file
import ctypes
import json
import os
import sys
import threading
import time
import platform
from module.timer import timer
from module.I2CDevice import I2CDevice
from module.smqtt import smqtt
executable_path = sys.argv[0]
path = os.path.dirname(os.path.abspath(executable_path))
dev_sys = platform.system()
resourcesPath = path + os.sep
IDLE = 0
COLLECT_NORNAL = 1
COMPLETE = 2
def main_run():
global high_enable, wtlink_enable
global sample_num, sleep_ms
global opdata_key
global scr_data
global topic_id
wttimer = timer(24 * 3600, period)
status = IDLE
complete = False
timesample = None
opdata_buf = [{}]
while True:
if status == IDLE:
if wttimer.getSendCheckInFlag() is True:
mqtt.publish(checkinpub, [{"addr":0,"version":"1.02","hardware":"607"}])
mqtt.publish(opcheckinpub, [{"addr":0,"version":"1.02","hardware":"607"}])
wttimer.setSendCheckInFlag(False)
if high_enable:
# I2CDevice._collect_event.set()
print("开始采集")
dev:I2CDevice
for dev in high_device:
dev.start_sampling(sleep_ms * 1000, sample_num)
status = COLLECT_NORNAL
if status == COLLECT_NORNAL:
timesample = round(time.time())
# 读工况数据
if wtlink_enable:
dev:I2CDevice
for index, dev in enumerate(wtlink_device):
data = ctypes.c_short(dev.read_reg(0x01)).value # 读取寄存器
ma4to20 = round((data * 2.5) / 3900.0 , 3)
opdata_buf[0][opdata_key[index]] = ma4to20
complete = True
# 高频数据
if high_enable:
thread_complete = False
while thread_complete is not True:
dev:I2CDevice
complete_cnt = 0
for dev in high_device:
if dev.is_sampling_complete() is True:
complete_cnt = complete_cnt + 1
if complete_cnt == len(high_device):
thread_complete = True
time.sleep(1)
complete = True
print("读取完成")
status = COMPLETE
if status == COMPLETE:
if complete is True:
complete = False
if wttimer.getSendWaveFlag() is True:
if wtlink_enable:
opdata_buf[0]['addr'] = "1"
opdata_buf[0]['ts'] = timesample
mqtt.publish(opdatapub, opdata_buf)
print(f"=1+++={opdata_buf}")
if high_enable:
dev:I2CDevice
for dev in high_device:
send = {'x': [], 'y': [], 'z': []}
wave_data = dev.get_c_data()
for item in wave_data:
value, timestamps = item
current_data = ctypes.c_short(value).value # 读取寄存器
send['x'].append(round((current_data * 2.5) / 3900.0 , 3))
sample_data = [{
"addr": dev.addr,
"id": topic_id,
"ts": timesample,
"x": send['x'],
"y": send['y'],
"z": send['z']
}]
print(f"=1+++={opdata_buf}")
mqtt.publish(datapub, sample_data)
topic_id = topic_id + 1
wttimer.setSendWaveFlag(False)
scr_data['topic_id'] = topic_id
with open(resourcesPath + 'file/config.json' ,"w") as f:
json.dump(scr_data, f)
print("上发完成")
status = IDLE
if __name__ == "__main__":
topic = 'WT'
scr_data = None
high_enable = False
wtlink_enable = False
with open(resourcesPath + 'file/config.json', 'rb') as load_f:
context = load_f.read()
scr_data = json.loads(context)
mqtt_info = scr_data.get("mqtt_info", {'broker': 'www.witium.com.cn', 'port': 12883, 'user': 'witcd','password': 'Witium37774020'})
gw_type = scr_data.get("gw_type", "WTG936F")
sn = scr_data.get("sn", "20240621")
period = scr_data.get("period", 300)
opdata_address = scr_data.get("iic_opdata_addr", [])
opdata_key = scr_data.get("opdata_key", [])
shift_address = scr_data.get("iic_shift_addr", [])
sleep_ms = scr_data.get("sleep_ms", 5)
sample_num = scr_data.get("sample_num", 16384)
topic_id = scr_data.get("topic_id", 0)
if len(shift_address) != 0:
high_enable = True
if len(opdata_address) != 0:
wtlink_enable = True
checkinpub = '%s/%s/%s/CheckIn' %(topic, gw_type, sn)
opcheckinpub = '%s/WTG93RF/%sW/CheckIn' %(topic, sn)
datapub = '%s/%s/%s/Data' %(topic, gw_type, sn)
opdatapub = '%s/WTG93RF/%sW/opData' %(topic, sn)
high_device = []
wtlink_device = []
if high_enable:
dev:I2CDevice
for index, addr in enumerate(shift_address):
dev = I2CDevice(index + 1, addr)
# threading.Thread(target=dev.collect_task, args=(int(sleep_ms * 1000), int(sample_num), )).start()
dev.write_reg(0x00, 0x4005)
dev.write_reg(0x05, 0x2000)
cfg_reg_value = dev.read_reg(0x00)
cal_reg_value = dev.read_reg(0x05)
print(f'addr {index + 1} 配置寄存器0x00: {cfg_reg_value}')
print(f'addr {index + 1} 校准寄存器0x05: {cal_reg_value}')
high_device.append(dev)
del dev
if wtlink_enable:
dev:I2CDevice
for index, addr in enumerate(opdata_address):
dev = I2CDevice(index + 1, addr)
threading.Thread(target=dev.collect_task, args=(int(sleep_ms * 1000), int(sample_num), )).start()
dev.write_reg(0x00, 0x4005)
dev.write_reg(0x05, 0x2000)
cfg_reg_value = dev.read_reg(0x00)
cal_reg_value = dev.read_reg(0x05)
print(f'addr {addr} 配置寄存器0x00: {cfg_reg_value}')
print(f'addr {addr} 校准寄存器0x05: {cal_reg_value}')
wtlink_device.append(dev)
del dev
mqtt = smqtt(mqtt_info, None, None)
mqtt.open_mqtt()
threading.Thread(target=mqtt.internet_task).start()
time.sleep(2)
main_process = threading.Thread(target=main_run, ).start()
time.sleep(300)
import ctypes
import threading
import time
import wtina226
class I2CDevice:
_collect_event = threading.Event()
def __init__(self, addr, address):
self.device = wtina226.I2CHandle(address)
self.data = {f'{addr}_time':[], f'{addr}_data':[]}
self.complete = None
self.addr = str(addr)
def __enter__(self):
return self.device
def __exit__(self, exc_type, exc_value, traceback):
self.device = None
def write_reg(self, reg, value):
return self.device.write_reg(reg, value)
def read_reg(self, reg):
return self.device.read_reg(reg)
def start_sampling(self, sample_interval_us, sample_count):
return self.device.start_sampling(sample_interval_us, sample_count)
def is_sampling_complete(self):
return self.device.is_sampling_complete()
def get_c_data(self):
return self.device.get_data()
def collect_task(self, sampling_interval, num_reads):
while True:
I2CDevice._collect_event.wait()
print(f'addr {self.addr}: 开始采集')
self.complete = False
self.start_sampling(sampling_interval, num_reads)
while not self.is_sampling_complete():
time.sleep(1)
data = self.get_c_data()
for item in data:
value, timestamp = item
current_data = ctypes.c_short(value).value # 读取寄存器
self.data[f'{self.addr}_data'].append(round((current_data * 2.5) / 3900.0 , 3))
self.data[f'{self.addr}_time'].append(timestamp)
self.complete = True
I2CDevice._collect_event.clear()
def get_data(self):
return self.data
def set_data(self):
del self.data
self.data = {f'{self.addr}_time':[], f'{self.addr}_data':[]}
self.complete = False
\ No newline at end of file
import platform
dev_sys = platform.system()
if dev_sys != "Windows":
import iic_gpio
class i2c_gpio:
def __init__(self):
if dev_sys != "Windows":
self._iic_gpio = iic_gpio
def init_isr_gpio(self, io, mode):
if dev_sys != "Windows":
iic_gpio.init_isr_gpio(str(io), str(mode))
def set_callback(self, callback=None):
if dev_sys != "Windows":
iic_gpio.set_callback(callback)
def wait_interrupt(self, io):
if dev_sys != "Windows":
iic_gpio.wait_interrupt(str(io))
def set_collect_flag(self):
if dev_sys != "Windows":
iic_gpio.set_collect_flag()
def get_timestamps(self):
if dev_sys != "Windows":
return iic_gpio.get_timestamps()
return 0, None
def write_gpio(self, io, level):
if dev_sys != "Windows":
iic_gpio.write_gpio(str(io), str(level))
\ No newline at end of file
#!usr/bin/python3
import json
import shlex
import socket
import threading
import time
import paho.mqtt.client as mqtt
import subprocess
import os
class smqtt:
def __init__(self, mqtt_info, callback = None, led_callback = None):
self._topic_enable = False
self._topic = []
self._mqtt_enable = False
self._mqtt_info = mqtt_info
self._client = None
self._start_flag = False
self._rcv_event = threading.Event()
self._data = None
self._updata_data = None
self._callback = callback
self._led_callback = led_callback
self._connect_status = False
def open_mqtt(self):
# open mqtt
try:
if 'user' in self._mqtt_info and 'password' in self._mqtt_info \
and 'broker' in self._mqtt_info and 'port' in self._mqtt_info:
self._client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1)
# self._client.username_pw_set(self._mqtt_info['user'], self._mqtt_info['password'])
self._client.on_connect = self.on_connect
self._client.on_message = self.on_message
# self._client.connect(self._mqtt_info['broker'], self._mqtt_info['port'])
self._mqtt_enable = True
else:
self._mqtt_enable = False
except Exception as exp:
self._mqtt_enable = False
print(exp)
def set_add_topic(self, topic):
self._topic_enable = True
self._topic.append(topic)
def get_add_topic(self):
if self._topic_enable:
return self._topic
else:
return None
def clear_topic(self):
del self._topic
self._topic = []
self._topic_enable = False
def on_connect(self, client, userdata, flags, rc):
if self._topic_enable and self._mqtt_enable:
for index in self._topic:
self._client.subscribe(index)
self._start_flag = True
def on_message(self, client, userdata, msg):
try:
if self._topic_enable and self._mqtt_enable:
data = str(msg.payload, encoding="utf-8")
data = eval(data)
self._callback(msg.topic, data)
except Exception as exp:
print(exp)
def publish(self, topic, data):
try:
if self._client._state != 2 and self._mqtt_enable and self._connect_status is True:
self._client.publish(topic, json.dumps(data, separators=(',', ':')), qos=1)
except Exception as exp:
print(exp)
def set_rcv_event(self):
if self._rcv_event is not None:
self._rcv_event.set()
def wait_rcv_event(self):
if self._rcv_event is not None:
self._rcv_event.wait()
def clear_rcv_event(self):
if self._rcv_event is not None:
self._rcv_event.clear()
def get_rcv_data(self):
return self._updata_data
def loop_task(self):
try:
if self._mqtt_enable:
self._client.loop_start()
except Exception as exp:
print(exp)
def del_task(self):
try:
if self._mqtt_enable:
self._client.loop_stop()
except Exception as exp:
print(exp)
def internet_task(self):
while True:
try:
servers = [(self._mqtt_info['broker'], self._mqtt_info['port'])]
r = self.test_connections_to_servers(servers)
if r is False:
if self._connect_status is True:
self._client.disconnect()
self.del_task()
self._connect_status = False
else:
if self._connect_status is False:
self._client.username_pw_set(self._mqtt_info['user'], self._mqtt_info['password'])
self._client.connect(self._mqtt_info['broker'], self._mqtt_info['port'])
if self._topic_enable and self._mqtt_enable:
for index in self._topic:
self._client.subscribe(index)
self._connect_status = True
self.loop_task()
except Exception as exp:
# 超时会进入这里
if self._connect_status is True:
self._client.disconnect()
self.del_task()
self._connect_status = False
print(exp)
time.sleep(60)
def test_tcp_connection(self, host='8.8.8.8', port=53):
try:
cnt = socket.create_connection((host, port), timeout=5)
cnt.close()
return True
except Exception as exp:
print(exp)
return False
def test_connections_to_servers(self, servers=[('116.62.127.242', 1883), ('114.114.114.114', 53)]):
for server, port in servers:
if self.test_tcp_connection(server, port):
return True
return False
import threading
class timer:
def __init__(self, checkIn : int, period : int) -> None:
self._checkIn = checkIn
self._period = period
self._bSendCheckIn = True
self._bSendWave = True
def checkin_timer(self):
self._bSendCheckIn = True
def period_timer(self):
self._bSendWave = True
def getSendCheckInFlag(self):
return self._bSendCheckIn
def getSendWaveFlag(self):
return self._bSendWave
def setSendCheckInFlag(self, status):
self._bSendCheckIn = status
threading.Timer(self._checkIn, self.checkin_timer).start()
def setSendWaveFlag(self, status):
self._bSendWave = status
threading.Timer(self._period, self.period_timer).start()
\ No newline at end of file
from distutils.core import setup, Extension
from setuptools import setup, Extension
from wheel.bdist_wheel import bdist_wheel
import os
# 定义你的C扩展模块
extension = Extension(
'wtina226',
sources=['wtina226.c'],
extra_compile_args=['-g'],
extra_link_args=['-g']
)
# 定义setup函数
def setup_package():
setup(
name='wtina226',
version='1.0.1',
description='607 iic 3 control',
author='maplePan',
author_email='suwan.pan@witium.com',
license='MIT',
#packages=['wtina226'],
ext_modules=[extension],
python_requires='>=3.7',
classifiers=[
'Development Status :: 3 - Alpha',
'Intended Audience :: Developers',
'License :: OSI Approved :: MIT License',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
],
)
# 确保在非sdist模式下运行bdist_wheel
if __name__ == '__main__':
os.system('rm -rf build')
os.system('rm -rf dist')
os.system('rm -rf *.egg-info')
setup_package()
This diff is collapsed.
Metadata-Version: 2.1
Name: wtina226
Version: 1.0.1
Summary: 607 iic 3 control
Author: maplePan
Author-email: suwan.pan@witium.com
License: MIT
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Requires-Python: >=3.7
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment