# Hello World Example
#
# Welcome to the CanMV IDE! Click on the green run arrow button below to run the script!
import sensor, image, time, lcd, struct, ustruct
import gc
from machine import UART,Timer,PWM
from board import board_info
from fpioa_manager import fm
import KPU as kpu
from Maix import GPIO,utils
import gc
import machine
lcd.init() # Init lcd display
lcd.clear(lcd.RED) # Clear lcd screen.
sensor.reset(freq=22000000, dual_buff=1) # 设置摄像头频率 24M 开启双缓冲模式 会提高帧率 但内存占用增加
sensor.set_auto_exposure(0) # 设置自动曝光
sensor.set_auto_gain(False) # 颜色跟踪必须关闭自动增益
sensor.set_auto_whitebal(False) # 颜色跟踪必须关闭白平衡
sensor.set_pixformat(sensor.RGB565) # Set pixel format to RGB565 (or GRAYSCALE)
sensor.set_framesize(sensor.QVGA) # Set frame size to QVGA (320x240)
#kpu = KPU()
#kpu.load_kmodel("/sd/KPU/mnist/uint8_mnist_cnn_model.kmodel")
sensor.skip_frames(time = 2000) # Wait for settings take effect.
clock = time.clock() # Create a clock object to track the FPS.
sensor.set_auto_exposure(0) # 设置自动曝光
sensor.set_vflip(1)
#______________________________________________________________________________________________________________________________
#打印内存分配情况
print(utils.gc_heap_size())
print("stack mem:"+str(gc.mem_free() / 1024)) # stack mem
print("heap mem:"+str(utils.heap_free() / 1024)) # heap mem
#第一次用执行一次下面这两个语句
#utils.gc_heap_size(800*1024)
#machine.reset()
#______________________________________________________________________________________________________________________________
#程序运行选择
is_need_debug = 0
is_patrol_line = 0
is_upacker_debug = 0
is_stack_heap_mem_debug = 0
is_upacker_recive_debug = 1
is_findflob_debug = 0
#______________________________________________________________________________________________________________________________
#程序运行状态
work_mode = 0 #0是巡线模式,1是数字识别模式
#______________________________________________________________________________________________________________________________
#串口配置区
fm.register(6, fm.fpioa.UART1_TX, force=True)
fm.register(7, fm.fpioa.UART1_RX, force=True)
k210_uart = UART(UART.UART1, 115200, 8, 0, 0, timeout=1000, read_buf_len=4096)
uart_test_write_str = '1260808878'
#______________________________________________________________________________________________________________________________
#按键蜂鸣器配置区
#注册IO,注意高速GPIO口才有中断
fm.register(35, fm.fpioa.GPIO0)
fm.register(16, fm.fpioa.GPIOHS0)
#构建案件对象
KEY=GPIO(GPIO.GPIOHS0, GPIO.IN, GPIO.PULL_UP)
#PWM通过定时器配置,接到IO15引脚
tim = Timer(Timer.TIMER1, Timer.CHANNEL0, mode=Timer.MODE_PWM)
beep = PWM(tim, freq=1000, duty=0, pin=9)
#按键标志位
key_node = 0
key_press_long = 0
#中断回调函数
def fun(KEY):
global work_mode,key_node,key_press_long
temp_count = 0
time.sleep_ms(10) #消除抖动
while KEY.value()== 0:
key_node = 1
time.sleep_ms(10) #长按延时
#长按检测计数
temp_count=temp_count+1
print(temp_count)
if temp_count >= 50:
beep.duty(50)
time.sleep_ms(500)
beep.duty(0)
time.sleep_ms(100)
print(temp_count)
key_node = 0
key_press_long = 1
#开启中断,下降沿触发
KEY.irq(fun, GPIO.IRQ_FALLING)
#______________________________________________________________________________________________________________________________
#要传给梁山派的数据
#work_mode:0是巡线模式,1是数字识别模式
#recognition:0是未知(巡线状态下),1是药房门口区域(停车线),十字路口和T字路口由
class uart_send_data_t:
def __init__( self, work_mode=0, recognition=0,top_block_offset=0,center_block_offset=0,left_block_offset=0,right_block_offset=0,left_number = 0,right_number = 0):
self.work_mode = work_mode
self.recognition = recognition
self.top_block_offset = top_block_offset
self.center_block_offset = center_block_offset
self.left_block_offset = left_block_offset
self.right_block_offset = right_block_offset
self.left_number = left_number
self.right_number = right_number
global_uart_send_data = uart_send_data_t()
#______________________________________________________________________________________________________________________________
#感兴趣区配置
roi_area_width = 30
roi_area_color = (0, 0, 200)
roi_area_thickness = 2
left_roi = [50,40,roi_area_width,sensor.height()-40]
right_roi = [sensor.width()-roi_area_width-50,40,roi_area_width,sensor.height()-40]
top_roi = [0,10,sensor.width(),roi_area_width]
center_roi = [0,int(sensor.height()/2)-int(roi_area_width/2),sensor.width(),roi_area_width]
#bottom_roi = [0,sensor.height()- roi_area_width,sensor.width(),roi_area_width]
black_block_roi = [0,100,sensor.width(),100]
def draw_roi(img):
img.draw_rectangle(left_roi, color = roi_area_color, thickness = roi_area_thickness, fill = False)
img.draw_rectangle(right_roi, color = roi_area_color, thickness = roi_area_thickness, fill = False)
img.draw_rectangle(top_roi, color = roi_area_color, thickness = roi_area_thickness, fill = False)
img.draw_rectangle(center_roi, color = roi_area_color, thickness = roi_area_thickness, fill = False)
img.draw_rectangle(black_block_roi, color = (0, 255, 0), thickness = roi_area_thickness, fill = False)
return
#______________________________________________________________________________________________________________________________
#寻找色块区配置
red_threshold =[12, 68, 11, 63, 5, 81]
black_threshold =[0, 50, -24,-1, -18, 6]
class red_blob_location_t:
def __init__( self, x=0, y=0,area = 0):
self.x = x
self.y = y
self.area = area
#______________
#左部blob位置信息
left_blob_location = red_blob_location_t()
left_pixels_threshold = 20 #若一个色块的像素数小于 pixel_threshold ,则会被过滤掉。
left_area_threshold = 20 #若一个色块的边界框区域小于 area_threshold ,则会被过滤掉。
left_x_stride = 20 #是查找某色块时需要跳过的x像素的数量。找到色块后,直线填充算法将精确像素。 若已知色块较大,可增加 x_stride 来提高查找色块的速度。
left_y_stride = 20 #是查找某色块时需要跳过的y像素的数量。找到色块后,直线填充算法将精确像素。 若已知色块较大,可增加 y_stride 来提高查找色块的速度。
#______________
#右部blob位置信息
right_blob_location = red_blob_location_t()
right_pixels_threshold = 20 #若一个色块的像素数小于 pixel_threshold ,则会被过滤掉。
right_area_threshold = 20 #若一个色块的边界框区域小于 area_threshold ,则会被过滤掉。
right_x_stride = 20 #是查找某色块时需要跳过的x像素的数量。找到色块后,直线填充算法将精确像素。 若已知色块较大,可增加 x_stride 来提高查找色块的速度。
right_y_stride = 20 #是查找某色块时需要跳过的y像素的数量。找到色块后,直线填充算法将精确像素。 若已知色块较大,可增加 y_stride 来提高查找色块的速度。
#______________
#顶部blob位置信息
top_blob_location = red_blob_location_t()
top_pixels_threshold = 20 #若一个色块的像素数小于 pixel_threshold ,则会被过滤掉。
top_area_threshold = 20 #若一个色块的边界框区域小于 area_threshold ,则会被过滤掉。
top_x_stride = 15 #是查找某色块时需要跳过的x像素的数量。找到色块后,直线填充算法将精确像素。 若已知色块较大,可增加 x_stride 来提高查找色块的速度。
top_y_stride = 20 #是查找某色块时需要跳过的y像素的数量。找到色块后,直线填充算法将精确像素。 若已知色块较大,可增加 y_stride 来提高查找色块的速度。
#______________
#中部blob位置信息
center_blob_location = red_blob_location_t()
center_pixels_threshold = 20 #若一个色块的像素数小于 pixel_threshold ,则会被过滤掉。
center_area_threshold = 20 #若一个色块的边界框区域小于 area_threshold ,则会被过滤掉。
center_x_stride = 20 #是查找某色块时需要跳过的x像素的数量。找到色块后,直线填充算法将精确像素。 若已知色块较大,可增加 x_stride 来提高查找色块的速度。
center_y_stride = 20 #是查找某色块时需要跳过的y像素的数量。找到色块后,直线填充算法将精确像素。 若已知色块较大,可增加 y_stride 来提高查找色块的速度。
blob_location=[0,0]
def find_blob(img):
global global_uart_send_data
#top_blob_location.x = -1
#top_blob_location.y = -1
#center_blob_location.x = -1
#center_blob_location.y = -1
left_blob_location.x = -1
left_blob_location.y = -1
right_blob_location.x = -1
right_blob_location.y = -1
center_blob_location.area = 0
for blob in img.find_blobs([red_threshold], roi = left_roi, x_stride = left_x_stride, y_stride = left_y_stride, pixels_threshold=left_pixels_threshold, area_threshold=left_area_threshold, merge=False, margin=10):
img.draw_rectangle(blob.rect())
img.draw_cross(blob.cx(), blob.cy())
left_blob_location.x = blob.cx()
left_blob_location.y = blob.cy()
left_blob_location.area = blob.area()
for blob in img.find_blobs([red_threshold], roi = right_roi,x_stride = right_x_stride, y_stride = right_y_stride, pixels_threshold=right_pixels_threshold, area_threshold=right_area_threshold, merge=True, margin=10):
img.draw_rectangle(blob.rect())
img.draw_cross(blob.cx(), blob.cy())
right_blob_location.x = blob.cx()
right_blob_location.y = blob.cy()
right_blob_location.area = blob.area()
for blob in img.find_blobs([red_threshold], roi = top_roi,x_stride = top_x_stride, y_stride = top_y_stride, pixels_threshold=top_pixels_threshold, area_threshold=top_area_threshold, merge=True, margin=10):
img.draw_rectangle(blob.rect())
img.draw_cross(blob.cx(), blob.cy())
#print("-------"+str(blob.area()))
top_blob_location.x = blob.cx()
top_blob_location.y = blob.cy()
top_blob_location.area = blob.area()
for blob in img.find_blobs([red_threshold], roi = center_roi,x_stride = center_x_stride, y_stride = center_y_stride, pixels_threshold=center_pixels_threshold, area_threshold=center_area_threshold, merge=True, margin=10):
img.draw_rectangle(blob.rect())
img.draw_cross(blob.cx(), blob.cy())
center_blob_location.x = blob.cx()
center_blob_location.y = blob.cy()
center_blob_location.area = blob.area()
#print(blob.pixels())
#for blob in img.find_blobs([black_threshold], roi = black_block_roi, pixels_threshold=30,merge=True,margin=50):
#img.draw_rectangle(blob.rect())
#img.draw_cross(blob.cx(), blob.cy())
##print(blob.count())
#if(blob.count() >= 5):
#print("tinche"+str(blob.count()))
#global_uart_send_data.recognition = 1
#else:
#global_uart_send_data.recognition = 0
if(center_blob_location.area == 0):
print("tinche")
global_uart_send_data.recognition = 1
else:
global_uart_send_data.recognition = 0
return
#______________________________________________________________________________________________________________________________
#色块连线区配置
lines_color = (0, 200, 0)
lines_thickness = 1
def draw_lines(img):
img.draw_line(top_blob_location.x, top_blob_location.y, center_blob_location.x, center_blob_location.y, color = lines_color, thickness = lines_thickness)
img.draw_line(left_blob_location.x, left_blob_location.y, right_blob_location.x, right_blob_location.y, color = lines_color, thickness = lines_thickness)
return
#______________________________________________________________________________________________________________________________
#upacker python实现代码 :https://github.com/aeo123/upacker/blob/master/python/upacker.py
#介绍请看这里:https://github.com/aeo123/upacker
class Upacker():
def __init__(self):
self._STX_L = 0x55
self._MAX_PACK_SIZE = 1024 + 4 + 128
self._calc = 0
self._check = 0
self._cnt = 0
self._flen = 0
self._state = 0
self._data = bytearray()
def _decode(self, d):
if (self._state == 0 and d == self._STX_L):
self._state = 1
self._calc = self._STX_L
elif self._state == 1:
self._flen = d & 0xff
self._calc ^= d & 0xff
self._state = 2
elif self._state == 2:
self._flen |= (d & 0xff) << 8
self._calc ^= d & 0x3F
# 数据包超长得情况下直接丢包
if ((self._flen & 0x3FFF) > self._MAX_PACK_SIZE):
self._state = 0
return -1
else:
self._data = bytearray(self._flen & 0x3FFF)
self._state = 3
self._cnt = 0
elif self._state == 3:
header_crc = ((d & 0x03) << 4) | ((self._flen & 0xC000) >> 12)
self._check = d
if (header_crc != (self._calc & 0X3C)):
self._state = 0
return -1
self._state = 4
self._flen &= 0x3FFF
elif self._state == 4:
self._data[self._cnt] = d
self._cnt += 1
self._calc ^= d
if self._cnt == self._flen:
self._state = 0
#接收完,检查check
if ((self._calc & 0xFC) == (self._check & 0XFC)):
return 0
else:
return -1
else:
self._state = 0
return 1
# 解包
def unpack(self, bytes_data, callback):
ret = 0
for i in bytes_data:
ret = self._decode(i)
if ret == 0:
callback(self._data)
if(is_upacker_debug):
print(self._data)
elif ret == -1:
# callback(None)
print("err")
# 打包
def enpack(self, data):
tmp = bytearray(4)
tmp[0] = 0x55
tmp[1] = len(data) & 0xff
tmp[2] = (len(data) >> 8) & 0xff
crc = tmp[0] ^ tmp[1] ^ tmp[2]
tmp[2] |= (crc & 0x0c) << 4
tmp[3] = 0x03 & (crc >> 4)
for i in range(len(data)):
crc ^= data[i]
tmp[3] |= (crc & 0xfc)
frame = struct.pack("BBBB%ds" % len(data), tmp[0], tmp[1], tmp[2],
tmp[3], data)
#python3.5之前bytes数据没有hex()属性,所以修改了下面这个
#print(frame.hex())
if(is_upacker_debug):
print(''.join(map(lambda x:('' if len(hex(x))>=4 else '/x0')+hex(x)[2:],frame)))
return frame
#____________
#串口命令切换模式
uart_cmd_need_change_mode = 0
def print_hex(bytes):
global work_mode,uart_cmd_need_change_mode
hex_byte = [hex(i) for i in bytes]
if is_upacker_recive_debug:
print("-----"+" ".join(hex_byte))
if bytes[0] == 0x00:
work_mode = 0
if bytes[0] == 0x01:
work_mode = 1
uart_cmd_need_change_mode = 1
#if __name__ == '__main__':
#buf = bytearray([0x00, 0x01, 0x02,0x03,0x77])
#pack = Upacker()
#pkt = pack.enpack(buf)
#pack.unpack(pkt, print_hex)
#______________________________________________________________________________________________________________________________
#upacker python实现代码结束
def upacker_init():
pack = Upacker()
return pack
yzh = 10
#______________________________________________________________________________________________________________________________
#发送数据到MCU,gd32是小端字节序
#pack各字母对应类型
#x pad byte no value 1
#c char string of length 1 1
#b signed char integer 1
#B unsigned char integer 1
#? _Bool bool 1
#h short integer 2
#H unsigned short integer 2
#i int integer 4
#I unsigned int integer or long 4
#l long integer 4
#L unsigned long long 4
#q long long long 8
#Q unsilong long long 8
#f float float 4
#d double float 8
#s char[] string 1
#p char[] string 1
#P void * long
def send_data_to_mcu(pack,global_uart_send_data):
hex_data = ustruct.pack("<bbhhhhbb", #小端字节序
global_uart_send_data.work_mode,
global_uart_send_data.recognition,
global_uart_send_data.top_block_offset, #以屏幕中线为0点
global_uart_send_data.center_block_offset,
global_uart_send_data.left_block_offset, #以屏幕中线为0点
global_uart_send_data.right_block_offset,
global_uart_send_data.left_number,
global_uart_send_data.right_number,)
pkg_data = pack.enpack(hex_data)
k210_uart.write(pkg_data)
#______________________________________________________________________________________________________________________________
#pack解包后的回调函数
#def data_callback(bytes):
#hex_byte = [hex(i) for i in bytes]
#print(" ".join(hex_byte))
#______________________________________________________________________________________________________________________________
#定时发送串口数据的定时器
#构造函数
# 定时器回调
pack = upacker_init()
def on_timer(timer):
global work_mode
global_uart_send_data.work_mode = work_mode
if(top_blob_location.x != -1):
global_uart_send_data.top_block_offset = top_blob_location.x-int(sensor.width()/2)
else:
global_uart_send_data.top_block_offset = 0
if(center_blob_location.x != -1):
global_uart_send_data.center_block_offset = center_blob_location.x-int(sensor.width()/2)
else:
global_uart_send_data.center_block_offset = 0
if(left_blob_location.y != -1):
global_uart_send_data.left_block_offset = left_blob_location.y-int(sensor.height()/2)
else:
global_uart_send_data.left_block_offset = 0
if(right_blob_location.y != -1):
global_uart_send_data.right_block_offset = right_blob_location.y-int(sensor.height()/2)
else:
global_uart_send_data.right_block_offset = 0
global_uart_send_data.top_block_offset = top_blob_location.x-int(sensor.width()/2)
global_uart_send_data.center_block_offset = center_blob_location.x-int(sensor.width()/2)
send_data_to_mcu(pack,global_uart_send_data)
# 配置定时器0通道0
tim = Timer(Timer.TIMER0, Timer.CHANNEL0, mode=Timer.MODE_PERIODIC, period=20, unit=Timer.UNIT_MS, callback=on_timer, arg=on_timer, start=False, priority=1, div=0)
#定时发送串口数据
tim.start()
#______________________________________________________________________________________________________________________________
#数字识别配置区域
anchors = [1.40625, 1.875, 1.09375, 1.1875, 1.25, 1.46875, 1.53125, 2.15625, 1.1875, 1.8125000000000002]
labels = ["1", "2", "3", "4", "5", "6", "7", "8"]
actual_number = [1,2,3,4,5,6,7,8]
KPU = kpu.load("/sd/mx.kmodel")
xy = [0,0]
kpu.init_yolo2(KPU,0.7,0.01,len(anchors)//2,anchors)
#______________________________________________________________________________________________________________________________
#主循环
while (True):
gc.collect()
img = sensor.snapshot() # Take a picture and return the image.
# img.draw_cross(sensor.width()//2, sensor.height()//2, color = (0, 255, 0), size = 10, thickness = 2)
clock.tick()
#手动切换
if key_press_long ==1:
key_press_long = 0
#长按后切换K210工作模式
if work_mode == 0:
work_mode =1
print("work mode to 1")
sensor.set_windowing((224, 224))
else:
work_mode =0
print("work mode to 0")
img = img.resize(320, 240)
sensor.set_windowing((320, 240))
#GD32控制切换
if uart_cmd_need_change_mode == 1:
uart_cmd_need_change_mode = 0
#串口命令来切换K210工作模式
if work_mode == 1:
print("work mode to 1")
sensor.set_windowing((224, 224))
else:
print("work mode to 0")
img = img.resize(320, 240)
sensor.set_windowing((320, 240))
if work_mode == 0: # 巡线模式
find_blob(img)
draw_roi(img)
draw_lines(img)
if is_findflob_debug:
print(clock.fps(), top_blob_location.x,center_blob_location.x, left_blob_location.y, right_blob_location.y) # Note: CanMV Cam runs about half as fast when connected,int(top.x),int(top.y)
print(clock.fps(), global_uart_send_data.top_block_offset,global_uart_send_data.center_block_offset, global_uart_send_data.left_block_offset, global_uart_send_data.right_block_offset)
print(global_uart_send_data.recognition)
fps = clock.fps() # Update the FPS clock.
img.draw_string(2, 2, ("%2.1ffps" % (fps)), color=(0, 128, 0), scale=2)
lcd.display(img)
else: # 数字识别模式
img = sensor.snapshot() # Take a picture and return the image.
code = kpu.run_yolo2(KPU, img)
fps = clock.fps() # Update the FPS clock.
img.draw_string(2, 2, ("%2.1ffps" % (fps)), color=(0, 128, 0), scale=2)
#print(clock.fps())
if bool(code):
for i in code:
img = img.draw_rectangle(i.rect(), (0, 255, 0), 2, 0)
xy[0] = i.x()
xy[1] = (i.y() - 20)
img = img.draw_string(xy[0], xy[1], (labels[i.classid()]), (0, 255, 0), scale=2)
img = img.draw_string(xy[0], xy[1] + 24, '%.3f' % i.value(), color=(255, 0, 0), scale=2)
if(code[0].x()<code[-1].x()):
global_uart_send_data.left_number = actual_number[code[0].classid()]
global_uart_send_data.right_number = actual_number[(code[-1].classid())]
else:
global_uart_send_data.left_number = actual_number[(code[-1].classid())]
global_uart_send_data.right_number = actual_number[(code[0].classid())]
#print(global_uart_send_data.left_number,global_uart_send_data.right_number)
lcd.display(img)
read_data = k210_uart.read()
if read_data:
read_str = pack.unpack(read_data, print_hex)
lcd.display(img) # Display image on lcd.
if is_stack_heap_mem_debug:
print("stack mem:"+str(gc.mem_free() / 1024)) # stack mem
print("heap mem:"+str(utils.heap_free() / 1024)) # heap mem