Commit 8ee22668 authored by aohui.li's avatar aohui.li

优化升级的验证字段

parent 282350fa
''' '''
@Modified: 2024-5-30 @Modified: 2024-5-30
@Brief: Update Image Safely @Brief: Update Image Safely
''' '''
# !usr/bin/python3 # !usr/bin/python3
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
import os import os
import threading import threading
import json import json
import time import time
import sys import re
import zlib #----------------------------------------------------------------------------------------#
#----------------------------------------------------------------------------------------#
# 测试平台
# 测试平台 mqttConfig = {
mqttConfig = { 'mqttServer': 'www.witium.com.cn',
'mqttServer': 'www.witium.com.cn', 'mqttPort': 12883,
'mqttPort': 12883, 'userName': 'witcd',
'userName': 'witcd', 'password': 'Witium37774020'
'password': 'Witium37774020' }
}
# 正式平台
# 正式平台 # mqttConfig = {
# mqttConfig = { # 'mqttServer': '116.62.127.242',
# 'mqttServer': '116.62.127.242', # 'mqttPort': 1883,
# 'mqttPort': 1883, # 'userName': 'witcd',
# 'userName': 'witcd', # 'password': 'Witium37774020'
# 'password': 'Witium37774020' # }
# }
# # 颖泰平台
# # 颖泰平台 # mqttConfig = {
# mqttConfig = { # 'mqttServer': '112.17.190.17',
# 'mqttServer': '112.17.190.17', # 'mqttPort': 1883,
# 'mqttPort': 1883, # 'userName': 'witcd',
# 'userName': 'witcd', # 'password': 'Witium37774020'
# 'password': 'Witium37774020' # }
# }
dev_type = 'WTG906F'
dev_type = 'WTG902F'
updateCheckInTopic = 'WT/%s/Update/CheckIn' % dev_type
updateCheckInTopic = 'WT/%s/Update/CheckIn' % dev_type updateResponseTopic = 'WT/%s/Update/Response' % dev_type
updateResponseTopic = 'WT/%s/Update/Response' % dev_type updateTopic = 'WT/%s/{0}/Update' % dev_type
updateTopic = 'WT/%s/{0}/Update' % dev_type
updateList = [{'sn': '21126112', 'package': 0, 'time': time.time(), 'send': True}]
updateList = [{'sn': '21126112', 'package': 0, 'time': time.time(), 'send': True}] NewImage = "BIN/v2.35.bin"
NewImage = "Gateway.bin" cmd7Topic = 'WT/{}/{}/BackDoor'.format(dev_type,updateList[0]['sn'])
cmd7Topic = 'WT/{}/{}/BackDoor'.format(dev_type,updateList[0]['sn']) cmd7ResTopic = 'WT/{}/{}/BackDoorResp'.format(dev_type,updateList[0]['sn'])
cmd7ResTopic = 'WT/{}/{}/BackDoorResp'.format(dev_type,updateList[0]['sn'])
CheckPassFlag = False
CheckPassFlag = False
#----------------------------------------------------------------------------------------#
StampingMark = "WTG"
#----------------------------------------------------------------------------------------# FIXED_LOCATION = 0x200
StampingMark = "WTG" FIXED_STAMPING_SIZE = 37
FIXED_LOCATION = 0x200 FIXED_HEAD = 3
FIXED_STAMPING_SIZE = 37 #----------------------------------------------------------------------------------------#
FIXED_HEAD = 3
#----------------------------------------------------------------------------------------# def extract_characters_around(pattern, text, before=1, after=2):
matches = re.finditer(pattern, text)
def get_Image_Size(file_path):
file_size = 0 for match in matches:
with open(file_path, 'rb') as file: start = match.start()
file_data = file.read() before_chars = text[start-before-1:start] if before > 0 else ''
file_size = len(file_data) target_char = text[start]
return file_size after_chars = text[start+1:start+after+1] if after > 0 else ''
return before_chars+"_"+after_chars
def read_stamping(fiel_path):
with open(file_path, 'rb') as file:
file.seek(FIXED_LOCATION) def get_substrings_around(main_str, sub_str, before=0, after=10):
content = file.read(FIXED_STAMPING_SIZE) start = main_str.find(sub_str)
print("Content--->{}".format(content)) if start == -1:
return content return None, None # 子字符串未找到
# 提取前一段和后一段字符串
def HAS_STAMPING(content): before_str = main_str[0:start]
try: after_str = main_str[start + len(sub_str):-1]
if StampingMark in content.decode()[:FIXED_HEAD]:
return True return before_str, after_str
else:
return False def get_Image_Size(file_path):
except: file_size = 0
print("NO STAMPING HERE, EXIT!") with open(file_path, 'rb') as file:
return False file_data = file.read()
def main_client_on_connect(client, userdata, flags, rc): file_size = len(file_data)
log('Main client connected with result code ' + str(rc)) return file_size
log('Main client subscribe ' + updateCheckInTopic + ' topic') def read_stamping(fiel_path):
client.subscribe(updateCheckInTopic) with open(file_path, 'rb') as file:
file.seek(FIXED_LOCATION)
log('Main client subscribe ' + updateResponseTopic + ' topic') content = file.read(FIXED_STAMPING_SIZE)
client.subscribe(updateResponseTopic) print("Content--->{}".format(content))
return content
log('Main client subscribe ' + cmd7ResTopic + ' topic')
client.subscribe(cmd7ResTopic)
def HAS_STAMPING(content):
try:
def main_client_on_message(client, userdata, msg): if StampingMark in content.decode()[:FIXED_HEAD]:
global updateList return True
global CheckPassFlag else:
if msg.topic == updateCheckInTopic and CheckPassFlag == True: return False
try: except:
jsonMsg = json.loads(msg.payload.decode()) print("NO STAMPING HERE, EXIT!")
return False
if jsonMsg['gId'] not in ['20010801']: def main_client_on_connect(client, userdata, flags, rc):
return log('Main client connected with result code ' + str(rc))
updateDict = {} log('Main client subscribe ' + updateCheckInTopic + ' topic')
if 'version' in jsonMsg: client.subscribe(updateCheckInTopic)
if checkVersion(jsonMsg['version']):
updateDict['sn'] = jsonMsg['gId'] log('Main client subscribe ' + updateResponseTopic + ' topic')
updateDict['package'] = 0 client.subscribe(updateResponseTopic)
updateDict['time'] = time.time()
updateDict['send'] = True log('Main client subscribe ' + cmd7ResTopic + ' topic')
updateList.append(updateDict) client.subscribe(cmd7ResTopic)
else:
updateDict['sn'] = jsonMsg['gId']
updateDict['package'] = 0 def main_client_on_message(client, userdata, msg):
updateDict['time'] = time.time() global updateList
updateDict['send'] = True global CheckPassFlag
updateList.append(updateDict) if msg.topic == updateCheckInTopic and CheckPassFlag == True:
except: try:
log('Not a json data') jsonMsg = json.loads(msg.payload.decode())
elif msg.topic == updateResponseTopic and CheckPassFlag == True: if jsonMsg['gId'] not in ['20010801']:
try: return
responseMsg = msg.payload.decode().split()
for each in updateList: updateDict = {}
if responseMsg[1] == each['sn']: if 'version' in jsonMsg:
if responseMsg[2] == '0': if checkVersion(jsonMsg['version']):
each['package'] = int(responseMsg[0]) + 1 updateDict['sn'] = jsonMsg['gId']
each['time'] = time.time() updateDict['package'] = 0
each['send'] = True updateDict['time'] = time.time()
else: updateDict['send'] = True
each['send'] = True updateList.append(updateDict)
except: else:
log('Error') updateDict['sn'] = jsonMsg['gId']
elif msg.topic == cmd7ResTopic: updateDict['package'] = 0
try: updateDict['time'] = time.time()
Cmd7Res = json.loads(msg.payload) updateDict['send'] = True
if 'stamping' in Cmd7Res: updateList.append(updateDict)
print("检查本地镜像") except:
UpdataImageStamping = read_stamping(file_path) log('Not a json data')
UpdataImageStamping = UpdataImageStamping.decode()
if UpdataImageStamping == Cmd7Res['stamping']: elif msg.topic == updateResponseTopic and CheckPassFlag == True:
print("Romote Stamping {}".format(Cmd7Res['stamping'])) try:
print("Local Stamping {}".format(UpdataImageStamping)) responseMsg = msg.payload.decode().split()
print("Equal") for each in updateList:
CheckPassFlag = True if responseMsg[1] == each['sn']:
else: if responseMsg[2] == '0':
print("Romote Stamping {}".format(Cmd7Res['stamping'])) each['package'] = int(responseMsg[0]) + 1
print("Local Stamping {}".format(UpdataImageStamping)) each['time'] = time.time()
print("exit") each['send'] = True
else:
except: each['send'] = True
log("Error Cmd 7 Res") except:
log('Error')
elif msg.topic == cmd7ResTopic:
def checkVersion(oldVersion): try:
if 'Beta' in oldVersion and 'Beta' in tdlVersion or 'Beta' not in oldVersion and 'Beta' not in tdlVersion: Cmd7Res = json.loads(msg.payload)
if oldVersion < tdlVersion: if 'stamping' in Cmd7Res:
return True LocalImage = read_stamping(file_path)
else: LocalImage = LocalImage.decode()
return False local_version = extract_characters_around("_", str(LocalImage))
elif 'Beta' in oldVersion and 'Beta' not in tdlVersion: print("\nLocal 镜像版本:{}".format(local_version))
if oldVersion[0:5] < tdlVersion: Local_before, Local_after = get_substrings_around(LocalImage, local_version)
return True
else:
return False RemoteImage = Cmd7Res['stamping']
elif 'Beta' not in oldVersion and 'Beta' in tdlVersion: RemoteImage = RemoteImage
if oldVersion < tdlVersion[0:5]: Remote_version = extract_characters_around("_", str(RemoteImage))
return True print("\nRemote 镜像版本:{}".format(Remote_version))
else: Remote_before, Remote_after = get_substrings_around(RemoteImage, Remote_version)
return False
return False
if Local_before == Remote_before and Local_after == Remote_after:
CheckPassFlag = True
def createCheckCode(checkData): print("Equal")
sumDat = 0 else:
codeBytes = b'' print(LocalImage)
for each in checkData: print(RemoteImage)
sumDat += each print("Before {}|{}".format(Local_before, Remote_before))
sumHex = hex(sumDat).lstrip('0x') print("After {}|{}".format(Local_after, Remote_after))
while len(sumHex) != 8: print("Fail")
sumHex = '0' + sumHex
except Exception as e:
codeBytes += bytes((int(sumHex[0:2], 16),)) log("Error Cmd 7 Res")
codeBytes += bytes((int(sumHex[2:4], 16),)) print("Exception-> {}".format(e))
codeBytes += bytes((int(sumHex[4:6], 16),))
codeBytes += bytes((int(sumHex[6:8], 16),))
def checkVersion(oldVersion):
return codeBytes if 'Beta' in oldVersion and 'Beta' in tdlVersion or 'Beta' not in oldVersion and 'Beta' not in tdlVersion:
if oldVersion < tdlVersion:
return True
def update(): else:
global fileData return False
global updateList elif 'Beta' in oldVersion and 'Beta' not in tdlVersion:
global mainClient if oldVersion[0:5] < tdlVersion:
global packageSize return True
global oldTime else:
return False
data = [{"cmd":7,"ResquestStamping":1}] elif 'Beta' not in oldVersion and 'Beta' in tdlVersion:
data = json.dumps(data, separators=(',', ':')) if oldVersion < tdlVersion[0:5]:
print(data) return True
while CheckPassFlag == False: else:
print("waiting for check Pass Flag...") return False
mainClient.publish(cmd7Topic, data) return False
time.sleep(5)
startPackage = bytes((0,)) + b'Witium_Update' + bytes((packageSize,)) def createCheckCode(checkData):
startPackage += createCheckCode(startPackage) sumDat = 0
while True: codeBytes = b''
if len(updateList) == 0: for each in checkData:
time.sleep(1) sumDat += each
else: sumHex = hex(sumDat).lstrip('0x')
curTime = time.time() while len(sumHex) != 8:
if curTime - oldTime > 30: sumHex = '0' + sumHex
oldTime = time.time()
log('Current List:' + str(updateList)) codeBytes += bytes((int(sumHex[0:2], 16),))
codeBytes += bytes((int(sumHex[2:4], 16),))
for eachModule in updateList: codeBytes += bytes((int(sumHex[4:6], 16),))
if eachModule['send'] == True: codeBytes += bytes((int(sumHex[6:8], 16),))
eachModule['send'] = False
if eachModule['package'] == 0: return codeBytes
mainClient.publish(updateTopic.format(eachModule['sn']), startPackage)
else:
if eachModule['package'] != packageSize: def update():
sendData = bytes((eachModule['package'],)) + fileData[ global fileData
(eachModule['package'] - 1) * 1024:eachModule[ global updateList
'package'] * 1024] global mainClient
else: global packageSize
sendData = bytes((eachModule['package'],)) + fileData[(eachModule['package'] - 1) * 1024:] global oldTime
sendData += createCheckCode(sendData)
mainClient.publish(updateTopic.format(eachModule['sn']), sendData) data = [{"cmd":7,"ResquestStamping":1}]
log('Send the ' + str(eachModule['package']) + ' package to ' + eachModule['sn']) data = json.dumps(data, separators=(',', ':'))
if eachModule['package'] == packageSize: print(data)
updateList.remove(eachModule) while CheckPassFlag == False:
log('Update success ' + eachModule['sn']) print("waiting for check Pass Flag...")
mainClient.publish(cmd7Topic, data)
nowTime = time.time() time.sleep(5)
if nowTime - eachModule['time'] > 30:
updateList.remove(eachModule) startPackage = bytes((0,)) + b'Witium_Update' + bytes((packageSize,))
log('Update time out ' + eachModule['sn']) startPackage += createCheckCode(startPackage)
while True:
if len(updateList) == 0:
def log(logData): time.sleep(1)
with open('log.txt', 'a+') as logFile: else:
logFile.write(logData + '\n') curTime = time.time()
print(logData) if curTime - oldTime > 30:
oldTime = time.time()
log('Current List:' + str(updateList))
#----------------------------------------------------------------------------------------# for eachModule in updateList:
if eachModule['send'] == True:
eachModule['send'] = False
if eachModule['package'] == 0:
mainClient.publish(updateTopic.format(eachModule['sn']), startPackage)
path = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) else:
resourcesPath = path + os.sep + 'scripts' + os.sep if eachModule['package'] != packageSize:
curFileList = os.listdir(os.getcwd()) sendData = bytes((eachModule['package'],)) + fileData[
oldTime = time.time() (eachModule['package'] - 1) * 1024:eachModule[
'package'] * 1024]
if not os.path.exists('log.txt'): else:
with open('log.txt', 'w'): sendData = bytes((eachModule['package'],)) + fileData[(eachModule['package'] - 1) * 1024:]
pass sendData += createCheckCode(sendData)
mainClient.publish(updateTopic.format(eachModule['sn']), sendData)
file_path = resourcesPath + NewImage log('Send the ' + str(eachModule['package']) + ' package to ' + eachModule['sn'])
print("Update Image Path [{}]".format(file_path)) if eachModule['package'] == packageSize:
updateList.remove(eachModule)
size = get_Image_Size(file_path) log('Update success ' + eachModule['sn'])
print(f"文件大小:{get_Image_Size(file_path)} bytes")
if HAS_STAMPING(read_stamping(file_path)) == True: nowTime = time.time()
print("STAMPING IS EXIST.") if nowTime - eachModule['time'] > 30:
print("\n\n\n\n\n") updateList.remove(eachModule)
else: log('Update time out ' + eachModule['sn'])
print("STAMPING IS NOT EXIST.")
exit(0)
def log(logData):
with open(file_path, 'rb') as file: with open('log.txt', 'a+') as logFile:
fileData = file.read() logFile.write(logData + '\n')
packageSize = (len(fileData) - 1) // 1024 + 1 print(logData)
log('packageSize: ' + str(packageSize))
if not file_path:
log('Update file has not found.') #----------------------------------------------------------------------------------------#
else:
mainClient = mqtt.Client()
mainClient.username_pw_set(mqttConfig['userName'], mqttConfig['password'])
mainClient.on_connect = main_client_on_connect
mainClient.on_message = main_client_on_message path = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
mainClient.connect(mqttConfig['mqttServer'], mqttConfig['mqttPort'], 60) resourcesPath = path + os.sep + 'safe_Upgrade_Python_Scripts' + os.sep
curFileList = os.listdir(os.getcwd())
thread = threading.Thread(target=update) oldTime = time.time()
thread.setDaemon(True)
thread.start() if not os.path.exists('log.txt'):
with open('log.txt', 'w'):
mainClient.loop_forever() pass
file_path = resourcesPath + NewImage
print("Update Image Path [{}]".format(file_path))
size = get_Image_Size(file_path)
print(f"文件大小:{get_Image_Size(file_path)} bytes")
if HAS_STAMPING(read_stamping(file_path)) == True:
print("STAMPING IS EXIST.")
print("\n\n\n\n\n")
else:
print("STAMPING IS NOT EXIST.")
exit(0)
with open(file_path, 'rb') as file:
fileData = file.read()
packageSize = (len(fileData) - 1) // 1024 + 1
log('packageSize: ' + str(packageSize))
if not file_path:
log('Update file has not found.')
else:
mainClient = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1)
mainClient.username_pw_set(mqttConfig['userName'], mqttConfig['password'])
mainClient.on_connect = main_client_on_connect
mainClient.on_message = main_client_on_message
mainClient.connect(mqttConfig['mqttServer'], mqttConfig['mqttPort'], 60)
thread = threading.Thread(target=update)
thread.setDaemon(True)
thread.start()
mainClient.loop_forever()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment