【图像超分】python实现制作超分辨率数据集

前言

此博客针对于.264视频,其他视频文件逻辑相同

1、帧截取

将视频文件按照每x帧截取

import cv2
import os
import argparse
from pathlib import Path


def extract_h264_frames(file_path, output_dir, interval=30, fps=25):
    """
    从H.264视频文件中按指定间隔提取帧

    参数:
    file_path (str): .264文件路径
    output_dir (str): 输出图片目录
    interval (int): 帧间隔,默认30帧
    fps (int): 视频帧率,用于进度估算
    """
    # 创建输出目录
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    # 使用FFmpeg后端打开H.264文件
    cap = cv2.VideoCapture(file_path, cv2.CAP_FFMPEG)

    # 检查文件是否成功打开
    if not cap.isOpened():
        print(f"无法打开H.264文件: {file_path}")
        print("请确保已安装FFmpeg并正确配置OpenCV")
        return

    frame_number = 0
    extracted_count = 0
    error_count = 0

    print(f"开始从 {file_path} 提取帧...")
    print(f"帧间隔: {interval},输出目录: {output_dir}")

    while True:
        # 读取一帧
        ret, frame = cap.read()

        # 检查是否读取到帧
        if not ret:
            break

        # 每interval帧提取一次
        if frame_number % interval == 0:
            # 生成文件名
            filename = f"frame_{extracted_count:06d}.jpg"
            output_path = output_dir / filename

            # 保存帧
            if cv2.imwrite(str(output_path), frame):
                extracted_count += 1
                # 每提取10张图片显示一次进度
                if extracted_count % 10 == 0:
                    print(f"已提取 {extracted_count} 张图片")

        frame_number += 1

        # 错误处理
        if frame is None:
            error_count += 1
            if error_count % 10 == 0:
                print(f"已遇到 {error_count} 个错误帧")

    # 释放资源
    cap.release()

    # 输出统计信息
    print(f"提取完成!")
    print(f"总处理帧数: {frame_number}")
    print(f"成功提取图片: {extracted_count} 张")
    print(f"错误帧数量: {error_count}")
    print(f"所有图片已保存至: {output_dir}")


if __name__ == "__main__":
    # 解析命令行参数
    parser = argparse.ArgumentParser(description='从H.264视频中按间隔提取帧并保存为图片')
    parser.add_argument('file_path', help='.264文件的路径')
    parser.add_argument('output_dir', help='保存图片的目录')
    parser.add_argument('--interval', type=int, default=30, help='帧间隔,默认30帧')
    parser.add_argument('--fps', type=int, default=25, help='视频帧率,默认25fps')

    args = parser.parse_args()

    # 调用提取函数
    extract_h264_frames(args.file_path, args.output_dir, args.interval, args.fps)

多线程实现双三次插值缩小图像分辨率

缩小二倍,三倍,四倍,代码中有变量控制

import numpy as np
from PIL import Image
import math
import time
import os
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed


def cubic_kernel(x, a=-0.5):
    """三次卷积核函数(线程安全)"""
    x = abs(x)
    if x <= 1:
        return (a + 2) * math.pow(x, 3) - (a + 3) * math.pow(x, 2) + 1
    elif x < 2:
        return a * math.pow(x, 3) - 5 * a * math.pow(x, 2) + 8 * a * x - 4 * a
    else:
        return 0


