本文展示了一个基于 YOLOv11 深度学习模型的目标检测与跟踪系统,并通过 PyQt5 提供了直观易用的可视化界面。整个项目集成了 模型推理、文件夹自动监控、图像处理、结果可视化、UI交互 等功能,既可以进行单张图片的检测,也支持对文件夹进行实时监控和目标跟踪。

1. 前言

1.1yolov11源码+环境配置

官方地址:https://github.com/ultralytics/ultralytics

环境配置:https://docs.ultralytics.com/zh/quickstart/

1.2源码

https://pan.baidu.com/s/1ngV3dvUdGGYCCEDVKj2PBw?pwd=xqit

2. 系统架构与核心功能

2.1 模型推理模块

通过 ModelService 封装了 YOLOv11 模型加载与推理接口。

class ModelService:
    """封装 YOLO 模型的加载与推理"""

    def __init__(self):
        self.model = None

    def load_model(self, model_path: str):
        """加载 YOLO 模型"""
        self.model = YOLO(model_path)

    def is_loaded(self) -> bool:
        return self.model is not None

    def predict(self, image, save=False, track=False):
        """运行推理或跟踪"""
        if not self.is_loaded():
            raise RuntimeError("模型未加载")
        if track:
            return self.model.track(image, persist=True, save=save)
        return self.model.predict(image, save=save)
  • predict() —— 目标检测模式。
  • track() —— 目标跟踪模式,能在连续输入的图像序列中保持对象 ID 的一致性。
  • 支持是否保存推理结果到 runs/ 目录,方便后续分析。

2.2 检测线程

DetectionWorker 继承自 QThread,实现 后台任务调度:

  • class DetectionWorker(QThread):
    # ---------- 主循环 ----------
        def run(self):
            while self._running_flag.is_set():
                if self._paused_flag.is_set():
                    time.sleep(0.1)
                    continue
    
                try:
                    item = self.task_queue.get(timeout=0.2)
                except Empty:
                    continue
                if item is None:
                    break
    
                try:
                    if not self.model_service.is_loaded():
                        continue
    
                    image_for_infer = item
                    if not self.is_saving_results:
                        img_bgr = cv2.imread(item)
                        if img_bgr is not None:
                            image_for_infer = img_bgr
    
                    results = self.model_service.predict(
                        image_for_infer,
                        save=self.is_saving_results,
                        track=self.is_continuous_mode,
                    )
    
                    res0 = results[0]
                    orig_rgb = cv2.cvtColor(res0.orig_img, cv2.COLOR_BGR2RGB)
                    ann_rgb = cv2.cvtColor(res0.plot(), cv2.COLOR_BGR2RGB)
                    self.image_ready.emit(orig_rgb, ann_rgb)  # type: ignore
    
                    del results, res0, ann_rgb
                except Exception:
                    traceback.print_exc()
  • 支持 暂停恢复停止 等操作,保证检测可控。
  • 根据[任务模式]决定调用 YOLO 的 检测跟踪 功能。

2.3 文件夹监控

基于 watchdog 实现 ImageFolderWatcher,可以实时监听文件夹中新生成的图片。

class ImageFolderWatcher(QThread):
    """文件夹监控线程 (基于 watchdog)"""
    new_image_detected = pyqtSignal(str)

    def __init__(self, folder_path: str):
        super().__init__()
        self.folder_path = folder_path
        self.observer = Observer()
        self.event_handler = ImageFileEventHandler(self)
        self.observer.schedule(self.event_handler, self.folder_path, recursive=False)
        self._running_flag = threading.Event()
        self._running_flag.set()

    def run(self):
        self.observer.start()
        while self._running_flag.is_set():
            self.msleep(UI_CONFIG["FOLDER_WATCH_INTERVAL_MS"])

    def stop(self):
        self._running_flag.clear()
        try:
            self.observer.stop()
            self.observer.join(timeout=1.0)
        finally:
            self.quit()
            self.wait(1000)

3. 可视化界面设计(UI)

3.1 界面布局

界面分为两大区域:控制面板和图像展示面板

左侧控制面板(ControlPanel)

  • 支持 任务模式: [目标检测] / [目标跟踪]、输入源:[图片] / [文件夹]、执行方式:[手动] / [自动] 组合模式。
  • 提供操作按钮:[👆加载模型]、[📂选择文件]、[▶开始检测/暂停/继续]、[⏹停止]、[❌退出]、[♻恢复默认配置]。

右侧图像展示面板(ImagePanel)

  • 并排显示两张图片:原始输入图像、带检测/跟踪结果的标注图像。

3.2 交互逻辑

界面包含三个下拉框,用于配置检测方式:

  • 任务模式:[目标检测] / [目标跟踪]
  • 输入源:[图片] / [文件夹]
  • 执行方式:[手动] / [自动]

其中,在输入源为 [文件夹] 时“执行方式”不可操作。[文件夹]模式下会自动扫描已有图片并实时监控新增文件。

4. 运行流程示例

1.启动程序。

2.选择模型文件(或使用默认模型)。

3.选择文件(图片 / 文件夹,或使用默认文件或文件夹)。

4.点击[▶开始检测]按钮(不进行2、3操作会直接使用默认模型和文件)。

