import asyncio import datetime import json from bleak import BleakClient CONFIG_FILE = "zt5b.json" with open(CONFIG_FILE, "r") as f: config = json.load(f) DEVICE = "9C:0C:35:03:C2:B7" # Replace with your actual meter's MAC if needed CHAR_UUID = config["gatt"]["notify_char_uuid"] LOG_LEVEL = config.get("log_level", "info").lower() HEADERS = {k.lower(): v for k, v in config["modes"].items()} XORKEY_STR = config["xorkey"] DIGIT_TABLE = config["digit_table"] ICON_REGIONS = config["icon_regions"] ICON_TABLES = config["icon_tables"] UNITS = {k.lower(): v for k, v in config.get("units", {}).items()} current_mode = None mode_code = None last_display = None def get_time(): return datetime.datetime.now().strftime("%H:%M:%S") def str2hexarray(string): string = string.replace(" ", "").lower() return [int(string[i:i+2], 16) for i in range(0, len(string), 2)] def bytewise_XOR(array, xorkey): return ['{0:0{1}X}'.format(array[x] ^ xorkey[x % len(xorkey)], 2) for x in range(len(array))] def hex2bin(array): return [bin(int(x,16))[2:].zfill(8) for x in array] def flip_bits(array): return [b[::-1] for b in array] def array2str(array): return "".join(array) def display_decoder(string): digits_orig = DIGIT_TABLE groups = [string[i:i+8] for i in range(0, len(string), 8)] number = "" for x in range(len(groups)): if x == 0 and groups[0][0] == "1": number += "-" if x > 0 and groups[x][0] == "1": number += "." val = digits_orig.get(groups[x][1:], " ") number += str(val) try: return float(number) except Exception: return number def extract_icon_info(bin_str, mode=None, typeID=None): region = ICON_REGIONS.get(mode, ICON_REGIONS.get(typeID, ICON_REGIONS["default"])) bits = region["bits"] icon_bits = bin_str[bits[0]:bits[1]] + bin_str[bits[2]:bits[3]] table_name = region.get("icon_table", "11") icon_table = ICON_TABLES[table_name] icons = [icon_table[i] for i, b in enumerate(icon_bits) if b == '1' and i < len(icon_table)] return [x for x in icons if x.strip() and x != " "] def binary2info(bin_str, mode=None): typeID = bin_str[16:18] display_number = display_decoder(bin_str[28:60]) icons = extract_icon_info(bin_str, mode=mode, typeID=typeID) value_type = type(display_number).__name__ return { "typeID": typeID, "display": display_number, "value_type": value_type, "icons": icons } def decode(hexstring, mode=None): xorkey = str2hexarray(XORKEY_STR) encoded_array = str2hexarray(hexstring) if len(encoded_array) > len(xorkey): xorkey = (xorkey * ((len(encoded_array) // len(xorkey)) + 1))[:len(encoded_array)] xordecoded = bytewise_XOR(encoded_array, xorkey) binary = hex2bin(xordecoded) flipped = flip_bits(binary) result = array2str(flipped) obj = binary2info(result, mode=mode) return obj def log_info(msg): if LOG_LEVEL in ["info", "debug"]: print(msg) def log_debug(msg): if LOG_LEVEL == "debug": print(msg) def log_warn(msg): if LOG_LEVEL in ["warning", "info", "debug"]: print(f"WARNING: {msg}") # Trackers for mode and last value current_mode = None mode_code = None last_display_per_mode = {} last_printed_mode = None def notif_handler(sender, data): global current_mode, mode_code, last_printed_mode hexstr = data.hex().lower() # MODE CHANGE: only print on genuine mode switch, not on every packet! if hexstr in HEADERS: # Only print mode header if it's a new mode (not repeating) new_mode = HEADERS[hexstr] if new_mode != last_printed_mode: print(f"{new_mode} : {get_time()}") last_printed_mode = new_mode current_mode = new_mode mode_code = hexstr return if current_mode and hexstr != mode_code: decoded = decode(hexstr, mode=current_mode.lower()) display = decoded.get("display") icons = decoded.get("icons") icon_str = f" [{' '.join(icons)}]" if icons else "" unit = UNITS.get(current_mode.lower(), "") unit_str = f" {unit}" if unit else "" # Only print if display value has changed for the current mode last_val = last_display_per_mode.get(current_mode) if display != last_val and display is not None and str(display).strip(): print(f"{current_mode} : {display}{icon_str}{unit_str}") last_display_per_mode[current_mode] = display async def main(): async with BleakClient(DEVICE) as client: print("Connected:", client.is_connected) await client.start_notify(CHAR_UUID, notif_handler) print("Listening... Ctrl+C to stop/cleanup.") try: while True: await asyncio.sleep(1) except KeyboardInterrupt: print("\nInterrupted. Cleaning up...") finally: await client.stop_notify(CHAR_UUID) print("Notifications stopped. Disconnecting...") if __name__ == "__main__": asyncio.run(main())