def process_single_row(row_idx, img_array, scale_factor, height, width, channels, is_gray):
    """单线程处理一行像素(双三次插值核心)"""
    inv_scale = 1.0 / scale_factor
    new_width = int(width * scale_factor)
    row_pixels = np.zeros((new_width, channels), dtype=np.uint8) if not is_gray else np.zeros(new_width, dtype=np.uint8)

    # 计算垂直方向位置与权重
    y = row_idx * inv_scale
    y0 = math.floor(y)
    dy = y - y0
    v_weights = [cubic_kernel(1 + dy), cubic_kernel(dy), cubic_kernel(1 - dy), cubic_kernel(2 - dy)]

    # 逐列计算像素值
    for j in range(new_width):
        x = j * inv_scale
        x0 = math.floor(x)
        dx = x - x0
        h_weights = [cubic_kernel(1 + dx), cubic_kernel(dx), cubic_kernel(1 - dx), cubic_kernel(2 - dx)]

        if is_gray:
            pixel_value = 0.0
            for k in range(4):
                y_coord = max(0, min(y0 - 1 + k, height - 1))
                for l in range(4):
                    x_coord = max(0, min(x0 - 1 + l, width - 1))
                    pixel_value += img_array[y_coord, x_coord] * v_weights[k] * h_weights[l]
            row_pixels[j] = np.clip(pixel_value, 0, 255).astype(np.uint8)
        else:
            for c in range(channels):
                pixel_value = 0.0
                for k in range(4):
                    y_coord = max(0, min(y0 - 1 + k, height - 1))
                    for l in range(4):
                        x_coord = max(0, min(x0 - 1 + l, width - 1))
                        pixel_value += img_array[y_coord, x_coord, c] * v_weights[k] * h_weights[l]
                row_pixels[j, c] = np.clip(pixel_value, 0, 255).astype(np.uint8)

    return row_idx, row_pixels


def bicubic_interpolation_multi_thread(image, scale_factor, max_workers=None):
    """多线程双三次插值(单图处理核心)"""
    img_array = np.array(image)
    if len(img_array.shape) == 3:
        height, width, channels = img_array.shape
        is_gray = False
    else:
        height, width = img_array.shape
        channels = 1
        is_gray = True

    new_height = int(height * scale_factor)
    new_width = int(width * scale_factor)
    output = np.zeros((new_height, new_width), dtype=np.uint8) if is_gray else np.zeros((new_height, new_width, channels), dtype=np.uint8)

    # 线程池配置(默认CPU核心数)
    max_workers = max_workers or os.cpu_count() or 4

    # 并行处理所有行
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(process_single_row, idx, img_array, scale_factor, height, width, channels, is_gray): idx for idx in range(new_height)}
        for future in as_completed(futures):
            row_idx, row_pixels = future.result()
            output[row_idx, :] = row_pixels if is_gray else row_pixels[:, :]

    return Image.fromarray(output)


def process_single_image(input_img_path, output_dir, scale_factor, max_workers_per_img=None):
    """
    单图处理(核心保障:输出文件名 = 输入文件名)
    返回:(处理结果, 输入文件名, 输出文件名, 错误信息)
    """
    try:
        # 1. 提取输入图像的原始文件名(含后缀,如"frame_001.jpg")
        input_filename = os.path.basename(input_img_path)
        if not input_filename:
            raise ValueError("无法提取输入图像的文件名")

        # 2. 读取输入图像
        with Image.open(input_img_path) as img:
            img_copy = img.copy()  # 避免文件句柄占用
            img_format = img.format  # 保留原格式(确保输出格式与输入一致)

        # 3. 执行双三次插值缩放
        scaled_img = bicubic_interpolation_multi_thread(img_copy, scale_factor, max_workers_per_img)

        # 4. 构建输出路径(输出目录 + 原始文件名,保障名字完全一致)
        output_img_path = os.path.join(output_dir, input_filename)
        output_filename = os.path.basename(output_img_path)

        # 5. 验证:输出文件名必须与输入文件名完全一致(强化保障)
        if output_filename != input_filename:
            raise RuntimeError(f"文件名不一致错误!输入:{input_filename},输出:{output_filename}")

        # 6. 保存图像(保留原格式,避免格式转换导致的文件名后缀变化)
        scaled_img.save(output_img_path, format=img_format)

        return (True, input_filename, output_filename, "")

    except Exception as e:
        input_filename = os.path.basename(input_img_path) or "未知文件名"
        return (False, input_filename, "", str(e)[:150])  # 截取错误信息,避免日志过长