# =========================================
# 全局配置
# =========================================
UI_CONFIG = {
    "APP_TITLE": "实例分割系统",
    "APP_ICON": "./icons/paimon.png",
    "WIN_GEOM": (100, 120, 1920, 900),
    "FONT_FAMILY": "Microsoft YaHei, Arial",

    "DEFAULT_MODEL_PATH": "./model/best.pt",
    "DEFAULT_IMAGE_PATH": "./image/mei_gan_93/93_0610.jpg",
    "DEFAULT_FOLDER_PATH": "./image/mei_gan_93",

    "THROTTLE_INTERVAL": 0.10,
    "FOLDER_WATCH_INTERVAL_MS": 200,
}

任务模式:[目标检测]

        输入源:[文件夹]

                执行方式:不可操作。

                功能:对文件夹中的所有图片进行检测。图片可以是 连续帧,也可以是 任意不连续的独立图片。

        输入源:[图片] + 执行方式:[手动]

                功能:图片检测完成后,需要重新选择[📂选择文件]要检测的图片,并点击[▶开始检测]按钮。

        输入源:[图片] + 执行方式:[自动]

                功能:点击[▶开始检测]按钮后,该按钮变为[➡下一张]。当前图片检测完成后,会自动加载该图片所在文件夹中的下一张图片。点击[➡下一张]按钮检测图片。

任务模式:[目标跟踪]

        输入源:[文件夹]

                执行方式:不可操作。

                功能:对文件夹中的所有图片进行 检测 + 跟踪。文件夹中的图片需要是 连续帧

        输入源:[图片] + 执行方式:[手动]

                功能:与[目标检测]模式一致,每次手动选择图片并点击[▶开始检测]按钮。

        输入源:[图片] + 执行方式:[自动]

                功能:与[目标检测]模式一致。

5.可随时[⏸暂停]、[▶继续]、[⏹停止]检测。

4.1完整代码

源码(含config_manager.py和config.json):

https://pan.baidu.com/s/1ngV3dvUdGGYCCEDVKj2PBw?pwd=xqit

# =========================================
# 导入依赖
# =========================================
import os
import sys
import threading
import time
import traceback
from queue import Queue, Empty

import cv2
import numpy as np
from PyQt5.QtCore import Qt, QThread, pyqtSignal, QTime, QTimer, QPropertyAnimation, QEasingCurve
from PyQt5.QtGui import QImage, QPixmap, QIcon, QFont
from PyQt5.QtWidgets import (
    QApplication, QMainWindow, QLabel, QVBoxLayout, QWidget,
    QPushButton, QHBoxLayout, QFileDialog, QScrollArea,
    QMessageBox, QFormLayout, QCheckBox, QFrame, QComboBox, QToolButton
)
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer

from config_manager import load_config, save_config, DEFAULT_CONFIG
from ultralytics import YOLO

# =========================================
# 全局配置
# =========================================
UI_CONFIG = {
    "APP_TITLE": "实例分割系统",
    "APP_ICON": "./icons/paimon.png",
    "WIN_GEOM": (100, 120, 1920, 900),
    "FONT_FAMILY": "Microsoft YaHei, Arial",

    "DEFAULT_MODEL_PATH": "./model/best.pt",
    "DEFAULT_IMAGE_PATH": "./image/mei_gan_93/93_0610.jpg",
    "DEFAULT_FOLDER_PATH": "./image/mei_gan_93",

    "THROTTLE_INTERVAL": 0.10,
    "FOLDER_WATCH_INTERVAL_MS": 200,
}


# =========================================
# 工具函数
# =========================================
def ndarray_to_qpixmap(np_image: np.ndarray, target_size) -> QPixmap:
    """
    将 OpenCV RGB 图像转换为 Qt Pixmap 并保持比例缩放
    """
    if np_image is None:
        return QPixmap()
    height, width, _ = np_image.shape
    qimage = QImage(np_image.data, width, height, 3 * width, QImage.Format_RGB888)
    return QPixmap.fromImage(qimage).scaled(
        target_size, Qt.KeepAspectRatio, Qt.SmoothTransformation
    )


# =========================================
# 文件选择工具
# =========================================
class FileDialogHelper:
    """封装文件/文件夹选择"""

    @staticmethod
    def select_image_file(parent=None) -> str:
        path, _ = QFileDialog.getOpenFileName(
            parent, "选择图片文件", "./", "图片文件 (*.jpg *.jpeg *.png)"
        )
        return path

    @staticmethod
    def select_image_folder(parent=None) -> str:
        return QFileDialog.getExistingDirectory(parent, "选择图片文件夹", "./")

    @staticmethod
    def select_model_file(parent=None) -> str:
        path, _ = QFileDialog.getOpenFileName(
            parent,
            "选择模型文件",
            "./",
            "模型文件 (*.pt);;All Files (*)",
            initialFilter="模型文件 (*.pt)"
        )
        return path


