可以连接和写入:
PC端python:
- import asyncio
- import json
- from bleak import BleakClient, discover
- class BluetoothInterface:
- def __init__(self, device_name="ESP32-led", tx_uuid=None, rx_uuid=None):
- """
- 初始化蓝牙接口
-
- :param device_name: 目标设备名称,默认"ESP32-led"
- :param tx_uuid: 发送数据的Characteristic UUID
- :param rx_uuid: 接收数据的Characteristic UUID
- """
- self.device_name = device_name
- self.tx_uuid = tx_uuid or "00001101-0000-1000-8000-00805F9B34FB"
- self.rx_uuid = rx_uuid or "00001101-0000-1000-8000-00805F9B34FB"
- self.client = None
- self.connected = False
- self.receive_callback = None
- async def connect(self):
- """连接到蓝牙设备"""
- devices = await discover()
- for d in devices:
- if d.name == self.device_name:
- self.device_address = d.address
- break
- else:
- raise Exception(f"未找到设备: {self.device_name}")
- self.client = BleakClient(self.device_address)
- try:
- await self.client.connect()
- self.connected = True
- print("蓝牙连接成功")
- except Exception as e:
- print(f"连接失败: {str(e)}")
- self.connected = False
- async def disconnect(self):
- """断开蓝牙连接"""
- if self.connected:
- await self.client.disconnect()
- self.connected = False
- print("蓝牙已断开")
- async def send_data(self, data):
- """发送JSON数据"""
- if not self.connected:
- raise Exception("未连接设备")
-
- try:
- json_str = json.dumps(data) + "\r\n" # 添加帧结束符
- await self.client.write_gatt_char(self.tx_uuid, json_str.encode('utf-8'))
- print(f"发送数据: {json_str.strip()}")
- except Exception as e:
- print(f"发送失败: {str(e)}")
- def register_receive_callback(self, callback):
- """注册数据接收回调函数"""
- self.receive_callback = callback
- async def _receive_loop(self):
- """内部接收循环"""
- while self.connected:
- try:
- data = await self.client.read_gatt_char(self.rx_uuid)
- cleaned_data = data.decode('utf-8').strip('\x0d\x0a')
- if cleaned_data:
- json_data = json.loads(cleaned_data)
- if self.receive_callback:
- self.receive_callback(json_data)
- except Exception as e:
- print(f"接收错误: {str(e)}")
- await asyncio.sleep(1)
- async def start_receive(self):
- """启动数据接收"""
- if self.connected:
- asyncio.create_task(self._receive_loop())
- async def main():
- # 配置UUID(根据实际设备修改)
- DEVICE_NAME = "ESP32-led"
- TX_UUID = "0000ffe1-0000-1000-8000-00805f9b34fb" # 替换为实际发送UUID
- RX_UUID = "0000ffe1-0000-1000-8000-00805f9b34fb" # 替换为实际接收UUID
- interface = BluetoothInterface(
- device_name=DEVICE_NAME,
- tx_uuid=TX_UUID,
- rx_uuid=RX_UUID
- )
- await interface.connect()
- interface.register_receive_callback(print) # 打印接收数据
- try:
- await interface.start_receive()
-
- while True:
- status = input("请输入状态(start/listening/thinking/speaking/stop): ").strip()
- score = input("请输入评分(0-10): ").strip()
-
- # 构造数据包
- data_packet = {
- "status": status,
- "score": score
- }
-
- # 发送数据
- await interface.send_data(data_packet)
- await asyncio.sleep(0.5) # 防止快速发送导致丢包
-
- except KeyboardInterrupt:
- print("\n程序终止")
- finally:
- await interface.disconnect()
- if __name__ == "__main__":
- # 运行主程序
- asyncio.run(main())
esp32:
|