Reworked config
This commit is contained in:
158
xxx.py
Normal file
158
xxx.py
Normal file
@@ -0,0 +1,158 @@
|
||||
import asyncio
|
||||
import datetime
|
||||
import json
|
||||
from bleak import BleakClient
|
||||
|
||||
CONFIG_FILE = "zt5b.json"
|
||||
|
||||
with open(CONFIG_FILE, "r") as f:
|
||||
config = json.load(f)
|
||||
|
||||
DEVICE = "9C:0C:35:03:C2:B7" # Replace with your actual meter's MAC if needed
|
||||
CHAR_UUID = config["gatt"]["notify_char_uuid"]
|
||||
LOG_LEVEL = config.get("log_level", "info").lower()
|
||||
|
||||
HEADERS = {k.lower(): v for k, v in config["modes"].items()}
|
||||
XORKEY_STR = config["xorkey"]
|
||||
DIGIT_TABLE = config["digit_table"]
|
||||
ICON_REGIONS = config["icon_regions"]
|
||||
ICON_TABLES = config["icon_tables"]
|
||||
UNITS = {k.lower(): v for k, v in config.get("units", {}).items()}
|
||||
|
||||
current_mode = None
|
||||
mode_code = None
|
||||
last_display = None
|
||||
|
||||
def get_time():
|
||||
return datetime.datetime.now().strftime("%H:%M:%S")
|
||||
|
||||
def str2hexarray(string):
|
||||
string = string.replace(" ", "").lower()
|
||||
return [int(string[i:i+2], 16) for i in range(0, len(string), 2)]
|
||||
|
||||
def bytewise_XOR(array, xorkey):
|
||||
return ['{0:0{1}X}'.format(array[x] ^ xorkey[x % len(xorkey)], 2) for x in range(len(array))]
|
||||
|
||||
def hex2bin(array):
|
||||
return [bin(int(x,16))[2:].zfill(8) for x in array]
|
||||
|
||||
def flip_bits(array):
|
||||
return [b[::-1] for b in array]
|
||||
|
||||
def array2str(array):
|
||||
return "".join(array)
|
||||
|
||||
def display_decoder(string):
|
||||
digits_orig = DIGIT_TABLE
|
||||
groups = [string[i:i+8] for i in range(0, len(string), 8)]
|
||||
number = ""
|
||||
for x in range(len(groups)):
|
||||
if x == 0 and groups[0][0] == "1":
|
||||
number += "-"
|
||||
if x > 0 and groups[x][0] == "1":
|
||||
number += "."
|
||||
val = digits_orig.get(groups[x][1:], " ")
|
||||
number += str(val)
|
||||
try:
|
||||
return float(number)
|
||||
except Exception:
|
||||
return number
|
||||
|
||||
def extract_icon_info(bin_str, mode=None, typeID=None):
|
||||
region = ICON_REGIONS.get(mode, ICON_REGIONS.get(typeID, ICON_REGIONS["default"]))
|
||||
bits = region["bits"]
|
||||
icon_bits = bin_str[bits[0]:bits[1]] + bin_str[bits[2]:bits[3]]
|
||||
table_name = region.get("icon_table", "11")
|
||||
icon_table = ICON_TABLES[table_name]
|
||||
icons = [icon_table[i] for i, b in enumerate(icon_bits) if b == '1' and i < len(icon_table)]
|
||||
return [x for x in icons if x.strip() and x != " "]
|
||||
|
||||
def binary2info(bin_str, mode=None):
|
||||
typeID = bin_str[16:18]
|
||||
display_number = display_decoder(bin_str[28:60])
|
||||
icons = extract_icon_info(bin_str, mode=mode, typeID=typeID)
|
||||
value_type = type(display_number).__name__
|
||||
return {
|
||||
"typeID": typeID,
|
||||
"display": display_number,
|
||||
"value_type": value_type,
|
||||
"icons": icons
|
||||
}
|
||||
|
||||
def decode(hexstring, mode=None):
|
||||
xorkey = str2hexarray(XORKEY_STR)
|
||||
encoded_array = str2hexarray(hexstring)
|
||||
if len(encoded_array) > len(xorkey):
|
||||
xorkey = (xorkey * ((len(encoded_array) // len(xorkey)) + 1))[:len(encoded_array)]
|
||||
xordecoded = bytewise_XOR(encoded_array, xorkey)
|
||||
binary = hex2bin(xordecoded)
|
||||
flipped = flip_bits(binary)
|
||||
result = array2str(flipped)
|
||||
obj = binary2info(result, mode=mode)
|
||||
return obj
|
||||
|
||||
def log_info(msg):
|
||||
if LOG_LEVEL in ["info", "debug"]:
|
||||
print(msg)
|
||||
|
||||
def log_debug(msg):
|
||||
if LOG_LEVEL == "debug":
|
||||
print(msg)
|
||||
|
||||
def log_warn(msg):
|
||||
if LOG_LEVEL in ["warning", "info", "debug"]:
|
||||
print(f"WARNING: {msg}")
|
||||
|
||||
|
||||
# Trackers for mode and last value
|
||||
current_mode = None
|
||||
mode_code = None
|
||||
last_display_per_mode = {}
|
||||
last_printed_mode = None
|
||||
|
||||
def notif_handler(sender, data):
|
||||
global current_mode, mode_code, last_printed_mode
|
||||
hexstr = data.hex().lower()
|
||||
|
||||
# MODE CHANGE: only print on genuine mode switch, not on every packet!
|
||||
if hexstr in HEADERS:
|
||||
# Only print mode header if it's a new mode (not repeating)
|
||||
new_mode = HEADERS[hexstr]
|
||||
if new_mode != last_printed_mode:
|
||||
print(f"{new_mode} : {get_time()}")
|
||||
last_printed_mode = new_mode
|
||||
current_mode = new_mode
|
||||
mode_code = hexstr
|
||||
return
|
||||
|
||||
if current_mode and hexstr != mode_code:
|
||||
decoded = decode(hexstr, mode=current_mode.lower())
|
||||
display = decoded.get("display")
|
||||
icons = decoded.get("icons")
|
||||
icon_str = f" [{' '.join(icons)}]" if icons else ""
|
||||
unit = UNITS.get(current_mode.lower(), "")
|
||||
unit_str = f" {unit}" if unit else ""
|
||||
|
||||
# Only print if display value has changed for the current mode
|
||||
last_val = last_display_per_mode.get(current_mode)
|
||||
if display != last_val and display is not None and str(display).strip():
|
||||
print(f"{current_mode} : {display}{icon_str}{unit_str}")
|
||||
last_display_per_mode[current_mode] = display
|
||||
|
||||
|
||||
async def main():
|
||||
async with BleakClient(DEVICE) as client:
|
||||
print("Connected:", client.is_connected)
|
||||
await client.start_notify(CHAR_UUID, notif_handler)
|
||||
print("Listening... Ctrl+C to stop/cleanup.")
|
||||
try:
|
||||
while True:
|
||||
await asyncio.sleep(1)
|
||||
except KeyboardInterrupt:
|
||||
print("\nInterrupted. Cleaning up...")
|
||||
finally:
|
||||
await client.stop_notify(CHAR_UUID)
|
||||
print("Notifications stopped. Disconnecting...")
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
Reference in New Issue
Block a user