# =========================================
# 文件夹监控
# =========================================
class ImageFileEventHandler(FileSystemEventHandler):
    """
    监控文件夹中新生成的图像文件.
    - 确保文件写入稳定后才触发事件
    - 定期清理过期的文件记录
    """

    def __init__(self, watcher_thread):
        super().__init__()
        self.watcher_thread = watcher_thread
        self.seen_files = {}
        self.last_cleanup_time = time.time()
        self.file_count_since_cleanup = 0
        self._init_existing_files()

    def _init_existing_files(self):
        """扫描目录已有的图像文件, 并记录时间戳"""
        for f in os.listdir(self.watcher_thread.folder_path):
            if f.lower().endswith((".jpg", ".jpeg", ".png")):
                path = os.path.abspath(os.path.join(self.watcher_thread.folder_path, f))
                self.seen_files[path] = time.time()

    def _cleanup_old_files(self, max_age_seconds=600, max_count=1000):
        """清理 seen_files 中过期或过多的记录"""
        now = time.time()
        # 按时间清理
        to_remove = [p for p, last_seen in self.seen_files.items()
                     if now - last_seen > max_age_seconds]
        for p in to_remove:
            self.seen_files.pop(p, None)

        # 按数量清理
        if len(self.seen_files) > max_count:
            sorted_items = sorted(self.seen_files.items(), key=lambda x: x[1])
            self.seen_files = dict(sorted_items[-max_count:])

        self.last_cleanup_time = now
        self.file_count_since_cleanup = 0

    @staticmethod
    def _file_is_stable(path: str, max_wait_s: float = 2.0, probe_interval_s: float = 0.1) -> bool:
        """检测文件是否写入完成 (文件大小不再变化)"""
        start = time.time()
        try:
            last_size = os.path.getsize(path)
        except Exception:
            return False
        while time.time() - start < max_wait_s:
            time.sleep(probe_interval_s)
            try:
                new_size = os.path.getsize(path)
            except Exception:
                return False
            if new_size == last_size:
                return True
            last_size = new_size
        return False

    def on_created(self, event):
        if event.is_directory:
            return
        path = os.path.abspath(event.src_path)
        if not path.lower().endswith((".jpg", ".jpeg", ".png")):
            return

        self.file_count_since_cleanup += 1
        current_time = time.time()

        # 定期清理
        if (self.file_count_since_cleanup >= 100 or
                current_time - self.last_cleanup_time >= 300):
            self._cleanup_old_files()

        # 避免重复短时间触发
        if path in self.seen_files and current_time - self.seen_files[path] < 60:
            return

        # 文件必须写入稳定
        if not self._file_is_stable(path):
            return

        self.seen_files[path] = current_time
        self.watcher_thread.new_image_detected.emit(path)


class ImageFolderWatcher(QThread):
    """文件夹监控线程 (基于 watchdog)"""
    new_image_detected = pyqtSignal(str)

    def __init__(self, folder_path: str):
        super().__init__()
        self.folder_path = folder_path
        self.observer = Observer()
        self.event_handler = ImageFileEventHandler(self)
        self.observer.schedule(self.event_handler, self.folder_path, recursive=False)
        self._running_flag = threading.Event()
        self._running_flag.set()

    def run(self):
        self.observer.start()
        while self._running_flag.is_set():
            self.msleep(UI_CONFIG["FOLDER_WATCH_INTERVAL_MS"])

    def stop(self):
        self._running_flag.clear()
        try:
            self.observer.stop()
            self.observer.join(timeout=1.0)
        finally:
            self.quit()
            self.wait(1000)


# =========================================
# 模型服务
# =========================================
class ModelService:
    """封装 YOLO 模型的加载与推理"""

    def __init__(self):
        self.model = None

    def load_model(self, model_path: str):
        """加载 YOLO 模型"""
        self.model = YOLO(model_path)

    def is_loaded(self) -> bool:
        return self.model is not None

    def predict(self, image, save=False, track=False):
        """运行推理或跟踪"""
        if not self.is_loaded():
            raise RuntimeError("模型未加载")
        if track:
            return self.model.track(image, persist=True, save=save)
        return self.model.predict(image, save=save)


# =========================================
# 检测线程
# =========================================
class DetectionWorker(QThread):
    """后台检测线程, 负责调度模型推理"""
    image_ready = pyqtSignal(np.ndarray, np.ndarray)

    def __init__(self):
        super().__init__()
        self.task_queue = Queue()
        self._running_flag = threading.Event()
        self._running_flag.set()
        self._paused_flag = threading.Event()

        self.is_continuous_mode = True
        self.is_saving_results = False

        self.model_service = ModelService()

    # ---------- 配置 ----------
    def load_model(self, model_path: str):
        self.model_service.load_model(model_path)

    def set_continuous_mode(self, enabled: bool):
        self.is_continuous_mode = bool(enabled)

    def set_save_results_enabled(self, enabled: bool):
        self.is_saving_results = bool(enabled)

    # ---------- 队列 ----------
    def enqueue(self, image_path: str):
        if image_path:
            self.task_queue.put(image_path)

    def clear_queue(self):
        while not self.task_queue.empty():
            try:
                self.task_queue.get(False)
            except Empty:
                break

    def stop(self):
        self._running_flag.clear()
        self.task_queue.put(None)

    def pause(self):
        self._paused_flag.set()

    def resume(self):
        self._paused_flag.clear()

    # ---------- 主循环 ----------
    def run(self):
        while self._running_flag.is_set():
            if self._paused_flag.is_set():
                time.sleep(0.1)
                continue

            try:
                item = self.task_queue.get(timeout=0.2)
            except Empty:
                continue
            if item is None:
                break

            try:
                if not self.model_service.is_loaded():
                    continue

                image_for_infer = item
                if not self.is_saving_results:
                    img_bgr = cv2.imread(item)
                    if img_bgr is not None:
                        image_for_infer = img_bgr

                results = self.model_service.predict(
                    image_for_infer,
                    save=self.is_saving_results,
                    track=self.is_continuous_mode,
                )

                res0 = results[0]
                orig_rgb = cv2.cvtColor(res0.orig_img, cv2.COLOR_BGR2RGB)
                ann_rgb = cv2.cvtColor(res0.plot(), cv2.COLOR_BGR2RGB)
                self.image_ready.emit(orig_rgb, ann_rgb)  # type: ignore

                del results, res0, ann_rgb
            except Exception:
                traceback.print_exc()


