Files
Serendipity d5c9baffe6 perf: 为PPO和DQN添加GPU优化——AMP混合精度、pinned memory、torch.compile
- PPO (CW1_id_name): 添加 AMP GradScaler + autocast 混合精度训练,pinned memory 加速 CPU→GPU 传输,torch.compile JIT 编译支持,调整默认超参适配 RTX 5090
- DQN (Atari): 添加 AMP 混合精度、pinned memory 回放缓冲区、向量化批量添加经验 (add_batch) 和批量动作选择 (batch_select_actions),消除 Python 循环
- train_parallel.py: 重写为无缓冲脚本,集成所有优化,64 并行环境 + 每步 4 次训练更新

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-05 00:50:16 +08:00

307 lines
10 KiB
Python

"""Dueling Double DQN - Space Invaders 并行训练脚本
使用 AsyncVectorEnv 并行运行多个 Atari 环境,GPU 批量推理加速。
适合在 AutoDL 等多核服务器环境运行。
与 notebooks/train_parallel.ipynb 内容一致,但使用 Python 脚本直接运行,
确保 stdout 实时输出(无缓冲)。
"""
import sys
import os
import time
import numpy as np
import torch
from collections import deque
PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, PROJECT_ROOT)
from src.network import QNetwork, DuelingQNetwork
from src.replay_buffer import ReplayBuffer, PrioritizedReplayBuffer
from src.agent import DQNAgent
from src.utils import make_env, get_device
# 强制无缓冲输出
sys.stdout.reconfigure(line_buffering=True) if hasattr(sys.stdout, 'reconfigure') else None
os.environ['PYTHONUNBUFFERED'] = '1'
print("导入完成", flush=True)
# ── 环境工厂 ──
def _make_env_fn(env_id):
try:
import ale_py
import gymnasium as gym
gym.register_envs(ale_py)
except ImportError:
pass
def _make():
return make_env(env_id, gray_scale=True, resize=True, frame_stack=4)
return _make
print("环境工厂就绪", flush=True)
# ── 并行训练器 ──
class ParallelTrainer:
def __init__(
self, agent, envs, eval_env, num_envs,
save_dir="models", eval_freq=10000, save_freq=50000,
num_eval_episodes=10, warmup_steps=10000,
train_steps_per_update=1,
):
self.agent = agent
self.envs = envs
self.eval_env = eval_env
self.num_envs = num_envs
self.save_dir = save_dir
self.eval_freq = eval_freq
self.save_freq = save_freq
self.num_eval_episodes = num_eval_episodes
self.warmup_steps = warmup_steps
self.train_steps_per_update = train_steps_per_update
self.episode_rewards = deque(maxlen=100)
self.eval_rewards = []
self.best_eval_reward = -float("inf")
def evaluate(self):
rewards = []
for _ in range(self.num_eval_episodes):
state, _ = self.eval_env.reset()
ep_reward = 0
done = False
while not done:
action = self.agent.select_action(state, evaluate=True)
state, reward, terminated, truncated, _ = self.eval_env.step(action)
done = terminated or truncated
ep_reward += reward
rewards.append(ep_reward)
return np.mean(rewards)
def train(self, total_steps):
n = self.num_envs
device = self.agent.device
envs = self.envs
print(f"开始训练: {total_steps:,} 步, {n} 并行环境, 每步训练 {self.train_steps_per_update}", flush=True)
print("=" * 60, flush=True)
states, _ = envs.reset()
ep_rewards = np.zeros(n, dtype=np.float32)
ep_count = 0
start_time = time.time()
step = 0
while step < total_steps:
if step < self.warmup_steps:
actions = np.array([envs.single_action_space.sample() for _ in range(n)])
else:
actions = self.agent.batch_select_actions(states, self.agent.epsilon)
next_states, rewards, terminateds, truncateds, _ = envs.step(actions)
dones = np.logical_or(terminateds, truncateds)
# 向量化批量添加经验
self.agent.replay_buffer.add_batch(states, actions, rewards, next_states, dones)
ep_rewards += rewards
for i in range(n):
if dones[i]:
self.episode_rewards.append(ep_rewards[i])
ep_count += 1
ep_rewards[i] = 0
step += n
states = next_states
if step >= self.warmup_steps:
for _ in range(self.train_steps_per_update):
self.agent.train_step()
if ep_count > 0 and ep_count % 20 == 0:
avg_r = np.mean(self.episode_rewards) if self.episode_rewards else 0
elapsed = time.time() - start_time
fps = step / elapsed
lr = self.agent.optimizer.param_groups[0]["lr"]
print(f"Step:{step:>10,} | Ep:{ep_count:>5} | AvgR:{avg_r:>7.1f} | "
f"Eps:{self.agent.epsilon:.3f} | LR:{lr:.2e} | FPS:{fps:.0f}", flush=True)
if step % self.eval_freq == 0 and step > 0:
eval_r = self.evaluate()
self.eval_rewards.append((step, eval_r))
print(f"\n[评估] Step:{step:>10,} | 平均回报:{eval_r:.1f}\n", flush=True)
if eval_r > self.best_eval_reward:
self.best_eval_reward = eval_r
self.agent.save(f"{self.save_dir}/dqn_best.pt")
print(f"最佳模型已更新 (回报: {eval_r:.1f})", flush=True)
if step % self.save_freq == 0 and step > 0:
self.agent.save(f"{self.save_dir}/dqn_step_{step}.pt")
total_time = time.time() - start_time
print("\n" + "=" * 60, flush=True)
print(f"训练完成!总时间: {total_time:.1f} 秒 | FPS: {total_steps/total_time:.0f}", flush=True)
print(f"最佳评估回报: {self.best_eval_reward:.1f}", flush=True)
self.agent.save(f"{self.save_dir}/dqn_final.pt")
print("训练器就绪", flush=True)
def main():
# ── 超参数 ──
ENV_ID = "ALE/SpaceInvaders-v5"
N_ENVS = 64
TOTAL_STEPS = 2_000_000
LR = 1e-4
GAMMA = 0.99
BATCH_SIZE = 2048
BUFFER_SIZE = 1_000_000
EPSILON_START = 1.0
EPSILON_END = 0.01
EPSILON_DECAY = 4_000_000
TARGET_UPDATE = 5000
LR_DECAY_STEPS = 5_000_000
LR_DECAY_FACTOR = 0.5
WARMUP_STEPS = 50_000
EVAL_FREQ = 200000
EVAL_EPISODES = 10
SAVE_FREQ = 500000
SEED = 42
SAVE_DIR = os.path.join(PROJECT_ROOT, "models")
TRAIN_STEPS_PER_UPDATE = 4
USE_AMP = True
USE_COMPILE = True
USE_DUELING = True
USE_DOUBLE = True
USE_PER = True
os.makedirs(SAVE_DIR, exist_ok=True)
print(f"配置: {TOTAL_STEPS/1e6:.0f}M 步, {N_ENVS} 并行环境", flush=True)
print(f"每步训练 {TRAIN_STEPS_PER_UPDATE} 次, Batch {BATCH_SIZE}", flush=True)
print(f"AMP: {USE_AMP}, torch.compile: {USE_COMPILE}", flush=True)
print(f"模型保存: {SAVE_DIR}", flush=True)
torch.manual_seed(SEED)
np.random.seed(SEED)
import platform
device = get_device()
print(f"使用设备: {device}", flush=True)
from gymnasium.vector import SyncVectorEnv
env_fns = [_make_env_fn(ENV_ID) for _ in range(N_ENVS)]
envs = SyncVectorEnv(env_fns)
print(f"SyncVectorEnv: {envs.num_envs} 个环境", flush=True)
eval_env = make_env(ENV_ID, gray_scale=True, resize=True, frame_stack=4)
num_actions = envs.single_action_space.n
print(f"动作空间: {num_actions}", flush=True)
state_shape = (4, 84, 84)
if USE_DUELING:
q_network = DuelingQNetwork(state_shape, num_actions).to(device)
target_network = DuelingQNetwork(state_shape, num_actions).to(device)
print(f"Dueling DQN: {sum(p.numel() for p in q_network.parameters()):,} 参数", flush=True)
else:
q_network = QNetwork(state_shape, num_actions).to(device)
target_network = QNetwork(state_shape, num_actions).to(device)
print(f"标准 DQN: {sum(p.numel() for p in q_network.parameters()):,} 参数", flush=True)
if USE_COMPILE and hasattr(torch, 'compile'):
print("应用 torch.compile 加速...", flush=True)
q_network = torch.compile(q_network)
target_network = torch.compile(target_network)
print("torch.compile 完成", flush=True)
target_network.load_state_dict(q_network.state_dict())
target_network.eval()
if USE_PER:
replay_buffer = PrioritizedReplayBuffer(BUFFER_SIZE, state_shape, device)
print("优先经验回放 (Pinned Memory)", flush=True)
else:
replay_buffer = ReplayBuffer(BUFFER_SIZE, state_shape, device)
print("标准经验回放 (Pinned Memory)", flush=True)
agent = DQNAgent(
q_network=q_network,
target_network=target_network,
replay_buffer=replay_buffer,
device=device,
num_actions=num_actions,
gamma=GAMMA,
lr=LR,
epsilon_start=EPSILON_START,
epsilon_end=EPSILON_END,
epsilon_decay_steps=EPSILON_DECAY,
target_update_freq=TARGET_UPDATE,
batch_size=BATCH_SIZE,
double_dqn=USE_DOUBLE,
lr_decay_steps=LR_DECAY_STEPS,
lr_decay_factor=LR_DECAY_FACTOR,
warmup_steps=WARMUP_STEPS,
use_amp=USE_AMP,
)
print(f"Agent 创建完成 (AMP: {USE_AMP})", flush=True)
trainer = ParallelTrainer(
agent=agent,
envs=envs,
eval_env=eval_env,
num_envs=N_ENVS,
save_dir=SAVE_DIR,
eval_freq=EVAL_FREQ,
save_freq=SAVE_FREQ,
num_eval_episodes=EVAL_EPISODES,
warmup_steps=WARMUP_STEPS,
train_steps_per_update=TRAIN_STEPS_PER_UPDATE,
)
print("\n" + "=" * 60, flush=True)
print(f"开始 10M 步并行训练(全优化版)", flush=True)
print(f" GPU: {device}", flush=True)
print(f" 并行环境: {N_ENVS}", flush=True)
print(f" Batch Size: {BATCH_SIZE}", flush=True)
print(f" 每步训练: {TRAIN_STEPS_PER_UPDATE}", flush=True)
print(f" AMP 混合精度: {USE_AMP}", flush=True)
print(f" torch.compile: {USE_COMPILE}", flush=True)
print(f" Dueling: {USE_DUELING}", flush=True)
print(f" Double DQN: {USE_DOUBLE}", flush=True)
print(f" PER: {USE_PER}", flush=True)
print("=" * 60 + "\n", flush=True)
trainer.train(TOTAL_STEPS)
# ── 评估最佳模型 ──
print("\n加载最佳模型...", flush=True)
agent.load(f"{SAVE_DIR}/dqn_best.pt")
print("\n评估中...", flush=True)
all_rewards = []
for i in range(20):
state, _ = eval_env.reset()
ep_r = 0
done = False
while not done:
action = agent.select_action(state, evaluate=True)
state, reward, terminated, truncated, _ = eval_env.step(action)
done = terminated or truncated
ep_r += reward
all_rewards.append(ep_r)
print(f" Episode {i+1:>2}: {ep_r:.1f}", flush=True)
print(f"\n结果: 平均 {np.mean(all_rewards):.2f} +/- {np.std(all_rewards):.2f}", flush=True)
print(f"最佳: {max(all_rewards):.1f} | 最差: {min(all_rewards):.1f}", flush=True)
print(f"中位数: {np.median(all_rewards):.1f}", flush=True)
if __name__ == "__main__":
main()