【工业树莓派CM0 Dev Board】Home Assistant 物联网智能家居终端

本文介绍了树莓派 CM0 Dev Board 的实现 Home Assistant 物联网智能家居终端的项目设计,包括准备工作、环境搭建、驱动传感器、MQTT 消息上传、流程图、关键代码以及效果演示等。

项目介绍

  • 准备工作:包括所需 Python 环境、软件包的安装部署、EMQX 服务器搭建、Home Assistant 部署等;
  • 传感器驱动:通过 smbus 库驱动 IIC 协议的 AHT10 温湿度传感器模块,获取环境温湿度数据;
  • MQTT 消息上传:结合板载 WiFi 无线网络通信功能,实现 MQTT 消息发送、HA 终端显示等;
  • Home Assistant 智能家居平台:平台登录、MQTT 配置和连接、传感器卡片添加、APP 连接等。

MQTT

消息队列遥测传输协议(Message Queuing Telemetry Transport,MQTT)是一种基于发布/订阅模式的轻量级通讯协议,广泛应用于物联网领域,特别是在带宽低、网络延迟高、网络通信不稳定的环境。

在这里插入图片描述

MQTT协议中定义了三种角色:发布者(Publish)、代理(Broker)和订阅者(Subscribe)。其中,代理是MQTT服务器,负责接收、存储和转发消息。

这里使用 EMQX 作为 MQTT 服务器。

Home Assistant

Home Assistant 是一款开源的智能家居物联网平台,专注于本地控制和隐私保护;

可集成多种智能设备和服务,将不同品牌、不同协议的设备统一接入并自由联动;

为用户提供全面的家庭环境监控和自动化控制,提升生活便利性、安全性和节能效果。

在这里插入图片描述

详见:Home Assistant .

准备工作

系统安装及环境搭建详见:【工业树莓派CM0 Dev Board】介绍、镜像烧录、系统测试 .

硬件连接

  • 若采用 SSH 远程登录操作,则仅需连接电源供电即可;
  • 若采用本地登录,则需连接 HDMI 视频流传输线、USB 键盘连接线等;
  • 连接 AHT10 传感器模块,其中 SDA 和 SCL 引脚分别连接板载 40pin 排针的 3 号和 5 号引脚;

在这里插入图片描述

  • AHT10 模块接线方式如下
AHT10 模块 树莓派 CM0 (40Pin GPIO)
VCC 3.3V
GND GND
SDA GPIO2 (SDA)
SCL GPIO3 (SCL)
引脚定义

板载 40pin GPIO 排针序号

在这里插入图片描述

IIC 引脚定义

在这里插入图片描述

详见:I2C - 树莓派引脚定义导航站 .

库安装

  • 若要实现传感器模块驱动,需安装 RPi.GPIOsmbus 库;

  • 终端执行如下代码

sudo apt-get update
sudo apt-get install python3-smbus
sudo apt-get install python3-RPi.GPIO
  • 完成安装

在这里插入图片描述

  • 终端执行 i2cdetect -y 1 检测已连接的 IIC 设备,获得 IIC 设备地址,如 0x38 .

传感器驱动

使用精度较高的 AHT10 温湿度传感器模块。

代码

终端执行 touch aht10_print.py 指令新建文件,并 nano aht10_print.py 添加如下代码

#!/usr/bin/env python3
import smbus
import time

# AHT10配置
AHT10_ADDR = 0x38
bus = smbus.SMBus(1)

def aht10_init():
    """初始化传感器"""
    try:
        bus.write_i2c_block_data(AHT10_ADDR, 0xE1, [0x08, 0x00])
        time.sleep(0.01)
        return True
    except:
        return False

def aht10_read():
    """读取温湿度数据"""
    try:
        # 触发测量
        bus.write_i2c_block_data(AHT10_ADDR, 0xAC, [0x33, 0x00])
        time.sleep(0.08)
        
        # 读取数据
        data = bus.read_i2c_block_data(AHT10_ADDR, 0x00, 6)
        
        # 检查状态
        if data[0] & 0x80:
            return None, None
        
        # 转换数据
        hum_raw = (data[1] << 12) | (data[2] << 4) | (data[3] >> 4)
        temp_raw = ((data[3] & 0x0F) << 16) | (data[4] << 8) | data[5]
        
        humidity = (hum_raw * 100.0) / 1048576.0
        temperature = (temp_raw * 200.0) / 1048576.0 - 50.0
        
        return round(temperature, 1), round(humidity, 1)
        
    except Exception as e:
        print(f"错误: {e}")
        return None, None

# 主程序
if aht10_init():
    print("AHT10初始化成功!")
    print("开始读取温湿度数据...")
    
    try:
        while True:
            temp, hum = aht10_read()
            if temp is not None and hum is not None:
                print(f"温度: {temp}°C  湿度: {hum}%")
            else:
                print("读取失败")
            time.sleep(2)
    except KeyboardInterrupt:
        print("\n程序结束")