# =========================================
# UI: 图像显示面板
# =========================================
class ImagePanel(QWidget):
    def __init__(self):
        super().__init__()
        self._build_ui()

    def _build_ui(self):
        layout = QVBoxLayout(self)
        layout.setSpacing(16)

        img_row = QHBoxLayout()
        img_row.setSpacing(16)

        self.original_image_label = QLabel()
        self.original_image_label.setObjectName("imageHolder")
        self.original_image_label.setAlignment(Qt.AlignCenter)
        self.original_image_label.setMinimumSize(860, 700)

        self.segmented_image_label = QLabel()
        self.segmented_image_label.setObjectName("imageHolder")
        self.segmented_image_label.setAlignment(Qt.AlignCenter)
        self.segmented_image_label.setMinimumSize(860, 700)

        img_row.addWidget(self.original_image_label)
        img_row.addWidget(self.segmented_image_label)

        layout.addLayout(img_row)

    def show_images(self, orig_rgb: np.ndarray, ann_rgb: np.ndarray):
        pm1 = ndarray_to_qpixmap(orig_rgb, self.original_image_label.size())
        self.original_image_label.setPixmap(pm1)
        pm2 = ndarray_to_qpixmap(ann_rgb, self.segmented_image_label.size())
        self.segmented_image_label.setPixmap(pm2)


# =========================================
# UI: 控制面板
# =========================================
class CollapsibleGroupBox(QWidget):
    """可折叠分组面板"""

    def __init__(self, title="", expanded=True, parent=None):
        super().__init__(parent)
        # expanded 控制默认展开或折叠
        self.toggle_button = QToolButton(text=title, checkable=True, checked=expanded)
        self.toggle_button.setToolButtonStyle(Qt.ToolButtonTextBesideIcon)
        self.toggle_button.setArrowType(Qt.DownArrow if expanded else Qt.RightArrow)
        self.toggle_button.clicked.connect(self.on_toggled)  # type: ignore

        self.content_area = QFrame()
        self.content_area.setObjectName("expanded")
        self.content_layout = QFormLayout(self.content_area)
        self.content_layout.setLabelAlignment(Qt.AlignRight)
        self.content_layout.setHorizontalSpacing(24)
        self.content_layout.setVerticalSpacing(12)
        self.content_layout.setContentsMargins(12, 12, 12, 12)

        # 如果默认折叠, 则高度设为 0
        if not expanded:
            self.content_area.setMaximumHeight(0)

        self.toggle_animation = QPropertyAnimation(self.content_area, b"maximumHeight")
        self.toggle_animation.setDuration(150)
        self.toggle_animation.setEasingCurve(QEasingCurve.InOutQuad)

        main_layout = QVBoxLayout(self)
        main_layout.setContentsMargins(0, 0, 0, 0)
        main_layout.addWidget(self.toggle_button)
        main_layout.addWidget(self.content_area)

    def on_toggled(self, checked):
        self.toggle_button.setArrowType(Qt.DownArrow if checked else Qt.RightArrow)
        if checked:
            self.toggle_animation.setStartValue(0)
            self.toggle_animation.setEndValue(self.content_area.sizeHint().height())
        else:
            self.toggle_animation.setStartValue(self.content_area.height())
            self.toggle_animation.setEndValue(0)
        self.toggle_animation.start()

    def addRow(self, label, widget):
        self.content_layout.addRow(label, widget)


