feat: 添加强化学习项目报告及重构课程作业报告代码结构

- 新增强化学习个人项目报告,包含基于PyTorch从零实现的PPO算法
- 重构课程作业报告代码结构,提取运行时路径管理和notebook执行逻辑到独立模块
- 更新依赖文件requirements.txt,添加强化学习相关依赖
- 简化模型比较结果表格,仅保留基线逻辑回归模型数据
This commit is contained in:
2026-04-30 16:54:41 +08:00
parent 6ac02ba4fe
commit d353133b31
21 changed files with 1639 additions and 102 deletions
@@ -0,0 +1,6 @@
"""PPO Agent for CarRacing-v3 environment."""
from .network import Actor, Critic
from .replay_buffer import RolloutBuffer
from .trainer import PPOTrainer
__all__ = ['Actor', 'Critic', 'RolloutBuffer', 'PPOTrainer']
@@ -0,0 +1,92 @@
"""Evaluation script for trained PPO agent."""
import torch
import numpy as np
import gymnasium as gym
from src.utils import make_env, get_device
from src.network import Actor, Critic
def evaluate(actor, env, num_episodes=10, device=torch.device("cpu")):
"""Evaluate actor and return average return."""
actor.eval()
returns = []
for ep in range(num_episodes):
obs, _ = env.reset()
obs = np.transpose(obs, (1, 2, 0)) # (C, H, W) -> (H, W, C) for storage
total_reward = 0
done = False
steps = 0
while not done and steps < 1000:
with torch.no_grad():
# Convert to tensor (B, C, H, W)
obs_t = torch.from_numpy(obs).float().unsqueeze(0).permute(0, 3, 1, 2).to(device)
mu, std = actor(obs_t)
# Sample action
dist = torch.distributions.Normal(mu, std)
action = dist.sample()
action = torch.clamp(action, -1, 1).squeeze(0).cpu().numpy()
obs, reward, terminated, truncated, _ = env.step(action)
# Convert to (C, H, W) format
obs = np.transpose(obs, (1, 2, 0))
total_reward += reward
done = terminated or truncated
steps += 1
returns.append(total_reward)
print(f"Episode {ep+1}/{num_episodes}: return={total_reward:.1f}, steps={steps}")
actor.train()
return np.mean(returns), np.std(returns)
def evaluate_render(actor, env, device):
"""Render and evaluate agent with visualization."""
actor.eval()
obs, _ = env.reset()
obs = np.transpose(obs, (1, 2, 0))
env.render_mode = "human"
done = False
total_reward = 0
while not done:
with torch.no_grad():
obs_t = torch.from_numpy(obs).float().unsqueeze(0).permute(0, 3, 1, 2).to(device)
mu, std = actor(obs_t)
dist = torch.distributions.Normal(mu, std)
action = dist.sample()
action = torch.clamp(action, -1, 1).squeeze(0).cpu().numpy()
obs, reward, terminated, truncated, _ = env.step(action)
obs = np.transpose(obs, (1, 2, 0))
total_reward += reward
done = terminated or truncated
env.render()
actor.train()
print(f"Final return: {total_reward:.1f}")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--model", type=str, required=True, help="Path to trained model")
parser.add_argument("--episodes", type=int, default=5, help="Number of evaluation episodes")
args = parser.parse_args()
device = get_device()
env = make_env()
actor = Actor().to(device)
critic = Critic().to(device)
# Load model
checkpoint = torch.load(args.model, map_location=device, weights_only=False)
actor.load_state_dict(checkpoint["actor"])
print(f"Loaded model from {args.model}")
mean_return, std_return = evaluate(actor, env, num_episodes=args.episodes, device=device)
print(f"\nEvaluation: mean={mean_return:.2f}, std={std_return:.2f}")
@@ -0,0 +1,78 @@
"""Neural network architectures for Actor and Critic."""
import torch
import torch.nn as nn
import torch.nn.functional as F
class Actor(nn.Module):
"""Actor network outputting Gaussian policy parameters (mu, sigma)."""
def __init__(self, state_shape=(84, 84, 4), action_dim=3):
super().__init__()
c, h, w = state_shape[2], state_shape[0], state_shape[1] # channels, height, width
self.conv = nn.Sequential(
nn.Conv2d(c, 32, kernel_size=8, stride=4),
nn.ReLU(),
nn.Conv2d(32, 64, kernel_size=4, stride=2),
nn.ReLU(),
nn.Conv2d(64, 64, kernel_size=3, stride=1),
nn.ReLU(),
)
# Calculate feature map size: 84x84 -> 20x20 after conv layers
feat_size = 64 * 20 * 20
self.fc = nn.Sequential(
nn.Linear(feat_size, 512),
nn.ReLU(),
)
self.mu_head = nn.Linear(512, action_dim)
self.log_std_head = nn.Linear(512, action_dim)
# Initialize output layers
nn.init.orthogonal_(self.mu_head.weight, gain=0.01)
nn.init.orthogonal_(self.log_std_head.weight, gain=0.01)
def forward(self, x):
"""Forward pass returning (mu, log_std)."""
x = x / 255.0 # Normalize
x = self.conv(x)
x = x.view(x.size(0), -1)
x = self.fc(x)
mu = torch.tanh(self.mu_head(x))
log_std = self.log_std_head(x)
log_std = torch.clamp(log_std, -20, 2)
return mu, log_std.exp()
class Critic(nn.Module):
"""Critic network estimating state value V(s)."""
def __init__(self, state_shape=(84, 84, 4)):
super().__init__()
c, h, w = state_shape[2], state_shape[0], state_shape[1]
self.conv = nn.Sequential(
nn.Conv2d(c, 32, kernel_size=8, stride=4),
nn.ReLU(),
nn.Conv2d(32, 64, kernel_size=4, stride=2),
nn.ReLU(),
nn.Conv2d(64, 64, kernel_size=3, stride=1),
nn.ReLU(),
)
feat_size = 64 * 20 * 20
self.fc = nn.Sequential(
nn.Linear(feat_size, 512),
nn.ReLU(),
nn.Linear(512, 1)
)
def forward(self, x):
"""Forward pass returning V(s)."""
x = x / 255.0
x = self.conv(x)
x = x.view(x.size(0), -1)
return self.fc(x)
@@ -0,0 +1,64 @@
"""Rollout buffer for storing trajectories."""
import numpy as np
class RolloutBuffer:
"""Stores trajectories for PPO training."""
def __init__(self, buffer_size, state_shape, action_dim):
self.buffer_size = buffer_size
self.ptr = 0
self.size = 0
self.states = np.zeros((buffer_size, *state_shape), dtype=np.uint8)
self.actions = np.zeros((buffer_size, action_dim), dtype=np.float32)
self.rewards = np.zeros(buffer_size, dtype=np.float32)
self.dones = np.zeros(buffer_size, dtype=np.bool_)
self.values = np.zeros(buffer_size, dtype=np.float32)
self.log_probs = np.zeros((buffer_size, action_dim), dtype=np.float32)
def add(self, state, action, reward, done, value, log_prob):
"""Add a transition to the buffer."""
self.states[self.ptr] = state
self.actions[self.ptr] = action
self.rewards[self.ptr] = reward
self.dones[self.ptr] = done
self.values[self.ptr] = value
self.log_probs[self.ptr] = log_prob
self.ptr = (self.ptr + 1) % self.buffer_size
self.size = min(self.size + 1, self.buffer_size)
def compute_returns(self, last_value, gamma=0.99, gae_lambda=0.95):
"""Compute returns and advantages using GAE."""
advantages = np.zeros(self.size, dtype=np.float32)
last_gae = 0
# Compute GAE backwards
for t in reversed(range(self.size)):
if t == self.size - 1:
next_value = last_value
else:
next_value = self.values[t + 1]
delta = self.rewards[t] + gamma * next_value * (1 - self.dones[t]) - self.values[t]
last_gae = delta + gamma * gae_lambda * (1 - self.dones[t]) * last_gae
advantages[t] = last_gae
returns = advantages + self.values[:self.size]
return returns, advantages
def get(self):
"""Return all data as numpy arrays."""
return (
self.states[:self.size],
self.actions[:self.size],
self.rewards[:self.size],
self.dones[:self.size],
self.values[:self.size],
self.log_probs[:self.size],
)
def reset(self):
"""Reset buffer."""
self.ptr = 0
self.size = 0
@@ -0,0 +1,123 @@
"""PPO Trainer with GAE advantage estimation."""
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
import numpy as np
class PPOTrainer:
"""PPO trainer handling the training loop."""
def __init__(
self,
actor,
critic,
rollout_buffer,
device,
clip_eps=0.2,
gamma=0.99,
gae_lambda=0.95,
lr=3e-4,
ent_coef=0.01,
vf_coef=0.5,
max_grad_norm=0.5,
ppo_epochs=4,
mini_batch_size=64,
):
self.actor = actor
self.critic = critic
self.buffer = rollout_buffer
self.device = device
self.clip_eps = clip_eps
self.gamma = gamma
self.gae_lambda = gae_lambda
self.ent_coef = ent_coef
self.vf_coef = vf_coef
self.max_grad_norm = max_grad_norm
self.ppo_epochs = ppo_epochs
self.mini_batch_size = mini_batch_size
# Separate optimizers
self.actor_optim = optim.Adam(actor.parameters(), lr=lr)
self.critic_optim = optim.Adam(critic.parameters(), lr=lr)
self.loss_history = {'actor': [], 'critic': [], 'entropy': [], 'total': []}
def update(self, last_value):
"""Perform one PPO update."""
states, actions, rewards, dones, values, log_probs_old = self.buffer.get()
# Compute returns and advantages
returns, advantages = self.buffer.compute_returns(
last_value, self.gamma, self.gae_lambda
)
# Normalize advantages
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
# Convert to tensors
states_t = torch.from_numpy(states).float().to(self.device)
actions_t = torch.from_numpy(actions).float().to(self.device)
log_probs_old_t = torch.from_numpy(log_probs_old).float().to(self.device)
returns_t = torch.from_numpy(returns).float().to(self.device)
advantages_t = torch.from_numpy(advantages).float().to(self.device)
dataset = TensorDataset(states_t, actions_t, log_probs_old_t, returns_t, advantages_t)
loader = DataLoader(dataset, batch_size=self.mini_batch_size, shuffle=True)
total_actor_loss = 0
total_critic_loss = 0
total_entropy = 0
count = 0
for _ in range(self.ppo_epochs):
for batch in loader:
s, a, log_pi_old, ret, adv = batch
# Get current policy distribution
mu, std = self.actor(s)
dist = torch.distributions.Normal(mu, std)
log_pi = dist.log_prob(a).sum(dim=-1, keepdim=True)
entropy = dist.entropy().sum(dim=-1, keepdim=True)
# Probability ratio
ratio = torch.exp(log_pi - log_pi_old)
# Clipped surrogate objective
surr1 = ratio * adv
surr2 = torch.clamp(ratio, 1 - self.clip_eps, 1 + self.clip_eps) * adv
actor_loss = -torch.min(surr1, surr2).mean()
# Value loss
value = self.critic(s)
critic_loss = nn.MSELoss()(value.squeeze(), ret)
# Total loss
loss = actor_loss + self.vf_coef * critic_loss - self.ent_coef * entropy.mean()
# Update
self.actor_optim.zero_grad()
self.critic_optim.zero_grad()
loss.backward()
nn.utils.clip_grad_norm_(self.actor.parameters(), self.max_grad_norm)
nn.utils.clip_grad_norm_(self.critic.parameters(), self.max_grad_norm)
self.actor_optim.step()
self.critic_optim.step()
total_actor_loss += actor_loss.item()
total_critic_loss += critic_loss.item()
total_entropy += entropy.mean().item()
count += 1
avg_actor = total_actor_loss / count
avg_critic = total_critic_loss / count
avg_entropy = total_entropy / count
self.loss_history['actor'].append(avg_actor)
self.loss_history['critic'].append(avg_critic)
self.loss_history['entropy'].append(avg_entropy)
self.loss_history['total'].append(avg_actor + avg_critic)
self.buffer.reset()
return avg_actor, avg_critic, avg_entropy
@@ -0,0 +1,87 @@
"""Utility functions for environment, device detection, and TensorBoard."""
import gymnasium as gym
import numpy as np
import torch
from collections import deque
class GrayScaleWrapper(gym.ObservationWrapper):
"""Convert RGB observation to grayscale."""
def __init__(self, env):
super().__init__(env)
def observation(self, obs):
# RGB to grayscale: weighted average
gray = 0.299 * obs[:, :, 0] + 0.587 * obs[:, :, 1] + 0.114 * obs[:, :, 2]
return gray.astype(np.uint8)
class ResizeWrapper(gym.ObservationWrapper):
"""Resize observation to target size."""
def __init__(self, env, size=(84, 84)):
super().__init__(env)
self.size = size
def observation(self, obs):
import cv2
return cv2.resize(obs, self.size, interpolation=cv2.INTER_AREA)
class FrameStackWrapper(gym.ObservationWrapper):
"""Stack last N frames."""
def __init__(self, env, num_stack=4):
super().__init__(env)
self.num_stack = num_stack
self.frames = deque(maxlen=num_stack)
obs_shape = env.observation_space.shape
self.observation_space = gym.spaces.Box(
low=0, high=255,
shape=(num_stack, *obs_shape[-2:]),
dtype=np.uint8
)
def reset(self, **kwargs):
obs, info = self.env.reset(**kwargs)
for _ in range(self.num_stack):
self.frames.append(obs)
return self._get_observation(), info
def observation(self, obs):
self.frames.append(obs)
return self._get_observation()
def _get_observation(self):
return np.stack(list(self.frames), axis=0)
def make_env(env_id="CarRacing-v3", gray_scale=True, resize=True, frame_stack=4):
"""Create preprocessed CarRacing environment."""
env = gym.make(env_id, render_mode="rgb_array")
if resize:
env = ResizeWrapper(env, size=(84, 84))
if gray_scale:
env = GrayScaleWrapper(env)
if frame_stack > 1:
env = FrameStackWrapper(env, num_stack=frame_stack)
return env
def get_device():
"""Detect and return available device."""
if torch.cuda.is_available():
device = torch.device("cuda")
print(f"Using GPU: {torch.cuda.get_device_name(0)}")
else:
device = torch.device("cpu")
print("Using CPU")
return device
def preprocess_obs(obs):
"""Ensure observation is in correct format for network."""
if len(obs.shape) == 2: # single channel
obs = np.expand_dims(obs, axis=0)
return obs