本帖最后由 keer_zu 于 2014-5-12 11:34 编辑
最近一个项目,处理CAN收发的一个线程:
这个是主从结构中,主设备的程序,为了接收处理不同地址从设备发过来的分包数据,将接收缓冲区做成哈希结构,通过地址索引之前收到的同一地址的数据,等收到末尾分包后,完成整帧数据接收并处理之。。。。。分包数据存在每个哈希节点下的双链表里面。
处理完毕后,将缓冲区片还回空闲缓冲区队列。如果收到新的分包,将会从空闲缓冲区片队列中取出一片,填上数据,初始化链表项,插入相关链表中。
对于帧不完整的,做超时处理。
测试结果:系统平稳。
static int can_thread_func(void *para)
{
int ret,i;
struct can_comx *can;
struct can_frame frame;
unsigned char *buf;
unsigned short addr;
unsigned char dir;
unsigned char attr;
unsigned char index;
unsigned char ack;
struct can_frame_list *pos = NULL,*can_frame = NULL;
struct list_head *pos_h;
unsigned char flag = 0;
struct aioi_proto *proto;
proto = &g_aioi_proto;
struct return_data *ret_d;
unsigned char *node_buf;
can_data_t ack_update;
unsigned char ack_data[10];
struct recv_packet_node *packet_node;
struct recv_hlist_head *h_head;
struct recv_h_node *h_node;
can = (struct can_comx *)para;
memset(ack_data,0,10);
while(1){
ret = can->can.recv(&can->can,&frame);
if(ret < 0){
printf("############### can frame recv err!\n");
can->is_thread_alive == 0;
return ERR;
}
if(ret == 0){
usleep(100);
continue;
}
addr = _id_get_addr(frame.can_id);
attr = _id_get_attr(frame.can_id);
ack = _id_get_ack(frame.can_id);
index = _id_get_index(frame.can_id);
dir = _id_get_dir(frame.can_id);
if(dir != 0x01)
continue;
flag = 0;
if(ack == 1){
h_head = can->recv_head_array_ack + DATA_MOD(addr,H_HEAD_ARRY_SIZE);
h_node = recv_hlist_get(h_head,addr);
if(h_node != NULL){
flag = 1;
}
}else if(ack == 0){
h_head = can->recv_head_array_req + DATA_MOD(addr,H_HEAD_ARRY_SIZE);
h_node = recv_hlist_get(h_head,addr);
if(h_node != NULL){
flag = 1;
}
}else{
printf("!!! wrong ack!\n");
}
if(flag == 0 && attr == 0){
if(ack == 1){
buf = get_free_para_buf(can->answer_q);
if(buf != NULL){
memset(buf,0,PARA_BUF_SIZE);
*(unsigned short *)buf = addr;
buf[2] += frame.can_dlc;
memcpy(buf + 3,frame.data,frame.can_dlc);
ret = para_process_buf_insert(can->answer_q);
if(ret != OK){
printf("############### Can ack q insert err! line:%d\n",__LINE__);
return ret;
}
}
}else{
buf = get_free_para_buf(proto->para_buf_q);
if(buf != NULL){
memset(buf,0,PARA_BUF_SIZE);
ret_d = (struct return_data *)buf;
ret_d->type = frame.data[0];//DATA_TYPE_DISP;
ret_d->len = frame.can_dlc;
ret_d->id = addr;
memcpy(buf + sizeof(struct return_data),frame.data + 1,ret_d->len - 1);
ret = para_process_buf_insert(proto->para_buf_q);
if(ret != OK){
printf("############### Can update q insert err! line:%d\n",__LINE__);
return ret;
}
ack_update.id = _create_can_id(0,addr,0,0,0,1);
ack_data[0] = 0x80 + 0x32;
ack_data[1] = 'o';
ack_update.data = ack_data;
ack_update.can_dlc = 2;
ret = can->can.send(&can->can,&ack_update);
if(ret != OK){
printf("############### can send err!\n");
return ERR;
}
}
}
}else if(flag == 0 && attr == 1){
_get_recv_h_node1:
h_node = (struct recv_h_node *)get_q_buf(can->h_elem_q);
if(h_node == NULL){
usleep(100);
goto _get_recv_h_node1;
}
h_node->ack = ack;
h_node->addr = addr;
gettimeofday(&h_node->tv_now,NULL);
INIT_LIST_HEAD(&h_node->packet_head);
_get_recv_packet_node1:
packet_node = (struct recv_packet_node *)get_q_buf(can->packet_elem_q);
if(packet_node == NULL){
usleep(100);
goto _get_recv_packet_node1;
}
packet_node->dlc = frame.can_dlc;
packet_node->index = index;
memcpy(packet_node->data,frame.data,packet_node->dlc);
list_add_tail(&packet_node->pack_list,&h_node->packet_head);
if(ack == 1)
recv_hlist_add(can->recv_head_array_ack,h_node);
else
recv_hlist_add(can->recv_head_array_req,h_node);
}else if(flag == 1 && attr == 0){
_get_recv_packet_node2:
packet_node = (struct recv_packet_node *)get_q_buf(can->packet_elem_q);
if(packet_node == NULL){
usleep(100);
goto _get_recv_packet_node2;
}
packet_node->dlc = frame.can_dlc;
packet_node->index = index;
memcpy(packet_node->data,frame.data,packet_node->dlc);
list_add_tail(&packet_node->pack_list,&h_node->packet_head);
if(ack == 1){
buf = get_free_para_buf(can->answer_q);
if(buf != NULL){
*(unsigned short *)buf = h_node->addr;
buf[2] = 0;
list_for_each(pos_h,&h_node->packet_head){
packet_node = list_entry(pos_h,struct recv_packet_node,pack_list);
memcpy(buf + 3 + buf[2],packet_node->data,packet_node->dlc);
buf[2] += packet_node->dlc;
__list_del(packet_node->pack_list.prev,packet_node->pack_list.next);
q_buf_insert(can->packet_elem_q,(unsigned char *)packet_node);
}
hlist_del(&h_node->node);
q_buf_insert(can->h_elem_q,(unsigned char*)h_node);
para_process_buf_insert(can->answer_q);
}
}else{
buf = get_free_para_buf(proto->para_buf_q);
if(buf != NULL){
ret_d = (struct return_data *)buf;
ret_d->len = 0;
list_for_each(pos_h,&h_node->packet_head){
packet_node = list_entry(pos_h,struct recv_packet_node,pack_list);
memcpy(buf + sizeof(struct return_data) + ret_d->len,packet_node->data,packet_node->dlc);
ret_d->len += packet_node->dlc;
__list_del(packet_node->pack_list.prev,packet_node->pack_list.next);
q_buf_insert(can->packet_elem_q,(unsigned char *)packet_node);
}
ret_d->type = (char)(*(buf + sizeof(struct return_data)));
ret_d->id = addr;
hlist_del(&h_node->node);
q_buf_insert(can->h_elem_q,(unsigned char*)h_node);
para_process_buf_insert(proto->para_buf_q);
}
}
}else if(flag == 1 && attr == 1){
_get_recv_packet_node3:
packet_node = (struct recv_packet_node *)get_q_buf(can->packet_elem_q);
if(packet_node == NULL){
usleep(100);
goto _get_recv_packet_node3;
}
packet_node->dlc = frame.can_dlc;
packet_node->index = index;
memcpy(packet_node->data,frame.data,packet_node->dlc);
list_add_tail(&packet_node->pack_list,&h_node->packet_head);
gettimeofday(&h_node->tv_now,NULL);
}
if(can->is_thread_alive == 0)
return OK;
usleep(10);
}
return OK;
} |