159 lines
5.0 KiB
Python
159 lines
5.0 KiB
Python
import asyncio
|
|
import datetime
|
|
import json
|
|
from bleak import BleakClient
|
|
|
|
CONFIG_FILE = "zt5b.json"
|
|
|
|
with open(CONFIG_FILE, "r") as f:
|
|
config = json.load(f)
|
|
|
|
DEVICE = "9C:0C:35:03:C2:B7" # Replace with your actual meter's MAC if needed
|
|
CHAR_UUID = config["gatt"]["notify_char_uuid"]
|
|
LOG_LEVEL = config.get("log_level", "info").lower()
|
|
|
|
HEADERS = {k.lower(): v for k, v in config["modes"].items()}
|
|
XORKEY_STR = config["xorkey"]
|
|
DIGIT_TABLE = config["digit_table"]
|
|
ICON_REGIONS = config["icon_regions"]
|
|
ICON_TABLES = config["icon_tables"]
|
|
UNITS = {k.lower(): v for k, v in config.get("units", {}).items()}
|
|
|
|
current_mode = None
|
|
mode_code = None
|
|
last_display = None
|
|
|
|
def get_time():
|
|
return datetime.datetime.now().strftime("%H:%M:%S")
|
|
|
|
def str2hexarray(string):
|
|
string = string.replace(" ", "").lower()
|
|
return [int(string[i:i+2], 16) for i in range(0, len(string), 2)]
|
|
|
|
def bytewise_XOR(array, xorkey):
|
|
return ['{0:0{1}X}'.format(array[x] ^ xorkey[x % len(xorkey)], 2) for x in range(len(array))]
|
|
|
|
def hex2bin(array):
|
|
return [bin(int(x,16))[2:].zfill(8) for x in array]
|
|
|
|
def flip_bits(array):
|
|
return [b[::-1] for b in array]
|
|
|
|
def array2str(array):
|
|
return "".join(array)
|
|
|
|
def display_decoder(string):
|
|
digits_orig = DIGIT_TABLE
|
|
groups = [string[i:i+8] for i in range(0, len(string), 8)]
|
|
number = ""
|
|
for x in range(len(groups)):
|
|
if x == 0 and groups[0][0] == "1":
|
|
number += "-"
|
|
if x > 0 and groups[x][0] == "1":
|
|
number += "."
|
|
val = digits_orig.get(groups[x][1:], " ")
|
|
number += str(val)
|
|
try:
|
|
return float(number)
|
|
except Exception:
|
|
return number
|
|
|
|
def extract_icon_info(bin_str, mode=None, typeID=None):
|
|
region = ICON_REGIONS.get(mode, ICON_REGIONS.get(typeID, ICON_REGIONS["default"]))
|
|
bits = region["bits"]
|
|
icon_bits = bin_str[bits[0]:bits[1]] + bin_str[bits[2]:bits[3]]
|
|
table_name = region.get("icon_table", "11")
|
|
icon_table = ICON_TABLES[table_name]
|
|
icons = [icon_table[i] for i, b in enumerate(icon_bits) if b == '1' and i < len(icon_table)]
|
|
return [x for x in icons if x.strip() and x != " "]
|
|
|
|
def binary2info(bin_str, mode=None):
|
|
typeID = bin_str[16:18]
|
|
display_number = display_decoder(bin_str[28:60])
|
|
icons = extract_icon_info(bin_str, mode=mode, typeID=typeID)
|
|
value_type = type(display_number).__name__
|
|
return {
|
|
"typeID": typeID,
|
|
"display": display_number,
|
|
"value_type": value_type,
|
|
"icons": icons
|
|
}
|
|
|
|
def decode(hexstring, mode=None):
|
|
xorkey = str2hexarray(XORKEY_STR)
|
|
encoded_array = str2hexarray(hexstring)
|
|
if len(encoded_array) > len(xorkey):
|
|
xorkey = (xorkey * ((len(encoded_array) // len(xorkey)) + 1))[:len(encoded_array)]
|
|
xordecoded = bytewise_XOR(encoded_array, xorkey)
|
|
binary = hex2bin(xordecoded)
|
|
flipped = flip_bits(binary)
|
|
result = array2str(flipped)
|
|
obj = binary2info(result, mode=mode)
|
|
return obj
|
|
|
|
def log_info(msg):
|
|
if LOG_LEVEL in ["info", "debug"]:
|
|
print(msg)
|
|
|
|
def log_debug(msg):
|
|
if LOG_LEVEL == "debug":
|
|
print(msg)
|
|
|
|
def log_warn(msg):
|
|
if LOG_LEVEL in ["warning", "info", "debug"]:
|
|
print(f"WARNING: {msg}")
|
|
|
|
|
|
# Trackers for mode and last value
|
|
current_mode = None
|
|
mode_code = None
|
|
last_display_per_mode = {}
|
|
last_printed_mode = None
|
|
|
|
def notif_handler(sender, data):
|
|
global current_mode, mode_code, last_printed_mode
|
|
hexstr = data.hex().lower()
|
|
|
|
# MODE CHANGE: only print on genuine mode switch, not on every packet!
|
|
if hexstr in HEADERS:
|
|
# Only print mode header if it's a new mode (not repeating)
|
|
new_mode = HEADERS[hexstr]
|
|
if new_mode != last_printed_mode:
|
|
print(f"{new_mode} : {get_time()}")
|
|
last_printed_mode = new_mode
|
|
current_mode = new_mode
|
|
mode_code = hexstr
|
|
return
|
|
|
|
if current_mode and hexstr != mode_code:
|
|
decoded = decode(hexstr, mode=current_mode.lower())
|
|
display = decoded.get("display")
|
|
icons = decoded.get("icons")
|
|
icon_str = f" [{' '.join(icons)}]" if icons else ""
|
|
unit = UNITS.get(current_mode.lower(), "")
|
|
unit_str = f" {unit}" if unit else ""
|
|
|
|
# Only print if display value has changed for the current mode
|
|
last_val = last_display_per_mode.get(current_mode)
|
|
if display != last_val and display is not None and str(display).strip():
|
|
print(f"{current_mode} : {display}{icon_str}{unit_str}")
|
|
last_display_per_mode[current_mode] = display
|
|
|
|
|
|
async def main():
|
|
async with BleakClient(DEVICE) as client:
|
|
print("Connected:", client.is_connected)
|
|
await client.start_notify(CHAR_UUID, notif_handler)
|
|
print("Listening... Ctrl+C to stop/cleanup.")
|
|
try:
|
|
while True:
|
|
await asyncio.sleep(1)
|
|
except KeyboardInterrupt:
|
|
print("\nInterrupted. Cleaning up...")
|
|
finally:
|
|
await client.stop_notify(CHAR_UUID)
|
|
print("Notifications stopped. Disconnecting...")
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|