[PSOC™] 【英飞凌 CY8CKIT-062S2-AI评测】MQTT实现IMU数据上传

[复制链接]
47|0
傅沈骁 发表于 2025-11-15 18:14 | 显示全部楼层 |阅读模式
本篇文章我将基于MicroPython实现将板载IMU数据上传至MQTT Explore

虽然官方SDK中有IMU的驱动代码,但是本文基于MicroPython,因此需要借助MicroPython_BMI270,在这个库中包含了重力计、加速度计等的函数,其下载链接为https://github.com/ederjc/micropython_bmi270
接着下载MicroPython的MQTT库,在MicroPython的官方micropython-lib中找到umqtt,下载链接为https://github.com/micropython/micropython-lib/tree/master
首先定义一个config.py,用于存放IMU采样、WiFi、MQTT等的配置信息
  1. # config.py
  2. WIFI_SSID      = "***"
  3. WIFI_PASSWORD  = "***"

  4. # 非 TLS:1883;TLS:8883(需要 ca.crt)
  5. MQTT_BROKER    = "broker.emqx.io"     # 或你的私有 Broker
  6. MQTT_PORT      = 1883                 # 8883 for TLS
  7. MQTT_USERNAME  = None                 # 如需鉴权,填字符串
  8. MQTT_PASSWORD  = None

  9. CLIENT_ID      = "psoc6_cy8c62_ai_imu"
  10. TOPIC_PUB      = "/psoc6/ai/imu"
  11. SAMPLE_HZ      = 10                   # 采样频率
  12. PUBLISH_EVERY  = 1                    # 每多少样本打包发布一次
  13. USE_TLS        = False                # True 走 TLS,需 ca.crt
  14. CA_CERT_PATH   = "ca.crt"             # TLS 时 CA 证书路径
接着定义定义一个wifi_connect.py,用于定义WiFi连接的代码
  1. # wifi_connect.py
  2. import network, time
  3. from config import WIFI_SSID, WIFI_PASSWORD

  4. def wifi_connect(timeout_s=20):
  5.     wlan = network.WLAN(network.STA_IF)
  6.     wlan.active(True)
  7.     if not wlan.isconnected():
  8.         wlan.connect(WIFI_SSID, WIFI_PASSWORD)
  9.         t0 = time.ticks_ms()
  10.         while not wlan.isconnected():
  11.             if time.ticks_diff(time.ticks_ms(), t0) > timeout_s*1000:
  12.                 raise RuntimeError("WiFi connect timeout")
  13.             time.sleep(0.2)
  14.     print("WiFi OK:", wlan.ifconfig())
  15.     return wlan
在主程序中,首先构建SSLContext
  1. # ---- 构建 SSLContext(若 USE_TLS=True) ----
  2. def build_ssl_ctx():
  3.     try:
  4.         ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
  5.     except:
  6.         ctx = ssl.SSLContext()
  7.     if CA_CERT_PATH:
  8.         try:
  9.             ctx.load_verify_locations(CA_CERT_PATH)
  10.         except:
  11.             # 兼容某些端口使用 cadata 方式
  12.             with open(CA_CERT_PATH, "rb") as f:
  13.                 ca = f.read()
  14.             try:
  15.                 ctx.load_verify_locations(cadata=ca)
  16.             except:
  17.                 ctx.set_cadata(ca)
  18.     return ctx
接着初始化IMU的相关配置
  1. # ---- 初始化 I2C 与 BMI270 ----
  2. def init_imu():
  3.     # P0.2=SCL, P0.3=SDA(板卡手册,I2C 引脚)
  4.     i2c = I2C(0, scl=Pin("P0_2"), sda=Pin("P0_3"), freq=400000)
  5.     imu = BMI270(i2c=i2c, address=0x68)  # 默认 0x68(可改 0x69)
  6.     imu.acceleration_range = ACCEL_RANGE_8G   # ±8g
  7.     imu.gyro_range         = GYRO_RANGE_1000  # ±1000 dps
  8.     imu.acceleration_operation_mode = ACCELERATOR_ENABLED
  9.     return imu