else:
    print("AHT10初始化失败,请检查连接")

保存代码。

效果

  • 终端执行 i2cdetect -y 1 指令,检索 AHT10 传感器设备,识别对应的 IIC 设备地址 0x38
  • 执行指令 python aht10_print.py 打印传感器数据

在这里插入图片描述

Home Assistant

介绍了树莓派 CM0 结合 AHT10 传感器采集环境温湿度,并将数据通过 MQTT 协议上传至 Home Assistant 平台,实现工业物联网温湿度监控的项目设计流程。

环境搭建

包括 Docker 容器安装、HA 和 EMQX 部署等。

Docker
  • 电脑主机或服务器下载并安装 Docker Desktop 软件;
  • 可更换或添加镜像源,便于后期实现 EMQX 和 HA 文件的快速拉取。

详见:Home Assistant 助手 .

HA 部署
  • 打开命令行终端并执行指令
docker pull homeassistant/home-assistant:latest
  • 待拉取 HA 镜像完成;

  • 磁盘根目录创建 homeassistant 文件夹,新建 docker-compose.yaml 文件,并添加如下代码

version: '3'
services:
  homeassistant:
    image: homeassistant/home-assistant:latest
    container_name: homeassistant
    restart: always
    volumes:
      - /data/homeassistant/config:/config
    environment:
      - TZ=Asia/Shanghai
    ports:
      - "8123:8123"
  • 保存文件,并在终端打开该文件夹,执行 docker compose up -d 指令,完成 HA 容器创建。

  • 进入 Containers 容器页面,点击 homeassistant 端口链接,进入 HA 浏览器页面,创建并登录账户;

EMQX 部署

在添加软件镜像源的基础上,终端执行指令

docker pull emqx/emqx:latest

拉取最新版 emqx 镜像

终端执行指令

docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:latest

创建并运行 emqx 容器;

若提示端口被占用报错,可删除已创建的 emqx 容器,进入 Images 标签页,点击 emqx/emqx 镜像对应的启动按钮,手动配置容器名称、Host端口等参数,端口填写 0 ,系统随机分配可用端口。

EMQX 配置

(1)进入 Containers 容器页面,点击 emqx 端口链接,进入 emqx 浏览器页面,初始登录账户名 admin 密码 public

(2)依次打开 访问控制 - 客户端认证 - 创建 - Password-Based - 内置数据库 - (默认配置)- 创建

(3)用户管理 - 新建用户 - 自定义用户名和密码 .

MQTT 配置
  • 命令行终端输入 ipconfig 获取本地计算机 IPv4 地址,如 192.168.31.160

  • 配置 Home Assistant ,依次点击设置 - 设备与服务 - 添加集成 - 搜索 MQTT - 填写代理信息。

代理栏输入计算机 IP 地址,端口 1883,用户名和密码为 EMQX 中创建的用户信息。

代码

为了实现模块化,将 AHT10 传感器的驱动代码进行封装,便于调用。

AHT10 驱动
  • 终端执行 touch aht10.py 新建传感器驱动文件,执行 nano ah10.py 并添加如下代码
#!/usr/bin/env python3
import smbus
import time

AHT10_I2C_ADDR   = 0x38
AHT10_CMD_INIT   = 0xE1
AHT10_CMD_MEAS   = 0xAC
AHT10_CMD_RESET  = 0xBA
AHT10_STATUS_BUSY = 0x80


class AHT10:
    def __init__(self, bus=1):
        self._bus = smbus.SMBus(bus)
        self._addr = AHT10_I2C_ADDR
        self._init_sensor()

    # ---------- 底层封装 ----------
    def _write(self, buf):
        self._bus.write_i2c_block_data(self._addr, buf[0], buf[1:])

    def _read(self, length):
        return self._bus.read_i2c_block_data(self._addr, 0x00, length)

    def _init_sensor(self):
        self._write([AHT10_CMD_INIT, 0x08, 0x00])
        time.sleep(0.01)
        if self._is_busy():
            raise RuntimeError("AHT10 init fail")

    def _is_busy(self):
        return (self._read(1)[0] & AHT10_STATUS_BUSY) != 0

    def _trigger_measure(self):
        self._write([AHT10_CMD_MEAS, 0x33, 0x00])

    # ---------- 对外接口 ----------
    def read(self):
        """
        返回 (temperature, humidity)
        温度 ℃,湿度 %RH
        """
        self._trigger_measure()
        time.sleep(0.08)          # 等待测量完成
        while self._is_busy():
            time.sleep(0.01)

        data = self._read(6)
        # 解析 20bit 湿度 + 20bit 温度
        hum_raw  = ((data[1] << 16) | (data[2] << 8) | data[3]) >> 4
        temp_raw = ((data[3] & 0x0F) << 16) | (data[4] << 8) | data[5]
        humidity = (hum_raw / 0x100000) * 100
        temperature = (temp_raw / 0x100000) * 200 - 50
        return round(temperature, 2), round(humidity, 2)


# 自检
if __name__ == "__main__":
    sensor = AHT10()
    print("AHT10 self-test:", sensor.read())

保存代码。

MQTT 上传

终端执行 touch aht10_ha.py 指令新建文件,并 nano aht10_ha.py 添加如下代码

#!/usr/bin/env python3
import json
import time
import signal
import sys
import paho.mqtt.client as mqtt
from aht10 import AHT10

# ---------------- MQTT 配置 ----------------
MQTT_BROKER = "192.168.1.103"
MQTT_PORT   = 1883
CLIENT_ID   = "rpi-cm0-aht10"
AUTH        = {"username": "xxx", "password": "xxx"}
PUBLISH_INTERVAL = 2          # 发布间隔(秒)

DISCOVERY_PREFIX = "homeassistant"
NODE_ID          = "rpi_cm0_aht10"
# ---------------------------------------

sensor = AHT10()
running = True

def sig_handler(sig, frame):
    global running
    print("\nCtrl+C pressed, exiting…")
    running = False
    sys.exit(0)

signal.signal(signal.SIGINT, sig_handler)

def on_connect(client, userdata, flags, rc):
    print("[MQTT] Connected" if rc == 0 else f"[MQTT] Connect failed, rc={rc}")
    if rc == 0:
        send_discovery(client)

def send_discovery(client):
    """发送 HA 自动发现配置,Retain=True"""
    for (obj, name, unit, dev_cla) in [
        ("temperature", "温度", "°C", "temperature"),
        ("humidity",    "湿度", "%",  "humidity")
    ]:
        topic = f"{DISCOVERY_PREFIX}/sensor/{NODE_ID}/{obj}/config"
        payload = {
            "name": f"{name}",
            "state_topic": f"{DISCOVERY_PREFIX}/sensor/{NODE_ID}/state",
            "unit_of_measurement": unit,
            "device_class": dev_cla,
            "value_template": f"{{{{ value_json.{obj} }}}}",
            "unique_id": f"{NODE_ID}_{obj}",
            "device": {
                "identifiers": [NODE_ID],
                "name": "AHT10 Sensor",
                "model": "AHT10",
                "manufacturer": "LJL"
            }
        }
        client.publish(topic, json.dumps(payload), retain=True)
        print(f"[HA] 发现消息已发送 -> {topic}")

def publish_state(client):
    temp, hum = sensor.read()
    payload = {"temperature": temp, "humidity": hum, "timestamp": int(time.time())}
    topic = f"{DISCOVERY_PREFIX}/sensor/{NODE_ID}/state"
    client.publish(topic, json.dumps(payload))
    print("[PUB]", payload)

def main():
    client = mqtt.Client(CLIENT_ID)
    client.username_pw_set(**AUTH)
    client.on_connect = on_connect
    try:
        client.connect(MQTT_BROKER, MQTT_PORT, 60)
        client.loop_start()
    except Exception as e:
        print("[MQTT] Exception:", e)
        sys.exit(2)

    while running:
        if client.is_connected():
            publish_state(client)
        time.sleep(PUBLISH_INTERVAL)

    client.loop_stop()
    client.disconnect()

if __name__ == "__main__":
    main()

保存代码。

效果

终端执行指令 python aht10_print.py 运行程序,终端打印已发送的 MQTT 消息

在这里插入图片描述

MQTTX

使用 MQTTX 软件测试接收到的 MQTT 消息。

  • 下载安装并运行 MQTTX 软件;
  • 新建连接,配置 MQTT 服务器地址、用户名等信息;
  • 添加主题 homeassistant/sensor/rpi_cm0_aht10/state ,即可实时接收 CM0 上传的 MQTT 消息;

在这里插入图片描述

网页端

  • 登录 HA 平台网页,配置 MQTT 参数;
  • 进入 概览 页面,编辑仪表盘,添加 AHT10 温湿度卡片;

在这里插入图片描述

APP 端

结合 HA 完善的生态建设,手机安装 Home Assistant 应用,通过 IP 地址接入服务器并登录账户

在这里插入图片描述

总结

本文介绍了树莓派 CM0 Dev Board 的实现 Home Assistant 智能家居终端的项目设计,包括准备工作、环境搭建、驱动传感器、MQTT 消息上传、流程图、关键代码以及效果演示等,为相关产品在工业物联网领域的开发设计和快速应用提供了参考。

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