【图像超分】python实现制作超分辨率数据集
本文介绍了使用Python实现超分辨率数据集制作的方法。主要内容包括:1) 从H.264视频中按指定间隔提取帧的完整代码,使用OpenCV和FFmpeg后端,支持帧间隔设置和错误处理;2) 多线程实现双三次插值图像降采样,可控制缩小倍数(2x/3x/4x),采用三次卷积核函数进行高质量降采样,利用线程池并行处理提高效率。代码提供了完整的参数解析、进度显示和错误处理机制,适合批量处理视频帧生成超分辨
·
【图像超分】python实现制作超分辨率数据集
前言
此博客针对于.264视频,其他视频文件逻辑相同
1、帧截取
将视频文件按照每x帧截取
import cv2
import os
import argparse
from pathlib import Path
def extract_h264_frames(file_path, output_dir, interval=30, fps=25):
"""
从H.264视频文件中按指定间隔提取帧
参数:
file_path (str): .264文件路径
output_dir (str): 输出图片目录
interval (int): 帧间隔,默认30帧
fps (int): 视频帧率,用于进度估算
"""
# 创建输出目录
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
# 使用FFmpeg后端打开H.264文件
cap = cv2.VideoCapture(file_path, cv2.CAP_FFMPEG)
# 检查文件是否成功打开
if not cap.isOpened():
print(f"无法打开H.264文件: {file_path}")
print("请确保已安装FFmpeg并正确配置OpenCV")
return
frame_number = 0
extracted_count = 0
error_count = 0
print(f"开始从 {file_path} 提取帧...")
print(f"帧间隔: {interval},输出目录: {output_dir}")
while True:
# 读取一帧
ret, frame = cap.read()
# 检查是否读取到帧
if not ret:
break
# 每interval帧提取一次
if frame_number % interval == 0:
# 生成文件名
filename = f"frame_{extracted_count:06d}.jpg"
output_path = output_dir / filename
# 保存帧
if cv2.imwrite(str(output_path), frame):
extracted_count += 1
# 每提取10张图片显示一次进度
if extracted_count % 10 == 0:
print(f"已提取 {extracted_count} 张图片")
frame_number += 1
# 错误处理
if frame is None:
error_count += 1
if error_count % 10 == 0:
print(f"已遇到 {error_count} 个错误帧")
# 释放资源
cap.release()
# 输出统计信息
print(f"提取完成!")
print(f"总处理帧数: {frame_number}")
print(f"成功提取图片: {extracted_count} 张")
print(f"错误帧数量: {error_count}")
print(f"所有图片已保存至: {output_dir}")
if __name__ == "__main__":
# 解析命令行参数
parser = argparse.ArgumentParser(description='从H.264视频中按间隔提取帧并保存为图片')
parser.add_argument('file_path', help='.264文件的路径')
parser.add_argument('output_dir', help='保存图片的目录')
parser.add_argument('--interval', type=int, default=30, help='帧间隔,默认30帧')
parser.add_argument('--fps', type=int, default=25, help='视频帧率,默认25fps')
args = parser.parse_args()
# 调用提取函数
extract_h264_frames(args.file_path, args.output_dir, args.interval, args.fps)
多线程实现双三次插值缩小图像分辨率
缩小二倍,三倍,四倍,代码中有变量控制
import numpy as np
from PIL import Image
import math
import time
import os
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed
def cubic_kernel(x, a=-0.5):
"""三次卷积核函数(线程安全)"""
x = abs(x)
if x <= 1:
return (a + 2) * math.pow(x, 3) - (a + 3) * math.pow(x, 2) + 1
elif x < 2:
return a * math.pow(x, 3) - 5 * a * math.pow(x, 2) + 8 * a * x - 4 * a
else:
return 0
def process_single_row(row_idx, img_array, scale_factor, height, width, channels, is_gray):
"""单线程处理一行像素(双三次插值核心)"""
inv_scale = 1.0 / scale_factor
new_width = int(width * scale_factor)
row_pixels = np.zeros((new_width, channels), dtype=np.uint8) if not is_gray else np.zeros(new_width, dtype=np.uint8)
# 计算垂直方向位置与权重
y = row_idx * inv_scale
y0 = math.floor(y)
dy = y - y0
v_weights = [cubic_kernel(1 + dy), cubic_kernel(dy), cubic_kernel(1 - dy), cubic_kernel(2 - dy)]
# 逐列计算像素值
for j in range(new_width):
x = j * inv_scale
x0 = math.floor(x)
dx = x - x0
h_weights = [cubic_kernel(1 + dx), cubic_kernel(dx), cubic_kernel(1 - dx), cubic_kernel(2 - dx)]
if is_gray:
pixel_value = 0.0
for k in range(4):
y_coord = max(0, min(y0 - 1 + k, height - 1))
for l in range(4):
x_coord = max(0, min(x0 - 1 + l, width - 1))
pixel_value += img_array[y_coord, x_coord] * v_weights[k] * h_weights[l]
row_pixels[j] = np.clip(pixel_value, 0, 255).astype(np.uint8)
else:
for c in range(channels):
pixel_value = 0.0
for k in range(4):
y_coord = max(0, min(y0 - 1 + k, height - 1))
for l in range(4):
x_coord = max(0, min(x0 - 1 + l, width - 1))
pixel_value += img_array[y_coord, x_coord, c] * v_weights[k] * h_weights[l]
row_pixels[j, c] = np.clip(pixel_value, 0, 255).astype(np.uint8)
return row_idx, row_pixels
def bicubic_interpolation_multi_thread(image, scale_factor, max_workers=None):
"""多线程双三次插值(单图处理核心)"""
img_array = np.array(image)
if len(img_array.shape) == 3:
height, width, channels = img_array.shape
is_gray = False
else:
height, width = img_array.shape
channels = 1
is_gray = True
new_height = int(height * scale_factor)
new_width = int(width * scale_factor)
output = np.zeros((new_height, new_width), dtype=np.uint8) if is_gray else np.zeros((new_height, new_width, channels), dtype=np.uint8)
# 线程池配置(默认CPU核心数)
max_workers = max_workers or os.cpu_count() or 4
# 并行处理所有行
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(process_single_row, idx, img_array, scale_factor, height, width, channels, is_gray): idx for idx in range(new_height)}
for future in as_completed(futures):
row_idx, row_pixels = future.result()
output[row_idx, :] = row_pixels if is_gray else row_pixels[:, :]
return Image.fromarray(output)
def process_single_image(input_img_path, output_dir, scale_factor, max_workers_per_img=None):
"""
单图处理(核心保障:输出文件名 = 输入文件名)
返回:(处理结果, 输入文件名, 输出文件名, 错误信息)
"""
try:
# 1. 提取输入图像的原始文件名(含后缀,如"frame_001.jpg")
input_filename = os.path.basename(input_img_path)
if not input_filename:
raise ValueError("无法提取输入图像的文件名")
# 2. 读取输入图像
with Image.open(input_img_path) as img:
img_copy = img.copy() # 避免文件句柄占用
img_format = img.format # 保留原格式(确保输出格式与输入一致)
# 3. 执行双三次插值缩放
scaled_img = bicubic_interpolation_multi_thread(img_copy, scale_factor, max_workers_per_img)
# 4. 构建输出路径(输出目录 + 原始文件名,保障名字完全一致)
output_img_path = os.path.join(output_dir, input_filename)
output_filename = os.path.basename(output_img_path)
# 5. 验证:输出文件名必须与输入文件名完全一致(强化保障)
if output_filename != input_filename:
raise RuntimeError(f"文件名不一致错误!输入:{input_filename},输出:{output_filename}")
# 6. 保存图像(保留原格式,避免格式转换导致的文件名后缀变化)
scaled_img.save(output_img_path, format=img_format)
return (True, input_filename, output_filename, "")
except Exception as e:
input_filename = os.path.basename(input_img_path) or "未知文件名"
return (False, input_filename, "", str(e)[:150]) # 截取错误信息,避免日志过长
def batch_process_images(input_dir, output_dir, scale_factor, max_workers_per_img=None, max_batch_workers=4):
"""
批量处理文件夹图像(核心规则:输入输出文件名完全一致)
"""
# 1. 基础合法性检查
if not os.path.isdir(input_dir):
print(f"❌ 输入路径不存在或不是文件夹:{input_dir}")
return
# 2. 创建输出目录(确保目录存在,避免保存失败)
os.makedirs(output_dir, exist_ok=True)
print(f"✅ 输出目录已准备:{os.path.abspath(output_dir)}")
print(f"📌 核心规则:输出图像文件名 = 输入图像文件名(含后缀)")
# 3. 筛选支持的图像文件(避免处理非图像文件)
supported_formats = (".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".gif") # 可扩展
img_files = []
for root, _, files in os.walk(input_dir): # 遍历所有子文件夹
for file in files:
if file.lower().endswith(supported_formats):
img_files.append(os.path.join(root, file))
# 4. 检查是否有可处理的图像
if len(img_files) == 0:
print(f"❌ 输入文件夹中未找到支持的图像文件(支持格式:{supported_formats})")
return
print(f"📊 批量任务信息:共{len(img_files)}张图像 | 缩放因子:{scale_factor}x")
print(f"🔧 线程配置:单图线程数={max_workers_per_img or os.cpu_count() or 4} | 并行处理图数={max_batch_workers}")
# 5. 批量并行处理(外层控制同时处理的图像数,避免内存过载)
start_total = time.time()
success_count = 0
fail_count = 0
fail_details = []
print(f"\n🚀 开始批量处理(输入输出文件名严格一致)...")
with ThreadPoolExecutor(max_workers=max_batch_workers) as batch_executor:
# 提交所有图像处理任务
futures = [batch_executor.submit(
process_single_image,
input_img_path=img_path,
output_dir=output_dir,
scale_factor=scale_factor,
max_workers_per_img=max_workers_per_img
) for img_path in img_files]
# 实时跟踪进度
with tqdm(total=len(futures), desc="批量处理进度", unit="张", ncols=100) as pbar:
for future in as_completed(futures):
success, input_name, output_name, error_msg = future.result()
if success:
success_count += 1
pbar.set_postfix({"当前成功": input_name}) # 显示当前成功的文件名
else:
fail_count += 1
fail_details.append((input_name, error_msg))
pbar.update(1)
# 6. 输出最终报告(明确显示文件名一致性情况)
total_time = time.time() - start_total
print(f"\n📋 批量处理完成!")
print(f"⏱️ 总耗时:{total_time:.2f}秒")
print(f"✅ 成功处理:{success_count}张(所有输出文件名与输入完全一致)")
print(f"❌ 处理失败:{fail_count}张")
# 显示失败详情(便于排查问题)
if fail_count > 0:
print(f"\n❌ 失败详情:")
for idx, (img_name, err) in enumerate(fail_details, 1):
print(f" {idx}. 文件名:{img_name} | 错误:{err}")
if idx >= 10: # 最多显示10条,避免日志过长
print(f" ... 还有{len(fail_details)-10}条失败记录未显示")
break
# -------------------------- 批量任务配置(根据需求修改) --------------------------
if __name__ == "__main__":
INPUT_DIR = "dataset/hongwai_image" # 输入图像文件夹(例:之前提取的.264帧文件夹)
OUTPUT_DIR = "dataset/scaled_hongwai" # 输出图像文件夹(自动创建,无需提前准备)
SCALE_FACTOR = 0.25 # 缩放因子(0.25=缩小到1/4,2=放大2倍,3=放大3倍)
MAX_WORKERS_PER_IMG = 8 # 单张图像处理的线程数(建议设为CPU核心数,如8/16)
MAX_BATCH_WORKERS = 4 # 同时处理的图像数量(避免内存过载,建议4-8)
# 执行批量处理(严格保障输入输出文件名一致)
batch_process_images(
input_dir=INPUT_DIR,
output_dir=OUTPUT_DIR,
scale_factor=SCALE_FACTOR,
max_workers_per_img=MAX_WORKERS_PER_IMG,
max_batch_workers=MAX_BATCH_WORKERS
)
批量RGB转单通道脚本
按需使用吧
import os
from PIL import Image
def convert_to_gray(input_dir, output_dir):
"""
将输入目录中的所有图片转换为单通道灰度图,并保存到输出目录
参数:
input_dir: 输入图片所在的文件夹路径
output_dir: 处理后图片的保存路径
"""
# 确保输出目录存在,如果不存在则创建
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# 获取输入目录中所有的文件
file_list = os.listdir(input_dir)
# 支持的图片格式
supported_formats = ('.jpg', '.jpeg', '.png', '.bmp', '.gif', '.tiff')
# 遍历所有文件
for filename in file_list:
# 检查文件是否为图片
if filename.lower().endswith(supported_formats):
try:
# 构建完整的文件路径
input_path = os.path.join(input_dir, filename)
# 打开图像
img = Image.open(input_path)
# 转换为单通道灰度图
gray_img = img.convert('L') # 'L'表示8位灰度,单通道
# 构建输出文件路径
output_filename = f"{filename}" # 在原文件名前加上gray_前缀
output_path = os.path.join(output_dir, output_filename)
# 保存单通道图像
gray_img.save(output_path)
print(f"已处理: {filename} -> {output_filename}")
except Exception as e:
print(f"处理 {filename} 时出错: {str(e)}")
# 示例用法
if __name__ == "__main__":
# 输入文件夹路径 - 请根据实际情况修改
input_directory = "C:\\Users\\27879\\jiangguolong\\dataset\\DIV2K\\DIV2K\\DIV2K_train_LR_bicubic\\X3"
# 输出文件夹路径 - 请根据实际情况修改
output_directory = "C:\\Users\\27879\\jiangguolong\\dataset\\DIV2K_single\\DIV2K_train_LR_bicubic\\X3"
# 执行转换
convert_to_gray(input_directory, output_directory)
print("所有图片处理完成!")
PS:打开.264视频的脚本
import cv2
import argparse
import time
def play_h264_file(file_path, fps=25):
"""
播放H.264格式视频文件
参数:
file_path (str): .264文件路径
fps (int): 视频帧率,默认25fps
"""
# 使用FFmpeg后端打开H.264文件
# 需要系统中安装FFmpeg并确保OpenCV编译时支持FFmpeg
cap = cv2.VideoCapture(file_path, cv2.CAP_FFMPEG)
# 检查是否成功打开文件
if not cap.isOpened():
print(f"无法打开文件: {file_path}")
print("请确保已安装FFmpeg并正确配置OpenCV支持FFmpeg")
return
# 计算每帧之间的延迟时间(毫秒)
frame_delay = int(1000 / fps)
print("正在播放H.264视频...")
print("按 'q' 键退出播放")
while True:
# 读取一帧
ret, frame = cap.read()
# 检查是否读取成功
if not ret:
print("视频播放完毕或读取失败")
break
# 显示帧
cv2.imshow('H.264 Player', frame)
# 控制播放速度并检查按键
if cv2.waitKey(frame_delay) & 0xFF == ord('q'):
print("用户退出播放")
break
# 释放资源
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
# 设置命令行参数
parser = argparse.ArgumentParser(description='播放H.264格式视频文件')
parser.add_argument('file_path', help='.264文件的路径')
parser.add_argument('--fps', type=int, default=25, help='视频帧率,默认25fps')
# 解析参数
args = parser.parse_args()
# 播放视频
play_h264_file(args.file_path, args.fps)
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐
所有评论(0)