class ControlPanel(QWidget):
    loadModelRequested = pyqtSignal()
    chooseFileRequested = pyqtSignal()
    startDetectionRequested = pyqtSignal()
    pauseRequested = pyqtSignal()
    resumeRequested = pyqtSignal()
    stopRequested = pyqtSignal()
    exitRequested = pyqtSignal()
    restoreDefaultsRequested = pyqtSignal()

    continuousChanged = pyqtSignal(bool)
    detectTypeChanged = pyqtSignal(str)
    imageModeChanged = pyqtSignal(str)
    saveResultsToggled = pyqtSignal(bool)

    def __init__(self):
        super().__init__()
        self.detect_state = "idle"
        self._build_ui()

    def _build_ui(self):
        scroll = QScrollArea()
        scroll.setMinimumWidth(600)
        scroll.setWidgetResizable(True)

        container = QFrame()
        container.setObjectName("card")
        container_layout = QVBoxLayout(container)
        container_layout.setSpacing(8)

        # ---------- 设备状态 ----------
        status_group = CollapsibleGroupBox("⚙ 设备状态", expanded=True)
        self.runtime_label = QLabel('00:00:00')
        status_group.addRow('运行时间:', self.runtime_label)
        container_layout.addWidget(status_group)

        # ---------- 检测设置 ----------
        detect_group = CollapsibleGroupBox("🔍 检测设置", expanded=True)
        self.continuous_combo = QComboBox()
        self.continuous_combo.addItems(["目标跟踪", "目标检测"])
        self.continuous_combo.currentIndexChanged.connect(  # type: ignore
            lambda idx: self.continuousChanged.emit(idx == 0)  # type: ignore
        )

        self.detect_type_combo = QComboBox()
        self.detect_type_combo.addItems(["文件夹", "图片"])
        self.detect_type_combo.currentIndexChanged.connect(  # type: ignore
            lambda idx: self.detectTypeChanged.emit("folder" if idx == 0 else "image")  # type: ignore
        )

        self.image_mode_combo = QComboBox()
        self.image_mode_combo.addItems(["手动", "自动"])
        self.image_mode_combo.currentIndexChanged.connect(  # type: ignore
            lambda idx: self.imageModeChanged.emit("manual" if idx == 0 else "auto")  # type: ignore
        )

        self.save_results_checkbox = QCheckBox("保存推理结果到 runs/ 目录")
        self.save_results_checkbox.stateChanged.connect(  # type: ignore
            lambda s: self.saveResultsToggled.emit(s == Qt.Checked)  # type: ignore
        )

        detect_group.addRow('任务模式:', self.continuous_combo)
        detect_group.addRow("输入源:", self.detect_type_combo)
        detect_group.addRow("执行方式:", self.image_mode_combo)
        detect_group.addRow('', self.save_results_checkbox)
        container_layout.addWidget(detect_group)

        # ---------- 操作按钮 ----------
        button_group = CollapsibleGroupBox("🎯 操作", expanded=True)
        self.load_model_btn = QPushButton("👆 选择模型")
        self.load_model_btn.clicked.connect(self.loadModelRequested)  # type: ignore

        self.choose_file_btn = QPushButton("📂 选择文件")
        self.choose_file_btn.clicked.connect(self.chooseFileRequested)  # type: ignore

        self.start_detection_btn = QPushButton("▶ 开始检测")
        self.start_detection_btn.clicked.connect(self._on_start_pause_resume_clicked)  # type: ignore

        self.stop_btn = QPushButton("⏹ 停止")
        self.stop_btn.clicked.connect(lambda: self.stopRequested.emit())  # type: ignore

        self.exit_btn = QPushButton("❌ 退出")
        self.exit_btn.clicked.connect(self.exitRequested)  # type: ignore

        self.restore_defaults_btn = QPushButton("♻ 恢复默认配置")
        self.restore_defaults_btn.clicked.connect(lambda: self.restoreDefaultsRequested.emit())  # type: ignore

        button_group.addRow('', self.load_model_btn)
        button_group.addRow('', self.choose_file_btn)
        button_group.addRow('', self.start_detection_btn)
        button_group.addRow('', self.stop_btn)
        button_group.addRow('', self.exit_btn)
        button_group.addRow('', self.restore_defaults_btn)
        container_layout.addWidget(button_group)

        container_layout.addStretch()
        scroll.setWidget(container)

        root = QVBoxLayout(self)
        root.addWidget(scroll)

    def _on_start_pause_resume_clicked(self):
        detect_type = self.detect_type_combo.currentText()
        if detect_type == "图片":
            self.startDetectionRequested.emit()  # type: ignore
            return
        if self.detect_state == 'idle':
            self.startDetectionRequested.emit()  # type: ignore
            self.detect_state = 'running'
            self.start_detection_btn.setText("⏸ 暂停")
        elif self.detect_state == 'running':
            self.pauseRequested.emit()  # type: ignore
            self.detect_state = 'paused'
            self.start_detection_btn.setText("▶ 继续")
        elif self.detect_state == 'paused':
            self.resumeRequested.emit()  # type: ignore
            self.detect_state = 'running'
            self.start_detection_btn.setText("⏸ 暂停")

    def reset_start_button(self):
        self.detect_state = 'idle'
        self.start_detection_btn.setText("▶ 开始检测")

    def _decorate_status(self, label: QLabel):
        label.setObjectName("status")
        label.setAlignment(Qt.AlignCenter)
        label.setMinimumSize(180, 40)

    def set_runtime_text(self, text: str):
        self.runtime_label.setText(text)