构建MQTT客户端
  1. # ---- 构建 MQTT 客户端 ----
  2. def make_mqtt_client():
  3.     cid = CLIENT_ID + "_" + ubinascii.hexlify(machine.unique_id()).decode()
  4.     if USE_TLS:
  5.         sslctx = build_ssl_ctx()
  6.         return MQTTClient(client_id=cid, server=MQTT_BROKER, port=MQTT_PORT,
  7.                           user=MQTT_USERNAME, password=MQTT_PASSWORD,
  8.                           ssl=sslctx, keepalive=60)
  9.     else:
  10.         return MQTTClient(client_id=cid, server=MQTT_BROKER, port=MQTT_PORT,
  11.                           user=MQTT_USERNAME, password=MQTT_PASSWORD,
  12.                           ssl=False, keepalive=60)
在主流程中,首先完成IMU、WiFi和MQTT的初始化,接着不断采集数据和发布数据
  1. # ---- 主流程:采集 + 发布 ----
  2. def main():
  3.     wifi_connect()
  4.     imu = init_imu()
  5.     client = None

  6.     # 采样与发布节流
  7.     period_ms     = int(1000 / max(1, SAMPLE_HZ))
  8.     batch_size    = max(1, PUBLISH_EVERY)
  9.     buf           = []
  10.     last_ping_ms  = time.ticks_ms()

  11.     while True:
  12.         try:
  13.             if client is None:
  14.                 client = make_mqtt_client()
  15.                 client.connect()
  16.                 print("MQTT connected:", MQTT_BROKER, MQTT_PORT)

  17.             # 采样
  18.             t0 = time.ticks_ms()
  19.             ax, ay, az = imu.acceleration  # 单位 m/s^2(库内部已换算)
  20.             gx, gy, gz = imu.gyro          # 单位 dps

  21.             sample = {
  22.                 "ts_ms": t0,
  23.                 "acc": [round(ax, 4), round(ay, 4), round(az, 4)],
  24.                 "gyro": [round(gx, 4), round(gy, 4), round(gz, 4)]
  25.             }
  26.             buf.append(sample)

  27.             # 批量发布(JSON 行)
  28.             if len(buf) >= batch_size:
  29.                 payload = ujson.dumps({"batch": buf})
  30.                 client.publish(TOPIC_PUB, payload, qos=0, retain=True)  # umqtt.simple 支持 QoS0/1
  31.                 print("PUB:", TOPIC_PUB, payload[:len(payload)])
  32.                 buf.clear()

  33.             # 心跳/维持长连接
  34.             now = time.ticks_ms()
  35.             if time.ticks_diff(now, last_ping_ms) > 30000:
  36.                 try:
  37.                     client.ping()
  38.                 except Exception as _:
  39.                     # 触发重连
  40.                     raise
  41.                 last_ping_ms = now

  42.             # 周期
  43.             dt = time.ticks_diff(time.ticks_ms(), t0)
  44.             sleep_ms = max(0, period_ms - dt)
  45.             time.sleep_ms(sleep_ms)

  46.         except Exception as e:
  47.             print("loop err:", e)
  48.             try:
  49.                 if client:
  50.                     client.disconnect()
  51.             except:
  52.                 pass
  53.             client = None
  54.             time.sleep_ms(2000)  # 简单退避后重连

  55. # 入口
  56. import ujson
  57. if __name__ == "__main__":
  58.     main()

串口中输出内容如下

在MQTT Explorer中输入相关配置信息



运行程序后即可看到相关数据的显示



工程代码如下



本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?注册

×
您需要登录后才可以回帖 登录 | 注册

本版积分规则

9

主题

22

帖子

0

粉丝
快速回复 在线客服 返回列表 返回顶部