Files
rl-atari/强化学习个人项目报告(Atari 游戏方向)/notebooks/train_parallel.ipynb
T
Serendipity d5c9baffe6 perf: 为PPO和DQN添加GPU优化——AMP混合精度、pinned memory、torch.compile
- PPO (CW1_id_name): 添加 AMP GradScaler + autocast 混合精度训练,pinned memory 加速 CPU→GPU 传输,torch.compile JIT 编译支持,调整默认超参适配 RTX 5090
- DQN (Atari): 添加 AMP 混合精度、pinned memory 回放缓冲区、向量化批量添加经验 (add_batch) 和批量动作选择 (batch_select_actions),消除 Python 循环
- train_parallel.py: 重写为无缓冲脚本,集成所有优化,64 并行环境 + 每步 4 次训练更新

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-05 00:50:16 +08:00

166 lines
14 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Dueling Double DQN - Space Invaders 并行训练\n",
"\n",
"使用 AsyncVectorEnv 并行运行多个 Atari 环境,GPU 批量推理加速。\n",
"适合在 AutoDL 等多核服务器环境运行。"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": "import sys\nimport os\nimport time\nimport numpy as np\nimport torch\nimport torch.nn.functional as F\nfrom collections import deque\nfrom multiprocessing import Process, Queue\n\n# notebooks/ 的上级目录即项目根目录\nsys.path.insert(0, os.path.abspath(os.path.join(os.getcwd(), \"..\")))\n\nfrom src.network import QNetwork, DuelingQNetwork\nfrom src.replay_buffer import ReplayBuffer, PrioritizedReplayBuffer\nfrom src.agent import DQNAgent\nfrom src.utils import make_env, get_device\n\nprint(\"导入完成\")"
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# ── 环境工厂 ──\n",
"def _make_env_fn(env_id):\n",
" \"\"\"环境工厂 - 必须在模块级别以便 multiprocessing pickle.\"\"\"\n",
" # AsyncVectorEnv 子进程需要独立注册 ALE\n",
" try:\n",
" import ale_py\n",
" import gymnasium as gym\n",
" gym.register_envs(ale_py)\n",
" except ImportError:\n",
" pass\n",
"\n",
" def _make():\n",
" return make_env(env_id, gray_scale=True, resize=True, frame_stack=4)\n",
" return _make\n",
"\n",
"print(\"环境工厂就绪\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": "# ── 并行训练器(优化版) ──\nclass ParallelTrainer:\n def __init__(\n self, agent, envs, eval_env, num_envs,\n save_dir=\"models\", eval_freq=10000, save_freq=50000,\n num_eval_episodes=10, warmup_steps=10000,\n train_steps_per_update=1,\n ):\n self.agent = agent\n self.envs = envs\n self.eval_env = eval_env\n self.num_envs = num_envs\n self.save_dir = save_dir\n self.eval_freq = eval_freq\n self.save_freq = save_freq\n self.num_eval_episodes = num_eval_episodes\n self.warmup_steps = warmup_steps\n self.train_steps_per_update = train_steps_per_update\n self.episode_rewards = deque(maxlen=100)\n self.eval_rewards = []\n self.best_eval_reward = -float(\"inf\")\n\n def evaluate(self):\n \"\"\"评估智能体\"\"\"\n rewards = []\n for _ in range(self.num_eval_episodes):\n state, _ = self.eval_env.reset()\n ep_reward = 0\n done = False\n while not done:\n action = self.agent.select_action(state, evaluate=True)\n state, reward, terminated, truncated, _ = self.eval_env.step(action)\n done = terminated or truncated\n ep_reward += reward\n rewards.append(ep_reward)\n return np.mean(rewards)\n\n def train(self, total_steps):\n n = self.num_envs\n device = self.agent.device\n envs = self.envs\n\n print(f\"开始训练: {total_steps:,} 步, {n} 并行环境, 每步训练 {self.train_steps_per_update} 次\")\n print(\"=\" * 60)\n\n states, _ = envs.reset()\n ep_rewards = np.zeros(n, dtype=np.float32)\n ep_count = 0\n start_time = time.time()\n step = 0\n\n while step < total_steps:\n # ── 动作选择(向量化) ──\n if step < self.warmup_steps:\n actions = np.array([envs.single_action_space.sample() for _ in range(n)])\n else:\n actions = self.agent.batch_select_actions(states, self.agent.epsilon)\n\n # ── 环境步进 ──\n next_states, rewards, terminateds, truncateds, _ = envs.step(actions)\n dones = np.logical_or(terminateds, truncateds)\n\n # ── 向量化批量添加经验(消除 Python 循环) ──\n self.agent.replay_buffer.add_batch(states, actions, rewards, next_states, dones)\n\n ep_rewards += rewards\n\n # ── 处理结束的 episode ──\n for i in range(n):\n if dones[i]:\n self.episode_rewards.append(ep_rewards[i])\n ep_count += 1\n ep_rewards[i] = 0\n\n step += n\n states = next_states\n\n # ── 每步训练多次(提升 GPU 利用率) ──\n if step >= self.warmup_steps:\n for _ in range(self.train_steps_per_update):\n self.agent.train_step()\n\n # ── 进度打印 ──\n if ep_count > 0 and ep_count % 20 == 0:\n avg_r = np.mean(self.episode_rewards) if self.episode_rewards else 0\n elapsed = time.time() - start_time\n fps = step / elapsed\n lr = self.agent.optimizer.param_groups[0][\"lr\"]\n print(f\"Step:{step:>10,} | Ep:{ep_count:>5} | AvgR:{avg_r:>7.1f} | \"\n f\"Eps:{self.agent.epsilon:.3f} | LR:{lr:.2e} | FPS:{fps:.0f}\")\n\n # ── 定期评估 ──\n if step % self.eval_freq == 0 and step > 0:\n eval_r = self.evaluate()\n self.eval_rewards.append((step, eval_r))\n print(f\"\\n[评估] Step:{step:>10,} | 平均回报:{eval_r:.1f}\\n\")\n if eval_r > self.best_eval_reward:\n self.best_eval_reward = eval_r\n self.agent.save(f\"{self.save_dir}/dqn_best.pt\")\n\n # ── 定期保存 ──\n if step % self.save_freq == 0 and step > 0:\n self.agent.save(f\"{self.save_dir}/dqn_step_{step}.pt\")\n\n total_time = time.time() - start_time\n print(\"\\n\" + \"=\" * 60)\n print(f\"训练完成!总时间: {total_time:.1f} 秒 | FPS: {total_steps/total_time:.0f}\")\n print(f\"最佳评估回报: {self.best_eval_reward:.1f}\")\n self.agent.save(f\"{self.save_dir}/dqn_final.pt\")\n\nprint(\"训练器就绪\")"
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 配置参数\n",
"\n",
"根据 GPU 和预期训练时间调整以下参数。"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": "# ── 可修改的超参数 ──\n\nENV_ID = \"ALE/SpaceInvaders-v5\"\nN_ENVS = 64 # 超配:64 个并行环境,最大化 CPU 利用率\nTOTAL_STEPS = 10_000_000 # 总步数\nLR = 1e-4 # 大 batch 配合稍高 lr\nGAMMA = 0.99 # 折扣因子\nBATCH_SIZE = 2048 # 大 batch 充分利用 RTX 5090\nBUFFER_SIZE = 1_000_000 # 回放缓冲区\nEPSILON_START = 1.0\nEPSILON_END = 0.01\nEPSILON_DECAY = 4_000_000 # ε衰减步数\nTARGET_UPDATE = 5000 # 降低目标网络更新频率\nLR_DECAY_STEPS = 5_000_000\nLR_DECAY_FACTOR = 0.5\nWARMUP_STEPS = 50_000\nEVAL_FREQ = 200000 # 评估频率降低,减少中断\nEVAL_EPISODES = 10\nSAVE_FREQ = 500000\nSEED = 42\nSAVE_DIR = os.path.join(os.path.abspath(os.path.join(os.getcwd(), \"..\")), \"models\")\n\nTRAIN_STEPS_PER_UPDATE = 4 # 每步训练 4 次,提升 GPU 利用率\nUSE_AMP = True # AMP 混合精度训练\nUSE_COMPILE = True # torch.compile 编译加速\n\nUSE_DUELING = True\nUSE_DOUBLE = True\nUSE_PER = True\n\nos.makedirs(SAVE_DIR, exist_ok=True)\n\nprint(f\"配置: {TOTAL_STEPS/1e6:.0f}M 步, {N_ENVS} 并行环境\")\nprint(f\"每步训练 {TRAIN_STEPS_PER_UPDATE} 次, Batch {BATCH_SIZE}\")\nprint(f\"AMP: {USE_AMP}, torch.compile: {USE_COMPILE}\")\nprint(f\"模型保存: {SAVE_DIR}\")"
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"使用GPU: NVIDIA GeForce RTX 4060 Laptop GPU\n",
"SyncVectorEnv (Windows): 16 个环境\n",
"动作空间: 6\n"
]
}
],
"source": [
"# ── 环境设置 ──\n",
"torch.manual_seed(SEED)\n",
"np.random.seed(SEED)\n",
"import platform\n",
"\n",
"device = get_device()\n",
"\n",
"# Windows Jupyter 不支持 AsyncVectorEnv 子进程,用 SyncVectorEnv 替代\n",
"if platform.system() == \"Linux\":\n",
" from gymnasium.vector import AsyncVectorEnv\n",
" env_fns = [_make_env_fn(ENV_ID) for _ in range(N_ENVS)]\n",
" envs = AsyncVectorEnv(env_fns, shared_memory=True)\n",
" print(f\"AsyncVectorEnv: {envs.num_envs} 个环境\")\n",
"else:\n",
" from gymnasium.vector import SyncVectorEnv\n",
" env_fns = [_make_env_fn(ENV_ID) for _ in range(N_ENVS)]\n",
" envs = SyncVectorEnv(env_fns)\n",
" print(f\"SyncVectorEnv (Windows): {envs.num_envs} 个环境\")\n",
"\n",
"# 评估环境\n",
"eval_env = make_env(ENV_ID, gray_scale=True, resize=True, frame_stack=4)\n",
"\n",
"num_actions = envs.single_action_space.n\n",
"print(f\"动作空间: {num_actions}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": "# ── 网络 + torch.compile ──\nstate_shape = (4, 84, 84)\n\nif USE_DUELING:\n q_network = DuelingQNetwork(state_shape, num_actions).to(device)\n target_network = DuelingQNetwork(state_shape, num_actions).to(device)\n print(f\"Dueling DQN: {sum(p.numel() for p in q_network.parameters()):,} 参数\")\nelse:\n q_network = QNetwork(state_shape, num_actions).to(device)\n target_network = QNetwork(state_shape, num_actions).to(device)\n print(f\"标准 DQN: {sum(p.numel() for p in q_network.parameters()):,} 参数\")\n\n# torch.compile 编译加速(PyTorch 2.x\nif USE_COMPILE and hasattr(torch, 'compile'):\n print(\"应用 torch.compile 加速...\")\n q_network = torch.compile(q_network)\n target_network = torch.compile(target_network)\n print(\"torch.compile 完成\")\n\ntarget_network.load_state_dict(q_network.state_dict())\ntarget_network.eval()"
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": "# ── 回放缓冲区 + Agent ──\nif USE_PER:\n replay_buffer = PrioritizedReplayBuffer(BUFFER_SIZE, state_shape, device)\n print(\"优先经验回放 (Pinned Memory)\")\nelse:\n replay_buffer = ReplayBuffer(BUFFER_SIZE, state_shape, device)\n print(\"标准经验回放 (Pinned Memory)\")\n\nagent = DQNAgent(\n q_network=q_network,\n target_network=target_network,\n replay_buffer=replay_buffer,\n device=device,\n num_actions=num_actions,\n gamma=GAMMA,\n lr=LR,\n epsilon_start=EPSILON_START,\n epsilon_end=EPSILON_END,\n epsilon_decay_steps=EPSILON_DECAY,\n target_update_freq=TARGET_UPDATE,\n batch_size=BATCH_SIZE,\n double_dqn=USE_DOUBLE,\n lr_decay_steps=LR_DECAY_STEPS,\n lr_decay_factor=LR_DECAY_FACTOR,\n warmup_steps=WARMUP_STEPS,\n use_amp=USE_AMP,\n)\nprint(f\"Agent 创建完成 (AMP: {USE_AMP})\")"
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": "# ── 开始训练 ──\ntrainer = ParallelTrainer(\n agent=agent,\n envs=envs,\n eval_env=eval_env,\n num_envs=N_ENVS,\n save_dir=SAVE_DIR,\n eval_freq=EVAL_FREQ,\n save_freq=SAVE_FREQ,\n num_eval_episodes=EVAL_EPISODES,\n warmup_steps=WARMUP_STEPS,\n train_steps_per_update=TRAIN_STEPS_PER_UPDATE,\n)\n\nprint(\"\\n\" + \"=\" * 60)\nprint(f\"开始 10M 步并行训练(全优化版)\")\nprint(f\" GPU: {device}\")\nprint(f\" 并行环境: {N_ENVS}\")\nprint(f\" Batch Size: {BATCH_SIZE}\")\nprint(f\" 每步训练: {TRAIN_STEPS_PER_UPDATE} 次\")\nprint(f\" AMP 混合精度: {USE_AMP}\")\nprint(f\" torch.compile: {USE_COMPILE}\")\nprint(f\" Dueling: {USE_DUELING}\")\nprint(f\" Double DQN: {USE_DOUBLE}\")\nprint(f\" PER: {USE_PER}\")\nprint(f\" Pinned Memory: 是\")\nprint(f\" 向量化批量添加: 是\")\nprint(\"=\" * 60 + \"\\n\")\n\ntrainer.train(TOTAL_STEPS)"
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 训练完成后:评估最佳模型"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": "# ── 评估最佳模型 ──\nprint(\"加载最佳模型...\")\nagent.load(f\"{SAVE_DIR}/dqn_best.pt\")\n\nprint(\"\\n评估中...\")\nall_rewards = []\nfor i in range(20):\n state, _ = eval_env.reset()\n ep_r = 0\n done = False\n while not done:\n action = agent.select_action(state, evaluate=True)\n state, reward, terminated, truncated, _ = eval_env.step(action)\n done = terminated or truncated\n ep_r += reward\n all_rewards.append(ep_r)\n print(f\" Episode {i+1:>2}: {ep_r:.1f}\")\n\nprint(f\"\\n结果: 平均 {np.mean(all_rewards):.2f} ± {np.std(all_rewards):.2f}\")\nprint(f\"最佳: {max(all_rewards):.1f} | 最差: {min(all_rewards):.1f}\")\nprint(f\"中位数: {np.median(all_rewards):.1f}\")"
}
],
"metadata": {
"kernelspec": {
"display_name": "my_env",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.20"
}
},
"nbformat": 4,
"nbformat_minor": 4
}