# =========================================
# 主窗口 (协调器)
# =========================================
class SortingAppWindow(QMainWindow):
    """主窗口, 负责整合 UI、检测线程 """

    def __init__(self):
        super().__init__()
        self.current_config = load_config()
        self.model_path = ""
        self.input_path = ""
        self.image_mode = "manual"
        self.detect_type = "folder"

        self.image_list = []
        self.current_image_index = -1

        font = QFont()
        font.setPointSize(12)
        font.setFamily(UI_CONFIG["FONT_FAMILY"])
        QApplication.setFont(font)

        self.detector = DetectionWorker()
        self.folder_watcher = None

        self._setup_window()
        self._build_layout()
        self._apply_config_to_ui()
        self._connect_ui_signals()
        self._connect_detector_signals()
        self._apply_config_to_runtime()
        self._setup_timers()

        self.detector.start()

    # ---------------- UI 结构 ----------------
    def _setup_window(self):
        self.setWindowTitle(UI_CONFIG["APP_TITLE"])
        try:
            self.setWindowIcon(QIcon(UI_CONFIG["APP_ICON"]))
        except Exception:
            pass
        x, y, w, h = UI_CONFIG["WIN_GEOM"]
        self.setGeometry(x, y, w, h)

        self.setStyleSheet(f"""
            QWidget {{
                color: #222222;
                background: #f7f7fb;
                font-size: 12pt;
            }}
            QPushButton {{
                background: white; 
                border: 2px solid #6950a1;
                border-radius: 12px; 
                padding: 10px 16px; 
                font-weight: 600;
            }}
            QPushButton:hover {{
                color: white;
                background: #6950a1; 
            }}
            QLabel#status {{
                border: 1px solid #e9e7f3; 
                background: white;
                border-radius: 10px;
            }}          
            QFrame#card {{
                background: white; 
                border: 2px solid #e9e7f3; 
                border-radius: 12px;
            }}
            QFrame#expanded {{
                background: white; 
                border: 2px solid #e9e7f3; 
                border-radius: 6px;
            }}
            QScrollArea {{ 
                border: 2px solid #e9e7f3; 
                border-radius: 12px; 
            }}
            QLabel#imageHolder {{ 
                border: 3px solid #6950a1; 
                border-radius: 18px; 
                background: #000000; 
            }}
            QToolButton {{
                border: none; 
                font-weight: bold;
            }}
        """)

    def _build_layout(self):
        root = QHBoxLayout()
        root.setContentsMargins(16, 16, 16, 16)
        root.setSpacing(16)

        self.ctrl_panel = ControlPanel()
        self.image_panel = ImagePanel()

        root.addWidget(self.ctrl_panel)
        root.addWidget(self.image_panel)

        central = QWidget()
        central.setLayout(root)
        self.setCentralWidget(central)

    # ---------------- 配置同步 ----------------
    def _apply_config_to_ui(self):
        cfg = self.current_config
        # 🔍 检测设置
        self._block_set(self.ctrl_panel.continuous_combo, self.ctrl_panel.continuous_combo.setCurrentIndex,
                        0 if cfg["CONTINUOUS_MODE"] else 1)
        self._block_set(self.ctrl_panel.detect_type_combo, self.ctrl_panel.detect_type_combo.setCurrentIndex,
                        0 if cfg["DETECT_TYPE"] == "folder" else 1)
        self._block_set(self.ctrl_panel.image_mode_combo, self.ctrl_panel.image_mode_combo.setCurrentIndex,
                        0 if cfg["IMAGE_MODE"] == "manual" else 1)
        self._block_set(self.ctrl_panel.save_results_checkbox, self.ctrl_panel.save_results_checkbox.setChecked,
                        cfg["SAVE_RESULTS"])

        self._update_ui_enable_state()

    def _block_set(self, widget, method, value):
        widget.blockSignals(True)
        method(value)
        widget.blockSignals(False)

    # ---------------- 信号绑定 ----------------
    def _connect_ui_signals(self):
        # 按钮
        self.ctrl_panel.loadModelRequested.connect(self._on_choose_model)  # type: ignore
        self.ctrl_panel.chooseFileRequested.connect(self._on_choose_file)  # type: ignore
        self.ctrl_panel.startDetectionRequested.connect(self._on_start_detection)  # type: ignore
        self.ctrl_panel.pauseRequested.connect(self._on_pause_detection)  # type: ignore
        self.ctrl_panel.resumeRequested.connect(self._on_resume_detection)  # type: ignore
        self.ctrl_panel.stopRequested.connect(self._on_stop_detection)  # type: ignore
        self.ctrl_panel.exitRequested.connect(self._close_app)  # type: ignore
        self.ctrl_panel.restoreDefaultsRequested.connect(self._restore_defaults)  # type: ignore

        self.ctrl_panel.detectTypeChanged.connect(  # type: ignore
            lambda v: self._update_config("DETECT_TYPE", v, self._on_detect_type_changed))
        self.ctrl_panel.imageModeChanged.connect(  # type: ignore
            lambda v: self._update_config("IMAGE_MODE", v, self._on_image_mode_changed))

        self.ctrl_panel.continuous_combo.currentIndexChanged.connect(  # type: ignore
            lambda _: self._update_ui_enable_state())
        self.ctrl_panel.detect_type_combo.currentIndexChanged.connect(  # type: ignore
            lambda _: self._update_ui_enable_state())

    def _update_ui_enable_state(self):
        is_image_type = self.ctrl_panel.detect_type_combo.currentIndex() == 1
        self.ctrl_panel.image_mode_combo.setEnabled(is_image_type)

    def _connect_detector_signals(self):
        self.detector.image_ready.connect(self._on_image_ready)  # type: ignore
        self.ctrl_panel.continuousChanged.connect(  # type: ignore
            lambda v: self._update_config("CONTINUOUS_MODE", v, self.detector.set_continuous_mode))
        self.ctrl_panel.saveResultsToggled.connect(  # type: ignore
            lambda v: self._update_config("SAVE_RESULTS", v, self.detector.set_save_results_enabled))

    def _update_config(self, key, value, apply_fn):
        self.current_config[key] = value
        save_config(self.current_config)
        apply_fn(value)

    def _apply_config_to_runtime(self):
        cfg = self.current_config

        self.detector.set_continuous_mode(cfg["CONTINUOUS_MODE"])
        self.detector.set_save_results_enabled(cfg["SAVE_RESULTS"])

        self.detect_type = cfg["DETECT_TYPE"]
        self.image_mode = cfg["IMAGE_MODE"]

    # ---------------- 定时器 ----------------
    def _setup_timers(self):
        self._start_time = QTime(0, 0)
        self.runtime_timer = QTimer(self)
        self.runtime_timer.timeout.connect(self._tick_runtime)  # type: ignore
        self.runtime_timer.start(1000)

    def _tick_runtime(self):
        self._start_time = self._start_time.addSecs(1)
        self.ctrl_panel.set_runtime_text(self._start_time.toString("hh:mm:ss"))

    # ---------------- 状态显示 ----------------
    def show_status(self, text: str, timeout: int = 0):
        self.statusBar().showMessage(text, timeout)

    # ---------------- 事件处理 ----------------
    def _on_choose_model(self):
        path = FileDialogHelper.select_model_file(self)
        if path:
            self.model_path = path
            self.show_status(f"🟢 已选择模型: {path}", 5000)

    def _on_choose_file(self):
        if self.detect_type == "image":
            path = FileDialogHelper.select_image_file(self)
        else:
            path = FileDialogHelper.select_image_folder(self)

        if path:
            self.input_path = path
            file_type = "图片" if self.detect_type == "image" else "文件夹"
            self.show_status(f"🟢 已选择{file_type}: {path}", 5000)

    def _on_pause_detection(self):
        self.detector.pause()
        self.show_status("⏸ 已暂停检测", 5000)

    def _on_resume_detection(self):
        self.detector.resume()
        self.show_status("▶ 恢复检测", 5000)

    def _on_stop_detection(self, show_message=True):
        if self.folder_watcher:
            self.folder_watcher.stop()
            self.folder_watcher = None

        try:
            self.detector.clear_queue()
        except Exception:
            pass

        self.image_list.clear()
        self.current_image_index = -1

        if self.detector.isRunning():
            self.detector.stop()
            self.detector.wait(500)

        self.detector = DetectionWorker()
        self._connect_detector_signals()
        self._apply_config_to_runtime()
        self.detector.start()

        self.ctrl_panel.reset_start_button()
        if show_message:
            self.show_status("⏹ 检测已停止", 5000)
        else:
            self.show_status("✅ 自动模式完成", 5000)

        black_img = np.zeros((700, 860, 3), dtype=np.uint8)
        self.image_panel.show_images(black_img, black_img)

    def _restore_defaults(self):
        self.current_config = DEFAULT_CONFIG.copy()
        save_config(self.current_config)
        self._apply_config_to_ui()
        self._apply_config_to_runtime()
        self.show_status("♻ 恢复默认配置完成", 5000)

    def _on_image_mode_changed(self, mode: str):
        self.image_mode = mode
        self.show_status(f"🖼 当前执行方式: {'手动模式' if mode == 'manual' else '自动模式'}", 5000)

    def _on_detect_type_changed(self, detect_type: str):
        self.detect_type = detect_type
        self.show_status(f"🔍 输入源: {'图片' if detect_type == 'image' else '文件夹'}", 5000)

    def _on_image_ready(self, orig_rgb: np.ndarray, ann_rgb: np.ndarray):
        self.image_panel.show_images(orig_rgb, ann_rgb)

    # ----------- 开始检测 -----------
    def _on_start_detection(self):
        if not self.model_path:
            if os.path.exists(UI_CONFIG["DEFAULT_MODEL_PATH"]):
                self.model_path = UI_CONFIG["DEFAULT_MODEL_PATH"]
                self.show_status(f"🟢 使用默认模型: {UI_CONFIG['DEFAULT_MODEL_PATH']}", 5000)
            else:
                QMessageBox.warning(self, "错误", "未选择模型且默认模型不存在!")
                return

        # ========= 加载模型 =========
        if not self.detector.model_service.is_loaded():
            try:
                self.detector.load_model(self.model_path)
                self.show_status('🟢 模型已加载', 5000)
            except Exception as e:
                traceback.print_exc()
                QMessageBox.warning(self, "错误", f"加载模型失败: {e}")
                self.show_status('🔴 加载失败', 5000)
                return

        # ========= 检测流程 =========
        if self.detect_type == "image":
            self._start_image_detection()
        else:
            self._start_folder_detection()

    def _start_image_detection(self):
        if os.path.isdir(self.input_path):
            self.input_path = ""

        if self.image_mode == "manual":
            if not self.input_path:
                if os.path.exists(UI_CONFIG["DEFAULT_IMAGE_PATH"]):
                    self.input_path = UI_CONFIG["DEFAULT_IMAGE_PATH"]
                    self.show_status(f"🟢 使用默认图片: {UI_CONFIG['DEFAULT_IMAGE_PATH']}", 5000)
                else:
                    path = FileDialogHelper.select_image_file(self)
                    if not path:
                        return
                    self.input_path = path

            if not os.path.isfile(self.input_path):
                QMessageBox.warning(self, "错误", f"选择的路径不是有效的图片文件: {self.input_path}")
                self.input_path = ""
                return

            self.detector.enqueue(self.input_path)
            self.show_status(f"📷 手动模式: {os.path.basename(self.input_path)}", 5000)
            return
        else:
            if not self.image_list:
                if not self.input_path:
                    if os.path.exists(UI_CONFIG["DEFAULT_IMAGE_PATH"]):
                        self.input_path = UI_CONFIG["DEFAULT_IMAGE_PATH"]
                        self.show_status(f"🟢 使用默认图片: {UI_CONFIG['DEFAULT_IMAGE_PATH']}", 5000)
                    else:
                        path = FileDialogHelper.select_image_file(self)
                        if not path:
                            return
                        self.input_path = path

                if not os.path.isfile(self.input_path):
                    QMessageBox.warning(self, "错误", f"选择的路径不是有效的图片文件: {self.input_path}")
                    self.input_path = ""
                    return

                folder = os.path.dirname(self.input_path)
                self.image_list = sorted([
                    os.path.abspath(os.path.join(folder, f))
                    for f in os.listdir(folder)
                    if f.lower().endswith((".jpg", ".jpeg", ".png")) and os.path.isfile(os.path.join(folder, f))
                ])

                abs_input = os.path.abspath(self.input_path).lower()
                image_list_lower = [p.lower() for p in self.image_list]

                if abs_input in image_list_lower:
                    self.current_image_index = image_list_lower.index(abs_input)
                else:
                    QMessageBox.warning(self, "错误", "所选图片不在该文件夹列表中!")
                    return

            if 0 <= self.current_image_index < len(self.image_list):
                self.input_path = self.image_list[self.current_image_index]
                self.detector.enqueue(self.input_path)
                self.show_status(
                    f"📷 自动模式: 第 {self.current_image_index + 1} / {len(self.image_list)} 张 → {os.path.basename(self.input_path)}",
                    5000
                )
                self.ctrl_panel.start_detection_btn.setText("➡ 下一张")
                self.current_image_index += 1
            else:
                QMessageBox.information(self, "提示", "✅ 已到最后一张图片")
                self._on_stop_detection(show_message=False)  # 清理并复位
                return

    def _start_folder_detection(self):
        if os.path.isfile(self.input_path):
            self.input_path = ""

        if not self.input_path:
            if os.path.exists(UI_CONFIG["DEFAULT_FOLDER_PATH"]):
                self.input_path = UI_CONFIG["DEFAULT_FOLDER_PATH"]
                self.show_status(f"🟢 使用默认文件夹: {UI_CONFIG['DEFAULT_FOLDER_PATH']}", 5000)
            else:
                QMessageBox.warning(self, "错误", "未选择文件夹且默认文件夹不存在!")
                return
        self._run_folder_detection(self.input_path)

    def _run_folder_detection(self, folder):
        """文件夹检测/监控"""
        if not folder:
            return
        self._setup_folder_watcher(folder)
        image_paths = [
            os.path.join(folder, f) for f in os.listdir(folder)
            if f.lower().endswith((".jpg", ".jpeg", ".png"))
        ]
        image_paths.sort()
        for p in image_paths:
            self.detector.enqueue(p)

    # ----------- 文件夹监控 -----------
    def _setup_folder_watcher(self, folder_path: str):
        if self.folder_watcher is not None:
            try:
                self.folder_watcher.stop()
                self.folder_watcher.new_image_detected.disconnect()
            except Exception:
                pass
            self.folder_watcher = None
        self.folder_watcher = ImageFolderWatcher(folder_path)
        self.folder_watcher.new_image_detected.connect(self._on_new_file)  # type: ignore
        self.folder_watcher.start()
        self.show_status('🟡 监控中', 5000)

    def _on_new_file(self, image_path: str):
        self.detector.enqueue(image_path)

    # ----------- 退出 -----------
    def _close_app(self):
        try:
            if self.folder_watcher and self.folder_watcher.isRunning():
                self.folder_watcher.stop()
            if self.detector and self.detector.isRunning():
                self.detector.stop()
                self.detector.wait(1000)
        finally:
            QApplication.quit()

    def closeEvent(self, event):
        self._close_app()
        event.accept()


