OpenMV从入门到精通:嵌入式机器视觉的完整指南
OpenMV是一款基于STM32微控制器的开源机器视觉模块,运行MicroPython解释器,提供低功耗、易用的嵌入式视觉解决方案。本指南系统介绍了OpenMV的开发应用:从硬件简介、开发环境搭建到核心功能实现(包括图像采集、颜色识别、人脸检测、二维码识别等);详细讲解高级视觉算法(模板匹配、AprilTag检测、光流算法等)和机器学习应用(物体分类、人脸识别);提供通信接口(串口、I2C、SPI
OpenMV从入门到精通:嵌入式机器视觉的完整指南
第一部分:OpenMV简介与基础入门
1.1 什么是OpenMV?
OpenMV是一个开源、低功耗的机器视觉模块,它基于STM32微控制器,运行MicroPython解释器。这个项目旨在成为"机器视觉界的Arduino",为创客、学生和工程师提供易于使用的嵌入式视觉解决方案。
核心特性:
-
集成图像传感器(不同型号分辨率不同,通常为30万到500万像素)
-
内置MicroPython解释器,支持Python编程
-
低功耗设计,适合电池供电应用
-
丰富的I/O接口(UART、I2C、SPI、GPIO等)
-
预置机器视觉算法库
1.2 OpenMV硬件家族
OpenMV有多个型号,满足不同需求:
OpenMV Cam H7(当前主流型号)
-
STM32H743VI ARM Cortex-M7处理器,主频480MHz
-
512KB RAM + 1MB SRAM + 2MB Flash
-
支持多种图像传感器(OV7725、MT9V034等)
-
内置RGB LED和红外LED
OpenMV Cam M7(前代旗舰)
-
STM32F765VI ARM Cortex-M7处理器,主频216MHz
-
512KB RAM + 1MB SRAM + 2MB Flash
OpenMV Cam H4(入门级)
-
成本优化版本,适合预算有限的项目
1.3 开发环境搭建
安装OpenMV IDE
OpenMV IDE是官方推荐的开发环境,支持Windows、macOS和Linux系统。
安装步骤:
-
访问OpenMV官方网站下载对应系统的IDE
-
安装IDE并启动
-
连接OpenMV Cam到计算机USB端口
-
IDE会自动检测设备并建立连接
第一个OpenMV程序:Hello World
python
# Hello World示例 - 点亮内置LED
import pyb
led = pyb.LED(3) # 蓝色LED
while True:
led.on()
pyb.delay(250)
led.off()
pyb.delay(250)
捕获第一张图像
python
import sensor, image, time
# 初始化摄像头
sensor.reset() # 重置传感器
sensor.set_pixformat(sensor.RGB565) # 设置像素格式为RGB565
sensor.set_framesize(sensor.QVGA) # 设置分辨率为QVGA (320x240)
sensor.skip_frames(time=2000) # 跳过一些帧,让相机自动调整
# 拍摄照片
img = sensor.snapshot()
# 保存到SD卡(如果有)
try:
img.save("test.jpg")
except:
print("SD卡未插入或不可用")
第二部分:OpenMV核心功能详解
2.1 图像采集与基础处理
图像采集模式
python
import sensor, image, time
sensor.reset()
sensor.set_pixformat(sensor.RGB565)
sensor.set_framesize(sensor.QVGA)
sensor.skip_frames(2000)
# 不同帧大小选择
# sensor.QQVGA: 160x120
# sensor.QVGA: 320x240
# sensor.VGA: 640x480
# sensor.HD: 1280x720
# sensor.FHD: 1920x1080(取决于传感器支持)
clock = time.clock()
while True:
clock.tick()
img = sensor.snapshot()
# 获取图像信息
print("宽度:", img.width())
print("高度:", img.height())
print("帧率:", clock.fps())
图像基本操作
python
import sensor, image, time, pyb
sensor.reset()
sensor.set_pixformat(sensor.RGB565)
sensor.set_framesize(sensor.QVGA)
sensor.skip_frames(2000)
while True:
img = sensor.snapshot()
# 绘制基本图形
img.draw_circle(160, 120, 30, color=(255, 0, 0)) # 红色圆形
img.draw_rectangle(50, 50, 100, 100, color=(0, 255, 0)) # 绿色矩形
img.draw_line(0, 0, 320, 240, color=(0, 0, 255)) # 蓝色对角线
img.draw_string(10, 10, "OpenMV Rocks!", color=(255, 255, 255))
# 图像裁剪
cropped = img.copy(roi=(100, 100, 120, 120))
# 图像缩放
scaled = img.scale(x_scale=0.5, y_scale=0.5)
# 图像旋转
rotated = img.rotation_corr(degrees=45)
2.2 颜色识别
颜色识别是OpenMV最常用的功能之一,适用于多种应用场景。
python
import sensor, image, time, pyb
# 颜色跟踪阈值定义 (L Min, L Max, A Min, A Max, B Min, B Max)
# LAB颜色空间通常比RGB更适合颜色识别
red_threshold = (30, 60, 25, 65, 10, 45)
green_threshold = (20, 50, -50, -20, 10, 40)
blue_threshold = (10, 40, 10, 30, -60, -30)
sensor.reset()
sensor.set_pixformat(sensor.RGB565)
sensor.set_framesize(sensor.QVGA)
sensor.skip_frames(2000)
sensor.set_auto_gain(False) # 颜色跟踪需要关闭自动增益
sensor.set_auto_whitebal(False) # 颜色跟踪需要关闭自动白平衡
while True:
img = sensor.snapshot()
# 寻找红色区域
blobs = img.find_blobs([red_threshold], pixels_threshold=100,
area_threshold=100, merge=True)
if blobs:
# 找到最大的红色区域
largest_blob = max(blobs, key=lambda b: b.pixels())
# 绘制矩形框
img.draw_rectangle(largest_blob.rect(), color=(255, 0, 0))
# 绘制中心十字
img.draw_cross(largest_blob.cx(), largest_blob.cy(), color=(255, 0, 0))
# 显示信息
img.draw_string(10, 10, "Red Object Detected!", color=(255, 0, 0))
img.draw_string(10, 25, "Size: %d pixels" % largest_blob.pixels(), color=(255, 0, 0))
2.3 人脸检测
OpenMV内置了Haar级联分类器,可以用于人脸检测。
python
import sensor, image, time, pyb
# 加载人脸检测模型
face_cascade = image.HaarCascade("frontalface", stages=25)
sensor.reset()
sensor.set_pixformat(sensor.GRAYSCALE) # 人脸检测使用灰度图
sensor.set_framesize(sensor.HQVGA) # 使用合适的分辨率
sensor.skip_frames(2000)
clock = time.clock()
while True:
clock.tick()
img = sensor.snapshot()
# 检测人脸
objects = img.find_features(face_cascade, threshold=0.75, scale_factor=1.25)
# 绘制检测到的人脸
for r in objects:
img.draw_rectangle(r, color=(255, 255, 255))
# 在脸部绘制特征点
img.draw_cross(r[0]+r[2]//2, r[1]+r[3]//2, color=(255, 255, 255))
# 显示帧率
img.draw_string(5, 5, "FPS: %.2f" % clock.fps(), color=(255, 255, 255))
2.4 二维码与条形码识别
python
import sensor, image, time, pyb
sensor.reset()
sensor.set_pixformat(sensor.RGB565)
sensor.set_framesize(sensor.VGA) # 更高分辨率有助于识别
sensor.skip_frames(2000)
while True:
img = sensor.snapshot()
# 二维码识别
codes = img.find_qrcodes()
for code in codes:
img.draw_rectangle(code.rect(), color=(255, 0, 0))
img.draw_string(code.x()+3, code.y()-20, code.payload(), color=(255, 0, 0))
print("QR Code:", code.payload(), "Type:", code.type())
# 条形码识别
barcodes = img.find_barcodes()
for barcode in barcodes:
img.draw_rectangle(barcode.rect(), color=(0, 255, 0))
img.draw_string(barcode.x()+3, barcode.y()-20, barcode.payload(), color=(0, 255, 0))
print("Barcode:", barcode.payload(), "Type:", barcode.type())
第三部分:高级视觉算法
3.1 模板匹配
模板匹配用于在图像中寻找预定义的模板图案。
python
import sensor, image, time, pyb
sensor.reset()
sensor.set_pixformat(sensor.GRAYSCALE)
sensor.set_framesize(sensor.QVGA)
sensor.skip_frames(2000)
# 从文件加载模板(或从第一帧捕获)
template = image.Image("/template.pgm") # 模板图像
clock = time.clock()
while True:
clock.tick()
img = sensor.snapshot()
# 模板匹配
# 参数: 模板图像, 搜索区域, 阈值(0-1), step=2, search=SEARCH_EX)
r = img.find_template(template, 0.70, step=2, search=image.SEARCH_EX)
if r:
img.draw_rectangle(r, color=(255, 255, 255))
img.draw_string(r[0], r[1]-20, "Match: %.2f" % r[5], color=(255, 255, 255))
img.draw_string(5, 5, "FPS: %.2f" % clock.fps(), color=(255, 255, 255))
3.2 AprilTag标记检测
AprilTag是一种类似二维码的视觉基准标记系统,具有更高的检测距离和角度。
python
import sensor, image, time, math
sensor.reset()
sensor.set_pixformat(sensor.RGB565)
sensor.set_framesize(sensor.QQVGA) # AprilTag需要较小的分辨率
sensor.skip_frames(2000)
# AprilTag家族选择
# TAG16H5, TAG25H7, TAG25H9, TAG36H10, TAG36H11, ARTOOLKIT
tag_families = 0
tag_families |= image.TAG16H5
tag_families |= image.TAG36H11 # 推荐使用
while True:
img = sensor.snapshot()
tags = img.find_apriltags(families=tag_families)
for tag in tags:
# 绘制标记边界
img.draw_rectangle(tag.rect(), color=(255, 0, 0))
# 绘制标记中心
img.draw_cross(tag.cx(), tag.cy(), color=(0, 255, 0))
# 绘制标记轮廓
for c in tag.corners():
img.draw_circle(c[0], c[1], 5, color=(0, 0, 255))
# 计算距离和角度
# 假设标记的实际大小为0.1米
distance = 0.1 * 1080 / (tag.w() * math.tan(math.radians(70/2)))
# 显示信息
img.draw_string(tag.cx()+10, tag.cy()+5, "ID: %d" % tag.id(), color=(255, 255, 255))
img.draw_string(tag.cx()+10, tag.cy()+20, "Dist: %.2fm" % distance, color=(255, 255, 255))
# 计算旋转角度
rotation_angle = tag.rotation()
img.draw_string(tag.cx()+10, tag.cy()+35, "Rot: %.1f deg" % rotation_angle, color=(255, 255, 255))
3.3 光流算法
光流算法用于检测图像序列中的运动模式。
python
import sensor, image, time, pyb
sensor.reset()
sensor.set_pixformat(sensor.GRAYSCALE)
sensor.set_framesize(sensor.QQVGA) # 小分辨率以提高帧率
sensor.skip_frames(2000)
# 设置窗口大小用于光流计算
sensor.set_windowing((80, 60))
clock = time.clock()
# 创建光流对象
flow = image.OpticalFlow()
while True:
clock.tick()
img = sensor.snapshot()
# 计算光流
flow.calc(img)
# 获取平均运动
x_motion = flow.x_translation()
y_motion = flow.y_translation()
rotation = flow.rotation()
# 在图像上绘制光流场
for y in range(0, img.height(), 10):
for x in range(0, img.width(), 10):
dx, dy = flow.get(x, y)
if dx is not None and dy is not None:
img.draw_line(x, y, int(x+dx*5), int(y+dy*5), color=(255, 255, 255))
# 显示运动信息
img.draw_string(5, 5, "X: %.2f" % x_motion, color=(255, 255, 255))
img.draw_string(5, 15, "Y: %.2f" % y_motion, color=(255, 255, 255))
img.draw_string(5, 25, "R: %.2f" % rotation, color=(255, 255, 255))
img.draw_string(5, 35, "FPS: %.1f" % clock.fps(), color=(255, 255, 255))
3.4 直线检测与霍夫变换
python
import sensor, image, time, math
sensor.reset()
sensor.set_pixformat(sensor.GRAYSCALE)
sensor.set_framesize(sensor.QQVGA)
sensor.skip_frames(2000)
while True:
img = sensor.snapshot()
# 边缘检测
img.find_edges(image.EDGE_CANNY, threshold=(50, 80))
# 霍夫变换检测直线
lines = img.find_lines(threshold=1000, theta_margin=25, rho_margin=25)
# 绘制检测到的直线
for l in lines:
# 过滤接近水平的线
if (l.theta() < 10 or l.theta() > 170):
img.draw_line(l.line(), color=(255, 255, 255))
print("直线: theta=%d, rho=%d" % (l.theta(), l.rho()))
# 检测线段
segments = img.find_line_segments()
for s in segments:
img.draw_line(s.line(), color=(0, 255, 0))
img.draw_circle(s.x1(), s.y1(), 3, color=(0, 255, 0))
img.draw_circle(s.x2(), s.y2(), 3, color=(0, 255, 0))
3.5 圆形检测
python
import sensor, image, time, math
sensor.reset()
sensor.set_pixformat(sensor.GRAYSCALE)
sensor.set_framesize(sensor.QVGA)
sensor.skip_frames(2000)
while True:
img = sensor.snapshot()
# 高斯模糊降噪
img.gaussian(1)
# 检测圆形
circles = img.find_circles(threshold=2000, x_margin=10, y_margin=10,
r_margin=10, r_min=10, r_max=100)
# 绘制检测到的圆形
for c in circles:
img.draw_circle(c.x(), c.y(), c.r(), color=(255, 255, 255))
img.draw_cross(c.x(), c.y(), color=(255, 255, 255))
# 计算圆形度
circularity = c.roundness()
img.draw_string(c.x()+10, c.y()+10, "C: %.2f" % circularity, color=(255, 255, 255))
第四部分:机器学习与深度学习应用
4.1 使用OpenMV进行物体分类
OpenMV支持TensorFlow Lite Micro,可以运行轻量级神经网络模型。
python
import sensor, image, time, tf
sensor.reset()
sensor.set_pixformat(sensor.RGB565)
sensor.set_framesize(sensor.QVGA)
sensor.skip_frames(2000)
# 加载TensorFlow Lite模型
net = tf.load("trained_model.tflite", load_to_fb=True)
# 标签定义
labels = ['apple', 'banana', 'orange', 'unknown']
clock = time.clock()
while True:
clock.tick()
img = sensor.snapshot()
# 对图像进行分类
# 注意:输入图像大小需要与模型训练时一致
for obj in net.classify(img, min_scale=1.0, scale_mul=0.5,
x_overlap=0.0, y_overlap=0.0):
# 获取预测结果
predictions_list = list(zip(labels, obj.output()))
# 按置信度排序
predictions_list.sort(key=lambda x: x[1], reverse=True)
# 显示最佳预测
if predictions_list:
best_label, best_score = predictions_list[0]
img.draw_string(10, 10, "%s: %.2f" % (best_label, best_score),
color=(255, 255, 255))
# 如果置信度足够高,绘制边界框
if best_score > 0.7:
img.draw_rectangle(obj.rect(), color=(255, 0, 0))
img.draw_string(5, img.height()-20, "FPS: %.1f" % clock.fps(),
color=(255, 255, 255))
4.2 人脸识别
python
import sensor, image, time, pyb
sensor.reset()
sensor.set_pixformat(sensor.GRAYSCALE)
sensor.set_framesize(sensor.HQVGA)
sensor.skip_frames(2000)
# 加载人脸识别模型
face_model = image.HaarCascade("frontalface", stages=25)
# 简单的人脸数据库
face_database = {
0: {"name": "Alice", "features": None},
1: {"name": "Bob", "features": None},
2: {"name": "Charlie", "features": None}
}
# 训练样本计数器
training_samples = 5
current_person = 0
sample_count = 0
def extract_face_features(face_img):
"""从人脸图像中提取特征(简化版)"""
# 这里可以使用更复杂的特征提取方法
# 如LBP、HOG或小型神经网络
return face_img.histogram()
def recognize_face(features):
"""识别人脸(简化版)"""
best_match = None
best_score = 0
for person_id, data in face_database.items():
if data["features"] is not None:
# 计算相似度(这里使用简单的相关系数)
score = image.match_template(features, data["features"])
if score > best_score and score > 0.8: # 阈值
best_score = score
best_match = person_id
return best_match
while True:
img = sensor.snapshot()
faces = img.find_features(face_model, threshold=0.75, scale_factor=1.25)
for face_rect in faces:
img.draw_rectangle(face_rect, color=(255, 255, 255))
# 提取人脸区域
face_img = img.copy(roi=face_rect)
# 训练模式
if sample_count < training_samples:
img.draw_string(face_rect[0], face_rect[1]-20,
"Training %d/%d" % (sample_count+1, training_samples),
color=(255, 255, 255))
if sample_count == 0:
# 存储特征
face_database[current_person]["features"] = extract_face_features(face_img)
sample_count += 1
if sample_count >= training_samples:
sample_count = 0
current_person += 1
if current_person >= len(face_database):
current_person = 0
# 识别模式
else:
features = extract_face_features(face_img)
person_id = recognize_face(features)
if person_id is not None:
name = face_database[person_id]["name"]
img.draw_string(face_rect[0], face_rect[1]-20, name,
color=(255, 255, 255))
else:
img.draw_string(face_rect[0], face_rect[1]-20, "Unknown",
color=(255, 255, 255))
4.3 目标检测
python
import sensor, image, time, tf
sensor.reset()
sensor.set_pixformat(sensor.RGB565)
sensor.set_framesize(sensor.QVGA)
sensor.skip_frames(2000)
# 加载目标检测模型
# 需要专门为目标检测训练的TensorFlow Lite模型
net = tf.load("object_detection_model.tflite", load_to_fb=True)
# 类别标签
labels = ['person', 'bicycle', 'car', 'motorbike', 'cat', 'dog']
# 锚点框(根据模型训练时设置)
anchors = [3.50, 3.78, 4.41, 4.73, 4.95, 5.48, 5.86, 6.36, 6.76, 7.46]
clock = time.clock()
while True:
clock.tick()
img = sensor.snapshot()
# 运行目标检测
for detection in net.detect(img, thresholds=[(math.ceil(0.6*255), math.ceil(0.7*255), 255)],
anchors=anchors):
# 获取检测结果
[x, y, w, h] = detection.rect()
center_x = x + w // 2
center_y = y + h // 2
# 绘制边界框
img.draw_rectangle([x, y, w, h], color=(255, 0, 0))
# 绘制标签
label = labels[detection.class_id()]
score = detection.score()
img.draw_string(x, y-15, "%s: %.2f" % (label, score), color=(255, 0, 0))
# 绘制中心点
img.draw_cross(center_x, center_y, color=(0, 255, 0))
# 输出检测信息
print("Detected:", label, "Score:", score,
"Position:", center_x, center_y)
img.draw_string(5, 5, "FPS: %.1f" % clock.fps(), color=(255, 255, 255))
第五部分:通信与外部设备接口
5.1 串口通信
python
import sensor, image, time, pyb
# 初始化串口
uart = pyb.UART(3, 115200, timeout_char=1000) # P4(TX)和P5(RX)
sensor.reset()
sensor.set_pixformat(sensor.RGB565)
sensor.set_framesize(sensor.QVGA)
sensor.skip_frames(2000)
while True:
img = sensor.snapshot()
# 检测红色物体
red_blobs = img.find_blobs([(30, 60, 25, 65, 10, 45)], pixels_threshold=100)
if red_blobs:
largest = max(red_blobs, key=lambda b: b.pixels())
# 通过串口发送位置信息
message = "RED,%d,%d,%d\n" % (largest.cx(), largest.cy(), largest.pixels())
uart.write(message)
img.draw_rectangle(largest.rect(), color=(255, 0, 0))
img.draw_cross(largest.cx(), largest.cy(), color=(255, 0, 0))
# 读取串口指令
if uart.any():
command = uart.readline().decode().strip()
if command == "CAPTURE":
# 保存当前图像
img.save("capture.jpg")
uart.write("Image saved\n")
elif command.startswith("THRESHOLD"):
# 更新阈值
parts = command.split(",")
if len(parts) == 7:
new_threshold = tuple(map(int, parts[1:]))
uart.write("Threshold updated\n")
5.2 I2C通信
python
import sensor, image, time, pyb
# 初始化I2C
i2c = pyb.I2C(2, pyb.I2C.SLAVE, addr=0x42)
i2c.init(pyb.I2C.SLAVE, addr=0x42)
sensor.reset()
sensor.set_pixformat(sensor.GRAYSCALE)
sensor.set_framesize(sensor.QQVGA)
sensor.skip_frames(2000)
while True:
img = sensor.snapshot()
# 计算图像平均亮度
stats = img.get_statistics()
brightness = stats.mean()
# 通过I2C发送数据
data = bytearray([int(brightness)])
try:
i2c.send(data, timeout=100)
except:
pass
# 尝试接收I2C数据
try:
received = i2c.recv(1, timeout=100)
if received:
command = received[0]
if command == 0x01: # 拍摄照片
img.save("i2c_capture.jpg")
except:
pass
img.draw_string(5, 5, "Brightness: %d" % brightness, color=(255, 255, 255))
5.3 SPI通信
python
import sensor, image, time, pyb
# 初始化SPI
spi = pyb.SPI(2, pyb.SPI.MASTER, baudrate=1000000, polarity=0, phase=0)
sensor.reset()
sensor.set_pixformat(sensor.GRAYSCALE)
sensor.set_framesize(sensor.QQVGA)
sensor.skip_frames(2000)
while True:
img = sensor.snapshot()
# 边缘检测
img.find_edges(image.EDGE_CANNY, threshold=(50, 80))
# 获取图像数据(缩小以适合传输)
small_img = img.scale(x_scale=0.25, y_scale=0.25)
img_data = small_img.bytearray()
# 通过SPI发送图像数据
# 首先发送图像尺寸
height = small_img.height()
width = small_img.width()
spi.send(bytearray([height, width]))
spi.send(img_data)
# 显示缩小后的图像
img.draw_image(small_img, x=img.width()-small_img.width(), y=0)
5.4 WiFi通信(OpenMV Cam H7 Plus)
python
import sensor, image, time, network, socket
# WiFi配置
SSID = "your_wifi_ssid"
PASSWORD = "your_wifi_password"
# 连接WiFi
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(SSID, PASSWORD)
# 等待连接
max_wait = 10
while max_wait > 0:
if wlan.status() < 0 or wlan.status() >= 3:
break
max_wait -= 1
time.sleep(1)
if wlan.status() != 3:
raise RuntimeError('WiFi连接失败')
else:
print('WiFi连接成功')
print('IP地址:', wlan.ifconfig()[0])
# 初始化摄像头
sensor.reset()
sensor.set_pixformat(sensor.RGB565)
sensor.set_framesize(sensor.QVGA)
sensor.skip_frames(2000)
# 创建HTTP服务器
addr = socket.getaddrinfo('0.0.0.0', 80)[0][-1]
s = socket.socket()
s.bind(addr)
s.listen(1)
print('服务器启动,监听地址:', addr)
def generate_html(image_data):
"""生成HTML页面"""
html = """<!DOCTYPE html>
<html>
<head>
<title>OpenMV视频流</title>
<meta http-equiv="refresh" content="1">
</head>
<body>
<h1>OpenMV实时视频流</h1>
<img src="data:image/jpeg;base64,%s" alt="实时视频">
</body>
</html>""" % image_data
return html
while True:
cl, addr = s.accept()
print('客户端连接:', addr)
# 捕获图像
img = sensor.snapshot()
# 转换为JPEG
jpeg_buffer = img.compress(quality=60).bytearray()
# 转换为base64
import ubinascii
base64_data = ubinascii.b2a_base64(jpeg_buffer).decode().strip()
# 生成HTTP响应
response = generate_html(base64_data)
cl.send('HTTP/1.0 200 OK\r\nContent-type: text/html\r\n\r\n')
cl.send(response)
cl.close()
第六部分:综合项目实战
6.1 智能追踪小车
python
"""
智能追踪小车项目
功能:识别并追踪特定颜色的物体,通过PWM控制电机
硬件:OpenMV Cam H7 + 电机驱动板 + 直流电机x2 + 小车底盘
"""
import sensor, image, time, pyb, math
# 电机引脚定义
MOTOR_A_PWM = pyb.Pin('P7', pyb.Pin.OUT_PP) # 电机A PWM
MOTOR_A_DIR = pyb.Pin('P8', pyb.Pin.OUT_PP) # 电机A方向
MOTOR_B_PWM = pyb.Pin('P9', pyb.Pin.OUT_PP) # 电机B PWM
MOTOR_B_DIR = pyb.Pin('P10', pyb.Pin.OUT_PP) # 电机B方向
# 创建PWM对象
pwm_a = pyb.Timer(4, freq=1000).channel(1, pyb.Timer.PWM, pin=MOTOR_A_PWM)
pwm_b = pyb.Timer(4, freq=1000).channel(2, pyb.Timer.PWM, pin=MOTOR_B_PWM)
# 目标颜色阈值(红色)
TARGET_COLOR_THRESHOLD = (30, 60, 25, 65, 10, 45)
# PID控制器参数
KP = 0.5 # 比例系数
KI = 0.01 # 积分系数
KD = 0.1 # 微分系数
# 初始化摄像头
sensor.reset()
sensor.set_pixformat(sensor.RGB565)
sensor.set_framesize(sensor.QVGA)
sensor.skip_frames(2000)
sensor.set_auto_gain(False)
sensor.set_auto_whitebal(False)
class PIDController:
def __init__(self, kp, ki, kd):
self.kp = kp
self.ki = ki
self.kd = kd
self.integral = 0
self.previous_error = 0
def update(self, error, dt):
# 比例项
proportional = self.kp * error
# 积分项
self.integral += error * dt
integral = self.ki * self.integral
# 微分项
derivative = self.kd * (error - self.previous_error) / dt
self.previous_error = error
# 输出控制量
output = proportional + integral + derivative
# 限制输出范围
output = max(-100, min(100, output))
return output
def set_motor_speed(motor, speed):
"""设置电机速度,范围:-100到100"""
speed = max(-100, min(100, speed))
if motor == 'A':
if speed >= 0:
MOTOR_A_DIR.high()
pwm_a.pulse_width_percent(speed)
else:
MOTOR_A_DIR.low()
pwm_a.pulse_width_percent(-speed)
elif motor == 'B':
if speed >= 0:
MOTOR_B_DIR.high()
pwm_b.pulse_width_percent(speed)
else:
MOTOR_B_DIR.low()
pwm_b.pulse_width_percent(-speed)
def stop_motors():
"""停止两个电机"""
pwm_a.pulse_width_percent(0)
pwm_b.pulse_width_percent(0)
def main():
# 创建PID控制器
pid = PIDController(KP, KI, KD)
# 图像中心坐标
img_center_x = sensor.width() // 2
img_center_y = sensor.height() // 2
# 目标距离阈值(像素)
TARGET_DISTANCE_THRESHOLD = 50
last_time = time.ticks_ms()
while True:
# 计算时间差
current_time = time.ticks_ms()
dt = (current_time - last_time) / 1000.0 # 转换为秒
last_time = current_time
# 捕获图像
img = sensor.snapshot()
# 寻找目标颜色
blobs = img.find_blobs([TARGET_COLOR_THRESHOLD],
pixels_threshold=100,
area_threshold=100,
merge=True)
if blobs:
# 找到最大的目标
target = max(blobs, key=lambda b: b.pixels())
# 绘制目标
img.draw_rectangle(target.rect(), color=(255, 0, 0))
img.draw_cross(target.cx(), target.cy(), color=(255, 0, 0))
# 计算目标中心与图像中心的误差
error_x = target.cx() - img_center_x
# 计算目标距离(基于目标大小)
target_distance = target.pixels()
# 使用PID控制器计算转向控制量
steer_output = pid.update(error_x, dt)
# 基础速度和转向调整
base_speed = 30
if target_distance > 5000: # 目标太近
base_speed = -20 # 后退
elif target_distance < 1000: # 目标太远
base_speed = 40 # 快速前进
else:
base_speed = 30 # 正常前进
# 计算左右电机速度
left_speed = base_speed - steer_output
right_speed = base_speed + steer_output
# 设置电机速度
set_motor_speed('A', left_speed)
set_motor_speed('B', right_speed)
# 显示信息
img.draw_string(10, 10, "Target Locked", color=(255, 0, 0))
img.draw_string(10, 25, "Error: %d" % error_x, color=(255, 0, 0))
img.draw_string(10, 40, "Distance: %d" % target_distance, color=(255, 0, 0))
img.draw_string(10, 55, "L: %d, R: %d" % (left_speed, right_speed), color=(255, 0, 0))
else:
# 没有找到目标,停止电机
stop_motors()
img.draw_string(10, 10, "No Target", color=(255, 255, 255))
# 缓慢旋转寻找目标
set_motor_speed('A', 20)
set_motor_speed('B', -20)
# 显示帧率
img.draw_string(sensor.width()-80, 10, "FPS: %.1f" % (1.0/dt if dt > 0 else 0),
color=(255, 255, 255))
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
stop_motors()
print("程序结束")
6.2 智能仓储管理系统
python
"""
智能仓储管理系统
功能:通过二维码识别管理库存,显示库存信息
硬件:OpenMV Cam H7 + OLED显示屏 + 蜂鸣器
"""
import sensor, image, time, pyb, tf, uos
# 初始化外设
# OLED显示屏 (I2C接口)
i2c = pyb.I2C(2, pyb.I2C.MASTER)
oled = None # 这里需要根据具体OLED型号初始化
# 蜂鸣器
buzzer = pyb.Pin('P6', pyb.Pin.OUT_PP)
# 初始化摄像头
sensor.reset()
sensor.set_pixformat(sensor.RGB565)
sensor.set_framesize(sensor.VGA) # 更高分辨率便于识别二维码
sensor.skip_frames(2000)
# 库存数据库
inventory_db = {
"ITEM001": {"name": "螺丝刀", "quantity": 15, "location": "A-01"},
"ITEM002": {"name": "扳手", "quantity": 8, "location": "A-02"},
"ITEM003": {"name": "钳子", "quantity": 12, "location": "B-01"},
"ITEM004": {"name": "电钻", "quantity": 5, "location": "B-02"},
"ITEM005": {"name": "螺丝", "quantity": 100, "location": "C-01"},
}
# 操作记录
operation_log = []
def beep(times=1, duration=100):
"""蜂鸣器提示音"""
for _ in range(times):
buzzer.high()
pyb.delay(duration)
buzzer.low()
pyb.delay(duration)
def update_display(item_id, operation):
"""更新OLED显示"""
if item_id in inventory_db:
item = inventory_db[item_id]
message = """
%s
物品: %s
数量: %d
位置: %s
操作: %s
""" % (item_id, item["name"], item["quantity"], item["location"], operation)
# 这里添加OLED显示代码
print(message) # 临时用打印代替
def log_operation(item_id, operation, change=0):
"""记录操作日志"""
timestamp = time.localtime()
timestamp_str = "%04d-%02d-%02d %02d:%02d:%02d" % (
timestamp[0], timestamp[1], timestamp[2],
timestamp[3], timestamp[4], timestamp[5]
)
log_entry = {
"timestamp": timestamp_str,
"item_id": item_id,
"operation": operation,
"quantity_change": change,
"new_quantity": inventory_db.get(item_id, {}).get("quantity", 0)
}
operation_log.append(log_entry)
# 保存到文件
try:
with open("/sd/operation_log.txt", "a") as f:
f.write("%s,%s,%s,%d,%d\n" % (
log_entry["timestamp"],
log_entry["item_id"],
log_entry["operation"],
log_entry["quantity_change"],
log_entry["new_quantity"]
))
except:
print("无法保存日志到SD卡")
def add_inventory(item_id, quantity=1):
"""增加库存"""
if item_id in inventory_db:
inventory_db[item_id]["quantity"] += quantity
update_display(item_id, "入库")
log_operation(item_id, "IN", quantity)
beep(2) # 成功提示音
return True
else:
# 新物品
inventory_db[item_id] = {
"name": "未知物品",
"quantity": quantity,
"location": "待分配"
}
update_display(item_id, "新物品入库")
log_operation(item_id, "NEW", quantity)
beep(3)
return True
def remove_inventory(item_id, quantity=1):
"""减少库存"""
if item_id in inventory_db:
if inventory_db[item_id]["quantity"] >= quantity:
inventory_db[item_id]["quantity"] -= quantity
update_display(item_id, "出库")
log_operation(item_id, "OUT", -quantity)
beep(1)
return True
else:
update_display(item_id, "库存不足")
beep(4, 50) # 错误提示音
return False
else:
update_display(item_id, "物品不存在")
beep(4, 50)
return False
def check_inventory():
"""检查库存状态"""
low_stock = []
for item_id, item in inventory_db.items():
if item["quantity"] < 10: # 阈值设为10
low_stock.append((item_id, item["name"], item["quantity"]))
return low_stock
def main():
last_scan_time = time.ticks_ms()
scan_interval = 1000 # 扫描间隔1秒
print("智能仓储管理系统启动")
print("等待扫描二维码...")
while True:
current_time = time.ticks_ms()
if time.ticks_diff(current_time, last_scan_time) > scan_interval:
img = sensor.snapshot()
# 识别二维码
codes = img.find_qrcodes()
for code in codes:
payload = code.payload()
print("扫描到二维码:", payload)
# 解析指令 (格式: 操作:物品ID:数量)
parts = payload.split(":")
if len(parts) >= 2:
operation = parts[0].upper()
item_id = parts[1]
if len(parts) >= 3:
try:
quantity = int(parts[2])
except:
quantity = 1
else:
quantity = 1
# 执行操作
if operation == "IN":
add_inventory(item_id, quantity)
elif operation == "OUT":
remove_inventory(item_id, quantity)
elif operation == "CHECK":
if item_id in inventory_db:
update_display(item_id, "查询")
else:
update_display(item_id, "物品不存在")
else:
print("未知操作:", operation)
# 绘制二维码区域
img.draw_rectangle(code.rect(), color=(0, 255, 0))
img.draw_string(code.x(), code.y()-20, payload, color=(0, 255, 0))
last_scan_time = current_time
# 定期检查库存
if time.ticks_diff(current_time, last_scan_time) > 5000: # 每5秒检查一次
low_stock = check_inventory()
if low_stock:
print("低库存警告:")
for item in low_stock:
print(" %s (%s): %d个" % item)
last_scan_time = current_time
# 显示帧率
img.draw_string(5, 5, "库存管理系统", color=(255, 255, 255))
img.draw_string(5, 20, "等待扫描...", color=(255, 255, 255))
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("系统关闭")
print("操作日志:")
for log in operation_log[-10:]: # 显示最后10条日志
print(log)
第七部分:性能优化与调试技巧
7.1 性能优化策略
python
"""
OpenMV性能优化技巧
"""
import sensor, image, time, pyb, gc
def optimize_performance():
"""
性能优化演示
"""
# 1. 选择合适的分辨率
# 分辨率越高,处理越慢
sensor.reset()
sensor.set_pixformat(sensor.GRAYSCALE) # 灰度图比RGB565快2倍
sensor.set_framesize(sensor.QQVGA) # 160x120 - 最快
# sensor.set_framesize(sensor.QVGA) # 320x240 - 平衡
# sensor.set_framesize(sensor.VGA) # 640x480 - 最慢
# 2. 关闭自动功能
sensor.set_auto_gain(False)
sensor.set_auto_whitebal(False)
sensor.set_auto_exposure(False)
# 3. 设置合适的曝光时间
sensor.set_auto_exposure(False, exposure_us=10000) # 10ms
# 4. 使用ROI(感兴趣区域)
# 只处理图像的一部分
sensor.set_windowing((40, 30, 80, 60)) # 只处理中心区域
# 5. 跳帧处理
sensor.skip_frames(time=2000) # 让相机稳定
clock = time.clock()
while True:
clock.tick()
# 6. 减少图像操作
img = sensor.snapshot()
# 7. 只在需要时计算
if clock.fps() < 15: # 如果帧率太低
# 减少处理复杂度
# 例如:降低检测精度
blobs = img.find_blobs([(30, 60, 25, 65, 10, 45)],
pixels_threshold=200, # 提高阈值
area_threshold=200)
else:
blobs = img.find_blobs([(30, 60, 25, 65, 10, 45)],
pixels_threshold=100,
area_threshold=100)
# 8. 手动垃圾回收
if clock.fps() < 10:
gc.collect() # 手动触发垃圾回收
# 9. 使用局部变量加速访问
fps = clock.fps()
img_width = img.width()
img_height = img.height()
# 10. 减少绘制操作
if blobs:
largest = max(blobs, key=lambda b: b.pixels())
# 只绘制必要的图形
img.draw_rectangle(largest.rect(), color=255)
# 避免频繁的字符串操作
if fps < 5:
# 只在帧率很低时显示调试信息
img.draw_string(5, 5, "Low FPS: %.1f" % fps, color=255)
print("FPS:", fps)
# 11. 动态调整处理频率
if fps < 5:
time.sleep_ms(50) # 如果帧率太低,稍微休息
# 内存使用监控
def monitor_memory():
import micropython
while True:
gc.collect()
free_memory = gc.mem_free()
allocated_memory = gc.mem_alloc()
total_memory = free_memory + allocated_memory
print("内存使用:")
print(" 可用: %d bytes" % free_memory)
print(" 已用: %d bytes" % allocated_memory)
print(" 总计: %d bytes" % total_memory)
print(" 使用率: %.1f%%" % (allocated_memory / total_memory * 100))
# 堆栈信息
micropython.mem_info()
time.sleep(5)
# 运行优化示例
if __name__ == "__main__":
optimize_performance()
7.2 调试技巧与故障排除
python
"""
OpenMV调试技巧
"""
import sensor, image, time, pyb, uos, json
class DebugLogger:
"""调试日志记录器"""
def __init__(self, log_file="/sd/debug.log", max_size=10000):
self.log_file = log_file
self.max_size = max_size
self.log_count = 0
def log(self, message, level="INFO"):
"""记录日志"""
timestamp = time.localtime()
timestamp_str = "%04d-%02d-%02d %02d:%02d:%02d" % (
timestamp[0], timestamp[1], timestamp[2],
timestamp[3], timestamp[4], timestamp[5]
)
log_entry = "%s [%s] %s\n" % (timestamp_str, level, message)
print(log_entry.strip()) # 同时输出到控制台
try:
# 检查文件大小
try:
size = uos.stat(self.log_file)[6]
if size > self.max_size:
# 文件太大,清空
with open(self.log_file, "w") as f:
f.write("")
except:
pass
# 写入日志
with open(self.log_file, "a") as f:
f.write(log_entry)
self.log_count += 1
except Exception as e:
print("无法写入日志文件:", e)
def dump_system_info(self):
"""输出系统信息"""
self.log("=== 系统信息 ===")
# 内存信息
import gc
gc.collect()
self.log("可用内存: %d bytes" % gc.mem_free())
self.log("已用内存: %d bytes" % gc.mem_alloc())
# 文件系统信息
try:
fs_stats = uos.statvfs('/')
total_space = fs_stats[0] * fs_stats[2]
free_space = fs_stats[0] * fs_stats[3]
self.log("SD卡总空间: %d KB" % (total_space // 1024))
self.log("SD卡可用空间: %d KB" % (free_space // 1024))
except:
self.log("无法获取文件系统信息")
# 摄像头信息
self.log("摄像头分辨率: %dx%d" % (sensor.width(), sensor.height()))
self.log("像素格式: %s" % ("RGB565" if sensor.get_pixformat() == sensor.RGB565 else "GRAYSCALE"))
def camera_diagnostics():
"""摄像头诊断"""
logger = DebugLogger()
logger.log("开始摄像头诊断")
# 测试不同设置
resolutions = [
("QQVGA", sensor.QQVGA),
("HQVGA", sensor.HQVGA),
("QVGA", sensor.QVGA),
("VGA", sensor.VGA)
]
for name, res in resolutions:
try:
sensor.reset()
sensor.set_pixformat(sensor.GRAYSCALE)
sensor.set_framesize(res)
sensor.skip_frames(100)
clock = time.clock()
frames = 0
start_time = time.ticks_ms()
# 测量帧率
while frames < 30:
clock.tick()
img = sensor.snapshot()
frames += 1
elapsed = time.ticks_diff(time.ticks_ms(), start_time)
fps = frames / (elapsed / 1000)
logger.log("%s: %.1f FPS" % (name, fps))
except Exception as e:
logger.log("%s 测试失败: %s" % (name, str(e)), "ERROR")
logger.log("摄像头诊断完成")
def image_processing_benchmark():
"""图像处理性能基准测试"""
logger = DebugLogger()
logger.log("开始图像处理性能测试")
sensor.reset()
sensor.set_pixformat(sensor.GRAYSCALE)
sensor.set_framesize(sensor.QVGA)
sensor.skip_frames(200)
tests = [
("find_blobs", lambda img: img.find_blobs([(0, 255)], pixels_threshold=100)),
("find_edges", lambda img: img.find_edges(image.EDGE_CANNY)),
("find_lines", lambda img: img.find_lines(threshold=1000)),
("find_circles", lambda img: img.find_circles(threshold=2000)),
("find_qrcodes", lambda img: img.find_qrcodes()),
]
for test_name, test_func in tests:
try:
# 预热
for _ in range(5):
img = sensor.snapshot()
test_func(img)
# 实际测试
iterations = 10
start_time = time.ticks_ms()
for _ in range(iterations):
img = sensor.snapshot()
result = test_func(img)
elapsed = time.ticks_diff(time.ticks_ms(), start_time)
avg_time = elapsed / iterations
logger.log("%s: 平均 %.1f ms" % (test_name, avg_time))
except Exception as e:
logger.log("%s 测试失败: %s" % (test_name, str(e)), "ERROR")
logger.log("性能测试完成")
def main():
"""主调试函数"""
logger = DebugLogger()
# 记录启动信息
logger.log("应用程序启动")
logger.dump_system_info()
try:
# 运行摄像头诊断
camera_diagnostics()
# 运行性能测试
image_processing_benchmark()
# 主循环
sensor.reset()
sensor.set_pixformat(sensor.RGB565)
sensor.set_framesize(sensor.QVGA)
sensor.skip_frames(200)
frame_count = 0
last_log_time = time.ticks_ms()
while True:
frame_count += 1
# 每100帧记录一次状态
current_time = time.ticks_ms()
if time.ticks_diff(current_time, last_log_time) > 5000: # 5秒
fps = frame_count / 5
logger.log("运行状态: %.1f FPS, 帧数: %d" % (fps, frame_count))
frame_count = 0
last_log_time = current_time
# 内存监控
import gc
gc.collect()
if gc.mem_free() < 10000: # 内存不足
logger.log("警告: 内存不足! 可用内存: %d bytes" % gc.mem_free(), "WARNING")
# 主图像处理逻辑
img = sensor.snapshot()
# 示例处理:颜色跟踪
blobs = img.find_blobs([(30, 60, 25, 65, 10, 45)], pixels_threshold=100)
if blobs:
largest = max(blobs, key=lambda b: b.pixels())
img.draw_rectangle(largest.rect(), color=(255, 0, 0))
# 记录检测到物体
if frame_count % 10 == 0:
logger.log("检测到红色物体,大小: %d 像素" % largest.pixels())
except Exception as e:
logger.log("程序异常: %s" % str(e), "ERROR")
logger.log("程序终止")
# 尝试保存错误信息
try:
with open("/sd/error_report.txt", "w") as f:
import sys
sys.print_exception(e, f)
except:
pass
# 重启系统
pyb.hard_reset()
if __name__ == "__main__":
main()
第八部分:扩展与进阶
8.1 自定义图像处理算法
python
"""
自定义图像处理算法示例
"""
import sensor, image, time, math, ulab
class CustomImageProcessor:
"""自定义图像处理器"""
@staticmethod
def enhance_contrast(img, contrast_factor=2.0):
"""
增强图像对比度
contrast_factor: 对比度因子,1.0为原始图像
"""
# 转换为字节数组进行处理
img_data = img.bytearray()
# 计算直方图
hist = [0] * 256
for pixel in img_data:
hist[pixel] += 1
# 计算累积分布函数
cdf = [0] * 256
cdf[0] = hist[0]
for i in range(1, 256):
cdf[i] = cdf[i-1] + hist[i]
# 归一化CDF
cdf_min = min(filter(lambda x: x > 0, cdf))
cdf_range = len(img_data) - cdf_min
# 创建查找表
lut = [0] * 256
for i in range(256):
if cdf[i] > 0:
lut[i] = max(0, min(255,
int(((cdf[i] - cdf_min) / cdf_range) * 255 * contrast_factor)))
# 应用查找表
enhanced_data = bytearray(len(img_data))
for i in range(len(img_data)):
enhanced_data[i] = lut[img_data[i]]
# 创建新图像
enhanced_img = image.Image(img.width(), img.height(),
sensor.GRAYSCALE, copy_to_fb=False)
enhanced_img.set_data(enhanced_data)
return enhanced_img
@staticmethod
def custom_edge_detection(img, threshold=50):
"""
自定义边缘检测算法
使用Sobel算子
"""
width = img.width()
height = img.height()
# Sobel算子核
sobel_x = [[-1, 0, 1],
[-2, 0, 2],
[-1, 0, 1]]
sobel_y = [[-1, -2, -1],
[0, 0, 0],
[1, 2, 1]]
# 创建输出图像
output = image.Image(width, height, sensor.GRAYSCALE, copy_to_fb=False)
# 获取图像数据
img_data = img.bytearray()
out_data = output.bytearray()
# 应用Sobel算子
for y in range(1, height-1):
for x in range(1, width-1):
# 计算x方向梯度
gx = 0
for ky in range(3):
for kx in range(3):
pixel = img_data[(y+ky-1)*width + (x+kx-1)]
gx += sobel_x[ky][kx] * pixel
# 计算y方向梯度
gy = 0
for ky in range(3):
for kx in range(3):
pixel = img_data[(y+ky-1)*width + (x+kx-1)]
gy += sobel_y[ky][kx] * pixel
# 计算梯度幅度
magnitude = min(255, int(math.sqrt(gx*gx + gy*gy)))
# 应用阈值
if magnitude > threshold:
out_data[y*width + x] = 255
else:
out_data[y*width + x] = 0
return output
@staticmethod
def morphological_operations(img, operation='erode', kernel_size=3):
"""
形态学操作:腐蚀和膨胀
operation: 'erode' 或 'dilate'
kernel_size: 核大小(奇数)
"""
width = img.width()
height = img.height()
# 创建输出图像
output = image.Image(width, height, sensor.GRAYSCALE, copy_to_fb=False)
# 获取图像数据
img_data = img.bytearray()
out_data = output.bytearray()
half_kernel = kernel_size // 2
for y in range(half_kernel, height-half_kernel):
for x in range(half_kernel, width-half_kernel):
# 提取邻域
neighbors = []
for ky in range(-half_kernel, half_kernel+1):
for kx in range(-half_kernel, half_kernel+1):
pixel = img_data[(y+ky)*width + (x+kx)]
neighbors.append(pixel)
if operation == 'erode':
# 腐蚀:取最小值
out_data[y*width + x] = min(neighbors)
elif operation == 'dilate':
# 膨胀:取最大值
out_data[y*width + x] = max(neighbors)
return output
def main():
"""主函数:演示自定义图像处理算法"""
sensor.reset()
sensor.set_pixformat(sensor.GRAYSCALE)
sensor.set_framesize(sensor.QVGA)
sensor.skip_frames(200)
processor = CustomImageProcessor()
while True:
# 捕获原始图像
original = sensor.snapshot()
# 1. 对比度增强
enhanced = processor.enhance_contrast(original, contrast_factor=2.0)
# 2. 边缘检测
edges = processor.custom_edge_detection(original, threshold=50)
# 3. 形态学操作
eroded = processor.morphological_operations(original, 'erode', 3)
dilated = processor.morphological_operations(original, 'dilate', 3)
# 显示结果(在IDE中查看)
# 注意:OpenMV IDE一次只能显示一张图像
# 这里我们显示边缘检测结果
edges.display()
# 或者将多个结果组合显示
combined = image.Image(original.width() * 2, original.height() * 2,
sensor.GRAYSCALE, copy_to_fb=False)
# 将四个处理结果拼接到一张大图中
combined.draw_image(original, 0, 0)
combined.draw_image(enhanced, original.width(), 0)
combined.draw_image(edges, 0, original.height())
# 将腐蚀和膨胀结果合并显示
morph_combined = image.Image(original.width(), original.height(),
sensor.GRAYSCALE, copy_to_fb=False)
# 创建差异图像
morph_data = morph_combined.bytearray()
eroded_data = eroded.bytearray()
dilated_data = dilated.bytearray()
for i in range(len(morph_data)):
# 显示膨胀和腐蚀的差异
morph_data[i] = dilated_data[i] - eroded_data[i]
combined.draw_image(morph_combined, original.width(), original.height())
combined.display()
if __name__ == "__main__":
main()
8.2 多摄像头系统
python
"""
多摄像头系统示例
注意:需要多个OpenMV模块和额外的硬件支持
"""
import sensor, image, time, pyb, network, socket
class MultiCameraSystem:
"""多摄像头协调系统"""
def __init__(self, camera_id=0, server_ip='192.168.1.100', server_port=8080):
self.camera_id = camera_id
self.server_ip = server_ip
self.server_port = server_port
# 初始化摄像头
self.init_camera()
# 网络连接
self.socket = None
self.connect_to_server()
def init_camera(self):
"""初始化摄像头设置"""
sensor.reset()
sensor.set_pixformat(sensor.RGB565)
sensor.set_framesize(sensor.QVGA)
# 根据摄像头ID调整参数
if self.camera_id == 0:
# 主摄像头
sensor.set_brightness(0)
sensor.set_saturation(0)
sensor.set_gainceiling(16)
elif self.camera_id == 1:
# 辅助摄像头1
sensor.set_brightness(1)
sensor.set_contrast(1)
elif self.camera_id == 2:
# 辅助摄像头2
sensor.set_brightness(-1)
sensor.set_contrast(-1)
sensor.skip_frames(500)
def connect_to_server(self):
"""连接到中央服务器"""
try:
self.socket = socket.socket()
self.socket.connect((self.server_ip, self.server_port))
# 发送摄像头ID
self.socket.send(b"CAMERA_ID:%d\n" % self.camera_id)
print("已连接到服务器")
except Exception as e:
print("连接服务器失败:", e)
self.socket = None
def process_image(self, img):
"""图像处理流水线"""
results = {
'camera_id': self.camera_id,
'timestamp': time.ticks_ms(),
'objects': [],
'features': {}
}
# 颜色检测
color_results = self.detect_colors(img)
if color_results:
results['objects'].extend(color_results)
# 形状检测
shape_results = self.detect_shapes(img)
if shape_results:
results['objects'].extend(shape_results)
# 特征提取
features = self.extract_features(img)
results['features'] = features
return results
def detect_colors(self, img):
"""检测颜色"""
colors = []
# 红色检测
red_blobs = img.find_blobs([(30, 60, 25, 65, 10, 45)],
pixels_threshold=100)
for blob in red_blobs[:3]: # 只取前3个
colors.append({
'type': 'red',
'x': blob.cx(),
'y': blob.cy(),
'size': blob.pixels()
})
# 蓝色检测
blue_blobs = img.find_blobs([(10, 40, 10, 30, -60, -30)],
pixels_threshold=100)
for blob in blue_blobs[:3]:
colors.append({
'type': 'blue',
'x': blob.cx(),
'y': blob.cy(),
'size': blob.pixels()
})
return colors
def detect_shapes(self, img):
"""检测形状"""
shapes = []
# 圆形检测
circles = img.find_circles(threshold=2000, x_margin=10,
y_margin=10, r_margin=10)
for circle in circles[:5]:
shapes.append({
'type': 'circle',
'x': circle.x(),
'y': circle.y(),
'radius': circle.r()
})
# 直线检测
lines = img.find_lines(threshold=1000)
for line in lines[:5]:
shapes.append({
'type': 'line',
'x1': line.x1(),
'y1': line.y1(),
'x2': line.x2(),
'y2': line.y2(),
'theta': line.theta(),
'rho': line.rho()
})
return shapes
def extract_features(self, img):
"""提取图像特征"""
features = {}
# 转换为灰度图进行特征提取
gray = img.to_grayscale()
# 计算图像统计信息
stats = gray.get_statistics()
features['brightness'] = stats.mean()
features['contrast'] = stats.stdev()
# 计算图像熵(复杂度度量)
features['entropy'] = self.calculate_entropy(gray)
# 边缘密度
edges = gray.find_edges(image.EDGE_CANNY)
edge_pixels = sum(1 for p in edges.bytearray() if p > 128)
features['edge_density'] = edge_pixels / (gray.width() * gray.height())
return features
def calculate_entropy(self, img):
"""计算图像熵"""
hist = [0] * 256
total_pixels = img.width() * img.height()
# 计算直方图
for pixel in img.bytearray():
hist[pixel] += 1
# 计算熵
entropy = 0
for count in hist:
if count > 0:
probability = count / total_pixels
entropy -= probability * math.log(probability, 2)
return entropy
def send_results(self, results):
"""发送结果到服务器"""
if self.socket:
try:
import json
data = json.dumps(results)
self.socket.send(data.encode() + b'\n')
except Exception as e:
print("发送数据失败:", e)
# 尝试重新连接
self.connect_to_server()
def run(self):
"""主循环"""
frame_count = 0
last_report_time = time.ticks_ms()
while True:
frame_count += 1
# 捕获图像
img = sensor.snapshot()
# 处理图像
results = self.process_image(img)
# 每10帧发送一次结果
if frame_count % 10 == 0:
self.send_results(results)
# 每分钟报告一次状态
current_time = time.ticks_ms()
if time.ticks_diff(current_time, last_report_time) > 60000:
print("摄像头 %d: 已处理 %d 帧" % (self.camera_id, frame_count))
last_report_time = current_time
# 显示本地结果
self.display_results(img, results)
# 控制帧率
time.sleep_ms(33) # 约30FPS
def display_results(self, img, results):
"""在图像上显示结果"""
# 绘制检测到的对象
for obj in results['objects']:
if obj['type'] == 'red':
color = (255, 0, 0)
elif obj['type'] == 'blue':
color = (0, 0, 255)
elif obj['type'] == 'circle':
color = (0, 255, 0)
img.draw_circle(obj['x'], obj['y'], obj['radius'], color=color)
continue
elif obj['type'] == 'line':
color = (255, 255, 0)
img.draw_line(obj['x1'], obj['y1'], obj['x2'], obj['y2'], color=color)
continue
# 绘制点
img.draw_circle(obj['x'], obj['y'], 5, color=color)
img.draw_string(obj['x']+10, obj['y'], obj['type'], color=color)
# 显示摄像头ID和帧数
img.draw_string(5, 5, "Cam %d" % self.camera_id, color=(255, 255, 255))
# 显示特征信息
if results['features']:
y_offset = 20
for key, value in results['features'].items():
if y_offset < img.height() - 20:
text = "%s: %.2f" % (key, value)
img.draw_string(5, y_offset, text, color=(255, 255, 255))
y_offset += 15
# 服务器端代码示例
class CameraServer:
"""多摄像头服务器"""
def __init__(self, port=8080):
self.port = port
self.cameras = {}
self.server_socket = None
self.start_server()
def start_server(self):
"""启动服务器"""
self.server_socket = socket.socket()
self.server_socket.bind(('0.0.0.0', self.port))
self.server_socket.listen(5)
print("服务器启动,端口:", self.port)
def handle_client(self, client_socket, address):
"""处理客户端连接"""
print("新连接:", address)
try:
# 接收摄像头ID
data = client_socket.readline()
if data.startswith(b"CAMERA_ID:"):
camera_id = int(data.split(b":")[1])
self.cameras[camera_id] = {
'socket': client_socket,
'address': address,
'last_update': time.ticks_ms()
}
print("摄像头 %d 注册成功" % camera_id)
# 接收数据
while True:
data = client_socket.readline()
if data:
try:
import json
results = json.loads(data.decode())
self.process_camera_data(camera_id, results)
# 更新最后活动时间
self.cameras[camera_id]['last_update'] = time.ticks_ms()
except Exception as e:
print("数据处理错误:", e)
else:
break
except Exception as e:
print("客户端处理错误:", e)
finally:
# 清理
if camera_id in self.cameras:
del self.cameras[camera_id]
client_socket.close()
print("连接关闭:", address)
def process_camera_data(self, camera_id, data):
"""处理摄像头数据"""
print("摄像头 %d 数据:" % camera_id)
print(" 时间戳:", data['timestamp'])
print(" 对象数量:", len(data['objects']))
print(" 特征:", data['features'])
# 这里可以添加数据融合、3D重建等高级处理
# 示例:简单的位置融合
if len(self.cameras) >= 2:
self.fuse_positions()
def fuse_positions(self):
"""融合多摄像头位置数据"""
# 这里可以实现三角测量、多视角几何等算法
pass
def run(self):
"""服务器主循环"""
while True:
try:
client_socket, address = self.server_socket.accept()
# 创建新线程处理客户端
# 注意:OpenMV的MicroPython可能不支持线程
# 可以使用async或轮询方式处理多个连接
self.handle_client(client_socket, address)
except Exception as e:
print("服务器错误:", e)
# 清理超时连接
self.cleanup_timeout_connections()
time.sleep_ms(100)
def cleanup_timeout_connections(self):
"""清理超时连接"""
current_time = time.ticks_ms()
timeout = 30000 # 30秒超时
to_remove = []
for camera_id, info in self.cameras.items():
if time.ticks_diff(current_time, info['last_update']) > timeout:
to_remove.append(camera_id)
for camera_id in to_remove:
try:
self.cameras[camera_id]['socket'].close()
except:
pass
del self.cameras[camera_id]
print("摄像头 %d 超时断开" % camera_id)
# 使用示例
if __name__ == "__main__":
# 判断当前是作为摄像头还是服务器运行
import sys
if len(sys.argv) > 1:
# 作为摄像头客户端运行
camera_id = int(sys.argv[1])
server_ip = sys.argv[2] if len(sys.argv) > 2 else '192.168.1.100'
system = MultiCameraSystem(camera_id=camera_id, server_ip=server_ip)
system.run()
else:
# 作为服务器运行
server = CameraServer(port=8080)
server.run()
第九部分:最佳实践与资源推荐
9.1 OpenMV开发最佳实践
-
代码组织
-
使用模块化设计,将功能分解为独立的函数和类
-
为重要函数和类添加文档字符串
-
保持代码简洁,避免过长的函数
-
-
内存管理
-
及时释放不需要的对象
-
避免在循环中创建大量临时对象
-
使用
gc.collect()定期清理内存
-
-
性能优化
-
选择合适的分辨率和像素格式
-
使用ROI减少处理区域
-
预处理图像以减少后续操作复杂度
-
-
错误处理
-
使用try-except处理可能失败的操作
-
记录错误信息便于调试
-
实现优雅降级,当高级功能失败时使用基本功能
-
-
电源管理
-
使用
sensor.sleep(True)在空闲时降低功耗 -
调整帧率平衡性能和功耗
-
关闭未使用的外设
-
9.2 学习资源推荐
官方资源:
-
OpenMV官方网站:https://openmv.io/
-
GitHub仓库:https://github.com/openmv/openmv
教程与课程:
-
OpenMV官方教程系列
-
YouTube上的OpenMV教程视频
-
在线课程平台上的嵌入式视觉课程
社区与支持:
-
OpenMV官方论坛
-
GitHub Issues
-
相关的Discord和Slack频道
扩展库与工具:
-
OpenMV IDE:官方开发环境
-
TensorFlow Lite for Microcontrollers
-
各种图像处理和机器学习库
9.3 常见问题与解决方案
-
摄像头初始化失败
-
检查硬件连接
-
尝试重启设备
-
更新固件到最新版本
-
-
内存不足错误
-
降低图像分辨率
-
减少同时处理的图像数量
-
优化算法减少内存使用
-
-
帧率过低
-
降低图像分辨率
-
简化图像处理算法
-
关闭不必要的图像增强功能
-
-
检测精度不高
-
调整阈值参数
-
改善光照条件
-
使用更合适的颜色空间
-
-
通信问题
-
检查接线和接口设置
-
验证波特率和其他通信参数
-
添加错误检测和重试机制
-
结语
OpenMV作为一个强大的嵌入式机器视觉平台,为创客、学生和工程师提供了实现计算机视觉应用的便捷途径。通过本指南的学习,您应该已经掌握了从基础到高级的OpenMV开发技能,并能够实现各种实用的视觉应用。
记住,实践是最好的老师。不断尝试新的项目,解决实际问题,您将不断提高自己的技能。机器视觉领域正在快速发展,OpenMV为您打开了进入这个精彩世界的大门。
无论您是想要构建智能机器人、自动化系统,还是进行科学研究,OpenMV都能为您提供强大的支持。祝您在嵌入式视觉的开发道路上取得成功!
附录:常用函数速查表
| 功能 | 函数/方法 | 说明 |
|---|---|---|
| 摄像头初始化 | sensor.reset() |
重置摄像头 |
| 设置像素格式 | sensor.set_pixformat() |
RGB565或GRAYSCALE |
| 设置分辨率 | sensor.set_framesize() |
QQVGA、QVGA、VGA等 |
| 捕获图像 | sensor.snapshot() |
捕获一帧图像 |
| 颜色识别 | img.find_blobs() |
查找颜色块 |
| 人脸检测 | img.find_features() |
检测人脸 |
| 二维码识别 | img.find_qrcodes() |
识别二维码 |
| 条形码识别 | img.find_barcodes() |
识别条形码 |
| 直线检测 | img.find_lines() |
霍夫变换检测直线 |
| 圆形检测 | img.find_circles() |
检测圆形 |
| 模板匹配 | img.find_template() |
匹配预定义模板 |
| AprilTag检测 | img.find_apriltags() |
检测AprilTag标记 |
| 光流计算 | image.OpticalFlow() |
计算图像光流 |
| 图像保存 | img.save() |
保存图像到文件 |
| 图像显示 | img.display() |
在IDE中显示图像 |
| 串口通信 | pyb.UART() |
串口通信 |
| I2C通信 | pyb.I2C() |
I2C通信 |
| SPI通信 | pyb.SPI() |
SPI通信 |
| PWM输出 | pyb.Timer().channel() |
PWM信号输出 |
| 时间延迟 | pyb.delay() |
毫秒级延迟 |
| 定时器 | time.ticks_ms() |
获取毫秒时间戳 |
这个速查表涵盖了OpenMV开发中最常用的函数和方法,可以帮助您快速查找需要的功能。随着OpenMV的不断发展,建议您经常查阅官方文档以获取最新信息。
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐


所有评论(0)