主体代码

import os
import numpy as np
from PIL import Image
import cv2
from tqdm import tqdm
from torch.utils.data import Dataset
import torch
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
SRC_ROOT   = "data/stage1_train"        # 原始 data 根目录
OUT_IMG_DIR  = "processed/images"       # 处理后保存的位置
OUT_MASK_DIR = "processed/masks"        # 合并后 mask 保存位置
TARGET_SIZE  = (256, 256)               # 统一输出尺寸


def ensure_dir(path):
    """若目录不存在则创建"""
    if not os.path.exists(path): #判断路径是否存在
        os.makedirs(path) #创建路径

def read_and_resize(img_path, size):
    """读取并缩放图像到指定尺寸,返回 numpy 数组"""
    img = Image.open(img_path).convert("RGB") #Image.open 读取图片文件 .convert将数据改为所需的格式
    img = img.resize(size, Image.BILINEAR) #.resize(size, Image.BILINEAR) 缩放图片的大小 保证格式相同
    return np.array(img) #np.array 将数据的格式改为数组
#插值(interpolation):缩放图像时需要“估算”新像素值的方法。常见方式有:最近邻 (NEAREST),双线性插值 (BILINEAR),双三次插值 (BICUBIC)

def merge_instance_masks(mask_folder, size):
    """将一个病例下的所有 instance-mask 合并成一张二值 mask"""
    final = np.zeros(size[::-1], dtype=np.uint8)  # (H, W) 创建空的掩膜
    for fname in os.listdir(mask_folder): #os.listdir(...) 遍历文件 读取所有的mask图片
        if not fname.lower().endswith((".png", ".jpg", ".tif")): #跳过非图片文件 .endswith判断文件格式
            continue
        m = Image.open(os.path.join(mask_folder, fname)).convert("L") #.convert 将数据改为单通道数据 黑白图片
        m = m.resize(size, Image.NEAREST)          # 保持 label 不插值
        m = np.array(m)
        m = (m > 0).astype(np.uint8)               # 转成 0/1 二值化 (m > 0)布尔判断 生成一个和m形同形状的数组 并比较比0大的就是1 其他的就是0
        final = np.maximum(final, m)   #将符合条件的masks合并             # 像素级取最大值
    return final * 255      #最后输出的值为255\0 方便保存                       # 保存成 0/255

def process_one_case(case_dir, out_img_dir, out_mask_dir, size):
    """处理单个病例,输出 image.png & mask.png"""
    # 1. 读取原图(每个 images 文件夹只含一张)
    img_folder = os.path.join(case_dir, "images")
    #拼接路径,得到某病例的“images”文件夹路径
    img_name   = os.listdir(img_folder)[0]
    img        = read_and_resize(os.path.join(img_folder, img_name), size)

    # 2. 合并 mask
    mask_folder = os.path.join(case_dir, "masks")
    mask        = merge_instance_masks(mask_folder, size)

    # 3. 生成保存路径
    case_id = os.path.basename(case_dir)
    img_out_path  = os.path.join(out_img_dir,  f"{case_id}.png")
    mask_out_path = os.path.join(out_mask_dir, f"{case_id}.png")

    # 4. 保存
    Image.fromarray(img).save(img_out_path)
    Image.fromarray(mask).save(mask_out_path)

def main():
    ensure_dir(OUT_IMG_DIR)
    ensure_dir(OUT_MASK_DIR)

    case_dirs = [os.path.join(SRC_ROOT, d) for d in os.listdir(SRC_ROOT)
                 if os.path.isdir(os.path.join(SRC_ROOT, d))]

    for c in tqdm(case_dirs, desc="Processing cases"):
        process_one_case(c, OUT_IMG_DIR, OUT_MASK_DIR, TARGET_SIZE)

    print(f" 处理完成!{len(case_dirs)} 张图像已保存到 {OUT_IMG_DIR} / {OUT_MASK_DIR}")

if __name__ == "__main__":
    main()

