OpenMV从入门到精通:嵌入式机器视觉的完整指南

第一部分:OpenMV简介与基础入门

1.1 什么是OpenMV?

OpenMV是一个开源、低功耗的机器视觉模块,它基于STM32微控制器,运行MicroPython解释器。这个项目旨在成为"机器视觉界的Arduino",为创客、学生和工程师提供易于使用的嵌入式视觉解决方案。

核心特性:

  • 集成图像传感器(不同型号分辨率不同,通常为30万到500万像素)

  • 内置MicroPython解释器,支持Python编程

  • 低功耗设计,适合电池供电应用

  • 丰富的I/O接口(UART、I2C、SPI、GPIO等)

  • 预置机器视觉算法库

1.2 OpenMV硬件家族

OpenMV有多个型号,满足不同需求:

OpenMV Cam H7(当前主流型号)

  • STM32H743VI ARM Cortex-M7处理器,主频480MHz

  • 512KB RAM + 1MB SRAM + 2MB Flash

  • 支持多种图像传感器(OV7725、MT9V034等)

  • 内置RGB LED和红外LED

OpenMV Cam M7(前代旗舰)

  • STM32F765VI ARM Cortex-M7处理器,主频216MHz

  • 512KB RAM + 1MB SRAM + 2MB Flash

OpenMV Cam H4(入门级)

  • 成本优化版本,适合预算有限的项目

1.3 开发环境搭建

安装OpenMV IDE

OpenMV IDE是官方推荐的开发环境,支持Windows、macOS和Linux系统。

安装步骤:

  1. 访问OpenMV官方网站下载对应系统的IDE

  2. 安装IDE并启动

  3. 连接OpenMV Cam到计算机USB端口

  4. IDE会自动检测设备并建立连接

第一个OpenMV程序:Hello World

python

# Hello World示例 - 点亮内置LED
import pyb

led = pyb.LED(3)  # 蓝色LED

while True:
    led.on()
    pyb.delay(250)
    led.off()
    pyb.delay(250)
捕获第一张图像

python

import sensor, image, time

# 初始化摄像头
sensor.reset()  # 重置传感器
sensor.set_pixformat(sensor.RGB565)  # 设置像素格式为RGB565
sensor.set_framesize(sensor.QVGA)  # 设置分辨率为QVGA (320x240)
sensor.skip_frames(time=2000)  # 跳过一些帧,让相机自动调整

# 拍摄照片
img = sensor.snapshot()

# 保存到SD卡(如果有)
try:
    img.save("test.jpg")
except:
    print("SD卡未插入或不可用")

第二部分:OpenMV核心功能详解

2.1 图像采集与基础处理

图像采集模式

python

import sensor, image, time

sensor.reset()
sensor.set_pixformat(sensor.RGB565)
sensor.set_framesize(sensor.QVGA)
sensor.skip_frames(2000)

# 不同帧大小选择
# sensor.QQVGA: 160x120
# sensor.QVGA: 320x240  
# sensor.VGA: 640x480
# sensor.HD: 1280x720
# sensor.FHD: 1920x1080(取决于传感器支持)

clock = time.clock()

while True:
    clock.tick()
    img = sensor.snapshot()
    
    # 获取图像信息
    print("宽度:", img.width())
    print("高度:", img.height())
    print("帧率:", clock.fps())
图像基本操作

python

import sensor, image, time, pyb

sensor.reset()
sensor.set_pixformat(sensor.RGB565)
sensor.set_framesize(sensor.QVGA)
sensor.skip_frames(2000)

while True:
    img = sensor.snapshot()
    
    # 绘制基本图形
    img.draw_circle(160, 120, 30, color=(255, 0, 0))  # 红色圆形
    img.draw_rectangle(50, 50, 100, 100, color=(0, 255, 0))  # 绿色矩形
    img.draw_line(0, 0, 320, 240, color=(0, 0, 255))  # 蓝色对角线
    img.draw_string(10, 10, "OpenMV Rocks!", color=(255, 255, 255))
    
    # 图像裁剪
    cropped = img.copy(roi=(100, 100, 120, 120))
    
    # 图像缩放
    scaled = img.scale(x_scale=0.5, y_scale=0.5)
    
    # 图像旋转
    rotated = img.rotation_corr(degrees=45)

2.2 颜色识别

颜色识别是OpenMV最常用的功能之一,适用于多种应用场景。

python

import sensor, image, time, pyb

# 颜色跟踪阈值定义 (L Min, L Max, A Min, A Max, B Min, B Max)
# LAB颜色空间通常比RGB更适合颜色识别
red_threshold = (30, 60, 25, 65, 10, 45)
green_threshold = (20, 50, -50, -20, 10, 40)
blue_threshold = (10, 40, 10, 30, -60, -30)

sensor.reset()
sensor.set_pixformat(sensor.RGB565)
sensor.set_framesize(sensor.QVGA)
sensor.skip_frames(2000)
sensor.set_auto_gain(False)  # 颜色跟踪需要关闭自动增益
sensor.set_auto_whitebal(False)  # 颜色跟踪需要关闭自动白平衡

while True:
    img = sensor.snapshot()
    
    # 寻找红色区域
    blobs = img.find_blobs([red_threshold], pixels_threshold=100, 
                          area_threshold=100, merge=True)
    
    if blobs:
        # 找到最大的红色区域
        largest_blob = max(blobs, key=lambda b: b.pixels())
        
        # 绘制矩形框
        img.draw_rectangle(largest_blob.rect(), color=(255, 0, 0))
        
        # 绘制中心十字
        img.draw_cross(largest_blob.cx(), largest_blob.cy(), color=(255, 0, 0))
        
        # 显示信息
        img.draw_string(10, 10, "Red Object Detected!", color=(255, 0, 0))
        img.draw_string(10, 25, "Size: %d pixels" % largest_blob.pixels(), color=(255, 0, 0))

2.3 人脸检测

OpenMV内置了Haar级联分类器,可以用于人脸检测。

python

import sensor, image, time, pyb

# 加载人脸检测模型
face_cascade = image.HaarCascade("frontalface", stages=25)

sensor.reset()
sensor.set_pixformat(sensor.GRAYSCALE)  # 人脸检测使用灰度图
sensor.set_framesize(sensor.HQVGA)  # 使用合适的分辨率
sensor.skip_frames(2000)

clock = time.clock()

