Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in
Toggle navigation
E
EmbeddedDevice_SafeUpgrade
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Python_Scripts_For_Embedded_Developer
EmbeddedDevice_SafeUpgrade
Commits
9802b59e
Commit
9802b59e
authored
Jul 10, 2024
by
aohui.li
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
增加sensor安全升级脚本
parent
8ee22668
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
281 additions
and
1 deletion
+281
-1
at32_gateway_safe_update.py
at32_gateway_safe_update.py
+3
-1
at32_sensor_safeupgrade.py
at32_sensor_safeupgrade.py
+278
-0
No files found.
at32_gateway_safe_update.py
View file @
9802b59e
...
...
@@ -72,7 +72,7 @@ def extract_characters_around(pattern, text, before=1, after=2):
return
before_chars
+
"_"
+
after_chars
def
get_substrings_around
(
main_str
,
sub_str
,
before
=
0
,
after
=
10
):
def
get_substrings_around
(
main_str
,
sub_str
):
start
=
main_str
.
find
(
sub_str
)
if
start
==
-
1
:
return
None
,
None
# 子字符串未找到
...
...
@@ -242,6 +242,8 @@ def update():
while
CheckPassFlag
==
False
:
print
(
"waiting for check Pass Flag..."
)
mainClient
.
publish
(
cmd7Topic
,
data
)
# print(cmd7ResTopic)
# print(data)
time
.
sleep
(
5
)
startPackage
=
bytes
((
0
,))
+
b
'Witium_Update'
+
bytes
((
packageSize
,))
...
...
at32_sensor_safeupgrade.py
0 → 100644
View file @
9802b59e
from
email.mime
import
image
import
paho.mqtt.client
as
mqtt
import
time
import
json
import
struct
import
threading
import
os
import
re
##################################################################################################################
dev_type
=
'WTG906F'
update_sn
=
'21126112'
## 13333/26667---- 3c
# image_file = 'WTS4V3C_v2.57.bin'
## 4000 ---- 3A
image_file
=
"Sensor.bin"
sensor_addr
=
'1'
formal_server
=
True
##################################################################################################################
StampingMark
=
{
'SRE'
,
'SRA'
,
'SRS'
,
'TEST'
}
cmd8Topic
=
'WT/{}/{}/BackDoor'
.
format
(
dev_type
,
update_sn
)
cmd8ResTopic
=
'WT/{}/{}/BackDoorResp'
.
format
(
dev_type
,
update_sn
)
CheckPassFlag
=
False
#----------------------------------------------------------------------------------------#
FIXED_LOCATION
=
0xA00
FIXED_STAMPING_SIZE
=
34
#----------------------------------------------------------------------------------------#
if
formal_server
==
True
:
mqtt_info
=
{
'broker'
:
'116.62.127.242'
,
'port'
:
1883
,
'user'
:
'witcd'
,
'password'
:
'Witium37774020'
}
else
:
mqtt_info
=
{
'broker'
:
'222.71.96.91'
,
'port'
:
12883
,
'user'
:
'witium'
,
'password'
:
'Witium37774020'
}
def
extract_characters_around
(
pattern
,
text
,
before
=
1
,
after
=
2
):
matches
=
re
.
finditer
(
pattern
,
text
)
for
match
in
matches
:
start
=
match
.
start
()
before_chars
=
text
[
start
-
before
-
1
:
start
]
if
before
>
0
else
''
target_char
=
text
[
start
]
after_chars
=
text
[
start
+
1
:
start
+
after
+
1
]
if
after
>
0
else
''
return
before_chars
+
"_"
+
after_chars
def
get_substrings_around
(
main_str
,
sub_str
):
start
=
main_str
.
find
(
sub_str
)
if
start
==
-
1
:
return
None
,
None
before_str
=
main_str
[
0
:
start
]
after_str
=
main_str
[
start
+
len
(
sub_str
):
-
1
]
return
before_str
,
after_str
def
get_Image_Size
(
file_path
):
file_size
=
0
with
open
(
file_path
,
'rb'
)
as
file
:
file_data
=
file
.
read
()
file_size
=
len
(
file_data
)
return
file_size
def
Stamping_To_Image
(
file_path
,
stamping
):
with
open
(
file_path
,
'ab'
)
as
file
:
file
.
write
(
stamping
.
encode
())
def
read_stamping
(
file_path
):
with
open
(
file_path
,
'rb'
)
as
file
:
file
.
seek
(
FIXED_LOCATION
)
content
=
file
.
read
(
FIXED_STAMPING_SIZE
)
# print("Content--->{}".format(content))
return
content
def
Judge_Exit
(
content
):
for
mark
in
StampingMark
:
if
mark
.
encode
(
'utf-8'
)
in
content
:
print
(
"该镜像已经写入钢印!"
)
return
False
print
(
"该镜像没有写入钢印!"
)
return
True
def
on_connect
(
client
,
userdata
,
flags
,
rc
):
global
start_update
print
(
'client connected with result code '
+
str
(
rc
))
client
.
subscribe
(
cmd8ResTopic
)
print
(
'Subscribe {} topic'
.
format
(
cmd8ResTopic
))
client
.
subscribe
(
RESPONSE_TOPIC
)
print
(
'Subscribe {} topic'
.
format
(
RESPONSE_TOPIC
))
start_update
=
True
def
on_message
(
client
,
userdata
,
msg
):
global
CheckPassFlag
try
:
if
msg
.
topic
==
cmd8ResTopic
:
Cmd8Res
=
json
.
loads
(
msg
.
payload
)
if
'stamping'
in
Cmd8Res
:
RemoteImage
=
Cmd8Res
[
'stamping'
]
print
(
"Remote Image Stamping: -> {}"
.
format
(
RemoteImage
))
LocalImage
=
read_stamping
(
"./Sensor.bin"
)
.
decode
()
print
(
"Local Image Stamping: -> {}"
.
format
(
LocalImage
))
Remote_version
=
extract_characters_around
(
"_"
,
str
(
RemoteImage
))
print
(
"Remote Image Version: -> {}"
.
format
(
Remote_version
))
Remote_before
,
Remote_after
=
get_substrings_around
(
RemoteImage
,
Remote_version
)
print
(
"Remote Image Before: -> {}"
.
format
(
Remote_before
))
print
(
"Remote Image After: -> {}"
.
format
(
Remote_after
))
Local_version
=
extract_characters_around
(
"_"
,
str
(
LocalImage
))
print
(
"Local Image Version: -> {}"
.
format
(
Local_version
))
Local_before
,
Local_after
=
get_substrings_around
(
LocalImage
,
Local_version
)
print
(
"Local Image Before: -> {}"
.
format
(
Local_before
))
print
(
"Local Image After: -> {}"
.
format
(
Local_after
))
if
Remote_before
==
Local_before
and
Remote_after
==
Local_after
:
CheckPassFlag
=
True
print
(
"Success"
)
else
:
print
(
"Fail"
)
elif
msg
.
topic
==
RESPONSE_TOPIC
:
global
received
,
error_break
print
(
'--------------------------------------------------'
)
print
(
"{}"
.
format
(
msg
.
topic
))
print
(
f
"Payload (binary): {msg.payload}"
)
if
msg
.
payload
==
b
'
\x08
'
:
received
=
True
elif
msg
.
payload
==
b
'
\x07
'
:
error_break
=
True
except
Exception
as
exp
:
print
(
"Exception Handle --> {}"
.
format
(
exp
))
def
create_check_sum
(
data
):
check_sum
=
0
for
each
in
data
:
check_sum
=
check_sum
+
each
check_sum
=
check_sum
&
0xFF
return
bytes
((
check_sum
,
))
total_flag
=
True
def
sensor_update
():
global
received
,
start_update
,
total_flag
,
update_index
,
error_break
data
=
[{
"cmd"
:
8
,
"addr"
:
int
(
sensor_addr
),
"RSS"
:
1
}]
data
=
json
.
dumps
(
data
,
separators
=
(
','
,
':'
))
print
(
data
)
client
.
publish
(
cmd8Topic
,
data
)
time
.
sleep
(
5
)
while
CheckPassFlag
==
False
:
print
(
"waiting for check Pass Flag..."
)
client
.
publish
(
cmd8Topic
,
data
)
time
.
sleep
(
5
)
while
True
:
if
start_update
and
CheckPassFlag
:
with
open
(
image_file
,
'rb'
)
as
file
:
update_data
=
file
.
read
()
total_cnt
=
(
len
(
update_data
)
-
1
)
//
240
+
1
# start
package_cnt
=
0
received
=
False
total_check
=
b
'
\x06\x50\x80\x00\x01\x00
'
total_check
+=
create_check_sum
(
update_data
)
total_check
+=
struct
.
pack
(
">H"
,
update_index
)
total_check
+=
create_check_sum
(
total_check
)
client
.
publish
(
CMD_TOPIC
,
payload
=
total_check
)
time
.
sleep
(
2
)
while
len
(
update_data
):
if
error_break
:
break
if
received
:
received
=
False
# if total_flag:
# update_stream = b'\x06\x50\x78\x00\x01\x00\x00'
# update_stream += struct.pack(">H", update_index)
# update_stream += create_check_sum(update_stream)
# client.publish(CMD_TOPIC, payload=update_stream)
# total_flag = False
if
len
(
update_data
)
>=
240
:
update_stream
=
b
'
\x10\x50\x00\x00\x78
'
+
update_data
[
0
:
240
]
update_stream
+=
struct
.
pack
(
">H"
,
update_index
)
update_index
=
update_index
+
1
update_stream
+=
create_check_sum
(
update_stream
)
client
.
publish
(
CMD_TOPIC
,
payload
=
update_stream
)
update_data
=
update_data
[
240
:]
else
:
update_stream
=
b
'
\x10\x50\x00\x00\x78
'
+
update_data
[
0
:]
+
b
'
\x00
'
*
(
240
-
len
(
update_data
))
update_stream
+=
struct
.
pack
(
">H"
,
update_index
)
update_index
=
update_index
+
1
update_stream
+=
create_check_sum
(
update_stream
)
client
.
publish
(
CMD_TOPIC
,
payload
=
update_stream
)
update_data
=
b
''
package_cnt
+=
1
print
(
'
%
d/
%
d'
%
(
package_cnt
,
total_cnt
))
time
.
sleep
(
0.1
)
if
error_break
:
break
update_stream
=
b
'
\x10\x50\x79\x00\x03
'
+
b
'IWITMU'
update_index
=
0
update_stream
+=
struct
.
pack
(
">H"
,
update_index
)
update_stream
+=
create_check_sum
(
update_stream
)
client
.
publish
(
CMD_TOPIC
,
payload
=
update_stream
)
print
(
'Finish'
)
start_update
=
False
time
.
sleep
(
8
)
cmd
=
b
'
\x03\x00\x02\x00\x01
'
cmd
+=
create_check_sum
(
cmd
)
client
.
publish
(
CMD_TOPIC
,
payload
=
cmd
)
time
.
sleep
(
5
)
break
time
.
sleep
(
0.1
)
print
(
"Update Complete, Closed."
)
client
.
disconnect
()
received
=
False
error_break
=
False
CMD_TOPIC
=
'WT/
%
s/
%
s/
%
s/Update'
%
(
dev_type
,
update_sn
,
sensor_addr
)
RESPONSE_TOPIC
=
'WT/
%
s/
%
s/SensorResp'
%
(
dev_type
,
update_sn
)
start_update
=
False
update_index
=
0
client
=
mqtt
.
Client
(
mqtt
.
CallbackAPIVersion
.
VERSION1
)
client
.
username_pw_set
(
mqtt_info
[
'user'
],
mqtt_info
[
'password'
])
client
.
on_connect
=
on_connect
client
.
on_message
=
on_message
client
.
connect
(
mqtt_info
[
'broker'
],
mqtt_info
[
'port'
])
update_thread
=
threading
.
Thread
(
target
=
sensor_update
)
update_thread
.
setDaemon
(
True
)
update_thread
.
start
()
client
.
loop_forever
()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment