From 195d874b3182705c505f031c5f2106e09ece6cee Mon Sep 17 00:00:00 2001 From: Wynand Meijer Date: Thu, 18 Sep 2025 00:21:10 +0200 Subject: [PATCH] Reworked config --- .gitignore | 1 + HANDOVER.md | 113 ++++++++++++++++++++++++++++ config/zt5b.json | 65 ++++++++++++++++ config/zt5b_minimal.json | 69 +++++++++++++++++ dmm_decoder.py | 78 +++++++++++++++++++ xxx.py | 158 +++++++++++++++++++++++++++++++++++++++ xxx.txt | 0 yyy.py | 138 ++++++++++++++++++++++++++++++++++ 8 files changed, 622 insertions(+) create mode 100644 .gitignore create mode 100644 HANDOVER.md create mode 100644 config/zt5b.json create mode 100644 config/zt5b_minimal.json create mode 100644 dmm_decoder.py create mode 100644 xxx.py create mode 100644 xxx.txt create mode 100644 yyy.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f2d4b96 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +__pycache__* \ No newline at end of file diff --git a/HANDOVER.md b/HANDOVER.md new file mode 100644 index 0000000..08f2858 --- /dev/null +++ b/HANDOVER.md @@ -0,0 +1,113 @@ +# Handover Document: Web-Based Multimeter (BLE) + +## Project Overview + +- **Purpose:** + Provide a web-based platform that connects to digital multimeters via Bluetooth Low Energy (BLE), decodes readings in real-time, and displays measurements and graphs via a modern JavaScript (and intended browser-based) frontend. +- **Features Implemented:** + - BLE connection and notifications from various multimeter models using JSON configuration. + - Parsing/decoding of obfuscated BLE packets (with per-model config for digits, icons, and units). + - Smart, quiet output (value/mode changes only) and verbose debug option. + +*** + +## Technical Architecture + +### **Backend (Current: Python Prototype)** +- **BLE Connection:** + Uses `bleak` (Python) for managing BLE device connections, subscribing to measurement notifications. +- **Decoder Logic:** + - Hex packets are XOR-unobfuscated and bits are mapped to 7-segment digits and icons. + - All protocol/mapping details are externalized into a model-specific JSON file. +- **Output Control:** + Console logging behavior governed by config (`log_level`: "info", "lite", "debug", "warning"). + +### **Frontend (Planned/Target)** +- **JavaScript-based Web App** + - Intended to connect to BLE multimeters from within the browser using Web Bluetooth API. + - Real-time value/graph display, modes, and protocol mapping as pioneered in Python version. + +*** + +## Configuration: JSON Model File + +- **Purpose:** + Allows support for many DMMs by abstracting BLE UUIDs, XOR keys, digit/icon mapping, and output units. +- **Key Sections:** + - `modes`: Hex notification → Mode name mapping. + - `digit_table`: Segment-to-digit/character mapping for 7-segment display values. + - `icon_regions`/`icon_tables`: Define where in each packet icons are encoded, and map them by bit to human-readable units/symbols for each mode. + - `units`: Which measurement unit to append for each mode. + - `log_level`: Controls verbosity and debug output behavior. +- **How To Extend:** + New models = add or clone config files, adjust keys/tables as needed. + +*** + +## How the Decoding Works (Essential Info) + +1. **Receive hex packet via BLE.** +2. **Unobfuscate using per-model XOR key.** +3. **Convert to bit string, optionally reverse bits.** +4. **Extract display digit region and decode using digit table.** +5. **Extract icon region for mode, map with icon table (mode-specific if needed).** +6. **Print/log/output only when value changes (or in full debug mode).** + +*** + +## Debugging & Output + +- **Lite/Info:** + - Prints only on mode change or display value change. +- **Debug:** + - Prints all decoded notification info and raw packet, useful for troubleshooting or mapping new modes/packets. +- **Icons/units**: Mode-specific filtering via JSON prevents irrelevant or noisy output. + +*** + +## How to Add Support for New Multimeter Models + +1. **Create new JSON file**, copying existing structure. +2. **Update `modes` section:** Map each button/mode-change code to correct label. +3. **Set the per-model `xorkey`.** +4. **Tune `digit_table` and `icon_tables` as observed from the meter’s display.** +5. **For weird/new icons, update/add mode-specific icon tables.** +6. **Test/adjust output by flipping log_level between "debug" and "info" as needed.** + +*** + +## Current Project State + +- **Python backend is functional** with smart value change tracking. +- **Config system is robust** (per-model, per-mode handling, adjustable log level). +- **Output filtering matches DMM UI/UX norms.** +- **Ready for:** + - Port to JavaScript/Web (Web Bluetooth API offers similar notification pattern). + - UI/UX work for live digital/graph display. + +*** + +## Known Issues / Next Steps + +- Need more real-world per-model packet captures for corner cases (e.g., rare icons/units). +- Front-end UI/JS integration. +- (Optional) Add project documentation for users on mapping their own meters and sharing JSON configs. +- Possible protocol edge-cases (multi-byte icons, decimal quirks) to handle with future test cases. + +*** + +## Contacts & Info + +- **Lead (handover from):** [Your Name/Username] +- **GitHub/Repo:** [insert when live] +- **Suggest further reading:** + - [Web Bluetooth API docs (MDN)](https://developer.mozilla.org/en-US/docs/Web/API/Web_Bluetooth_API) + - [bleak Python BLE library](https://github.com/hbldh/bleak) + +*** + +**To continue:** +1. Study JSON config and its reference file. +2. Run Python script, flip log levels as needed, test/observe. +3. For web: port BLE scan/notify and decoding tables to JS (all logic is model/config-driven). +4. Add, update, or share meter profiles by modifying JSON. diff --git a/config/zt5b.json b/config/zt5b.json new file mode 100644 index 0000000..1a27a4f --- /dev/null +++ b/config/zt5b.json @@ -0,0 +1,65 @@ +{ + "name": "ZOYI ZT-5B", + "model": "ZT-5B", + "manufacturer": "ZOYI", + "chipset": "BEKEN", + "mac_prefix": "9c:0c:35", + "log_level": "debug", + "gatt": { + "notify_char_uuid": "0000fff4-0000-1000-8000-00805f9b34fb", + "measure_service_uuid": "0000fff0-0000-1000-8000-00805f9b34fb", + "write_service_uuid": "f000ffc0-0451-4000-b000-000000000000", + "write_char_1_uuid": "f000ffc1-0451-4000-b000-000000000000", + "write_char_2_uuid": "f000ffc2-0451-4000-b000-000000000000" + }, + "xorkey": "41217355a2c1327166aa3bd0e2a833142021aabb", + "modes": { + "1b8471b58ca217f666aa": "Auto", + "1b84715d52aa33f144aa": "Diode", + "1b8471b5592ad9fa77aa": "Farads", + "1b8471b5592ad9fa668a": "Frequency", + "1b847155a2619ffc662a": "Temp (C)", + "1b847155a24138fb66ea": "Temp (F)", + "1b847155422436f166aa": "EF", + "1b8471b78ca217f666aa": "Hold" + }, + "digit_table": { + "1111101": 0, "0000101": 1, "1011011": 2, "0011111": 3, + "0100111": 4, "0111110": 5, "1111110": 6, "0010101": 7, + "1111111": 8, "0111111": 9, "1110111": "A", "1001100": "u", + "1101010": "t", "1001110": "o", "1101000": "L", "1111010": "E", + "1110010": "F", "0000000": " ", "0000010": "-" + }, + "icon_regions": { + "default": { "bits": [24, 28, 60, 87], "icon_table": "11" }, + "frequency": { "bits": [24, 28, 60, 87], "icon_table": "freq" }, + "temp (c)": { "bits": [24, 28, 60, 87], "icon_table": "tmpc" } + }, + "icon_tables": { + "11": [ + "LowBattery","Delta","BT","BUZ","HOLD","ºF","ºC","DIODE", + "MAX","MIN","%","AC","F","u(F)","m(F)","n(F)","Hz", + "ohm","K(ohm)","M(ohm)","V","m(V)","DC","A","AUTO", + "?7","u(A)","m(A)","?8","?9","?10","?11" + ], + "freq": [ + "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "Hz", + "", "", "", "", "", "", "", "", "", "", "", "", "", "", "" + ], + "tmpc": [ + "", "", "", "", "", "", "ºC", "", "", "", "", "", "", "", "", "", "", + "", "", "", "", "", "", "", "", "", "", "", "", "", "", "" + ] + }, + "units": { + "frequency": "Hz", + "auto": "", + "diode": "", + "farads": "F", + "temp (c)": "°C", + "temp (f)": "°F", + "ef": "", + "hold": "" + }, + "notes": "Log level, mode and keys use lower case. Adjust log_level to info/warning/debug as desired." +} diff --git a/config/zt5b_minimal.json b/config/zt5b_minimal.json new file mode 100644 index 0000000..df5f949 --- /dev/null +++ b/config/zt5b_minimal.json @@ -0,0 +1,69 @@ +{ + "model": "ZT-5B", + "name": "ZOYI ZT-5B", + "manufacturer": "ZOYI", + "chipset": "BEKEN", + "decoder": "ZT5BDecoder", + "xorkey": "41217355a2c1327166aa3bd0e2a833142021aabb", + "char_uuid": "0000fff4-0000-1000-8000-00805f9b34fb", + "icon_regions": { + "default": [24, 28, 60, 87] + }, + + "icon_table": [ + "icon_battery", "icon_hold", "icon_lightning", "icon_buz", "icon_hold2", "icon_unknown1", "icon_unknown2", "icon_bt", + "icon_F", "icon_V", "icon_DC", "icon_AC", "icon_n", "icon_diode", "icon_mF", "icon_unknown6", "icon_ohm", + "icon_K", "icon_unknown3", "icon_M", "icon_unknown4", "icon_Hz", "icon_degF", "icon_degC", "icon_unknown5", + "icon_q7", "icon_uA", "icon_mA", "icon_q8", "icon_q9", "icon_q10", "icon_q11" + ], + + "descriptions": { + "icon_battery": "Battery", + "icon_hold": "Hold", + "icon_hold2": "Hold", + "icon_lightning": "Lightning", + "icon_buz": "Buzzer", + "icon_bt": "Bluetooth", + "icon_F": "Farad", + "icon_V": "Volt", + "icon_DC": "DC", + "icon_AC": "AC", + "icon_n": "Nano", + "icon_diode": "Diode", + "icon_mF": "MilliFarad", + "icon_ohm": "Ohm", + "icon_K": "Kilo", + "icon_M": "Mega", + "icon_Hz": "Hertz", + "icon_degF": "Fahrenheit", + "icon_degC": "Celsius", + "icon_uA": "MicroAmp", + "icon_mA": "MilliAmp" + }, + + "units": { + "icon_battery": "", + "icon_hold": "", + "icon_hold2": "", + "icon_lightning": "", + "icon_buz": "", + "icon_bt": "", + "icon_F": "F", + "icon_V": "V", + "icon_DC": "DC", + "icon_AC": "AC", + "icon_n": "n", + "icon_diode": "", + "icon_mF": "mF", + "icon_ohm": "Ω", + "icon_K": "k", + "icon_M": "M", + "icon_Hz": "Hz", + "icon_degF": "°F", + "icon_degC": "°C", + "icon_uA": "μA", + "icon_mA": "mA" + }, + + "debug": true +} diff --git a/dmm_decoder.py b/dmm_decoder.py new file mode 100644 index 0000000..0d0b976 --- /dev/null +++ b/dmm_decoder.py @@ -0,0 +1,78 @@ +# dmm_decoder.py + +import json + +class DMMDecoder: + """Base class for DMM screen/binary decoders.""" + def __init__(self, config): + self.xorkey = self.str2hexarray(config["xorkey"]) + self.icon_regions = config["icon_regions"]["default"] + self.icon_table = config["icon_table"] + + @staticmethod + def str2hexarray(string): + string = string.replace(' ', '').lower() + return [int(string[i:i+2], 16) for i in range(0, len(string), 2)] + + @staticmethod + def bytewise_XOR(array, xorkey): + return [array[x] ^ xorkey[x % len(xorkey)] for x in range(len(array))] + + @staticmethod + def hex2bin(array): + return [bin(x)[2:].zfill(8) for x in array] + + @staticmethod + def flip_bits(array): + return [b[::-1] for b in array] + + @staticmethod + def array2str(array): + return ''.join(array) + + def extract_icon_bits(self, bitstring): + bits = self.icon_regions + # Concatenate bit ranges + return bitstring[bits[0]:bits[1]] + bitstring[bits[2]:bits[3]] + + def decode_icons(self, icon_bits): + return [self.icon_table[i] for i, b in enumerate(icon_bits) if b == '1' and i < len(self.icon_table)] + + def decode_packet(self, hexstring): + encoded_array = self.str2hexarray(hexstring) + # pad xorkey if needed + xorkey = (self.xorkey * ((len(encoded_array) // len(self.xorkey)) + 1))[:len(encoded_array)] + xordecoded = self.bytewise_XOR(encoded_array, xorkey) + binary = self.hex2bin(xordecoded) + flipped = self.flip_bits(binary) + bitstring = self.array2str(flipped) + icon_bits = self.extract_icon_bits(bitstring) + icons = self.decode_icons(icon_bits) + return {'icons': icons} + +# Example derived decoder for ZT-5B (can be extended for other models) +class ZT5BDecoder(DMMDecoder): + def __init__(self, config): + super().__init__(config) + # add any ZT-5B-specific setup here if needed + +# --- Usage Example --- +if __name__ == "__main__": + # Minimal config example + config = { + "xorkey": "41217355a2c1327166aa3bd0e2a833142021aabb", + "icon_regions": { + "default": [24, 28, 60, 87] + }, + "icon_table": [ + "LowBattery","Delta","BT","BUZ","HOLD","ºF","ºC","DIODE", + "MAX","MIN","%","AC","F","u(F)","m(F)","n(F)","Hz", + "ohm","K(ohm)","M(ohm)","V","m(V)","DC","A","AUTO", + "?7","u(A)","m(A)","?8","?9","?10","?11" + ] + } + + decoder = ZT5BDecoder(config) + hexstring = "1b847195453ad9fa668a" + result = decoder.decode_packet(hexstring) + print(json.dumps(result, ensure_ascii=False, indent=2)) diff --git a/xxx.py b/xxx.py new file mode 100644 index 0000000..63f0c86 --- /dev/null +++ b/xxx.py @@ -0,0 +1,158 @@ +import asyncio +import datetime +import json +from bleak import BleakClient + +CONFIG_FILE = "zt5b.json" + +with open(CONFIG_FILE, "r") as f: + config = json.load(f) + +DEVICE = "9C:0C:35:03:C2:B7" # Replace with your actual meter's MAC if needed +CHAR_UUID = config["gatt"]["notify_char_uuid"] +LOG_LEVEL = config.get("log_level", "info").lower() + +HEADERS = {k.lower(): v for k, v in config["modes"].items()} +XORKEY_STR = config["xorkey"] +DIGIT_TABLE = config["digit_table"] +ICON_REGIONS = config["icon_regions"] +ICON_TABLES = config["icon_tables"] +UNITS = {k.lower(): v for k, v in config.get("units", {}).items()} + +current_mode = None +mode_code = None +last_display = None + +def get_time(): + return datetime.datetime.now().strftime("%H:%M:%S") + +def str2hexarray(string): + string = string.replace(" ", "").lower() + return [int(string[i:i+2], 16) for i in range(0, len(string), 2)] + +def bytewise_XOR(array, xorkey): + return ['{0:0{1}X}'.format(array[x] ^ xorkey[x % len(xorkey)], 2) for x in range(len(array))] + +def hex2bin(array): + return [bin(int(x,16))[2:].zfill(8) for x in array] + +def flip_bits(array): + return [b[::-1] for b in array] + +def array2str(array): + return "".join(array) + +def display_decoder(string): + digits_orig = DIGIT_TABLE + groups = [string[i:i+8] for i in range(0, len(string), 8)] + number = "" + for x in range(len(groups)): + if x == 0 and groups[0][0] == "1": + number += "-" + if x > 0 and groups[x][0] == "1": + number += "." + val = digits_orig.get(groups[x][1:], " ") + number += str(val) + try: + return float(number) + except Exception: + return number + +def extract_icon_info(bin_str, mode=None, typeID=None): + region = ICON_REGIONS.get(mode, ICON_REGIONS.get(typeID, ICON_REGIONS["default"])) + bits = region["bits"] + icon_bits = bin_str[bits[0]:bits[1]] + bin_str[bits[2]:bits[3]] + table_name = region.get("icon_table", "11") + icon_table = ICON_TABLES[table_name] + icons = [icon_table[i] for i, b in enumerate(icon_bits) if b == '1' and i < len(icon_table)] + return [x for x in icons if x.strip() and x != " "] + +def binary2info(bin_str, mode=None): + typeID = bin_str[16:18] + display_number = display_decoder(bin_str[28:60]) + icons = extract_icon_info(bin_str, mode=mode, typeID=typeID) + value_type = type(display_number).__name__ + return { + "typeID": typeID, + "display": display_number, + "value_type": value_type, + "icons": icons + } + +def decode(hexstring, mode=None): + xorkey = str2hexarray(XORKEY_STR) + encoded_array = str2hexarray(hexstring) + if len(encoded_array) > len(xorkey): + xorkey = (xorkey * ((len(encoded_array) // len(xorkey)) + 1))[:len(encoded_array)] + xordecoded = bytewise_XOR(encoded_array, xorkey) + binary = hex2bin(xordecoded) + flipped = flip_bits(binary) + result = array2str(flipped) + obj = binary2info(result, mode=mode) + return obj + +def log_info(msg): + if LOG_LEVEL in ["info", "debug"]: + print(msg) + +def log_debug(msg): + if LOG_LEVEL == "debug": + print(msg) + +def log_warn(msg): + if LOG_LEVEL in ["warning", "info", "debug"]: + print(f"WARNING: {msg}") + + +# Trackers for mode and last value +current_mode = None +mode_code = None +last_display_per_mode = {} +last_printed_mode = None + +def notif_handler(sender, data): + global current_mode, mode_code, last_printed_mode + hexstr = data.hex().lower() + + # MODE CHANGE: only print on genuine mode switch, not on every packet! + if hexstr in HEADERS: + # Only print mode header if it's a new mode (not repeating) + new_mode = HEADERS[hexstr] + if new_mode != last_printed_mode: + print(f"{new_mode} : {get_time()}") + last_printed_mode = new_mode + current_mode = new_mode + mode_code = hexstr + return + + if current_mode and hexstr != mode_code: + decoded = decode(hexstr, mode=current_mode.lower()) + display = decoded.get("display") + icons = decoded.get("icons") + icon_str = f" [{' '.join(icons)}]" if icons else "" + unit = UNITS.get(current_mode.lower(), "") + unit_str = f" {unit}" if unit else "" + + # Only print if display value has changed for the current mode + last_val = last_display_per_mode.get(current_mode) + if display != last_val and display is not None and str(display).strip(): + print(f"{current_mode} : {display}{icon_str}{unit_str}") + last_display_per_mode[current_mode] = display + + +async def main(): + async with BleakClient(DEVICE) as client: + print("Connected:", client.is_connected) + await client.start_notify(CHAR_UUID, notif_handler) + print("Listening... Ctrl+C to stop/cleanup.") + try: + while True: + await asyncio.sleep(1) + except KeyboardInterrupt: + print("\nInterrupted. Cleaning up...") + finally: + await client.stop_notify(CHAR_UUID) + print("Notifications stopped. Disconnecting...") + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/xxx.txt b/xxx.txt new file mode 100644 index 0000000..e69de29 diff --git a/yyy.py b/yyy.py new file mode 100644 index 0000000..0ebce81 --- /dev/null +++ b/yyy.py @@ -0,0 +1,138 @@ +# main.py + +import asyncio +import json +from dmm_decoder import ZT5BDecoder +from bleak import BleakClient + +CONFIG_FILE = "config/zt5b_minimal.json" +DEVICE = "9C:0C:35:03:C2:B7" + +def load_config(path): + with open(path, "r") as f: + return json.load(f) + +def select_decoder(config): + if config.get("decoder") == "ZT5BDecoder": + return ZT5BDecoder(config) + raise ValueError(f"Unknown decoder specified: {config.get('decoder')}") + +def print_top_debug(config): + summary = { + "model": config.get("model"), + "name": config.get("name"), + "manufacturer": config.get("manufacturer"), + "chipset": config.get("chipset"), + "decoder": config.get("decoder"), + "char_uuid": config.get("char_uuid"), + "debug": config.get("debug"), + "mode_map": config.get("mode_map", {}) + } + print(json.dumps(summary, indent=2, ensure_ascii=False)) + +def compose_description_and_unit(active_indices, icon_table, descriptions, units): + """ + Given active icon indices and config tables, return a display description and full unit string. + """ + icon_keys = [icon_table[i] for i in active_indices] + description = ' '.join([descriptions.get(key, key) for key in icon_keys if descriptions.get(key, key)]) + unit = ''.join([units.get(key, '') for key in icon_keys]) + return description.strip(), unit.strip() + +def debug_trace(hexstring, decoder, config, result): + icon_table = config["icon_table"] + descriptions = config.get("descriptions", {}) + units = config.get("units", {}) + mode_map = config.get("mode_map", {}) + + # Decode internals + encoded_array = decoder.str2hexarray(hexstring) + xorkey = decoder.xorkey + if len(encoded_array) > len(xorkey): + xorkey = (xorkey * ((len(encoded_array) // len(xorkey)) + 1))[:len(encoded_array)] + xordecoded = decoder.bytewise_XOR(encoded_array, xorkey) + binary = decoder.hex2bin(xordecoded) + flipped = decoder.flip_bits(binary) + bitstring = decoder.array2str(flipped) + icon_bits = decoder.extract_icon_bits(bitstring) + + active_indices = [i for i, b in enumerate(icon_bits) if b == '1' and i < len(icon_table)] + active_icons = [icon_table[i] for i in active_indices] + index_icon_table = {str(i): icon_table[i] for i in active_indices} + + mode_key = str(sorted(active_indices)) + mode_label = mode_map.get(mode_key, "unknown") + description, unit = compose_description_and_unit(active_indices, icon_table, descriptions, units) + + debug_block = { + "hexstring": hexstring, + "decoded_bytes": xordecoded, + "full_bitstring": bitstring, + "icon_region_bitmask": icon_bits, + "active_indices": active_indices, + "active_icons": active_icons, + "index_icon_table": index_icon_table, + "mode_key": mode_key, + "mode_label": mode_label, + "description": description, + "unit": unit, + "final_result": result + } + print(json.dumps(debug_block, ensure_ascii=False, indent=2)) + +def notification_handler_factory(decoder, config): + last_icons = {"icons": None} + icon_table = config["icon_table"] + descriptions = config.get("descriptions", {}) + units = config.get("units", {}) + def notif_handler(sender, data): + hexstr = data.hex().lower() + result = decoder.decode_packet(hexstr) + # Build the active_indices again for the output + icon_bits = decoder.extract_icon_bits( + decoder.array2str( + decoder.flip_bits( + decoder.hex2bin( + decoder.bytewise_XOR( + decoder.str2hexarray(hexstr), decoder.xorkey + ) + ) + ) + ) + ) + active_indices = [i for i, b in enumerate(icon_bits) if b == '1' and i < len(icon_table)] + description, unit = compose_description_and_unit(active_indices, icon_table, descriptions, units) + icons = result["icons"] + if icons != last_icons["icons"]: + if config.get("debug", False): + debug_trace(hexstr, decoder, config, result) + else: + output = { + **result, + "description": description, + "unit": unit + } + print(json.dumps(output, ensure_ascii=False, indent=2)) + last_icons["icons"] = icons + return notif_handler + +async def main(): + config = load_config(CONFIG_FILE) + decoder = select_decoder(config) + char_uuid = config.get("char_uuid") + if config.get("debug", True): + print_top_debug(config) + async with BleakClient(DEVICE) as client: + await client.start_notify(char_uuid, notification_handler_factory(decoder, config)) + print("Listening for BLE notifications. Ctrl+C to exit.") + try: + while True: + await asyncio.sleep(1) + except KeyboardInterrupt: + pass + finally: + await client.stop_notify(char_uuid) + print("Stopped.") + +if __name__ == "__main__": + asyncio.run(main())