while True:
    clock.tick()
    img = sensor.snapshot()
    
    # 检测人脸
    objects = img.find_features(face_cascade, threshold=0.75, scale_factor=1.25)
    
    # 绘制检测到的人脸
    for r in objects:
        img.draw_rectangle(r, color=(255, 255, 255))
        
        # 在脸部绘制特征点
        img.draw_cross(r[0]+r[2]//2, r[1]+r[3]//2, color=(255, 255, 255))
    
    # 显示帧率
    img.draw_string(5, 5, "FPS: %.2f" % clock.fps(), color=(255, 255, 255))

2.4 二维码与条形码识别

python

import sensor, image, time, pyb

sensor.reset()
sensor.set_pixformat(sensor.RGB565)
sensor.set_framesize(sensor.VGA)  # 更高分辨率有助于识别
sensor.skip_frames(2000)

while True:
    img = sensor.snapshot()
    
    # 二维码识别
    codes = img.find_qrcodes()
    for code in codes:
        img.draw_rectangle(code.rect(), color=(255, 0, 0))
        img.draw_string(code.x()+3, code.y()-20, code.payload(), color=(255, 0, 0))
        print("QR Code:", code.payload(), "Type:", code.type())
    
    # 条形码识别
    barcodes = img.find_barcodes()
    for barcode in barcodes:
        img.draw_rectangle(barcode.rect(), color=(0, 255, 0))
        img.draw_string(barcode.x()+3, barcode.y()-20, barcode.payload(), color=(0, 255, 0))
        print("Barcode:", barcode.payload(), "Type:", barcode.type())

第三部分:高级视觉算法

3.1 模板匹配

模板匹配用于在图像中寻找预定义的模板图案。

python

import sensor, image, time, pyb

sensor.reset()
sensor.set_pixformat(sensor.GRAYSCALE)
sensor.set_framesize(sensor.QVGA)
sensor.skip_frames(2000)

# 从文件加载模板(或从第一帧捕获)
template = image.Image("/template.pgm")  # 模板图像

clock = time.clock()

while True:
    clock.tick()
    img = sensor.snapshot()
    
    # 模板匹配
    # 参数: 模板图像, 搜索区域, 阈值(0-1), step=2, search=SEARCH_EX)
    r = img.find_template(template, 0.70, step=2, search=image.SEARCH_EX)
    
    if r:
        img.draw_rectangle(r, color=(255, 255, 255))
        img.draw_string(r[0], r[1]-20, "Match: %.2f" % r[5], color=(255, 255, 255))
    
    img.draw_string(5, 5, "FPS: %.2f" % clock.fps(), color=(255, 255, 255))

3.2 AprilTag标记检测

AprilTag是一种类似二维码的视觉基准标记系统,具有更高的检测距离和角度。

python

import sensor, image, time, math

sensor.reset()
sensor.set_pixformat(sensor.RGB565)
sensor.set_framesize(sensor.QQVGA)  # AprilTag需要较小的分辨率
sensor.skip_frames(2000)

# AprilTag家族选择
# TAG16H5, TAG25H7, TAG25H9, TAG36H10, TAG36H11, ARTOOLKIT
tag_families = 0
tag_families |= image.TAG16H5
tag_families |= image.TAG36H11  # 推荐使用

while True:
    img = sensor.snapshot()
    
    tags = img.find_apriltags(families=tag_families)
    
    for tag in tags:
        # 绘制标记边界
        img.draw_rectangle(tag.rect(), color=(255, 0, 0))
        
        # 绘制标记中心
        img.draw_cross(tag.cx(), tag.cy(), color=(0, 255, 0))
        
        # 绘制标记轮廓
        for c in tag.corners():
            img.draw_circle(c[0], c[1], 5, color=(0, 0, 255))
        
        # 计算距离和角度
        # 假设标记的实际大小为0.1米
        distance = 0.1 * 1080 / (tag.w() * math.tan(math.radians(70/2)))
        
        # 显示信息
        img.draw_string(tag.cx()+10, tag.cy()+5, "ID: %d" % tag.id(), color=(255, 255, 255))
        img.draw_string(tag.cx()+10, tag.cy()+20, "Dist: %.2fm" % distance, color=(255, 255, 255))
        
        # 计算旋转角度
        rotation_angle = tag.rotation()
        img.draw_string(tag.cx()+10, tag.cy()+35, "Rot: %.1f deg" % rotation_angle, color=(255, 255, 255))

3.3 光流算法

光流算法用于检测图像序列中的运动模式。

python

import sensor, image, time, pyb

sensor.reset()
sensor.set_pixformat(sensor.GRAYSCALE)
sensor.set_framesize(sensor.QQVGA)  # 小分辨率以提高帧率
sensor.skip_frames(2000)

# 设置窗口大小用于光流计算
sensor.set_windowing((80, 60))

clock = time.clock()

# 创建光流对象
flow = image.OpticalFlow()

while True:
    clock.tick()
    img = sensor.snapshot()
    
    # 计算光流
    flow.calc(img)
    
    # 获取平均运动
    x_motion = flow.x_translation()
    y_motion = flow.y_translation()
    rotation = flow.rotation()
    
    # 在图像上绘制光流场
    for y in range(0, img.height(), 10):
        for x in range(0, img.width(), 10):
            dx, dy = flow.get(x, y)
            if dx is not None and dy is not None:
                img.draw_line(x, y, int(x+dx*5), int(y+dy*5), color=(255, 255, 255))
    
    # 显示运动信息
    img.draw_string(5, 5, "X: %.2f" % x_motion, color=(255, 255, 255))
    img.draw_string(5, 15, "Y: %.2f" % y_motion, color=(255, 255, 255))
    img.draw_string(5, 25, "R: %.2f" % rotation, color=(255, 255, 255))
    img.draw_string(5, 35, "FPS: %.1f" % clock.fps(), color=(255, 255, 255))

3.4 直线检测与霍夫变换

python

import sensor, image, time, math

sensor.reset()
sensor.set_pixformat(sensor.GRAYSCALE)
sensor.set_framesize(sensor.QQVGA)
sensor.skip_frames(2000)

while True:
    img = sensor.snapshot()
    
    # 边缘检测
    img.find_edges(image.EDGE_CANNY, threshold=(50, 80))
    
    # 霍夫变换检测直线
    lines = img.find_lines(threshold=1000, theta_margin=25, rho_margin=25)
    
    # 绘制检测到的直线
    for l in lines:
        # 过滤接近水平的线
        if (l.theta() < 10 or l.theta() > 170):
            img.draw_line(l.line(), color=(255, 255, 255))
            print("直线: theta=%d, rho=%d" % (l.theta(), l.rho()))
    
    # 检测线段
    segments = img.find_line_segments()
    for s in segments:
        img.draw_line(s.line(), color=(0, 255, 0))
        img.draw_circle(s.x1(), s.y1(), 3, color=(0, 255, 0))
        img.draw_circle(s.x2(), s.y2(), 3, color=(0, 255, 0))

3.5 圆形检测

python

import sensor, image, time, math

sensor.reset()
sensor.set_pixformat(sensor.GRAYSCALE)
sensor.set_framesize(sensor.QVGA)
sensor.skip_frames(2000)

while True:
    img = sensor.snapshot()
    
    # 高斯模糊降噪
    img.gaussian(1)
    
    # 检测圆形
    circles = img.find_circles(threshold=2000, x_margin=10, y_margin=10,
                              r_margin=10, r_min=10, r_max=100)
    
    # 绘制检测到的圆形
    for c in circles:
        img.draw_circle(c.x(), c.y(), c.r(), color=(255, 255, 255))
        img.draw_cross(c.x(), c.y(), color=(255, 255, 255))
        
        # 计算圆形度
        circularity = c.roundness()
        img.draw_string(c.x()+10, c.y()+10, "C: %.2f" % circularity, color=(255, 255, 255))

第四部分:机器学习与深度学习应用

4.1 使用OpenMV进行物体分类

OpenMV支持TensorFlow Lite Micro,可以运行轻量级神经网络模型。

python

import sensor, image, time, tf

sensor.reset()
sensor.set_pixformat(sensor.RGB565)
sensor.set_framesize(sensor.QVGA)
sensor.skip_frames(2000)

# 加载TensorFlow Lite模型
net = tf.load("trained_model.tflite", load_to_fb=True)

# 标签定义
labels = ['apple', 'banana', 'orange', 'unknown']

clock = time.clock()

while True:
    clock.tick()
    img = sensor.snapshot()
    
    # 对图像进行分类
    # 注意:输入图像大小需要与模型训练时一致
    for obj in net.classify(img, min_scale=1.0, scale_mul=0.5, 
                           x_overlap=0.0, y_overlap=0.0):
        
        # 获取预测结果
        predictions_list = list(zip(labels, obj.output()))
        
        # 按置信度排序
        predictions_list.sort(key=lambda x: x[1], reverse=True)
        
        # 显示最佳预测
        if predictions_list:
            best_label, best_score = predictions_list[0]
            
            img.draw_string(10, 10, "%s: %.2f" % (best_label, best_score), 
                          color=(255, 255, 255))
            
            # 如果置信度足够高,绘制边界框
            if best_score > 0.7:
                img.draw_rectangle(obj.rect(), color=(255, 0, 0))
    
    img.draw_string(5, img.height()-20, "FPS: %.1f" % clock.fps(), 
                   color=(255, 255, 255))

4.2 人脸识别

python

import sensor, image, time, pyb

sensor.reset()
sensor.set_pixformat(sensor.GRAYSCALE)
sensor.set_framesize(sensor.HQVGA)
sensor.skip_frames(2000)

# 加载人脸识别模型
face_model = image.HaarCascade("frontalface", stages=25)

# 简单的人脸数据库
face_database = {
    0: {"name": "Alice", "features": None},
    1: {"name": "Bob", "features": None},
    2: {"name": "Charlie", "features": None}
}

# 训练样本计数器
training_samples = 5
current_person = 0
sample_count = 0

def extract_face_features(face_img):
    """从人脸图像中提取特征(简化版)"""
    # 这里可以使用更复杂的特征提取方法
    # 如LBP、HOG或小型神经网络
    return face_img.histogram()

def recognize_face(features):
    """识别人脸(简化版)"""
    best_match = None
    best_score = 0
    
    for person_id, data in face_database.items():
        if data["features"] is not None:
            # 计算相似度(这里使用简单的相关系数)
            score = image.match_template(features, data["features"])
            if score > best_score and score > 0.8:  # 阈值
                best_score = score
                best_match = person_id
    
    return best_match

while True:
    img = sensor.snapshot()
    
    faces = img.find_features(face_model, threshold=0.75, scale_factor=1.25)
    
    for face_rect in faces:
        img.draw_rectangle(face_rect, color=(255, 255, 255))
        
        # 提取人脸区域
        face_img = img.copy(roi=face_rect)
        
        # 训练模式
        if sample_count < training_samples:
            img.draw_string(face_rect[0], face_rect[1]-20, 
                          "Training %d/%d" % (sample_count+1, training_samples),
                          color=(255, 255, 255))
            
            if sample_count == 0:
                # 存储特征
                face_database[current_person]["features"] = extract_face_features(face_img)
            sample_count += 1
            
            if sample_count >= training_samples:
                sample_count = 0
                current_person += 1
                if current_person >= len(face_database):
                    current_person = 0
        
        # 识别模式
        else:
            features = extract_face_features(face_img)
            person_id = recognize_face(features)
            
            if person_id is not None:
                name = face_database[person_id]["name"]
                img.draw_string(face_rect[0], face_rect[1]-20, name, 
                              color=(255, 255, 255))
            else:
                img.draw_string(face_rect[0], face_rect[1]-20, "Unknown", 
                              color=(255, 255, 255))

4.3 目标检测

python

import sensor, image, time, tf

sensor.reset()
sensor.set_pixformat(sensor.RGB565)
sensor.set_framesize(sensor.QVGA)
sensor.skip_frames(2000)

# 加载目标检测模型
# 需要专门为目标检测训练的TensorFlow Lite模型
net = tf.load("object_detection_model.tflite", load_to_fb=True)

# 类别标签
labels = ['person', 'bicycle', 'car', 'motorbike', 'cat', 'dog']

# 锚点框(根据模型训练时设置)
anchors = [3.50, 3.78, 4.41, 4.73, 4.95, 5.48, 5.86, 6.36, 6.76, 7.46]

clock = time.clock()

while True:
    clock.tick()
    img = sensor.snapshot()
    
    # 运行目标检测
    for detection in net.detect(img, thresholds=[(math.ceil(0.6*255), math.ceil(0.7*255), 255)],
                               anchors=anchors):
        
        # 获取检测结果
        [x, y, w, h] = detection.rect()
        center_x = x + w // 2
        center_y = y + h // 2
        
        # 绘制边界框
        img.draw_rectangle([x, y, w, h], color=(255, 0, 0))
        
        # 绘制标签
        label = labels[detection.class_id()]
        score = detection.score()
        img.draw_string(x, y-15, "%s: %.2f" % (label, score), color=(255, 0, 0))
        
        # 绘制中心点
        img.draw_cross(center_x, center_y, color=(0, 255, 0))
        
        # 输出检测信息
        print("Detected:", label, "Score:", score, 
              "Position:", center_x, center_y)
    
    img.draw_string(5, 5, "FPS: %.1f" % clock.fps(), color=(255, 255, 255))

第五部分:通信与外部设备接口

5.1 串口通信

python

import sensor, image, time, pyb

# 初始化串口
uart = pyb.UART(3, 115200, timeout_char=1000)  # P4(TX)和P5(RX)

sensor.reset()
sensor.set_pixformat(sensor.RGB565)
sensor.set_framesize(sensor.QVGA)
sensor.skip_frames(2000)

while True:
    img = sensor.snapshot()
    
    # 检测红色物体
    red_blobs = img.find_blobs([(30, 60, 25, 65, 10, 45)], pixels_threshold=100)
    
    if red_blobs:
        largest = max(red_blobs, key=lambda b: b.pixels())
        
        # 通过串口发送位置信息
        message = "RED,%d,%d,%d\n" % (largest.cx(), largest.cy(), largest.pixels())
        uart.write(message)
        
        img.draw_rectangle(largest.rect(), color=(255, 0, 0))
        img.draw_cross(largest.cx(), largest.cy(), color=(255, 0, 0))
    
    # 读取串口指令
    if uart.any():
        command = uart.readline().decode().strip()
        if command == "CAPTURE":
            # 保存当前图像
            img.save("capture.jpg")
            uart.write("Image saved\n")
        elif command.startswith("THRESHOLD"):
            # 更新阈值
            parts = command.split(",")
            if len(parts) == 7:
                new_threshold = tuple(map(int, parts[1:]))
                uart.write("Threshold updated\n")

5.2 I2C通信

python

import sensor, image, time, pyb

# 初始化I2C
i2c = pyb.I2C(2, pyb.I2C.SLAVE, addr=0x42)
i2c.init(pyb.I2C.SLAVE, addr=0x42)

sensor.reset()
sensor.set_pixformat(sensor.GRAYSCALE)
sensor.set_framesize(sensor.QQVGA)
sensor.skip_frames(2000)

while True:
    img = sensor.snapshot()
    
    # 计算图像平均亮度
    stats = img.get_statistics()
    brightness = stats.mean()
    
    # 通过I2C发送数据
    data = bytearray([int(brightness)])
    try:
        i2c.send(data, timeout=100)
    except:
        pass
    
    # 尝试接收I2C数据
    try:
        received = i2c.recv(1, timeout=100)
        if received:
            command = received[0]
            if command == 0x01:  # 拍摄照片
                img.save("i2c_capture.jpg")
    except:
        pass
    
    img.draw_string(5, 5, "Brightness: %d" % brightness, color=(255, 255, 255))

5.3 SPI通信

python

import sensor, image, time, pyb

# 初始化SPI
spi = pyb.SPI(2, pyb.SPI.MASTER, baudrate=1000000, polarity=0, phase=0)

sensor.reset()
sensor.set_pixformat(sensor.GRAYSCALE)
sensor.set_framesize(sensor.QQVGA)
sensor.skip_frames(2000)

while True:
    img = sensor.snapshot()
    
    # 边缘检测
    img.find_edges(image.EDGE_CANNY, threshold=(50, 80))
    
    # 获取图像数据(缩小以适合传输)
    small_img = img.scale(x_scale=0.25, y_scale=0.25)
    img_data = small_img.bytearray()
    
    # 通过SPI发送图像数据
    # 首先发送图像尺寸
    height = small_img.height()
    width = small_img.width()
    
    spi.send(bytearray([height, width]))
    spi.send(img_data)
    
    # 显示缩小后的图像
    img.draw_image(small_img, x=img.width()-small_img.width(), y=0)

5.4 WiFi通信(OpenMV Cam H7 Plus)

python

import sensor, image, time, network, socket

# WiFi配置
SSID = "your_wifi_ssid"
PASSWORD = "your_wifi_password"

# 连接WiFi
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(SSID, PASSWORD)

# 等待连接
max_wait = 10
while max_wait > 0:
    if wlan.status() < 0 or wlan.status() >= 3:
        break
    max_wait -= 1
    time.sleep(1)

if wlan.status() != 3:
    raise RuntimeError('WiFi连接失败')
else:
    print('WiFi连接成功')
    print('IP地址:', wlan.ifconfig()[0])

# 初始化摄像头
sensor.reset()
sensor.set_pixformat(sensor.RGB565)
sensor.set_framesize(sensor.QVGA)
sensor.skip_frames(2000)

# 创建HTTP服务器
addr = socket.getaddrinfo('0.0.0.0', 80)[0][-1]
s = socket.socket()
s.bind(addr)
s.listen(1)

print('服务器启动,监听地址:', addr)

def generate_html(image_data):
    """生成HTML页面"""
    html = """<!DOCTYPE html>
    <html>
    <head>
        <title>OpenMV视频流</title>
        <meta http-equiv="refresh" content="1">
    </head>
    <body>
        <h1>OpenMV实时视频流</h1>
        <img src="data:image/jpeg;base64,%s" alt="实时视频">
    </body>
    </html>""" % image_data
    return html

while True:
    cl, addr = s.accept()
    print('客户端连接:', addr)
    
    # 捕获图像
    img = sensor.snapshot()
    
    # 转换为JPEG
    jpeg_buffer = img.compress(quality=60).bytearray()
    
    # 转换为base64
    import ubinascii
    base64_data = ubinascii.b2a_base64(jpeg_buffer).decode().strip()
    
    # 生成HTTP响应
    response = generate_html(base64_data)
    
    cl.send('HTTP/1.0 200 OK\r\nContent-type: text/html\r\n\r\n')
    cl.send(response)
    cl.close()

第六部分:综合项目实战

6.1 智能追踪小车

python

"""
智能追踪小车项目
功能:识别并追踪特定颜色的物体,通过PWM控制电机
硬件:OpenMV Cam H7 + 电机驱动板 + 直流电机x2 + 小车底盘
"""

import sensor, image, time, pyb, math

# 电机引脚定义
MOTOR_A_PWM = pyb.Pin('P7', pyb.Pin.OUT_PP)  # 电机A PWM
MOTOR_A_DIR = pyb.Pin('P8', pyb.Pin.OUT_PP)  # 电机A方向
MOTOR_B_PWM = pyb.Pin('P9', pyb.Pin.OUT_PP)  # 电机B PWM
MOTOR_B_DIR = pyb.Pin('P10', pyb.Pin.OUT_PP) # 电机B方向

# 创建PWM对象
pwm_a = pyb.Timer(4, freq=1000).channel(1, pyb.Timer.PWM, pin=MOTOR_A_PWM)
pwm_b = pyb.Timer(4, freq=1000).channel(2, pyb.Timer.PWM, pin=MOTOR_B_PWM)

# 目标颜色阈值(红色)
TARGET_COLOR_THRESHOLD = (30, 60, 25, 65, 10, 45)

# PID控制器参数
KP = 0.5  # 比例系数
KI = 0.01 # 积分系数
KD = 0.1  # 微分系数

# 初始化摄像头
sensor.reset()
sensor.set_pixformat(sensor.RGB565)
sensor.set_framesize(sensor.QVGA)
sensor.skip_frames(2000)
sensor.set_auto_gain(False)
sensor.set_auto_whitebal(False)

class PIDController:
    def __init__(self, kp, ki, kd):
        self.kp = kp
        self.ki = ki
        self.kd = kd
        self.integral = 0
        self.previous_error = 0
        
    def update(self, error, dt):
        # 比例项
        proportional = self.kp * error
        
        # 积分项
        self.integral += error * dt
        integral = self.ki * self.integral
        
        # 微分项
        derivative = self.kd * (error - self.previous_error) / dt
        self.previous_error = error
        
        # 输出控制量
        output = proportional + integral + derivative
        
        # 限制输出范围
        output = max(-100, min(100, output))
        
        return output

def set_motor_speed(motor, speed):
    """设置电机速度,范围:-100到100"""
    speed = max(-100, min(100, speed))
    
    if motor == 'A':
        if speed >= 0:
            MOTOR_A_DIR.high()
            pwm_a.pulse_width_percent(speed)
        else:
            MOTOR_A_DIR.low()
            pwm_a.pulse_width_percent(-speed)
    elif motor == 'B':
        if speed >= 0:
            MOTOR_B_DIR.high()
            pwm_b.pulse_width_percent(speed)
        else:
            MOTOR_B_DIR.low()
            pwm_b.pulse_width_percent(-speed)

def stop_motors():
    """停止两个电机"""
    pwm_a.pulse_width_percent(0)
    pwm_b.pulse_width_percent(0)

def main():
    # 创建PID控制器
    pid = PIDController(KP, KI, KD)
    
    # 图像中心坐标
    img_center_x = sensor.width() // 2
    img_center_y = sensor.height() // 2
    
    # 目标距离阈值(像素)
    TARGET_DISTANCE_THRESHOLD = 50
    
    last_time = time.ticks_ms()
    
    while True:
        # 计算时间差
        current_time = time.ticks_ms()
        dt = (current_time - last_time) / 1000.0  # 转换为秒
        last_time = current_time
        
        # 捕获图像
        img = sensor.snapshot()
        
        # 寻找目标颜色
        blobs = img.find_blobs([TARGET_COLOR_THRESHOLD], 
                              pixels_threshold=100,
                              area_threshold=100,
                              merge=True)
        
        if blobs:
            # 找到最大的目标
            target = max(blobs, key=lambda b: b.pixels())
            
            # 绘制目标
            img.draw_rectangle(target.rect(), color=(255, 0, 0))
            img.draw_cross(target.cx(), target.cy(), color=(255, 0, 0))
            
            # 计算目标中心与图像中心的误差
            error_x = target.cx() - img_center_x
            
            # 计算目标距离(基于目标大小)
            target_distance = target.pixels()
            
            # 使用PID控制器计算转向控制量
            steer_output = pid.update(error_x, dt)
            
            # 基础速度和转向调整
            base_speed = 30
            if target_distance > 5000:  # 目标太近
                base_speed = -20  # 后退
            elif target_distance < 1000:  # 目标太远
                base_speed = 40   # 快速前进
            else:
                base_speed = 30   # 正常前进
            
            # 计算左右电机速度
            left_speed = base_speed - steer_output
            right_speed = base_speed + steer_output
            
            # 设置电机速度
            set_motor_speed('A', left_speed)
            set_motor_speed('B', right_speed)
            
            # 显示信息
            img.draw_string(10, 10, "Target Locked", color=(255, 0, 0))
            img.draw_string(10, 25, "Error: %d" % error_x, color=(255, 0, 0))
            img.draw_string(10, 40, "Distance: %d" % target_distance, color=(255, 0, 0))
            img.draw_string(10, 55, "L: %d, R: %d" % (left_speed, right_speed), color=(255, 0, 0))
        
        else:
            # 没有找到目标,停止电机
            stop_motors()
            img.draw_string(10, 10, "No Target", color=(255, 255, 255))
            
            # 缓慢旋转寻找目标
            set_motor_speed('A', 20)
            set_motor_speed('B', -20)
        
        # 显示帧率
        img.draw_string(sensor.width()-80, 10, "FPS: %.1f" % (1.0/dt if dt > 0 else 0), 
                       color=(255, 255, 255))

if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        stop_motors()
        print("程序结束")

6.2 智能仓储管理系统

python

"""
智能仓储管理系统
功能:通过二维码识别管理库存,显示库存信息
硬件:OpenMV Cam H7 + OLED显示屏 + 蜂鸣器
"""

import sensor, image, time, pyb, tf, uos

# 初始化外设
# OLED显示屏 (I2C接口)
i2c = pyb.I2C(2, pyb.I2C.MASTER)
oled = None  # 这里需要根据具体OLED型号初始化

# 蜂鸣器
buzzer = pyb.Pin('P6', pyb.Pin.OUT_PP)

# 初始化摄像头
sensor.reset()
sensor.set_pixformat(sensor.RGB565)
sensor.set_framesize(sensor.VGA)  # 更高分辨率便于识别二维码
sensor.skip_frames(2000)

# 库存数据库
inventory_db = {
    "ITEM001": {"name": "螺丝刀", "quantity": 15, "location": "A-01"},
    "ITEM002": {"name": "扳手", "quantity": 8, "location": "A-02"},
    "ITEM003": {"name": "钳子", "quantity": 12, "location": "B-01"},
    "ITEM004": {"name": "电钻", "quantity": 5, "location": "B-02"},
    "ITEM005": {"name": "螺丝", "quantity": 100, "location": "C-01"},
}

# 操作记录
operation_log = []

def beep(times=1, duration=100):
    """蜂鸣器提示音"""
    for _ in range(times):
        buzzer.high()
        pyb.delay(duration)
        buzzer.low()
        pyb.delay(duration)

def update_display(item_id, operation):
    """更新OLED显示"""
    if item_id in inventory_db:
        item = inventory_db[item_id]
        message = """
        %s
        物品: %s
        数量: %d
        位置: %s
        操作: %s
        """ % (item_id, item["name"], item["quantity"], item["location"], operation)
        
        # 这里添加OLED显示代码
        print(message)  # 临时用打印代替

def log_operation(item_id, operation, change=0):
    """记录操作日志"""
    timestamp = time.localtime()
    timestamp_str = "%04d-%02d-%02d %02d:%02d:%02d" % (
        timestamp[0], timestamp[1], timestamp[2],
        timestamp[3], timestamp[4], timestamp[5]
    )
    
    log_entry = {
        "timestamp": timestamp_str,
        "item_id": item_id,
        "operation": operation,
        "quantity_change": change,
        "new_quantity": inventory_db.get(item_id, {}).get("quantity", 0)
    }
    
    operation_log.append(log_entry)
    
    # 保存到文件
    try:
        with open("/sd/operation_log.txt", "a") as f:
            f.write("%s,%s,%s,%d,%d\n" % (
                log_entry["timestamp"],
                log_entry["item_id"],
                log_entry["operation"],
                log_entry["quantity_change"],
                log_entry["new_quantity"]
            ))
    except:
        print("无法保存日志到SD卡")

def add_inventory(item_id, quantity=1):
    """增加库存"""
    if item_id in inventory_db:
        inventory_db[item_id]["quantity"] += quantity
        update_display(item_id, "入库")
        log_operation(item_id, "IN", quantity)
        beep(2)  # 成功提示音
        return True
    else:
        # 新物品
        inventory_db[item_id] = {
            "name": "未知物品",
            "quantity": quantity,
            "location": "待分配"
        }
        update_display(item_id, "新物品入库")
        log_operation(item_id, "NEW", quantity)
        beep(3)
        return True

def remove_inventory(item_id, quantity=1):
    """减少库存"""
    if item_id in inventory_db:
        if inventory_db[item_id]["quantity"] >= quantity:
            inventory_db[item_id]["quantity"] -= quantity
            update_display(item_id, "出库")
            log_operation(item_id, "OUT", -quantity)
            beep(1)
            return True
        else:
            update_display(item_id, "库存不足")
            beep(4, 50)  # 错误提示音
            return False
    else:
        update_display(item_id, "物品不存在")
        beep(4, 50)
        return False

def check_inventory():
    """检查库存状态"""
    low_stock = []
    for item_id, item in inventory_db.items():
        if item["quantity"] < 10:  # 阈值设为10
            low_stock.append((item_id, item["name"], item["quantity"]))
    
    return low_stock

def main():
    last_scan_time = time.ticks_ms()
    scan_interval = 1000  # 扫描间隔1秒
    
    print("智能仓储管理系统启动")
    print("等待扫描二维码...")
    
    while True:
        current_time = time.ticks_ms()
        
        if time.ticks_diff(current_time, last_scan_time) > scan_interval:
            img = sensor.snapshot()
            
            # 识别二维码
            codes = img.find_qrcodes()
            
            for code in codes:
                payload = code.payload()
                print("扫描到二维码:", payload)
                
                # 解析指令 (格式: 操作:物品ID:数量)
                parts = payload.split(":")
                if len(parts) >= 2:
                    operation = parts[0].upper()
                    item_id = parts[1]
                    
                    if len(parts) >= 3:
                        try:
                            quantity = int(parts[2])
                        except:
                            quantity = 1
                    else:
                        quantity = 1
                    
                    # 执行操作
                    if operation == "IN":
                        add_inventory(item_id, quantity)
                    elif operation == "OUT":
                        remove_inventory(item_id, quantity)
                    elif operation == "CHECK":
                        if item_id in inventory_db:
                            update_display(item_id, "查询")
                        else:
                            update_display(item_id, "物品不存在")
                    else:
                        print("未知操作:", operation)
                    
                    # 绘制二维码区域
                    img.draw_rectangle(code.rect(), color=(0, 255, 0))
                    img.draw_string(code.x(), code.y()-20, payload, color=(0, 255, 0))
                
                last_scan_time = current_time
            
            # 定期检查库存
            if time.ticks_diff(current_time, last_scan_time) > 5000:  # 每5秒检查一次
                low_stock = check_inventory()
                if low_stock:
                    print("低库存警告:")
                    for item in low_stock:
                        print("  %s (%s): %d个" % item)
                
                last_scan_time = current_time
            
            # 显示帧率
            img.draw_string(5, 5, "库存管理系统", color=(255, 255, 255))
            img.draw_string(5, 20, "等待扫描...", color=(255, 255, 255))

if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        print("系统关闭")
        print("操作日志:")
        for log in operation_log[-10:]:  # 显示最后10条日志
            print(log)

第七部分:性能优化与调试技巧

7.1 性能优化策略

python

"""
OpenMV性能优化技巧
"""

import sensor, image, time, pyb, gc

def optimize_performance():
    """
    性能优化演示
    """
    
    # 1. 选择合适的分辨率
    # 分辨率越高,处理越慢
    sensor.reset()
    sensor.set_pixformat(sensor.GRAYSCALE)  # 灰度图比RGB565快2倍
    sensor.set_framesize(sensor.QQVGA)  # 160x120 - 最快
    # sensor.set_framesize(sensor.QVGA)   # 320x240 - 平衡
    # sensor.set_framesize(sensor.VGA)    # 640x480 - 最慢
    
    # 2. 关闭自动功能
    sensor.set_auto_gain(False)
    sensor.set_auto_whitebal(False)
    sensor.set_auto_exposure(False)
    
    # 3. 设置合适的曝光时间
    sensor.set_auto_exposure(False, exposure_us=10000)  # 10ms
    
    # 4. 使用ROI(感兴趣区域)
    # 只处理图像的一部分
    sensor.set_windowing((40, 30, 80, 60))  # 只处理中心区域
    
    # 5. 跳帧处理
    sensor.skip_frames(time=2000)  # 让相机稳定
    
    clock = time.clock()
    
    while True:
        clock.tick()
        
        # 6. 减少图像操作
        img = sensor.snapshot()
        
        # 7. 只在需要时计算
        if clock.fps() < 15:  # 如果帧率太低
            # 减少处理复杂度
            # 例如:降低检测精度
            blobs = img.find_blobs([(30, 60, 25, 65, 10, 45)], 
                                  pixels_threshold=200,  # 提高阈值
                                  area_threshold=200)
        else:
            blobs = img.find_blobs([(30, 60, 25, 65, 10, 45)], 
                                  pixels_threshold=100,
                                  area_threshold=100)
        
        # 8. 手动垃圾回收
        if clock.fps() < 10:
            gc.collect()  # 手动触发垃圾回收
        
        # 9. 使用局部变量加速访问
        fps = clock.fps()
        img_width = img.width()
        img_height = img.height()
        
        # 10. 减少绘制操作
        if blobs:
            largest = max(blobs, key=lambda b: b.pixels())
            
            # 只绘制必要的图形
            img.draw_rectangle(largest.rect(), color=255)
            
            # 避免频繁的字符串操作
            if fps < 5:
                # 只在帧率很低时显示调试信息
                img.draw_string(5, 5, "Low FPS: %.1f" % fps, color=255)
        
        print("FPS:", fps)
        
        # 11. 动态调整处理频率
        if fps < 5:
            time.sleep_ms(50)  # 如果帧率太低,稍微休息

# 内存使用监控
def monitor_memory():
    import micropython
    while True:
        gc.collect()
        free_memory = gc.mem_free()
        allocated_memory = gc.mem_alloc()
        total_memory = free_memory + allocated_memory
        
        print("内存使用:")
        print("  可用: %d bytes" % free_memory)
        print("  已用: %d bytes" % allocated_memory)
        print("  总计: %d bytes" % total_memory)
        print("  使用率: %.1f%%" % (allocated_memory / total_memory * 100))
        
        # 堆栈信息
        micropython.mem_info()
        
        time.sleep(5)

# 运行优化示例
if __name__ == "__main__":
    optimize_performance()

7.2 调试技巧与故障排除

python

"""
OpenMV调试技巧
"""

import sensor, image, time, pyb, uos, json

class DebugLogger:
    """调试日志记录器"""
    
    def __init__(self, log_file="/sd/debug.log", max_size=10000):
        self.log_file = log_file
        self.max_size = max_size
        self.log_count = 0
        
    def log(self, message, level="INFO"):
        """记录日志"""
        timestamp = time.localtime()
        timestamp_str = "%04d-%02d-%02d %02d:%02d:%02d" % (
            timestamp[0], timestamp[1], timestamp[2],
            timestamp[3], timestamp[4], timestamp[5]
        )
        
        log_entry = "%s [%s] %s\n" % (timestamp_str, level, message)
        print(log_entry.strip())  # 同时输出到控制台
        
        try:
            # 检查文件大小
            try:
                size = uos.stat(self.log_file)[6]
                if size > self.max_size:
                    # 文件太大,清空
                    with open(self.log_file, "w") as f:
                        f.write("")
            except:
                pass
            
            # 写入日志
            with open(self.log_file, "a") as f:
                f.write(log_entry)
                
            self.log_count += 1
            
        except Exception as e:
            print("无法写入日志文件:", e)
    
    def dump_system_info(self):
        """输出系统信息"""
        self.log("=== 系统信息 ===")
        
        # 内存信息
        import gc
        gc.collect()
        self.log("可用内存: %d bytes" % gc.mem_free())
        self.log("已用内存: %d bytes" % gc.mem_alloc())
        
        # 文件系统信息
        try:
            fs_stats = uos.statvfs('/')
            total_space = fs_stats[0] * fs_stats[2]
            free_space = fs_stats[0] * fs_stats[3]
            self.log("SD卡总空间: %d KB" % (total_space // 1024))
            self.log("SD卡可用空间: %d KB" % (free_space // 1024))
        except:
            self.log("无法获取文件系统信息")
        
        # 摄像头信息
        self.log("摄像头分辨率: %dx%d" % (sensor.width(), sensor.height()))
        self.log("像素格式: %s" % ("RGB565" if sensor.get_pixformat() == sensor.RGB565 else "GRAYSCALE"))

def camera_diagnostics():
    """摄像头诊断"""
    
    logger = DebugLogger()
    logger.log("开始摄像头诊断")
    
    # 测试不同设置
    resolutions = [
        ("QQVGA", sensor.QQVGA),
        ("HQVGA", sensor.HQVGA),
        ("QVGA", sensor.QVGA),
        ("VGA", sensor.VGA)
    ]
    
    for name, res in resolutions:
        try:
            sensor.reset()
            sensor.set_pixformat(sensor.GRAYSCALE)
            sensor.set_framesize(res)
            sensor.skip_frames(100)
            
            clock = time.clock()
            frames = 0
            start_time = time.ticks_ms()
            
            # 测量帧率
            while frames < 30:
                clock.tick()
                img = sensor.snapshot()
                frames += 1
            
            elapsed = time.ticks_diff(time.ticks_ms(), start_time)
            fps = frames / (elapsed / 1000)
            
            logger.log("%s: %.1f FPS" % (name, fps))
            
        except Exception as e:
            logger.log("%s 测试失败: %s" % (name, str(e)), "ERROR")
    
    logger.log("摄像头诊断完成")

def image_processing_benchmark():
    """图像处理性能基准测试"""
    
    logger = DebugLogger()
    logger.log("开始图像处理性能测试")
    
    sensor.reset()
    sensor.set_pixformat(sensor.GRAYSCALE)
    sensor.set_framesize(sensor.QVGA)
    sensor.skip_frames(200)
    
    tests = [
        ("find_blobs", lambda img: img.find_blobs([(0, 255)], pixels_threshold=100)),
        ("find_edges", lambda img: img.find_edges(image.EDGE_CANNY)),
        ("find_lines", lambda img: img.find_lines(threshold=1000)),
        ("find_circles", lambda img: img.find_circles(threshold=2000)),
        ("find_qrcodes", lambda img: img.find_qrcodes()),
    ]
    
    for test_name, test_func in tests:
        try:
            # 预热
            for _ in range(5):
                img = sensor.snapshot()
                test_func(img)
            
            # 实际测试
            iterations = 10
            start_time = time.ticks_ms()
            
            for _ in range(iterations):
                img = sensor.snapshot()
                result = test_func(img)
            
            elapsed = time.ticks_diff(time.ticks_ms(), start_time)
            avg_time = elapsed / iterations
            
            logger.log("%s: 平均 %.1f ms" % (test_name, avg_time))
            
        except Exception as e:
            logger.log("%s 测试失败: %s" % (test_name, str(e)), "ERROR")
    
    logger.log("性能测试完成")

def main():
    """主调试函数"""
    
    logger = DebugLogger()
    
    # 记录启动信息
    logger.log("应用程序启动")
    logger.dump_system_info()
    
    try:
        # 运行摄像头诊断
        camera_diagnostics()
        
        # 运行性能测试
        image_processing_benchmark()
        
        # 主循环
        sensor.reset()
        sensor.set_pixformat(sensor.RGB565)
        sensor.set_framesize(sensor.QVGA)
        sensor.skip_frames(200)
        
        frame_count = 0
        last_log_time = time.ticks_ms()
        
        while True:
            frame_count += 1
            
            # 每100帧记录一次状态
            current_time = time.ticks_ms()
            if time.ticks_diff(current_time, last_log_time) > 5000:  # 5秒
                fps = frame_count / 5
                logger.log("运行状态: %.1f FPS, 帧数: %d" % (fps, frame_count))
                frame_count = 0
                last_log_time = current_time
                
                # 内存监控
                import gc
                gc.collect()
                if gc.mem_free() < 10000:  # 内存不足
                    logger.log("警告: 内存不足! 可用内存: %d bytes" % gc.mem_free(), "WARNING")
            
            # 主图像处理逻辑
            img = sensor.snapshot()
            
            # 示例处理:颜色跟踪
            blobs = img.find_blobs([(30, 60, 25, 65, 10, 45)], pixels_threshold=100)
            
            if blobs:
                largest = max(blobs, key=lambda b: b.pixels())
                img.draw_rectangle(largest.rect(), color=(255, 0, 0))
                
                # 记录检测到物体
                if frame_count % 10 == 0:
                    logger.log("检测到红色物体,大小: %d 像素" % largest.pixels())
    
    except Exception as e:
        logger.log("程序异常: %s" % str(e), "ERROR")
        logger.log("程序终止")
        
        # 尝试保存错误信息
        try:
            with open("/sd/error_report.txt", "w") as f:
                import sys
                sys.print_exception(e, f)
        except:
            pass
        
        # 重启系统
        pyb.hard_reset()

if __name__ == "__main__":
    main()

第八部分:扩展与进阶

8.1 自定义图像处理算法

python

"""
自定义图像处理算法示例
"""

import sensor, image, time, math, ulab

class CustomImageProcessor:
    """自定义图像处理器"""
    
    @staticmethod
    def enhance_contrast(img, contrast_factor=2.0):
        """
        增强图像对比度
        contrast_factor: 对比度因子,1.0为原始图像
        """
        # 转换为字节数组进行处理
        img_data = img.bytearray()
        
        # 计算直方图
        hist = [0] * 256
        for pixel in img_data:
            hist[pixel] += 1
        
        # 计算累积分布函数
        cdf = [0] * 256
        cdf[0] = hist[0]
        for i in range(1, 256):
            cdf[i] = cdf[i-1] + hist[i]
        
        # 归一化CDF
        cdf_min = min(filter(lambda x: x > 0, cdf))
        cdf_range = len(img_data) - cdf_min
        
        # 创建查找表
        lut = [0] * 256
        for i in range(256):
            if cdf[i] > 0:
                lut[i] = max(0, min(255, 
                    int(((cdf[i] - cdf_min) / cdf_range) * 255 * contrast_factor)))
        
        # 应用查找表
        enhanced_data = bytearray(len(img_data))
        for i in range(len(img_data)):
            enhanced_data[i] = lut[img_data[i]]
        
        # 创建新图像
        enhanced_img = image.Image(img.width(), img.height(), 
                                 sensor.GRAYSCALE, copy_to_fb=False)
        enhanced_img.set_data(enhanced_data)
        
        return enhanced_img
    
    @staticmethod
    def custom_edge_detection(img, threshold=50):
        """
        自定义边缘检测算法
        使用Sobel算子
        """
        width = img.width()
        height = img.height()
        
        # Sobel算子核
        sobel_x = [[-1, 0, 1],
                   [-2, 0, 2],
                   [-1, 0, 1]]
        
        sobel_y = [[-1, -2, -1],
                   [0, 0, 0],
                   [1, 2, 1]]
        
        # 创建输出图像
        output = image.Image(width, height, sensor.GRAYSCALE, copy_to_fb=False)
        
        # 获取图像数据
        img_data = img.bytearray()
        out_data = output.bytearray()
        
        # 应用Sobel算子
        for y in range(1, height-1):
            for x in range(1, width-1):
                # 计算x方向梯度
                gx = 0
                for ky in range(3):
                    for kx in range(3):
                        pixel = img_data[(y+ky-1)*width + (x+kx-1)]
                        gx += sobel_x[ky][kx] * pixel
                
                # 计算y方向梯度
                gy = 0
                for ky in range(3):
                    for kx in range(3):
                        pixel = img_data[(y+ky-1)*width + (x+kx-1)]
                        gy += sobel_y[ky][kx] * pixel
                
                # 计算梯度幅度
                magnitude = min(255, int(math.sqrt(gx*gx + gy*gy)))
                
                # 应用阈值
                if magnitude > threshold:
                    out_data[y*width + x] = 255
                else:
                    out_data[y*width + x] = 0
        
        return output
    
    @staticmethod
    def morphological_operations(img, operation='erode', kernel_size=3):
        """
        形态学操作:腐蚀和膨胀
        operation: 'erode' 或 'dilate'
        kernel_size: 核大小(奇数)
        """
        width = img.width()
        height = img.height()
        
        # 创建输出图像
        output = image.Image(width, height, sensor.GRAYSCALE, copy_to_fb=False)
        
        # 获取图像数据
        img_data = img.bytearray()
        out_data = output.bytearray()
        
        half_kernel = kernel_size // 2
        
        for y in range(half_kernel, height-half_kernel):
            for x in range(half_kernel, width-half_kernel):
                # 提取邻域
                neighbors = []
                for ky in range(-half_kernel, half_kernel+1):
                    for kx in range(-half_kernel, half_kernel+1):
                        pixel = img_data[(y+ky)*width + (x+kx)]
                        neighbors.append(pixel)
                
                if operation == 'erode':
                    # 腐蚀:取最小值
                    out_data[y*width + x] = min(neighbors)
                elif operation == 'dilate':
                    # 膨胀:取最大值
                    out_data[y*width + x] = max(neighbors)
        
        return output

def main():
    """主函数:演示自定义图像处理算法"""
    
    sensor.reset()
    sensor.set_pixformat(sensor.GRAYSCALE)
    sensor.set_framesize(sensor.QVGA)
    sensor.skip_frames(200)
    
    processor = CustomImageProcessor()
    
    while True:
        # 捕获原始图像
        original = sensor.snapshot()
        
        # 1. 对比度增强
        enhanced = processor.enhance_contrast(original, contrast_factor=2.0)
        
        # 2. 边缘检测
        edges = processor.custom_edge_detection(original, threshold=50)
        
        # 3. 形态学操作
        eroded = processor.morphological_operations(original, 'erode', 3)
        dilated = processor.morphological_operations(original, 'dilate', 3)
        
        # 显示结果(在IDE中查看)
        # 注意:OpenMV IDE一次只能显示一张图像
        # 这里我们显示边缘检测结果
        edges.display()
        
        # 或者将多个结果组合显示
        combined = image.Image(original.width() * 2, original.height() * 2,
                             sensor.GRAYSCALE, copy_to_fb=False)
        
        # 将四个处理结果拼接到一张大图中
        combined.draw_image(original, 0, 0)
        combined.draw_image(enhanced, original.width(), 0)
        combined.draw_image(edges, 0, original.height())
        
        # 将腐蚀和膨胀结果合并显示
        morph_combined = image.Image(original.width(), original.height(),
                                   sensor.GRAYSCALE, copy_to_fb=False)
        
        # 创建差异图像
        morph_data = morph_combined.bytearray()
        eroded_data = eroded.bytearray()
        dilated_data = dilated.bytearray()
        
        for i in range(len(morph_data)):
            # 显示膨胀和腐蚀的差异
            morph_data[i] = dilated_data[i] - eroded_data[i]
        
        combined.draw_image(morph_combined, original.width(), original.height())
        
        combined.display()

if __name__ == "__main__":
    main()

8.2 多摄像头系统

python

"""
多摄像头系统示例
注意:需要多个OpenMV模块和额外的硬件支持
"""

import sensor, image, time, pyb, network, socket

class MultiCameraSystem:
    """多摄像头协调系统"""
    
    def __init__(self, camera_id=0, server_ip='192.168.1.100', server_port=8080):
        self.camera_id = camera_id
        self.server_ip = server_ip
        self.server_port = server_port
        
        # 初始化摄像头
        self.init_camera()
        
        # 网络连接
        self.socket = None
        self.connect_to_server()
    
    def init_camera(self):
        """初始化摄像头设置"""
        sensor.reset()
        sensor.set_pixformat(sensor.RGB565)
        sensor.set_framesize(sensor.QVGA)
        
        # 根据摄像头ID调整参数
        if self.camera_id == 0:
            # 主摄像头
            sensor.set_brightness(0)
            sensor.set_saturation(0)
            sensor.set_gainceiling(16)
        elif self.camera_id == 1:
            # 辅助摄像头1
            sensor.set_brightness(1)
            sensor.set_contrast(1)
        elif self.camera_id == 2:
            # 辅助摄像头2
            sensor.set_brightness(-1)
            sensor.set_contrast(-1)
        
        sensor.skip_frames(500)
    
    def connect_to_server(self):
        """连接到中央服务器"""
        try:
            self.socket = socket.socket()
            self.socket.connect((self.server_ip, self.server_port))
            
            # 发送摄像头ID
            self.socket.send(b"CAMERA_ID:%d\n" % self.camera_id)
            print("已连接到服务器")
            
        except Exception as e:
            print("连接服务器失败:", e)
            self.socket = None
    
    def process_image(self, img):
        """图像处理流水线"""
        
        results = {
            'camera_id': self.camera_id,
            'timestamp': time.ticks_ms(),
            'objects': [],
            'features': {}
        }
        
        # 颜色检测
        color_results = self.detect_colors(img)
        if color_results:
            results['objects'].extend(color_results)
        
        # 形状检测
        shape_results = self.detect_shapes(img)
        if shape_results:
            results['objects'].extend(shape_results)
        
        # 特征提取
        features = self.extract_features(img)
        results['features'] = features
        
        return results
    
    def detect_colors(self, img):
        """检测颜色"""
        colors = []
        
        # 红色检测
        red_blobs = img.find_blobs([(30, 60, 25, 65, 10, 45)], 
                                  pixels_threshold=100)
        for blob in red_blobs[:3]:  # 只取前3个
            colors.append({
                'type': 'red',
                'x': blob.cx(),
                'y': blob.cy(),
                'size': blob.pixels()
            })
        
        # 蓝色检测
        blue_blobs = img.find_blobs([(10, 40, 10, 30, -60, -30)], 
                                   pixels_threshold=100)
        for blob in blue_blobs[:3]:
            colors.append({
                'type': 'blue',
                'x': blob.cx(),
                'y': blob.cy(),
                'size': blob.pixels()
            })
        
        return colors
    
    def detect_shapes(self, img):
        """检测形状"""
        shapes = []
        
        # 圆形检测
        circles = img.find_circles(threshold=2000, x_margin=10,
                                  y_margin=10, r_margin=10)
        for circle in circles[:5]:
            shapes.append({
                'type': 'circle',
                'x': circle.x(),
                'y': circle.y(),
                'radius': circle.r()
            })
        
        # 直线检测
        lines = img.find_lines(threshold=1000)
        for line in lines[:5]:
            shapes.append({
                'type': 'line',
                'x1': line.x1(),
                'y1': line.y1(),
                'x2': line.x2(),
                'y2': line.y2(),
                'theta': line.theta(),
                'rho': line.rho()
            })
        
        return shapes
    
    def extract_features(self, img):
        """提取图像特征"""
        features = {}
        
        # 转换为灰度图进行特征提取
        gray = img.to_grayscale()
        
        # 计算图像统计信息
        stats = gray.get_statistics()
        features['brightness'] = stats.mean()
        features['contrast'] = stats.stdev()
        
        # 计算图像熵(复杂度度量)
        features['entropy'] = self.calculate_entropy(gray)
        
        # 边缘密度
        edges = gray.find_edges(image.EDGE_CANNY)
        edge_pixels = sum(1 for p in edges.bytearray() if p > 128)
        features['edge_density'] = edge_pixels / (gray.width() * gray.height())
        
        return features
    
    def calculate_entropy(self, img):
        """计算图像熵"""
        hist = [0] * 256
        total_pixels = img.width() * img.height()
        
        # 计算直方图
        for pixel in img.bytearray():
            hist[pixel] += 1
        
        # 计算熵
        entropy = 0
        for count in hist:
            if count > 0:
                probability = count / total_pixels
                entropy -= probability * math.log(probability, 2)
        
        return entropy
    
    def send_results(self, results):
        """发送结果到服务器"""
        if self.socket:
            try:
                import json
                data = json.dumps(results)
                self.socket.send(data.encode() + b'\n')
            except Exception as e:
                print("发送数据失败:", e)
                # 尝试重新连接
                self.connect_to_server()
    
    def run(self):
        """主循环"""
        frame_count = 0
        last_report_time = time.ticks_ms()
        
        while True:
            frame_count += 1
            
            # 捕获图像
            img = sensor.snapshot()
            
            # 处理图像
            results = self.process_image(img)
            
            # 每10帧发送一次结果
            if frame_count % 10 == 0:
                self.send_results(results)
            
            # 每分钟报告一次状态
            current_time = time.ticks_ms()
            if time.ticks_diff(current_time, last_report_time) > 60000:
                print("摄像头 %d: 已处理 %d 帧" % (self.camera_id, frame_count))
                last_report_time = current_time
            
            # 显示本地结果
            self.display_results(img, results)
            
            # 控制帧率
            time.sleep_ms(33)  # 约30FPS
    
    def display_results(self, img, results):
        """在图像上显示结果"""
        # 绘制检测到的对象
        for obj in results['objects']:
            if obj['type'] == 'red':
                color = (255, 0, 0)
            elif obj['type'] == 'blue':
                color = (0, 0, 255)
            elif obj['type'] == 'circle':
                color = (0, 255, 0)
                img.draw_circle(obj['x'], obj['y'], obj['radius'], color=color)
                continue
            elif obj['type'] == 'line':
                color = (255, 255, 0)
                img.draw_line(obj['x1'], obj['y1'], obj['x2'], obj['y2'], color=color)
                continue
            
            # 绘制点
            img.draw_circle(obj['x'], obj['y'], 5, color=color)
            img.draw_string(obj['x']+10, obj['y'], obj['type'], color=color)
        
        # 显示摄像头ID和帧数
        img.draw_string(5, 5, "Cam %d" % self.camera_id, color=(255, 255, 255))
        
        # 显示特征信息
        if results['features']:
            y_offset = 20
            for key, value in results['features'].items():
                if y_offset < img.height() - 20:
                    text = "%s: %.2f" % (key, value)
                    img.draw_string(5, y_offset, text, color=(255, 255, 255))
                    y_offset += 15

# 服务器端代码示例
class CameraServer:
    """多摄像头服务器"""
    
    def __init__(self, port=8080):
        self.port = port
        self.cameras = {}
        self.server_socket = None
        self.start_server()
    
    def start_server(self):
        """启动服务器"""
        self.server_socket = socket.socket()
        self.server_socket.bind(('0.0.0.0', self.port))
        self.server_socket.listen(5)
        print("服务器启动,端口:", self.port)
    
    def handle_client(self, client_socket, address):
        """处理客户端连接"""
        print("新连接:", address)
        
        try:
            # 接收摄像头ID
            data = client_socket.readline()
            if data.startswith(b"CAMERA_ID:"):
                camera_id = int(data.split(b":")[1])
                self.cameras[camera_id] = {
                    'socket': client_socket,
                    'address': address,
                    'last_update': time.ticks_ms()
                }
                print("摄像头 %d 注册成功" % camera_id)
            
            # 接收数据
            while True:
                data = client_socket.readline()
                if data:
                    try:
                        import json
                        results = json.loads(data.decode())
                        self.process_camera_data(camera_id, results)
                        
                        # 更新最后活动时间
                        self.cameras[camera_id]['last_update'] = time.ticks_ms()
                        
                    except Exception as e:
                        print("数据处理错误:", e)
                else:
                    break
        
        except Exception as e:
            print("客户端处理错误:", e)
        finally:
            # 清理
            if camera_id in self.cameras:
                del self.cameras[camera_id]
            client_socket.close()
            print("连接关闭:", address)
    
    def process_camera_data(self, camera_id, data):
        """处理摄像头数据"""
        print("摄像头 %d 数据:" % camera_id)
        print("  时间戳:", data['timestamp'])
        print("  对象数量:", len(data['objects']))
        print("  特征:", data['features'])
        
        # 这里可以添加数据融合、3D重建等高级处理
        
        # 示例:简单的位置融合
        if len(self.cameras) >= 2:
            self.fuse_positions()
    
    def fuse_positions(self):
        """融合多摄像头位置数据"""
        # 这里可以实现三角测量、多视角几何等算法
        pass
    
    def run(self):
        """服务器主循环"""
        while True:
            try:
                client_socket, address = self.server_socket.accept()
                # 创建新线程处理客户端
                # 注意:OpenMV的MicroPython可能不支持线程
                # 可以使用async或轮询方式处理多个连接
                self.handle_client(client_socket, address)
                
            except Exception as e:
                print("服务器错误:", e)
            
            # 清理超时连接
            self.cleanup_timeout_connections()
            
            time.sleep_ms(100)
    
    def cleanup_timeout_connections(self):
        """清理超时连接"""
        current_time = time.ticks_ms()
        timeout = 30000  # 30秒超时
        
        to_remove = []
        for camera_id, info in self.cameras.items():
            if time.ticks_diff(current_time, info['last_update']) > timeout:
                to_remove.append(camera_id)
        
        for camera_id in to_remove:
            try:
                self.cameras[camera_id]['socket'].close()
            except:
                pass
            del self.cameras[camera_id]
            print("摄像头 %d 超时断开" % camera_id)

# 使用示例
if __name__ == "__main__":
    # 判断当前是作为摄像头还是服务器运行
    import sys
    
    if len(sys.argv) > 1:
        # 作为摄像头客户端运行
        camera_id = int(sys.argv[1])
        server_ip = sys.argv[2] if len(sys.argv) > 2 else '192.168.1.100'
        
        system = MultiCameraSystem(camera_id=camera_id, server_ip=server_ip)
        system.run()
    else:
        # 作为服务器运行
        server = CameraServer(port=8080)
        server.run()

第九部分:最佳实践与资源推荐

9.1 OpenMV开发最佳实践

  1. 代码组织

    • 使用模块化设计,将功能分解为独立的函数和类

    • 为重要函数和类添加文档字符串

    • 保持代码简洁,避免过长的函数

  2. 内存管理

    • 及时释放不需要的对象

    • 避免在循环中创建大量临时对象

    • 使用gc.collect()定期清理内存

  3. 性能优化

    • 选择合适的分辨率和像素格式

    • 使用ROI减少处理区域

    • 预处理图像以减少后续操作复杂度

  4. 错误处理

    • 使用try-except处理可能失败的操作

    • 记录错误信息便于调试

    • 实现优雅降级,当高级功能失败时使用基本功能

  5. 电源管理

    • 使用sensor.sleep(True)在空闲时降低功耗

    • 调整帧率平衡性能和功耗

    • 关闭未使用的外设

9.2 学习资源推荐

官方资源:

教程与课程:

  • OpenMV官方教程系列

  • YouTube上的OpenMV教程视频

  • 在线课程平台上的嵌入式视觉课程

社区与支持:

  • OpenMV官方论坛

  • GitHub Issues

  • 相关的Discord和Slack频道

扩展库与工具:

  • OpenMV IDE:官方开发环境

  • TensorFlow Lite for Microcontrollers

  • 各种图像处理和机器学习库

9.3 常见问题与解决方案

  1. 摄像头初始化失败

    • 检查硬件连接

    • 尝试重启设备

    • 更新固件到最新版本

  2. 内存不足错误

    • 降低图像分辨率

    • 减少同时处理的图像数量

    • 优化算法减少内存使用

  3. 帧率过低

    • 降低图像分辨率

    • 简化图像处理算法

    • 关闭不必要的图像增强功能

  4. 检测精度不高

    • 调整阈值参数

    • 改善光照条件

    • 使用更合适的颜色空间

  5. 通信问题

    • 检查接线和接口设置

    • 验证波特率和其他通信参数

    • 添加错误检测和重试机制

结语

OpenMV作为一个强大的嵌入式机器视觉平台,为创客、学生和工程师提供了实现计算机视觉应用的便捷途径。通过本指南的学习,您应该已经掌握了从基础到高级的OpenMV开发技能,并能够实现各种实用的视觉应用。

记住,实践是最好的老师。不断尝试新的项目,解决实际问题,您将不断提高自己的技能。机器视觉领域正在快速发展,OpenMV为您打开了进入这个精彩世界的大门。

无论您是想要构建智能机器人、自动化系统,还是进行科学研究,OpenMV都能为您提供强大的支持。祝您在嵌入式视觉的开发道路上取得成功!


附录:常用函数速查表

功能 函数/方法 说明
摄像头初始化 sensor.reset() 重置摄像头
设置像素格式 sensor.set_pixformat() RGB565或GRAYSCALE
设置分辨率 sensor.set_framesize() QQVGA、QVGA、VGA等
捕获图像 sensor.snapshot() 捕获一帧图像
颜色识别 img.find_blobs() 查找颜色块
人脸检测 img.find_features() 检测人脸
二维码识别 img.find_qrcodes() 识别二维码
条形码识别 img.find_barcodes() 识别条形码
直线检测 img.find_lines() 霍夫变换检测直线
圆形检测 img.find_circles() 检测圆形
模板匹配 img.find_template() 匹配预定义模板
AprilTag检测 img.find_apriltags() 检测AprilTag标记
光流计算 image.OpticalFlow() 计算图像光流
图像保存 img.save() 保存图像到文件
图像显示 img.display() 在IDE中显示图像
串口通信 pyb.UART() 串口通信
I2C通信 pyb.I2C() I2C通信
SPI通信 pyb.SPI() SPI通信
PWM输出 pyb.Timer().channel() PWM信号输出
时间延迟 pyb.delay() 毫秒级延迟
定时器 time.ticks_ms() 获取毫秒时间戳

这个速查表涵盖了OpenMV开发中最常用的函数和方法,可以帮助您快速查找需要的功能。随着OpenMV的不断发展,建议您经常查阅官方文档以获取最新信息。

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