# =========================================
# 程序入口
# =========================================
if __name__ == '__main__':
    app = QApplication(sys.argv)
    win = SortingAppWindow()
    win.show()
    sys.exit(app.exec_())

5结语

目前代码还不能直接选择视频文件进行检测或者跟踪,后续再加吧。

6增加视频检测(20260319)

文件:yolov11_pyqt5_v2.py
链接: https://pan.baidu.com/s/1Gg68TSJbLoP3JgoSSGUZUw?pwd=4499 提取码: 4499

 ①DetectionWorker.run() 中增加视频判断

找到:

item = self.task_queue.get(timeout=0.2)

在下面插入视频处理逻辑:

# ====== 新增:视频处理 ======
if isinstance(item, str) and item.lower().endswith((".mp4", ".avi", ".mov", ".mkv")):
    cap = cv2.VideoCapture(item)
    while cap.isOpened() and self._running_flag.is_set():
        ret, frame = cap.read()
        if not ret:
            break

        results = self.model_service.predict(
            frame,
            save=False,
            track=self.is_continuous_mode,
        )

        res0 = results[0]
        orig_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        ann_rgb = cv2.cvtColor(res0.plot(), cv2.COLOR_BGR2RGB)

        self.image_ready.emit(orig_rgb, ann_rgb)

        time.sleep(0.03)

    cap.release()
    continue