class DSB2018Dataset(Dataset):
    def __init__(self,img_dir,mask_dir,transform=None): #将数据从文件中提取出来
        self.img_dir = img_dir #_dir 是指目录的意思
        self_mask_dir = mask_dir
        self.img_list = os.listdir(img_dir)
        self.transform = transform

    def __len__(self):
        return len(self.img_list) #输出数据集中数据的总数量

    def __getitem__(self, idx):
        img_name = self.img_list[idx] #为图片编码
        img_path = os.path.join(self.img_dir, img_name) #拼接出 图片和掩码的路径
        mask_path = os.path.join(self.mask_dir, img_name) #_path 指文件的路径

        img = Image.open(img_path).convert('RGB') #打开图片 以三通道的格式
        mask = Image.open(mask_path).convert('L')

        if self.transform:
            img = self.transform(img) #判断数据是不是tensor格式
        else:
            img = transforms.ToTensor()(img) #如果不是 转化为tensor形式 并归一化 transforms.ToTensor自带0到255值的归一化

        mask = np.array(mask) #将掩码转化为数组
        mask = (mask > 127).astype(np.float32)  # 二值化阈值127
        mask = torch.from_numpy(mask)  # 转tensor
        mask = mask.unsqueeze(0) #添加一个维度 保证数据的格式一致

        return img, mask

if __name__ == "__main__":
    img_dir = "./processed/images"
    mask_dir = "./processed/masks"
    dataset = DSB2018Dataset(img_dir, mask_dir)

    print(f"样本数量: {len(dataset)}")
    img, mask = dataset[0]
    print(f"图像尺寸: {img.shape}")   # (3, H, W)
    print(f"掩膜尺寸: {mask.shape}")  # (1, H, W)

#卷积模块
class ConvBlock(nn.Module):
    """两层卷积 + BN + ReLU"""
    def __init__(self, in_ch, out_ch):
        super().__init__()
        self.conv1 = nn.Conv2d(in_ch, out_ch, 3, padding=1)
        self.bn1 = nn.BatchNorm2d(out_ch)
        self.conv2 = nn.Conv2d(out_ch, out_ch, 3, padding=1)
        self.bn2 = nn.BatchNorm2d(out_ch)
    #前向传播
    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        return x


#由于模型主体太繁琐 这里使用了循环和递归 是函数更加简洁
class UNetPlusPlus(nn.Module):
    def __init__(self, in_channels=3, out_channels=1, filters=[64,128,256,512,1024]):
        super().__init__()
        self.depth = 4  # 编码器深度
        self.filters = filters #通道数

        # 编码器卷积块
        self.conv0_0 = ConvBlock(in_channels, filters[0])
        self.conv_blocks = nn.ModuleDict() #nn.ModuleDict() 用来储存模型的神经层
        for i in range(1, self.depth+1):
            self.conv_blocks[f'conv{i}_0'] = ConvBlock(filters[i-1], filters[i])

        # 解码器卷积块,动态存储所有跳跃连接的卷积层
        self.decoder_blocks = nn.ModuleDict()
        for d in range(self.depth):
            for s in range(1, self.depth - d + 1):
                in_ch = filters[d] * (s + 1)  # 组合通道数,s+1个输入拼接
                out_ch = filters[d]
                self.decoder_blocks[f'conv{d}_{s}'] = ConvBlock(in_ch, out_ch)

        # 输出层
        self.final = nn.Conv2d(filters[0], out_channels, kernel_size=1)

    def forward(self, x):
        # 编码器
        x_dict = {}
        x_dict[(0,0)] = self.conv0_0(x)
        for i in range(1, self.depth+1):
            x_dict[(i,0)] = self.conv_blocks[f'conv{i}_0'](
                F.max_pool2d(x_dict[(i-1,0)], 2)
            )

        # 解码器 动态计算
        for s in range(1, self.depth+1):  # stage层数
            for d in range(self.depth - s + 1):  # 深度
                upsamples = [F.interpolate(x_dict[(d+1, s-1)], scale_factor=2, mode='bilinear', align_corners=True)] #F.interpolate 上卷积 一个函数方法可以将数据转化为指定的通道大小 使高卷积层与低放卷积层的数据格式一致,方便拼接
                for k in range(s):
                    upsamples.append(x_dict[(d, k)])
                x_dict[(d,s)] = self.decoder_blocks[f'conv{d}_{s}'](torch.cat(upsamples, dim=1))

        # 输出
        out = self.final(x_dict[(0, self.depth)])
        return out

