Hardware / IOT / SmartHome · 2024-12-22 0

如何使用BP35A1获取电表数据

1. 前言

在日本居住过的朋友,大多知道家里都会安装 智能电表(Smart Meter),这类电表通常内置了 ECHONET Lite 协议和 Wi-SUN 通信功能。通过向电力公司申请 B Route ID 和 B Route密码,我们可以自己动手采集自家实时用电信息,用于做能源监控、家居自动化或者数据可视化。

本文是笔者在家利用 BP35A1 这类 Wi-SUN 模组,通过 B Route 方式从智能电表获取瞬时用电量、定时积算电力量等数据的记录。硬件和流程主要参考了 maky-ba.hatenablog.com 的博文,这里把整个过程梳理一遍,并加入一些自己的实践心得。

2. 硬件与环境准备

2.1 Wi-SUN 通信模组

  • BP35A1 或者类似的 Wi-SUN USB Dongle / UART 模组。
  • 这是常见的与日本智能电表进行 Wi-SUN 通信的硬件,支持 ECHONET Lite 协议。
  • 通过 USB 或者串口(UART) 接口与电脑或树莓派相连。

目前查询都能过之处Wi-SUN协议的模组并不多。BP35A1算是相对比较便宜的一款。我从mercari花费不到1万日元购入。

2.2 B Route密钥和 ID

  • 向电力公司申请获取的 B Route ID 与 B Route 密码(Bルート パスワード)。
  • 这通常是一串数字和字母组合的 ID 以及密码,用于与自家电表建立加密连接。
  • 需要在代码或配置文件中进行设置。

账号密码的申请,参考:https://www.tepco.co.jp/pg/consignment/liberalization/smartmeter-broute.html

基本只需要填写个form表达,3个工作日内会收到邮件通知。随后一周内东京电力会把密码通过普通信件寄到你的住址。

2.3 开发/运行环境

  • 主机:PC 或者树莓派(Raspberry Pi 等),安装 Python 3。(之后会介绍如何继承进入home-assistant)
  • 操作系统:本文示例在 macOS / Ubuntu / Raspberry Pi OS 上均可运行。
  • Python 库:pyserial, dateutil 等,用于串口通信和时间处理。

3. 整体流程与思路

1. 串口连接

用 USB 线将 BP35A1 连接到计算机;若是 UART 接口也需要跳线到树莓派 GPIO 或其他串口。

在系统中识别出对应的串口设备号(macOS 下类似 /dev/tty.usbserial-xxx;树莓派下类似 /dev/ttyAMA0;Windows 下是 COMx)。

2. 设置 B Route

通过 SKSETPWD 指令把 B Route密码设置到模组里。

通过 SKSETRBID 指令把 B Route ID 也设置进去。

3. 扫描并连接智能电表

用 SKSCAN 扫描附近的 Wi-SUN 信道。

解析 EVENT 22 返回的数据,找到电表所在的 Channel、Pan ID、Addr 等信息。

用 SKSREG S2 / SKSREG S3 设置 Channel 和 PAN ID。

调用 SKLL64 … 获取电表的 IPv6 地址。

最后 SKJOIN <IPv6> 发起 PANA 认证连接,成功后会返回 EVENT 25。

4. 请求数据 / 监听通知

发送 SKSENDTO 指令到电表对应的 IPv6 地址,以及需要查询的 ECHONET Lite 帧(比如读取瞬时电力 EPC=0xE7)。

解析从 ERXUDP 返回的数据帧,提取其中的 ECHONET Lite 负载,并逐字节解析 EHD、ESV、EPC 等字段。

5. 数据解析及存储

可将解析到的瞬时电力、定时积算电力量等数据存入数据库(如 InfluxDB)或直接发布到 MQTT,也可以在命令行中直接打印调试输出,实现简易监控。

4. 代码示例