② 文件选择支持视频

找到:

def select_image_file(parent=None) -> str:

改成:

def select_image_file(parent=None) -> str:
    path, _ = QFileDialog.getOpenFileName(
        parent,
        "选择文件",
        "./",
        "媒体文件 (*.jpg *.jpeg *.png *.mp4 *.avi *.mov *.mkv)"
    )
    return path

③ _start_image_detection() 不限制图片

找到:

if not os.path.isfile(self.input_path):
    QMessageBox.warning(self, "错误", f"选择的路径不是有效的图片文件: {self.input_path}")
    self.input_path = ""
    return

改成:

if not os.path.isfile(self.input_path):
    QMessageBox.warning(self, "错误", f"路径无效: {self.input_path}")
    self.input_path = ""
    return

④视频走检测流程

找到:

def _start_image_detection(self):

在函数最前面加这个判断

# ====== 新增:视频直接走检测,不进入图片列表逻辑 ======
def _start_image_detection(self):
    # 👇 放在最前面
    if self.input_path.lower().endswith((".mp4", ".avi", ".mov", ".mkv")):
        self.detector.enqueue(self.input_path)
        self.show_status(f"🎬 视频检测: {os.path.basename(self.input_path)}", 5000)
        return

⑤小结

增加了检测视频的功能,修改仅供参考。

新增功能只是在原有逻辑基础上做了简单的改动,把视频当作图片来进行处理,在视频处理逻辑上存在一些问题。不能保存,没有暂停功能。

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