#定义模型 损失函数 优化器
model = UNetPlusPlus()
criterion = nn.BCEWithLogitsLoss()  # 二分类交叉熵(带 sigmoid)
optimizer = optim.Adam(model.parameters(), lr=1e-4)

def train_one_epoch(model , dataloader , criterion , optimizer , device):
    model.train()
    running_loss = 0.0

    for images , masks in dataloader:
        images = images.to(device, dtype = torch.float32)
        masks = masks.to(device , dtype = torch.float32)

        outputs = model(images)
        loss = criterion(outputs,masks)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        epoch_loss = running_loss / len(dataloader.dataset)
        return epoch_loss

def validate(model, dataloader, criterion, device):
    model.eval()  # 设置为验证模式
    running_loss = 0.0

    with torch.no_grad():
        for images, masks in dataloader:
            images = images.to(device, dtype=torch.float32)
            masks = masks.to(device, dtype=torch.float32)

            outputs = model(images)
            loss = criterion(outputs, masks)

            running_loss += loss.item() * images.size(0)

    epoch_loss = running_loss / len(dataloader.dataset)
    return epoch_loss



def train_unetpp(model, train_dataset, val_dataset, batch_size, num_epochs, device, model_save_path):
    # 使用 DataLoader 加载数据
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

    # 初始化损失函数和优化器
    criterion = nn.BCEWithLogitsLoss()
    optimizer = optim.Adam(model.parameters(), lr=1e-4)

    # 学习率调整器
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)

    best_val_loss = float('inf')
    best_model_wts = None

    # 训练和验证循环
    for epoch in range(num_epochs):
        print(f"Epoch {epoch + 1}/{num_epochs}")

        # 1. 训练阶段
        train_loss = train_one_epoch(model, train_loader, criterion, optimizer, device)
        print(f"Train Loss: {train_loss:.4f}")

        # 2. 验证阶段
        val_loss = validate(model, val_loader, criterion, device)
        print(f"Validation Loss: {val_loss:.4f}")

        # 3. 保存最优模型
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_model_wts = model.state_dict()
            torch.save(best_model_wts, model_save_path)
            print(f"Best model saved at epoch {epoch + 1}.")

     
        scheduler.step()

    print(f"Training complete. Best validation loss: {best_val_loss:.4f}")
总结经验: 这个数据集比较复杂,处理这个数据集时,学习到了掩膜合并的相关知识和代码,由于UNet++这个模型的复杂性,每层解码器都需要和多层编码器拼接,导致代码写起来太过冗杂,所以用AI询问了一些方法和代码,使用递归算法简化了模型主体的代码。同时还学习到了新的损失函数Dice + BCE , IoU Loss的简单原理和代码的实现,优化模型的性能。目前通过复现这些模型,使我对训练搭建模型的流程更加熟练,同时对一些模型代码中常用的函数方法也更加熟悉。虽然说目前只靠自己搭建一个模型还是有些困难,但是模型的代码已经大部分都能看懂了。也总结出了在实际项目中搭建训练模型的具体流程,大概是先明确项目的目的和要求,再观察数据集,对数据集进行预处理,搭建模型的主体,选择合适的优化器和损失函数,模型训练过程和中间量的可视化。通过观察到的数据变化,再选择不同的优化方法和自定义模块 最后得到的就是训练好的能应用到实际项目中的模型。
Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