本帖最后由 coslight 于 2022-3-7 08:49 编辑
基于python-can的USB-CAN监视设备 本测试为AT-START-F425开发板的项目测试。测试想要达到的目的为,基于python环境完成一个USB-CAN监视设备的实现。试验主要完成如下几个关键内容: l USB虚拟串口的实现 l CAN总线驱动及缓冲区管理 l Python-can接口的协议解析 l 测试基于python的can总线数据首发 1、Python-can的介绍 python-can库为Python提供了控制器局域网的支持,为不同的硬件设备提供了通用的抽象,并提供了一套实用程序,用于在CAN总线上发送和接收消息。 python-可以在Python运行的任何地方运行;从具有商用CAN的高功率计算机到USB设备,再到运行Linux的低功率设备(例如BeagleBone或RaspberryPi)。 Python-can支持很多的接口,下表描述的他支持的接口类型。
接口应用 | | | | | | | | | | | IXXAT Virtual CAN Interface | | | | | | | | | | | | | | | | | | |
其中“slcan”接口是我们本次研究的对象,我们将让AT32F425支持python-can的slcan接口类型。 Python-can的库API主要对象是BusABC和Message。其中包括如下的API: l Bus l Thread safe bus l Message l Listeners l Asyncio support l Broadcast Manager l Internal API
2、USB-CAN实现 2.1 USB虚拟串口 虚拟串口CDC在官方的标准库例程中已经提供了。由于采用USB传输,所以对于测试设备的使用非常友好,而且速度很快,完全可以匹配CAN总线的处理能力。 通过采用官方提供的例程,在系统中可以看到两个串口设备,其中COM6为仿真器提供的虚拟串口,COM8为程序产生的虚拟串口。 实际测试,虚拟串口可以正确的工作。 2.2 CAN接口驱动及缓冲区管理 CAN总线的使用,标准库中也已经提供了例程,但是例程中是采用1M固定波特率的一个测试,并不适合我们使用,我们需要重新调整,具体的调整要结合python-can中slcan接口中的要求,为了增加总线的处理能力,还需要为CAN总线的首发提供缓冲区。因此我们重新修改了基于slcan要求的CAN总线操作接口函数: 1) CAN总线端口初始化 我们采用PB8和PB9两个引脚作为CAN总线的收发引脚,因此初始化如下。 static void can_gpio_config(void)
{
gpio_init_typegpio_init_struct;
/* enable the gpio clock */
crm_periph_clock_enable(CRM_GPIOB_PERIPH_CLOCK, TRUE);
gpio_default_para_init(&gpio_init_struct);
/* configure the can tx, rx pin */
gpio_init_struct.gpio_drive_strength = GPIO_DRIVE_STRENGTH_STRONGER;
gpio_init_struct.gpio_out_type = GPIO_OUTPUT_PUSH_PULL;
gpio_init_struct.gpio_mode = GPIO_MODE_MUX;
gpio_init_struct.gpio_pins = GPIO_PINS_9| GPIO_PINS_8;
gpio_init_struct.gpio_pull = GPIO_PULL_NONE;
gpio_init(GPIOB,&gpio_init_struct);
gpio_pin_mux_config(GPIOB, GPIO_PINS_SOURCE9, GPIO_MUX_4);
gpio_pin_mux_config(GPIOB,GPIO_PINS_SOURCE8, GPIO_MUX_4);
}
2) CAN总线的配置和打开 CAN总线打开的时候,需要完成波特率的配置,首先定义CAN总线波特率配置所需的结构,并初始化结构,这个结构中波特率的设定是根据slcan中定义的要求来的。 typedef struct
{
uint16_t baudrate_div;
uint8_t rsaw_size;
uint8_t bts1_size;
uint8_t bts2_size;
}_CAN_BAUD_CONFIG;
const _CAN_BAUD_CONFIG can_baud_conf[] =
{
//baudrate_div rsaw_size bts1_size bts2_size
{960, CAN_RSAW_1TQ, CAN_BTS1_7TQ, CAN_BTS2_2TQ}, //10000: "S0",
{480, CAN_RSAW_1TQ, CAN_BTS1_7TQ, CAN_BTS2_2TQ}, //20000: "S1",
{160, CAN_RSAW_1TQ, CAN_BTS1_8TQ, CAN_BTS2_3TQ}, //50000: "S2",
{80, CAN_RSAW_1TQ, CAN_BTS1_8TQ, CAN_BTS2_3TQ}, //100000:"S3",
{64, CAN_RSAW_1TQ, CAN_BTS1_8TQ, CAN_BTS2_3TQ}, //125000: "S4",
{32, CAN_RSAW_1TQ, CAN_BTS1_8TQ, CAN_BTS2_3TQ}, //250000:"S5",
{16, CAN_RSAW_1TQ, CAN_BTS1_8TQ, CAN_BTS2_3TQ}, //500000: "S6",
{8, CAN_RSAW_1TQ, CAN_BTS1_12TQ, CAN_BTS2_3TQ}, //750000: "S7",
{8, CAN_RSAW_1TQ, CAN_BTS1_8TQ, CAN_BTS2_3TQ}, //1000000:"S8",
//83300:"S9",
};
定义CAN总线的打开函数,其中包含CAN接口配置和中断配置。 int can_open(_SLCAN_CAN_STA slcan_sta)
{
can_base_typecan_base_struct;
can_baudrate_type can_baudrate_struct;
can_filter_init_type can_filter_init_struct;
/* enable the can clock */
crm_periph_clock_enable(CRM_CAN1_PERIPH_CLOCK, TRUE);
/* can base init */
can_default_para_init(&can_base_struct);
can_base_struct.mode_selection = CAN_MODE_COMMUNICATE;
can_base_struct.ttc_enable = FALSE;
can_base_struct.aebo_enable = TRUE;
can_base_struct.aed_enable = TRUE;
can_base_struct.prsf_enable = FALSE;
can_base_struct.mdrsel_selection = CAN_DISCARDING_FIRST_RECEIVED;
can_base_struct.mmssr_selection = CAN_SENDING_BY_ID;
can_base_init(CAN1, &can_base_struct);
/* can baudrate, set boudrate = pclk/(baudrate_div*(1 + bts1_size + bts2_size)) */
if(slcan_sta.baud <= 9)
{
can_baudrate_struct.baudrate_div =can_baud_conf[slcan_sta.baud].baudrate_div;
can_baudrate_struct.rsaw_size =can_baud_conf[slcan_sta.baud].rsaw_size;
can_baudrate_struct.bts1_size =can_baud_conf[slcan_sta.baud].bts1_size;
can_baudrate_struct.bts2_size =can_baud_conf[slcan_sta.baud].bts2_size;
can_baudrate_set(CAN1,&can_baudrate_struct);
}
else
{
/*disable the can clock */
crm_periph_clock_enable(CRM_CAN1_PERIPH_CLOCK, FALSE);
/*初始化错误*/
return -1;
}
/* can filter init */
can_filter_init_struct.filter_activate_enable = TRUE;
can_filter_init_struct.filter_mode = CAN_FILTER_MODE_ID_MASK;
can_filter_init_struct.filter_fifo = CAN_FILTER_FIFO0;
can_filter_init_struct.filter_number = 0;
can_filter_init_struct.filter_bit = CAN_FILTER_32BIT;
can_filter_init_struct.filter_id_high = 0;
can_filter_init_struct.filter_id_low = 0;
can_filter_init_struct.filter_mask_high = 0;
can_filter_init_struct.filter_mask_low = 0;
can_filter_init(CAN1, &can_filter_init_struct);
/* can interrupt config */
nvic_irq_enable(CAN1_IRQn, 0x01,0x00);
can_interrupt_enable(CAN1, CAN_RF0MIEN_INT, TRUE);
return 0;
}
3) CAN总线的关闭
关闭CAN总线时钟,同时关闭中断。
void can_close(void)
{
can_interrupt_enable(CAN1,CAN_RF0MIEN_INT, FALSE);
/* disable the can clock */
crm_periph_clock_enable(CRM_CAN1_PERIPH_CLOCK, FALSE);
}
4) CAN总线的收发缓冲区定义 为了提高CAN总线的瞬时数据吞吐率,定义了CAN总线的收发缓冲区。 #define_CAN_RX_MAX_FRAME 600
typedef struct
{
uint16_t head;
uint16_t tail;
can_rx_message_type buf[_CAN_RX_MAX_FRAME];
}_CAN_RX_STRUCT;
_CAN_RX_STRUCTcan_rx_buf;
#define_CAN_TX_MAX_FRAME 100
typedef struct
{
uint16_t head;
uint16_t tail;
can_tx_message_type buf[_CAN_TX_MAX_FRAME];
}_CAN_TX_STRUCT;
_CAN_TX_STRUCTcan_tx_buf;
在CAN总线的接收中断中,完成CAN接收缓冲区的填充,然后再慢慢的把它转为slcan协议支持的数据帧发送到CDC口。 3、slcan接口的协议解析 slcan接口时python-can库中一个基于串口的协议,我们采用的时USB虚拟串口,所以这个协议正好适合我们。Slcan接口协议采用ASCII码的方式进行数据交互,所以效率稍微低一点,不过再PC系统里面却比较好处理。 1)关闭can接口 发送字符“C”。 2) 打开can接口 发送字符“O” 3) can波特率设置 发送字符”S”+一个ASCII码数字。具体含义如下:
4) 接收数据帧 数据帧的构成包括:帧类型字符+CANID+数据长度+数据+“\r“ 扩展帧: “T1234567890123456789ABCDEF” 帧类型:“T” CANid:12345678 dlc: 9 data:01 23 45 67 89 AB CD EF
标准帧:“t12340123456789ABCDEF” 帧类型:“t” CANid:123 dlc: 4 data:01 23 45 67 89 AB CD EF
标准远程帧: “r1234” 帧类型:“r” CANid:123 dlc: 4
扩展远程帧:“R123456789” 帧类型:“R” CANid:12345678 dlc: 9
5)发送数据帧 格式同接收
4、测试环境 4.1 PC端CAN测试节点 为了测试整体运行效果,我们还需要一个其它的CAN总线收发器。 4.2 基于python的USB-CAN编程 当我们的程序可以正确的运行起来后,我们需要编写一个python的测试程序,前提是我们已经安装了python-can库。 安装方法为: pip install python-can Python的测试程序为: # import the library
import can
# create a bus instance
# many other interfaces are supported as well (see documentation)
bus = can.Bus(interface='slcan',
channel='COM8',
bitrate=125000,
receive_own_messages=True)
'''# send a message
message = can.Message(arbitration_id=123, is_extended_id=True,
data=[0x11, 0x22, 0x33])
bus.send(message, timeout=0.2)
# iterate over received messages
for msg in bus:
print(f"{msg.arbitration_id:X}: {msg.data}")
# or use an asynchronous notifier
notifier = can.Notifier(bus, [can.Logger("recorded.log"), can.Printer()])'''
''' 发送信息 '''
def send_one():
#bus = can.interface.Bus();
msg = can.Message(arbitration_id=0x7f,
data=[11, 25, 11, 1, 1, 2, 23, 18],
is_extended_id=False)
try:
''' 发送信息 '''
bus.send(msg)
print("Message sent on {}".format(bus.channel_info))
except can.CanError:
print("Message NOT sent")
def recv():
#bus = can.interface.Bus();
''' 接收信息 '''
msg = bus.recv(100)
try:
bus.send(msg)
print(msg)
# print(msg.data[0]) # 接收回来的第一个字节的数据
# print(msg.arbitration_id) # 接收回来的ID
return msg
except can.CanError:
print("Message NOT sent")
if __name__ == "__main__":
''' can_setup("can1"); '''
# send_one()
while True:
recv()
''' can_stop("can1"); '''
5、实际运行效果 搭建的基本环境,由于开发板没有CAN物理接口,所以外接了一个物理接口
PC端CAN节点工具的运行效果 Python在使用我们制作的USB-CAN监视设备进行数据收发测试数据,环境是基于VSCode搭建的。 数据收发及操作过程的演示视频
|