def batch_process_images(input_dir, output_dir, scale_factor, max_workers_per_img=None, max_batch_workers=4):
    """
    批量处理文件夹图像(核心规则:输入输出文件名完全一致)
    """
    # 1. 基础合法性检查
    if not os.path.isdir(input_dir):
        print(f"❌ 输入路径不存在或不是文件夹:{input_dir}")
        return

    # 2. 创建输出目录(确保目录存在,避免保存失败)
    os.makedirs(output_dir, exist_ok=True)
    print(f"✅ 输出目录已准备:{os.path.abspath(output_dir)}")
    print(f"📌 核心规则:输出图像文件名 = 输入图像文件名(含后缀)")

    # 3. 筛选支持的图像文件(避免处理非图像文件)
    supported_formats = (".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".gif")  # 可扩展
    img_files = []
    for root, _, files in os.walk(input_dir):  # 遍历所有子文件夹
        for file in files:
            if file.lower().endswith(supported_formats):
                img_files.append(os.path.join(root, file))

    # 4. 检查是否有可处理的图像
    if len(img_files) == 0:
        print(f"❌ 输入文件夹中未找到支持的图像文件(支持格式:{supported_formats})")
        return
    print(f"📊 批量任务信息:共{len(img_files)}张图像 | 缩放因子:{scale_factor}x")
    print(f"🔧 线程配置:单图线程数={max_workers_per_img or os.cpu_count() or 4} | 并行处理图数={max_batch_workers}")

    # 5. 批量并行处理(外层控制同时处理的图像数,避免内存过载)
    start_total = time.time()
    success_count = 0
    fail_count = 0
    fail_details = []

    print(f"\n🚀 开始批量处理(输入输出文件名严格一致)...")
    with ThreadPoolExecutor(max_workers=max_batch_workers) as batch_executor:
        # 提交所有图像处理任务
        futures = [batch_executor.submit(
            process_single_image,
            input_img_path=img_path,
            output_dir=output_dir,
            scale_factor=scale_factor,
            max_workers_per_img=max_workers_per_img
        ) for img_path in img_files]

        # 实时跟踪进度
        with tqdm(total=len(futures), desc="批量处理进度", unit="张", ncols=100) as pbar:
            for future in as_completed(futures):
                success, input_name, output_name, error_msg = future.result()
                if success:
                    success_count += 1
                    pbar.set_postfix({"当前成功": input_name})  # 显示当前成功的文件名
                else:
                    fail_count += 1
                    fail_details.append((input_name, error_msg))
                pbar.update(1)

    # 6. 输出最终报告(明确显示文件名一致性情况)
    total_time = time.time() - start_total
    print(f"\n📋 批量处理完成!")
    print(f"⏱️  总耗时:{total_time:.2f}秒")
    print(f"✅ 成功处理:{success_count}张(所有输出文件名与输入完全一致)")
    print(f"❌ 处理失败:{fail_count}张")

    # 显示失败详情(便于排查问题)
    if fail_count > 0:
        print(f"\n❌ 失败详情:")
        for idx, (img_name, err) in enumerate(fail_details, 1):
            print(f"  {idx}. 文件名:{img_name} | 错误:{err}")
            if idx >= 10:  # 最多显示10条,避免日志过长
                print(f"  ... 还有{len(fail_details)-10}条失败记录未显示")
                break


# -------------------------- 批量任务配置(根据需求修改) --------------------------
if __name__ == "__main__":
    INPUT_DIR = "dataset/hongwai_image"  # 输入图像文件夹(例:之前提取的.264帧文件夹)
    OUTPUT_DIR = "dataset/scaled_hongwai"  # 输出图像文件夹(自动创建,无需提前准备)
    SCALE_FACTOR = 0.25                 # 缩放因子(0.25=缩小到1/4,2=放大2倍,3=放大3倍)
    MAX_WORKERS_PER_IMG = 8             # 单张图像处理的线程数(建议设为CPU核心数,如8/16)
    MAX_BATCH_WORKERS = 4               # 同时处理的图像数量(避免内存过载,建议4-8)

    # 执行批量处理(严格保障输入输出文件名一致)
    batch_process_images(
        input_dir=INPUT_DIR,
        output_dir=OUTPUT_DIR,
        scale_factor=SCALE_FACTOR,
        max_workers_per_img=MAX_WORKERS_PER_IMG,
        max_batch_workers=MAX_BATCH_WORKERS
    )

批量RGB转单通道脚本

按需使用吧

import os
from PIL import Image


def convert_to_gray(input_dir, output_dir):
    """
    将输入目录中的所有图片转换为单通道灰度图,并保存到输出目录

    参数:
    input_dir: 输入图片所在的文件夹路径
    output_dir: 处理后图片的保存路径
    """
    # 确保输出目录存在,如果不存在则创建
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # 获取输入目录中所有的文件
    file_list = os.listdir(input_dir)

    # 支持的图片格式
    supported_formats = ('.jpg', '.jpeg', '.png', '.bmp', '.gif', '.tiff')

    # 遍历所有文件
    for filename in file_list:
        # 检查文件是否为图片
        if filename.lower().endswith(supported_formats):
            try:
                # 构建完整的文件路径
                input_path = os.path.join(input_dir, filename)

                # 打开图像
                img = Image.open(input_path)

                # 转换为单通道灰度图
                gray_img = img.convert('L')  # 'L'表示8位灰度,单通道

                # 构建输出文件路径
                output_filename = f"{filename}"  # 在原文件名前加上gray_前缀
                output_path = os.path.join(output_dir, output_filename)

                # 保存单通道图像
                gray_img.save(output_path)

                print(f"已处理: {filename} -> {output_filename}")

            except Exception as e:
                print(f"处理 {filename} 时出错: {str(e)}")


# 示例用法
if __name__ == "__main__":
    # 输入文件夹路径 - 请根据实际情况修改
    input_directory = "C:\\Users\\27879\\jiangguolong\\dataset\\DIV2K\\DIV2K\\DIV2K_train_LR_bicubic\\X3"

    # 输出文件夹路径 - 请根据实际情况修改
    output_directory = "C:\\Users\\27879\\jiangguolong\\dataset\\DIV2K_single\\DIV2K_train_LR_bicubic\\X3"

    # 执行转换
    convert_to_gray(input_directory, output_directory)
    print("所有图片处理完成!")

PS:打开.264视频的脚本

import cv2
import argparse
import time


def play_h264_file(file_path, fps=25):
    """
    播放H.264格式视频文件

    参数:
    file_path (str): .264文件路径
    fps (int): 视频帧率,默认25fps
    """
    # 使用FFmpeg后端打开H.264文件
    # 需要系统中安装FFmpeg并确保OpenCV编译时支持FFmpeg
    cap = cv2.VideoCapture(file_path, cv2.CAP_FFMPEG)

    # 检查是否成功打开文件
    if not cap.isOpened():
        print(f"无法打开文件: {file_path}")
        print("请确保已安装FFmpeg并正确配置OpenCV支持FFmpeg")
        return

    # 计算每帧之间的延迟时间(毫秒)
    frame_delay = int(1000 / fps)

    print("正在播放H.264视频...")
    print("按 'q' 键退出播放")

    while True:
        # 读取一帧
        ret, frame = cap.read()

        # 检查是否读取成功
        if not ret:
            print("视频播放完毕或读取失败")
            break

        # 显示帧
        cv2.imshow('H.264 Player', frame)

        # 控制播放速度并检查按键
        if cv2.waitKey(frame_delay) & 0xFF == ord('q'):
            print("用户退出播放")
            break

    # 释放资源
    cap.release()
    cv2.destroyAllWindows()


if __name__ == "__main__":
    # 设置命令行参数
    parser = argparse.ArgumentParser(description='播放H.264格式视频文件')
    parser.add_argument('file_path', help='.264文件的路径')
    parser.add_argument('--fps', type=int, default=25, help='视频帧率,默认25fps')

    # 解析参数
    args = parser.parse_args()

    # 播放视频
    play_h264_file(args.file_path, args.fps)

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