- # wifi_connect.py
- import network, time
- from config import WIFI_SSID, WIFI_PASSWORD
- def wifi_connect(timeout_s=20):
- wlan = network.WLAN(network.STA_IF)
- wlan.active(True)
- if not wlan.isconnected():
- wlan.connect(WIFI_SSID, WIFI_PASSWORD)
- t0 = time.ticks_ms()
- while not wlan.isconnected():
- if time.ticks_diff(time.ticks_ms(), t0) > timeout_s*1000:
- raise RuntimeError("WiFi connect timeout")
- time.sleep(0.2)
- print("WiFi OK:", wlan.ifconfig())
- return wlan
在主程序中,首先构建SSLContext
- # ---- 构建 SSLContext(若 USE_TLS=True) ----
- def build_ssl_ctx():
- try:
- ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
- except:
- ctx = ssl.SSLContext()
- if CA_CERT_PATH:
- try:
- ctx.load_verify_locations(CA_CERT_PATH)
- except:
- # 兼容某些端口使用 cadata 方式
- with open(CA_CERT_PATH, "rb") as f:
- ca = f.read()
- try:
- ctx.load_verify_locations(cadata=ca)
- except:
- ctx.set_cadata(ca)
- return ctx
接着初始化IMU的相关配置
- # ---- 初始化 I2C 与 BMI270 ----
- def init_imu():
- # P0.2=SCL, P0.3=SDA(板卡手册,I2C 引脚)
- i2c = I2C(0, scl=Pin("P0_2"), sda=Pin("P0_3"), freq=400000)
- imu = BMI270(i2c=i2c, address=0x68) # 默认 0x68(可改 0x69)
- imu.acceleration_range = ACCEL_RANGE_8G # ±8g
- imu.gyro_range = GYRO_RANGE_1000 # ±1000 dps
- imu.acceleration_operation_mode = ACCELERATOR_ENABLED
- return imu
构建MQTT客户端
- # ---- 构建 MQTT 客户端 ----
- def make_mqtt_client():
- cid = CLIENT_ID + "_" + ubinascii.hexlify(machine.unique_id()).decode()
- if USE_TLS:
- sslctx = build_ssl_ctx()
- return MQTTClient(client_id=cid, server=MQTT_BROKER, port=MQTT_PORT,
- user=MQTT_USERNAME, password=MQTT_PASSWORD,
- ssl=sslctx, keepalive=60)
- else:
- return MQTTClient(client_id=cid, server=MQTT_BROKER, port=MQTT_PORT,
- user=MQTT_USERNAME, password=MQTT_PASSWORD,
- ssl=False, keepalive=60)
在主流程中,首先完成IMU、WiFi和MQTT的初始化,接着不断采集数据和发布数据
- # ---- 主流程:采集 + 发布 ----
- def main():
- wifi_connect()
- imu = init_imu()
- client = None
- # 采样与发布节流
- period_ms = int(1000 / max(1, SAMPLE_HZ))
- batch_size = max(1, PUBLISH_EVERY)
- buf = []
- last_ping_ms = time.ticks_ms()
- while True:
- try:
- if client is None:
- client = make_mqtt_client()
- client.connect()
- print("MQTT connected:", MQTT_BROKER, MQTT_PORT)
- # 采样
- t0 = time.ticks_ms()
- ax, ay, az = imu.acceleration # 单位 m/s^2(库内部已换算)
- gx, gy, gz = imu.gyro # 单位 dps
- sample = {
- "ts_ms": t0,
- "acc": [round(ax, 4), round(ay, 4), round(az, 4)],
- "gyro": [round(gx, 4), round(gy, 4), round(gz, 4)]
- }
- buf.append(sample)
- # 批量发布(JSON 行)
- if len(buf) >= batch_size:
- payload = ujson.dumps({"batch": buf})
- client.publish(TOPIC_PUB, payload, qos=0, retain=True) # umqtt.simple 支持 QoS0/1
- print("PUB:", TOPIC_PUB, payload[:len(payload)])
- buf.clear()
- # 心跳/维持长连接
- now = time.ticks_ms()
- if time.ticks_diff(now, last_ping_ms) > 30000:
- try:
- client.ping()
- except Exception as _:
- # 触发重连
- raise
- last_ping_ms = now
- # 周期
- dt = time.ticks_diff(time.ticks_ms(), t0)
- sleep_ms = max(0, period_ms - dt)
- time.sleep_ms(sleep_ms)
- except Exception as e:
- print("loop err:", e)
- try:
- if client:
- client.disconnect()
- except:
- pass
- client = None
- time.sleep_ms(2000) # 简单退避后重连
- # 入口
- import ujson
- if __name__ == "__main__":
- main()
串口中输出内容如下