# main.py import asyncio import json from dmm_decoder import ZT5BDecoder from bleak import BleakClient CONFIG_FILE = "config/zt5b_minimal.json" DEVICE = "9C:0C:35:03:C2:B7" def load_config(path): with open(path, "r") as f: return json.load(f) def select_decoder(config): if config.get("decoder") == "ZT5BDecoder": return ZT5BDecoder(config) raise ValueError(f"Unknown decoder specified: {config.get('decoder')}") def print_top_debug(config): summary = { "model": config.get("model"), "name": config.get("name"), "manufacturer": config.get("manufacturer"), "chipset": config.get("chipset"), "decoder": config.get("decoder"), "char_uuid": config.get("char_uuid"), "debug": config.get("debug"), "mode_map": config.get("mode_map", {}) } print(json.dumps(summary, indent=2, ensure_ascii=False)) def compose_description_and_unit(active_indices, icon_table, descriptions, units): """ Given active icon indices and config tables, return a display description and full unit string. """ icon_keys = [icon_table[i] for i in active_indices] description = ' '.join([descriptions.get(key, key) for key in icon_keys if descriptions.get(key, key)]) unit = ''.join([units.get(key, '') for key in icon_keys]) return description.strip(), unit.strip() def debug_trace(hexstring, decoder, config, result): icon_table = config["icon_table"] descriptions = config.get("descriptions", {}) units = config.get("units", {}) mode_map = config.get("mode_map", {}) # Decode internals encoded_array = decoder.str2hexarray(hexstring) xorkey = decoder.xorkey if len(encoded_array) > len(xorkey): xorkey = (xorkey * ((len(encoded_array) // len(xorkey)) + 1))[:len(encoded_array)] xordecoded = decoder.bytewise_XOR(encoded_array, xorkey) binary = decoder.hex2bin(xordecoded) flipped = decoder.flip_bits(binary) bitstring = decoder.array2str(flipped) icon_bits = decoder.extract_icon_bits(bitstring) active_indices = [i for i, b in enumerate(icon_bits) if b == '1' and i < len(icon_table)] active_icons = [icon_table[i] for i in active_indices] index_icon_table = {str(i): icon_table[i] for i in active_indices} mode_key = str(sorted(active_indices)) mode_label = mode_map.get(mode_key, "unknown") description, unit = compose_description_and_unit(active_indices, icon_table, descriptions, units) debug_block = { "hexstring": hexstring, "decoded_bytes": xordecoded, "full_bitstring": bitstring, "icon_region_bitmask": icon_bits, "active_indices": active_indices, "active_icons": active_icons, "index_icon_table": index_icon_table, "mode_key": mode_key, "mode_label": mode_label, "description": description, "unit": unit, "final_result": result } print(json.dumps(debug_block, ensure_ascii=False, indent=2)) def notification_handler_factory(decoder, config): last_icons = {"icons": None} icon_table = config["icon_table"] descriptions = config.get("descriptions", {}) units = config.get("units", {}) def notif_handler(sender, data): hexstr = data.hex().lower() result = decoder.decode_packet(hexstr) # Build the active_indices again for the output icon_bits = decoder.extract_icon_bits( decoder.array2str( decoder.flip_bits( decoder.hex2bin( decoder.bytewise_XOR( decoder.str2hexarray(hexstr), decoder.xorkey ) ) ) ) ) active_indices = [i for i, b in enumerate(icon_bits) if b == '1' and i < len(icon_table)] description, unit = compose_description_and_unit(active_indices, icon_table, descriptions, units) icons = result["icons"] if icons != last_icons["icons"]: if config.get("debug", False): debug_trace(hexstr, decoder, config, result) else: output = { **result, "description": description, "unit": unit } print(json.dumps(output, ensure_ascii=False, indent=2)) last_icons["icons"] = icons return notif_handler async def main(): config = load_config(CONFIG_FILE) decoder = select_decoder(config) char_uuid = config.get("char_uuid") if config.get("debug", True): print_top_debug(config) async with BleakClient(DEVICE) as client: await client.start_notify(char_uuid, notification_handler_factory(decoder, config)) print("Listening for BLE notifications. Ctrl+C to exit.") try: while True: await asyncio.sleep(1) except KeyboardInterrupt: pass finally: await client.stop_notify(char_uuid) print("Stopped.") if __name__ == "__main__": asyncio.run(main())