第 8 章:完整项目实战¶
核心比喻:项目 = 烹饪全流程 —— 从买菜(数据准备)到切菜(预处理)到烹饪(训练)到摆盘(评估可视化),每一步都有讲究。
8.1 项目概述¶
本章将带你完成一个 完整的图像分类项目:在 CIFAR-10 数据集上训练一个 CNN 分类器。
项目目标
- 数据集:CIFAR-10(10 类彩色图像,50000 训练 + 10000 测试)
- 模型:自定义 CNN(约 100 万参数)
- 目标准确率:> 85%(测试集)
- 技术栈:PyTorch + torchvision + matplotlib
8.2 完整项目代码¶
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, random_split
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
import numpy as np
from tqdm import tqdm
import os
# ============================================================
# 1. 配置与超参数
# ============================================================
class Config:
# 数据
DATA_DIR = './data'
NUM_CLASSES = 10
IMG_SIZE = 32
# 训练
BATCH_SIZE = 128
NUM_EPOCHS = 30
LEARNING_RATE = 0.001
WEIGHT_DECAY = 1e-4
# 系统
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
NUM_WORKERS = 0 # Windows 设为 0
SEED = 42
# 路径
SAVE_DIR = './checkpoints'
BEST_MODEL_PATH = os.path.join(SAVE_DIR, 'best_model.pth')
torch.manual_seed(Config.SEED)
np.random.seed(Config.SEED)
print(f"使用设备: {Config.DEVICE}")
if torch.cuda.is_available():
print(f"GPU: {torch.cuda.get_device_name(0)}")
# ============================================================
# 2. 数据准备
# ============================================================
# 训练集数据增强
train_transform = transforms.Compose([
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
transforms.ToTensor(),
transforms.Normalize(
mean=[0.4914, 0.4822, 0.4465],
std=[0.2470, 0.2435, 0.2616]
),
])
# 测试集只做标准化
test_transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(
mean=[0.4914, 0.4822, 0.4465],
std=[0.2470, 0.2435, 0.2616]
),
])
# 加载数据集
full_train_dataset = datasets.CIFAR10(
root=Config.DATA_DIR, train=True, download=True, transform=train_transform
)
test_dataset = datasets.CIFAR10(
root=Config.DATA_DIR, train=False, download=True, transform=test_transform
)
# 划分训练集和验证集(90% 训练,10% 验证)
train_size = int(0.9 * len(full_train_dataset))
val_size = len(full_train_dataset) - train_size
train_dataset, val_dataset = random_split(full_train_dataset, [train_size, val_size])
# 验证集使用测试集的 transform
val_dataset.dataset.transform = test_transform
# 创建 DataLoader
train_loader = DataLoader(
train_dataset, batch_size=Config.BATCH_SIZE,
shuffle=True, num_workers=Config.NUM_WORKERS, pin_memory=True
)
val_loader = DataLoader(
val_dataset, batch_size=Config.BATCH_SIZE,
shuffle=False, num_workers=Config.NUM_WORKERS, pin_memory=True
)
test_loader = DataLoader(
test_dataset, batch_size=Config.BATCH_SIZE,
shuffle=False, num_workers=Config.NUM_WORKERS, pin_memory=True
)
print(f"\n训练集: {len(train_dataset)} 样本")
print(f"验证集: {len(val_dataset)} 样本")
print(f"测试集: {len(test_dataset)} 样本")
# CIFAR-10 类别名称
CLASSES = ['airplane', 'automobile', 'bird', 'cat', 'deer',
'dog', 'frog', 'horse', 'ship', 'truck']
# ============================================================
# 3. 模型定义
# ============================================================
class ResidualBlock(nn.Module):
"""残差块:解决深层网络退化问题"""
def __init__(self, in_channels, out_channels, stride=1):
super(ResidualBlock, self).__init__()
self.conv1 = nn.Conv2d(
in_channels, out_channels, kernel_size=3,
stride=stride, padding=1, bias=False
)
self.bn1 = nn.BatchNorm2d(out_channels)
self.conv2 = nn.Conv2d(
out_channels, out_channels, kernel_size=3,
stride=1, padding=1, bias=False
)
self.bn2 = nn.BatchNorm2d(out_channels)
# shortcut 连接(当维度不匹配时使用 1x1 卷积调整)
self.shortcut = nn.Sequential()
if stride != 1 or in_channels != out_channels:
self.shortcut = nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size=1,
stride=stride, bias=False),
nn.BatchNorm2d(out_channels)
)
def forward(self, x):
out = F.relu(self.bn1(self.conv1(x)))
out = self.bn2(self.conv2(out))
out += self.shortcut(x)
out = F.relu(out)
return out
class CIFAR10Classifier(nn.Module):
"""基于残差块的 CNN 分类器"""
def __init__(self, num_classes=10):
super(CIFAR10Classifier, self).__init__()
# 初始卷积层
self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(64)
# 残差层
self.layer1 = self._make_layer(64, 64, num_blocks=2, stride=1)
self.layer2 = self._make_layer(64, 128, num_blocks=2, stride=2)
self.layer3 = self._make_layer(128, 256, num_blocks=2, stride=2)
# 分类头
self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
self.fc = nn.Linear(256, num_classes)
# 初始化
self._initialize_weights()
def _make_layer(self, in_channels, out_channels, num_blocks, stride):
layers = []
layers.append(ResidualBlock(in_channels, out_channels, stride))
for _ in range(1, num_blocks):
layers.append(ResidualBlock(out_channels, out_channels, 1))
return nn.Sequential(*layers)
def _initialize_weights(self):
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
elif isinstance(m, nn.BatchNorm2d):
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.Linear):
nn.init.normal_(m.weight, 0, 0.01)
nn.init.constant_(m.bias, 0)
def forward(self, x):
x = F.relu(self.bn1(self.conv1(x)))
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.avgpool(x)
x = x.view(x.size(0), -1)
x = self.fc(x)
return x
model = CIFAR10Classifier(num_classes=Config.NUM_CLASSES).to(Config.DEVICE)
# 统计参数量
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"\n模型参数量: {total_params:,} (可训练: {trainable_params:,})")
# ============================================================
# 4. 训练配置
# ============================================================
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(
model.parameters(),
lr=Config.LEARNING_RATE,
weight_decay=Config.WEIGHT_DECAY
)
# 学习率调度:余弦退火
scheduler = optim.lr_scheduler.CosineAnnealingLR(
optimizer, T_max=Config.NUM_EPOCHS
)
# 创建保存目录
os.makedirs(Config.SAVE_DIR, exist_ok=True)
# ============================================================
# 5. 训练与评估函数
# ============================================================
def train_epoch(model, loader, criterion, optimizer, device):
model.train()
running_loss = 0.0
correct = 0
total = 0
pbar = tqdm(loader, desc='Training', leave=False)
for data, target in pbar:
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
# 梯度裁剪
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()
running_loss += loss.item()
_, predicted = output.max(1)
total += target.size(0)
correct += predicted.eq(target).sum().item()
pbar.set_postfix({
'loss': f'{loss.item():.3f}',
'acc': f'{100.*correct/total:.1f}%'
})
return running_loss / len(loader), 100. * correct / total
@torch.no_grad()
def evaluate(model, loader, criterion, device):
model.eval()
running_loss = 0.0
correct = 0
total = 0
for data, target in loader:
data, target = data.to(device), target.to(device)
output = model(data)
loss = criterion(output, target)
running_loss += loss.item()
_, predicted = output.max(1)
total += target.size(0)
correct += predicted.eq(target).sum().item()
return running_loss / len(loader), 100. * correct / total
# ============================================================
# 6. 训练循环
# ============================================================
print("\n" + "="*60)
print("开始训练")
print("="*60)
history = {
'train_loss': [], 'train_acc': [],
'val_loss': [], 'val_acc': [],
'lr': []
}
best_val_acc = 0.0
for epoch in range(Config.NUM_EPOCHS):
current_lr = optimizer.param_groups[0]['lr']
# 训练
train_loss, train_acc = train_epoch(
model, train_loader, criterion, optimizer, Config.DEVICE
)
# 验证
val_loss, val_acc = evaluate(
model, val_loader, criterion, Config.DEVICE
)
# 学习率调度
scheduler.step()
# 记录历史
history['train_loss'].append(train_loss)
history['train_acc'].append(train_acc)
history['val_loss'].append(val_loss)
history['val_acc'].append(val_acc)
history['lr'].append(current_lr)
# 保存最佳模型
if val_acc > best_val_acc:
best_val_acc = val_acc
torch.save({
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'val_acc': val_acc,
}, Config.BEST_MODEL_PATH)
save_marker = ' *'
else:
save_marker = ''
print(f"Epoch {epoch+1:2d}/{Config.NUM_EPOCHS} | "
f"LR: {current_lr:.6f} | "
f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.2f}% | "
f"Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.2f}%{save_marker}")
print(f"\n最佳验证准确率: {best_val_acc:.2f}%")
# ============================================================
# 7. 测试评估
# ============================================================
print("\n" + "="*60)
print("测试集评估")
print("="*60)
# 加载最佳模型
checkpoint = torch.load(Config.BEST_MODEL_PATH, map_location=Config.DEVICE)
model.load_state_dict(checkpoint['model_state_dict'])
test_loss, test_acc = evaluate(
model, test_loader, criterion, Config.DEVICE
)
print(f"测试集 Loss: {test_loss:.4f}")
print(f"测试集 Accuracy: {test_acc:.2f}%")
# ============================================================
# 8. 结果可视化
# ============================================================
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# 损失曲线
axes[0, 0].plot(history['train_loss'], label='Train Loss', color='#2196F3')
axes[0, 0].plot(history['val_loss'], label='Val Loss', color='#FF9800')
axes[0, 0].set_xlabel('Epoch')
axes[0, 0].set_ylabel('Loss')
axes[0, 0].set_title('训练与验证损失曲线')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# 准确率曲线
axes[0, 1].plot(history['train_acc'], label='Train Acc', color='#2196F3')
axes[0, 1].plot(history['val_acc'], label='Val Acc', color='#FF9800')
axes[0, 1].axhline(y=test_acc, color='#4CAF50', linestyle='--', label=f'Test Acc ({test_acc:.1f}%)')
axes[0, 1].set_xlabel('Epoch')
axes[0, 1].set_ylabel('Accuracy (%)')
axes[0, 1].set_title('训练与验证准确率曲线')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# 学习率曲线
axes[1, 0].plot(history['lr'], color='#9C27B0')
axes[1, 0].set_xlabel('Epoch')
axes[1, 0].set_ylabel('Learning Rate')
axes[1, 0].set_title('学习率变化(余弦退火)')
axes[1, 0].grid(True, alpha=0.3)
# 预测示例可视化
model.eval()
test_images, test_labels = next(iter(test_loader))
test_images = test_images[:16].to(Config.DEVICE)
with torch.no_grad():
outputs = model(test_images)
_, predicted = outputs.max(1)
# 反标准化
mean = torch.tensor([0.4914, 0.4822, 0.4465]).view(3, 1, 1)
std = torch.tensor([0.2470, 0.2435, 0.2616]).view(3, 1, 1)
test_images_denorm = test_images.cpu() * std + mean
test_images_denorm = test_images_denorm.clamp(0, 1)
for i in range(16):
ax = axes[1, 1] if i == 0 else axes[1, 1]
axes[1, 1].axis('off')
axes[1, 1].set_title(f'测试集准确率: {test_acc:.1f}%')
plt.tight_layout()
plt.savefig('training_results.png', dpi=150, bbox_inches='tight')
plt.show()
print(f"\n训练结果图已保存为 training_results.png")
print(f"最佳模型已保存至 {Config.BEST_MODEL_PATH}")
8.3 项目结构总结¶
cifar10_project/
├── data/ # 数据集(自动下载)
├── checkpoints/ # 模型检查点
│ └── best_model.pth # 最佳模型
├── train.py # 训练脚本(上面的完整代码)
├── inference.py # 推理脚本
└── training_results.png # 训练结果可视化
8.4 推理脚本¶
# inference.py - 使用训练好的模型进行单张图像推理
import torch
from PIL import Image
from torchvision import transforms
def predict_image(image_path, model, device, classes):
"""对单张图像进行预测"""
transform = transforms.Compose([
transforms.Resize((32, 32)),
transforms.ToTensor(),
transforms.Normalize(
mean=[0.4914, 0.4822, 0.4465],
std=[0.2470, 0.2435, 0.2616]
),
])
image = Image.open(image_path).convert('RGB')
image_tensor = transform(image).unsqueeze(0).to(device)
model.eval()
with torch.no_grad():
output = model(image_tensor)
probabilities = torch.softmax(output, dim=1)
confidence, predicted = probabilities.max(1)
return classes[predicted.item()], confidence.item()
# 使用示例
"""
model = CIFAR10Classifier().to(device)
checkpoint = torch.load('checkpoints/best_model.pth')
model.load_state_dict(checkpoint['model_state_dict'])
class_name, confidence = predict_image('cat.jpg', model, device, CLASSES)
print(f'预测类别: {class_name}, 置信度: {confidence:.2%}')
"""
print("推理脚本结构已展示")
8.5 项目要点回顾¶
项目中的最佳实践
- 配置集中管理:使用
Config类统一管理超参数 - 数据增强:训练集使用随机翻转、裁剪、颜色抖动
- 残差连接:使用 Residual Block 解决深层网络退化
- 学习率调度:余弦退火让训练后期更稳定
- 梯度裁剪:防止梯度爆炸
- 模型保存:保存最佳验证准确率的模型
- 结果可视化:损失曲线、准确率曲线、预测示例
要点总结¶
- 完整的深度学习项目包含:数据 → 模型 → 训练 → 评估 → 可视化
- 数据增强是提升泛化能力的最有效手段
- 残差连接(Residual Block)让深层网络更容易训练
- 余弦退火学习率调度比固定衰减更平滑
- 始终保存验证集上表现最好的模型
- 训练完成后在测试集上做最终评估
- 可视化训练曲线帮助诊断过拟合/欠拟合
课后练习¶
-
模型改进:尝试增加网络深度(更多 Residual Blocks),观察准确率变化。
-
数据增强实验:移除数据增强(只用 ToTensor + Normalize),观察过拟合程度。
-
迁移学习:使用 torchvision 的预训练 ResNet-18,在 CIFAR-10 上微调,比较与从头训练的效果。
-
模型部署:将训练好的模型导出为 ONNX 格式,尝试用 ONNX Runtime 进行推理。