以下展示一个 Python 脚本完整,示例如何完成 BP35A1 初始化连接和发送 ECHONET Lite 请求,解析 ERXUDP 响应的数据。

#! /usr/bin/env python3

import logging
import sys
import time
from datetime import datetime

import serial
from dateutil import tz

# ========== Logging Configuration ==========
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)  # 保留主 Logger 级别为 DEBUG

log_formatter = logging.Formatter(
    fmt="%(asctime)s [%(levelname)s] %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)

# FileHandler:记录 DEBUG及以上 到日志文件
file_handler = logging.FileHandler("smart_meter.log", encoding="utf-8")
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(log_formatter)

# StreamHandler:只在控制台输出 INFO及以上
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(log_formatter)

logger.addHandler(file_handler)
logger.addHandler(console_handler)

# タイムゾーン
JST = tz.gettz("Asia/Tokyo")
UTC = tz.gettz("UTC")


# ===============================
# スマートメーターの設定
# ===============================
SLEEP_SEC_FOR_REQUEST = 0  # 发送请求后,两次请求之间的等待时间(秒)
SERIAL_TIMEOUT = 2  # 串口读取超时时间(秒)

# BルートのID/パスワード(要自分の情報に置き換え)
ROUTE_B_ID = "xxxxxxxxxxxx"
ROUTE_B_PWD = "xxxxxxxxxx"

# シリアルデバイス(例)
SERIAL_PORT_DEV = "/dev/tty.usbserial-120"      # REPLACE


def waitOK(serial_port):
    """
    等到读取到"OK"才结束等待.
    如果超时或连续多次空读, 则抛出异常,让上层处理.
    """
    empty_count = 0
    max_empty_read = 5  # 连续5次空读就放弃

    while True:
        line = serial_port.readline()
        if not line:
            empty_count += 1
            logger.debug(f"waitOK: empty read count={empty_count}")
            if empty_count >= max_empty_read:
                raise TimeoutError("waitOK() reached max empty read count.")
            continue

        if line.startswith(b"OK"):
            logger.debug("Received 'OK'")
            break
        else:
            logger.debug(f"waitOK: ignoring line={line}")


def initialize():
    """
    初始化串口并执行 ECHONET Lite 连接流程:
      1. 设置 B 路由密码和 ID
      2. 扫描信道
      3. 连接到智能电表
    发生超时或多次空读时会抛出异常
    """
    logger.info("Initializing serial port...")
    serial_port = serial.Serial(
        port=SERIAL_PORT_DEV, baudrate=115200, timeout=SERIAL_TIMEOUT
    )
    logger.info("Serial port opened.")

    # Bルートのパスワードを設定
    logger.info("Setting B-route password...")
    cmd_pwd = f"SKSETPWD C {ROUTE_B_PWD}\r\n"
    serial_port.write(cmd_pwd.encode())
    waitOK(serial_port)

    # BルートのIDを設定
    logger.info("Setting B-route ID...")
    cmd_id = f"SKSETRBID {ROUTE_B_ID}\r\n"
    serial_port.write(cmd_id.encode())
    waitOK(serial_port)

    # ネットワークのスキャン
    logger.info("Scanning channel for Smart Meter...")
    scanDuration = 5
    scanRes = {}
    while "Channel" not in scanRes:
        command_str = f"SKSCAN 2 FFFFFFFF {scanDuration}\r\n"
        logger.debug(f"Sending scan command: {command_str.strip()}")
        serial_port.write(command_str.encode())

        scanEnd = False
        while not scanEnd:
            raw_line = serial_port.readline()
            if not raw_line:
                logger.debug("Empty read while scanning channel...")
                continue

            if raw_line.startswith(b"EVENT 22"):
                # スキャン終了
                scanEnd = True
            elif raw_line.startswith(b"  "):
                try:
                    line_decoded = raw_line.decode("utf-8", errors="ignore")
                    cols = line_decoded.strip().split(":")
                    if len(cols) == 2:
                        scanRes[cols[0]] = cols[1]
                except Exception as e:
                    logger.warning(f"scan parse error: {e}")

        scanDuration += 1
        if scanDuration > 14 and "Channel" not in scanRes:
            logger.error("Could not find valid channel within scan duration.")
            sys.exit()

    logger.info(f"Channel found: {scanRes['Channel']}")
    logger.info(f"Pan ID found: {scanRes['Pan ID']}")
    logger.info("Setting channel and Pan ID...")

    # (スキャンで取得した)チャネルを設定
    cmd_s2 = f"SKSREG S2 {scanRes['Channel']}\r\n"
    serial_port.write(cmd_s2.encode())
    waitOK(serial_port)

    # (スキャンで取得した)PAN IDを設定
    cmd_s3 = f"SKSREG S3 {scanRes['Pan ID']}\r\n"
    serial_port.write(cmd_s3.encode())
    waitOK(serial_port)

    # IPv6アドレスを取得
    logger.info("Getting IPv6 address...")
    cmd_addr = f"SKLL64 {scanRes['Addr']}\r\n"
    serial_port.write(cmd_addr.encode())
    # 先读一行(可能空)
    serial_port.readline()
    ipv6Addr = serial_port.readline().decode("utf-8", errors="ignore").strip()
    logger.info(f"IPv6 address: {ipv6Addr}")

    # PANA認証
    logger.info("Starting PANA authentication...")
    cmd_join = f"SKJOIN {ipv6Addr}\r\n"
    serial_port.write(cmd_join.encode())
    waitOK(serial_port)

    bConnected = False
    while not bConnected:
        raw_line = serial_port.readline()
        if not raw_line:
            logger.debug("Empty read while waiting for PANA EVENT 25...")
            continue

        if raw_line.startswith(b"EVENT 24"):
            logger.error("PANA authentication failed. (EVENT 24)")
            sys.exit(1)
        elif raw_line.startswith(b"EVENT 25"):
            logger.info("PANA authentication success. (EVENT 25)")
            bConnected = True

    return serial_port, ipv6Addr


def parse_echonet_lite_frame(echonet_bytes):
    """
    根据 ECHONET Lite 协议格式解析一帧的关键字段,并返回一个 dict.
    """
    frame_info = {}
    if len(echonet_bytes) < 12:
        return frame_info  # 长度不够

    EHD = echonet_bytes[0:2]  # b"\x10\x81"
    TID = echonet_bytes[2:4]
    SEOJ = echonet_bytes[4:7]
    DEOJ = echonet_bytes[7:10]
    ESV = echonet_bytes[10]
    OPC = echonet_bytes[11]

    frame_info["EHD"] = EHD
    frame_info["TID"] = TID
    frame_info["SEOJ"] = SEOJ
    frame_info["DEOJ"] = DEOJ
    frame_info["ESV"] = ESV
    frame_info["OPC"] = OPC
    frame_info["properties"] = []

    offset = 12
    for _ in range(OPC):
        if offset + 2 > len(echonet_bytes):
            break
        EPC = echonet_bytes[offset]
        PDC = echonet_bytes[offset + 1]
        offset += 2
        if offset + PDC > len(echonet_bytes):
            break
        EDT = echonet_bytes[offset : offset + PDC]
        offset += PDC
        frame_info["properties"].append((EPC, PDC, EDT))

    return frame_info


def parse_and_print_properties(frame):
    """
    针对常见 EPC(E7: 瞬時電力, E8: 瞬時電流, E9: 瞬時電圧, EA: 正方向累積, EB: 逆方向累積)
    做示例级解析。实际单位/字节数需要参考电表说明。
    """
    ESV = frame.get("ESV")
    props = frame.get("properties", [])

    # 0x72: 对 Read Request 的应答
    # 0x73: 设备主动上报(通知)
    # 这里统一处理,只要遇到相应 EPC 就解析。
    for EPC, PDC, EDT in props:
        if EPC == 0xE7 and PDC == 4:
            # 瞬时電力
            val = int.from_bytes(EDT, byteorder="big", signed=False)
            # 若最高位表示正负,需要特殊处理
            if (val >> 31) & 0x01 == 1:
                val = (val ^ 0xFFFFFFFF) * (-1) - 1
            logger.info(f"[E7] Instantaneous Power = {val} W")

        elif EPC == 0xE8:
            # 瞬時電流
            # 常见家庭(单相/二相)有时返回4字节(2+2),具体看电表手册
            if PDC == 4:
                i1 = int.from_bytes(EDT[0:2], "big", signed=False)
                i2 = int.from_bytes(EDT[2:4], "big", signed=False)
                # 假设0.1 A计量
                logger.info(
                    "[E8] Instantaneous Current: "
                    f"Phase1={i1/10:.1f} A, Phase2={i2/10:.1f} A"
                )
            else:
                logger.info(f"[E8] Unexpected current format. PDC={PDC}, raw={EDT}")

        elif EPC == 0xE9:
            # 瞬時電圧
            if PDC == 4:
                v1 = int.from_bytes(EDT[0:2], "big", signed=False)
                v2 = int.from_bytes(EDT[2:4], "big", signed=False)
                logger.info(f"[E9] Instantaneous Voltage: Phase1={v1} V, Phase2={v2} V")
            elif PDC == 0:
                logger.info(
                    "[E9] No voltage data returned (PDC=0). The meter may not support E9."
                )
            else:
                logger.info(f"[E9] Unexpected voltage format. PDC={PDC}, raw={EDT}")

        elif EPC in [0xEA, 0xEB] and PDC >= 10:
            # 定時積算電力量 (EA:正方向, EB:逆方向)
            year = int.from_bytes(EDT[0:2], "big")
            month = EDT[2]
            day = EDT[3]
            hour = EDT[4]
            minute = EDT[5]
            second = EDT[6]
            energy_raw = int.from_bytes(EDT[7:11], "big", signed=False)
            # 假设 0.1 kWh /count
            energy_val = energy_raw / 10

            dt_str = "InvalidDate"
            try:
                dt_jst = datetime(year, month, day, hour, minute, second, tzinfo=JST)
                dt_utc = dt_jst.astimezone(UTC)
                dt_str = dt_utc.replace(tzinfo=None).isoformat()
            except ValueError:
                logger.warning("Could not parse EA/EB date/time in the property EDT.")

            direction = "Forward" if EPC == 0xEA else "Reverse"
            logger.info(
                f"[E{EPC:02X}] Cumulative {direction} Energy = {energy_val:.1f} kWh, Time={dt_str}"
            )

        else:
            logger.debug(f"Unhandled EPC=0x{EPC:02X}, PDC={PDC}, EDT={EDT}")


def main_loop():
    """
    主循环:
      1. 初始化并连接智能电表
      2. 发送Read Request(一次性读取E7/E8/E9/EA/EB)
      3. 解析返回, 如空读过多或无响应则重新初始化
    """
    serial_port, ipv6Addr = initialize()
    logger.info("Successfully initialized and connected to Smart Meter.")

    # 一次性请求5个 EPC: E7/E8/E9/EA/EB
    # ESV=0x62=读请求, OPC=5
    echonetLiteFrame = b"\x10\x81"  # EHD
    echonetLiteFrame += b"\x00\x01"  # TID (随意编号)
    echonetLiteFrame += b"\x05\xFF\x01"  # SEOJ(控制器)
    echonetLiteFrame += b"\x02\x88\x01"  # DEOJ(低压スマートメーター对象)
    echonetLiteFrame += b"\x62"  # ESV=0x62=Read Request
    echonetLiteFrame += b"\x05"  # OPC=5(请求5个属性)

    # 依次请求 E7(瞬时电力), E8(瞬时电流), E9(瞬时电压), EA(正向积算), EB(逆向积算)
    for epc_code in [0xE7, 0xE8, 0xE9, 0xEA, 0xEB]:
        echonetLiteFrame += epc_code.to_bytes(1, "big")  # EPC
        echonetLiteFrame += b"\x00"  # PDC=0 (无附加数据)

    cmd_payload_len = len(echonetLiteFrame)
    command = (
        f"SKSENDTO 1 {ipv6Addr} 0E1A 1 {cmd_payload_len:04X} ".encode()
        + echonetLiteFrame
        + b"\r\n"
    )

    serial_port.timeout = SERIAL_TIMEOUT
    sleep_sec = SLEEP_SEC_FOR_REQUEST
    not_res_flag = False
    not_res_count = 0

    while True:
        # 如果上一次无响应, 计数+1; 否则清零
        if not_res_flag:
            not_res_count += 1
            logger.warning(f"No response detected, not_res_count={not_res_count}")
            not_res_flag = False
        else:
            not_res_count = 0

        if not_res_count >= 5:
            logger.warning(
                "Too many no-responses. Re-initializing serial connection..."
            )
            break

        logger.debug(f"Sending command to Smart Meter: {command}")
        serial_port.write(command)

        max_line_read = 12  # 本次请求后, 最多读12行就放弃
        empty_count = 0

        for line_idx in range(max_line_read):
            raw_line = serial_port.readline()
            if not raw_line:
                empty_count += 1
                logger.debug(f"Line#{line_idx+1} empty. empty_count={empty_count}")
                # 连续3行空, 视为无响应
                if empty_count >= 3:
                    not_res_flag = True
                    break
                continue

            logger.debug(f"Raw line {line_idx+1}: {raw_line}")
            line_decoded = raw_line.decode("utf-8", errors="ignore").strip()
            logger.debug(f"Decoded line {line_idx+1}: {line_decoded}")

            if line_decoded.startswith("SKSENDTO"):
                pass
            elif line_decoded.startswith("EVENT 21"):
                pass
            elif line_decoded.startswith("OK"):
                pass
            elif line_decoded.startswith("ERXUDP"):
                tokens = raw_line.split(b" ", 8)
                if len(tokens) < 9:
                    logger.warning("ERXUDP line format unexpected.")
                    break

                # 第9个token(索引8)是 ECHONET Lite 原始帧
                echonet_payload = tokens[8].rstrip(b"\r\n")
                frame_data = parse_echonet_lite_frame(echonet_payload)
                parse_and_print_properties(frame_data)
                break

            elif b"EVENT 29" in raw_line:
                logger.warning("Session lifetime expired. Trying to re-connect...")
                # 等待 EVENT 25 重新连上
                while True:
                    sub_line = serial_port.readline()
                    if not sub_line:
                        logger.debug("Empty read while re-connecting session...")
                        continue
                    if b"EVENT 25" in sub_line:
                        logger.info("Session re-connected (EVENT 25).")
                        break

        time.sleep(sleep_sec)
        sleep_sec = SLEEP_SEC_FOR_REQUEST


def run_forever():
    """
    整体外层循环,如果出现需要重启连接的情况,就等待一会儿再重来
    """
    while True:
        try:
            main_loop()
        except TimeoutError as e:
            logger.error(f"Timeout in initialize or waitOK: {e}")
        except Exception as e:
            logger.error(f"Unhandled exception in main_loop: {type(e).__name__}: {e}")

        logger.debug("Sleeping 30s before re-initializing...")
        time.sleep(30)


if __name__ == "__main__":
    try:
        run_forever()
    except KeyboardInterrupt:
        logger.info("KeyboardInterrupt received, exiting script...")
        sys.exit(0)

以上是完整本人修改测试成功的完整代码。运行结果如下图所示:

5. 集成home-assistant

参考这个repo进行安装https://github.com/yufeikang/b-route-meter