1. 前言
在日本居住过的朋友,大多知道家里都会安装 智能电表(Smart Meter),这类电表通常内置了 ECHONET Lite 协议和 Wi-SUN 通信功能。通过向电力公司申请 B Route ID 和 B Route密码,我们可以自己动手采集自家实时用电信息,用于做能源监控、家居自动化或者数据可视化。
本文是笔者在家利用 BP35A1 这类 Wi-SUN 模组,通过 B Route 方式从智能电表获取瞬时用电量、定时积算电力量等数据的记录。硬件和流程主要参考了 maky-ba.hatenablog.com 的博文,这里把整个过程梳理一遍,并加入一些自己的实践心得。
2. 硬件与环境准备
2.1 Wi-SUN 通信模组
- BP35A1 或者类似的 Wi-SUN USB Dongle / UART 模组。
- 这是常见的与日本智能电表进行 Wi-SUN 通信的硬件,支持 ECHONET Lite 协议。
- 通过 USB 或者串口(UART) 接口与电脑或树莓派相连。
目前查询都能过之处Wi-SUN协议的模组并不多。BP35A1算是相对比较便宜的一款。我从mercari花费不到1万日元购入。
2.2 B Route密钥和 ID
- 向电力公司申请获取的 B Route ID 与 B Route 密码(Bルート パスワード)。
- 这通常是一串数字和字母组合的 ID 以及密码,用于与自家电表建立加密连接。
- 需要在代码或配置文件中进行设置。
账号密码的申请,参考:https://www.tepco.co.jp/pg/consignment/liberalization/smartmeter-broute.html
基本只需要填写个form表达,3个工作日内会收到邮件通知。随后一周内东京电力会把密码通过普通信件寄到你的住址。
2.3 开发/运行环境
- 主机:PC 或者树莓派(Raspberry Pi 等),安装 Python 3。(之后会介绍如何继承进入home-assistant)
- 操作系统:本文示例在 macOS / Ubuntu / Raspberry Pi OS 上均可运行。
- Python 库:pyserial, dateutil 等,用于串口通信和时间处理。
3. 整体流程与思路
1. 串口连接
用 USB 线将 BP35A1 连接到计算机;若是 UART 接口也需要跳线到树莓派 GPIO 或其他串口。
在系统中识别出对应的串口设备号(macOS 下类似 /dev/tty.usbserial-xxx;树莓派下类似 /dev/ttyAMA0;Windows 下是 COMx)。
2. 设置 B Route
通过 SKSETPWD 指令把 B Route密码设置到模组里。
通过 SKSETRBID 指令把 B Route ID 也设置进去。
3. 扫描并连接智能电表
用 SKSCAN 扫描附近的 Wi-SUN 信道。
解析 EVENT 22 返回的数据,找到电表所在的 Channel、Pan ID、Addr 等信息。
用 SKSREG S2 / SKSREG S3 设置 Channel 和 PAN ID。
调用 SKLL64 … 获取电表的 IPv6 地址。
最后 SKJOIN <IPv6> 发起 PANA 认证连接,成功后会返回 EVENT 25。
4. 请求数据 / 监听通知
发送 SKSENDTO 指令到电表对应的 IPv6 地址,以及需要查询的 ECHONET Lite 帧(比如读取瞬时电力 EPC=0xE7)。
解析从 ERXUDP 返回的数据帧,提取其中的 ECHONET Lite 负载,并逐字节解析 EHD、ESV、EPC 等字段。
5. 数据解析及存储
可将解析到的瞬时电力、定时积算电力量等数据存入数据库(如 InfluxDB)或直接发布到 MQTT,也可以在命令行中直接打印调试输出,实现简易监控。
4. 代码示例
以下展示一个 Python 脚本完整,示例如何完成 BP35A1 初始化连接和发送 ECHONET Lite 请求,解析 ERXUDP 响应的数据。
#! /usr/bin/env python3
import logging
import sys
import time
from datetime import datetime
import serial
from dateutil import tz
# ========== Logging Configuration ==========
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG) # 保留主 Logger 级别为 DEBUG
log_formatter = logging.Formatter(
fmt="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
# FileHandler:记录 DEBUG及以上 到日志文件
file_handler = logging.FileHandler("smart_meter.log", encoding="utf-8")
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(log_formatter)
# StreamHandler:只在控制台输出 INFO及以上
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(log_formatter)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
# タイムゾーン
JST = tz.gettz("Asia/Tokyo")
UTC = tz.gettz("UTC")
# ===============================
# スマートメーターの設定
# ===============================
SLEEP_SEC_FOR_REQUEST = 0 # 发送请求后,两次请求之间的等待时间(秒)
SERIAL_TIMEOUT = 2 # 串口读取超时时间(秒)
# BルートのID/パスワード(要自分の情報に置き換え)
ROUTE_B_ID = "xxxxxxxxxxxx"
ROUTE_B_PWD = "xxxxxxxxxx"
# シリアルデバイス(例)
SERIAL_PORT_DEV = "/dev/tty.usbserial-120" # REPLACE
def waitOK(serial_port):
"""
等到读取到"OK"才结束等待.
如果超时或连续多次空读, 则抛出异常,让上层处理.
"""
empty_count = 0
max_empty_read = 5 # 连续5次空读就放弃
while True:
line = serial_port.readline()
if not line:
empty_count += 1
logger.debug(f"waitOK: empty read count={empty_count}")
if empty_count >= max_empty_read:
raise TimeoutError("waitOK() reached max empty read count.")
continue
if line.startswith(b"OK"):
logger.debug("Received 'OK'")
break
else:
logger.debug(f"waitOK: ignoring line={line}")
def initialize():
"""
初始化串口并执行 ECHONET Lite 连接流程:
1. 设置 B 路由密码和 ID
2. 扫描信道
3. 连接到智能电表
发生超时或多次空读时会抛出异常
"""
logger.info("Initializing serial port...")
serial_port = serial.Serial(
port=SERIAL_PORT_DEV, baudrate=115200, timeout=SERIAL_TIMEOUT
)
logger.info("Serial port opened.")
# Bルートのパスワードを設定
logger.info("Setting B-route password...")
cmd_pwd = f"SKSETPWD C {ROUTE_B_PWD}\r\n"
serial_port.write(cmd_pwd.encode())
waitOK(serial_port)
# BルートのIDを設定
logger.info("Setting B-route ID...")
cmd_id = f"SKSETRBID {ROUTE_B_ID}\r\n"
serial_port.write(cmd_id.encode())
waitOK(serial_port)
# ネットワークのスキャン
logger.info("Scanning channel for Smart Meter...")
scanDuration = 5
scanRes = {}
while "Channel" not in scanRes:
command_str = f"SKSCAN 2 FFFFFFFF {scanDuration}\r\n"
logger.debug(f"Sending scan command: {command_str.strip()}")
serial_port.write(command_str.encode())
scanEnd = False
while not scanEnd:
raw_line = serial_port.readline()
if not raw_line:
logger.debug("Empty read while scanning channel...")
continue
if raw_line.startswith(b"EVENT 22"):
# スキャン終了
scanEnd = True
elif raw_line.startswith(b" "):
try:
line_decoded = raw_line.decode("utf-8", errors="ignore")
cols = line_decoded.strip().split(":")
if len(cols) == 2:
scanRes[cols[0]] = cols[1]
except Exception as e:
logger.warning(f"scan parse error: {e}")
scanDuration += 1
if scanDuration > 14 and "Channel" not in scanRes:
logger.error("Could not find valid channel within scan duration.")
sys.exit()
logger.info(f"Channel found: {scanRes['Channel']}")
logger.info(f"Pan ID found: {scanRes['Pan ID']}")
logger.info("Setting channel and Pan ID...")
# (スキャンで取得した)チャネルを設定
cmd_s2 = f"SKSREG S2 {scanRes['Channel']}\r\n"
serial_port.write(cmd_s2.encode())
waitOK(serial_port)
# (スキャンで取得した)PAN IDを設定
cmd_s3 = f"SKSREG S3 {scanRes['Pan ID']}\r\n"
serial_port.write(cmd_s3.encode())
waitOK(serial_port)
# IPv6アドレスを取得
logger.info("Getting IPv6 address...")
cmd_addr = f"SKLL64 {scanRes['Addr']}\r\n"
serial_port.write(cmd_addr.encode())
# 先读一行(可能空)
serial_port.readline()
ipv6Addr = serial_port.readline().decode("utf-8", errors="ignore").strip()
logger.info(f"IPv6 address: {ipv6Addr}")
# PANA認証
logger.info("Starting PANA authentication...")
cmd_join = f"SKJOIN {ipv6Addr}\r\n"
serial_port.write(cmd_join.encode())
waitOK(serial_port)
bConnected = False
while not bConnected:
raw_line = serial_port.readline()
if not raw_line:
logger.debug("Empty read while waiting for PANA EVENT 25...")
continue
if raw_line.startswith(b"EVENT 24"):
logger.error("PANA authentication failed. (EVENT 24)")
sys.exit(1)
elif raw_line.startswith(b"EVENT 25"):
logger.info("PANA authentication success. (EVENT 25)")
bConnected = True
return serial_port, ipv6Addr
def parse_echonet_lite_frame(echonet_bytes):
"""
根据 ECHONET Lite 协议格式解析一帧的关键字段,并返回一个 dict.
"""
frame_info = {}
if len(echonet_bytes) < 12:
return frame_info # 长度不够
EHD = echonet_bytes[0:2] # b"\x10\x81"
TID = echonet_bytes[2:4]
SEOJ = echonet_bytes[4:7]
DEOJ = echonet_bytes[7:10]
ESV = echonet_bytes[10]
OPC = echonet_bytes[11]
frame_info["EHD"] = EHD
frame_info["TID"] = TID
frame_info["SEOJ"] = SEOJ
frame_info["DEOJ"] = DEOJ
frame_info["ESV"] = ESV
frame_info["OPC"] = OPC
frame_info["properties"] = []
offset = 12
for _ in range(OPC):
if offset + 2 > len(echonet_bytes):
break
EPC = echonet_bytes[offset]
PDC = echonet_bytes[offset + 1]
offset += 2
if offset + PDC > len(echonet_bytes):
break
EDT = echonet_bytes[offset : offset + PDC]
offset += PDC
frame_info["properties"].append((EPC, PDC, EDT))
return frame_info
def parse_and_print_properties(frame):
"""
针对常见 EPC(E7: 瞬時電力, E8: 瞬時電流, E9: 瞬時電圧, EA: 正方向累積, EB: 逆方向累積)
做示例级解析。实际单位/字节数需要参考电表说明。
"""
ESV = frame.get("ESV")
props = frame.get("properties", [])
# 0x72: 对 Read Request 的应答
# 0x73: 设备主动上报(通知)
# 这里统一处理,只要遇到相应 EPC 就解析。
for EPC, PDC, EDT in props:
if EPC == 0xE7 and PDC == 4:
# 瞬时電力
val = int.from_bytes(EDT, byteorder="big", signed=False)
# 若最高位表示正负,需要特殊处理
if (val >> 31) & 0x01 == 1:
val = (val ^ 0xFFFFFFFF) * (-1) - 1
logger.info(f"[E7] Instantaneous Power = {val} W")
elif EPC == 0xE8:
# 瞬時電流
# 常见家庭(单相/二相)有时返回4字节(2+2),具体看电表手册
if PDC == 4:
i1 = int.from_bytes(EDT[0:2], "big", signed=False)
i2 = int.from_bytes(EDT[2:4], "big", signed=False)
# 假设0.1 A计量
logger.info(
"[E8] Instantaneous Current: "
f"Phase1={i1/10:.1f} A, Phase2={i2/10:.1f} A"
)
else:
logger.info(f"[E8] Unexpected current format. PDC={PDC}, raw={EDT}")
elif EPC == 0xE9:
# 瞬時電圧
if PDC == 4:
v1 = int.from_bytes(EDT[0:2], "big", signed=False)
v2 = int.from_bytes(EDT[2:4], "big", signed=False)
logger.info(f"[E9] Instantaneous Voltage: Phase1={v1} V, Phase2={v2} V")
elif PDC == 0:
logger.info(
"[E9] No voltage data returned (PDC=0). The meter may not support E9."
)
else:
logger.info(f"[E9] Unexpected voltage format. PDC={PDC}, raw={EDT}")
elif EPC in [0xEA, 0xEB] and PDC >= 10:
# 定時積算電力量 (EA:正方向, EB:逆方向)
year = int.from_bytes(EDT[0:2], "big")
month = EDT[2]
day = EDT[3]
hour = EDT[4]
minute = EDT[5]
second = EDT[6]
energy_raw = int.from_bytes(EDT[7:11], "big", signed=False)
# 假设 0.1 kWh /count
energy_val = energy_raw / 10
dt_str = "InvalidDate"
try:
dt_jst = datetime(year, month, day, hour, minute, second, tzinfo=JST)
dt_utc = dt_jst.astimezone(UTC)
dt_str = dt_utc.replace(tzinfo=None).isoformat()
except ValueError:
logger.warning("Could not parse EA/EB date/time in the property EDT.")
direction = "Forward" if EPC == 0xEA else "Reverse"
logger.info(
f"[E{EPC:02X}] Cumulative {direction} Energy = {energy_val:.1f} kWh, Time={dt_str}"
)
else:
logger.debug(f"Unhandled EPC=0x{EPC:02X}, PDC={PDC}, EDT={EDT}")
def main_loop():
"""
主循环:
1. 初始化并连接智能电表
2. 发送Read Request(一次性读取E7/E8/E9/EA/EB)
3. 解析返回, 如空读过多或无响应则重新初始化
"""
serial_port, ipv6Addr = initialize()
logger.info("Successfully initialized and connected to Smart Meter.")
# 一次性请求5个 EPC: E7/E8/E9/EA/EB
# ESV=0x62=读请求, OPC=5
echonetLiteFrame = b"\x10\x81" # EHD
echonetLiteFrame += b"\x00\x01" # TID (随意编号)
echonetLiteFrame += b"\x05\xFF\x01" # SEOJ(控制器)
echonetLiteFrame += b"\x02\x88\x01" # DEOJ(低压スマートメーター对象)
echonetLiteFrame += b"\x62" # ESV=0x62=Read Request
echonetLiteFrame += b"\x05" # OPC=5(请求5个属性)
# 依次请求 E7(瞬时电力), E8(瞬时电流), E9(瞬时电压), EA(正向积算), EB(逆向积算)
for epc_code in [0xE7, 0xE8, 0xE9, 0xEA, 0xEB]:
echonetLiteFrame += epc_code.to_bytes(1, "big") # EPC
echonetLiteFrame += b"\x00" # PDC=0 (无附加数据)
cmd_payload_len = len(echonetLiteFrame)
command = (
f"SKSENDTO 1 {ipv6Addr} 0E1A 1 {cmd_payload_len:04X} ".encode()
+ echonetLiteFrame
+ b"\r\n"
)
serial_port.timeout = SERIAL_TIMEOUT
sleep_sec = SLEEP_SEC_FOR_REQUEST
not_res_flag = False
not_res_count = 0
while True:
# 如果上一次无响应, 计数+1; 否则清零
if not_res_flag:
not_res_count += 1
logger.warning(f"No response detected, not_res_count={not_res_count}")
not_res_flag = False
else:
not_res_count = 0
if not_res_count >= 5:
logger.warning(
"Too many no-responses. Re-initializing serial connection..."
)
break
logger.debug(f"Sending command to Smart Meter: {command}")
serial_port.write(command)
max_line_read = 12 # 本次请求后, 最多读12行就放弃
empty_count = 0
for line_idx in range(max_line_read):
raw_line = serial_port.readline()
if not raw_line:
empty_count += 1
logger.debug(f"Line#{line_idx+1} empty. empty_count={empty_count}")
# 连续3行空, 视为无响应
if empty_count >= 3:
not_res_flag = True
break
continue
logger.debug(f"Raw line {line_idx+1}: {raw_line}")
line_decoded = raw_line.decode("utf-8", errors="ignore").strip()
logger.debug(f"Decoded line {line_idx+1}: {line_decoded}")
if line_decoded.startswith("SKSENDTO"):
pass
elif line_decoded.startswith("EVENT 21"):
pass
elif line_decoded.startswith("OK"):
pass
elif line_decoded.startswith("ERXUDP"):
tokens = raw_line.split(b" ", 8)
if len(tokens) < 9:
logger.warning("ERXUDP line format unexpected.")
break
# 第9个token(索引8)是 ECHONET Lite 原始帧
echonet_payload = tokens[8].rstrip(b"\r\n")
frame_data = parse_echonet_lite_frame(echonet_payload)
parse_and_print_properties(frame_data)
break
elif b"EVENT 29" in raw_line:
logger.warning("Session lifetime expired. Trying to re-connect...")
# 等待 EVENT 25 重新连上
while True:
sub_line = serial_port.readline()
if not sub_line:
logger.debug("Empty read while re-connecting session...")
continue
if b"EVENT 25" in sub_line:
logger.info("Session re-connected (EVENT 25).")
break
time.sleep(sleep_sec)
sleep_sec = SLEEP_SEC_FOR_REQUEST
def run_forever():
"""
整体外层循环,如果出现需要重启连接的情况,就等待一会儿再重来
"""
while True:
try:
main_loop()
except TimeoutError as e:
logger.error(f"Timeout in initialize or waitOK: {e}")
except Exception as e:
logger.error(f"Unhandled exception in main_loop: {type(e).__name__}: {e}")
logger.debug("Sleeping 30s before re-initializing...")
time.sleep(30)
if __name__ == "__main__":
try:
run_forever()
except KeyboardInterrupt:
logger.info("KeyboardInterrupt received, exiting script...")
sys.exit(0)
以上是完整本人修改测试成功的完整代码。运行结果如下图所示:
5. 集成home-assistant
参考这个repo进行安装https://github.com/yufeikang/b-route-meter