d5c9baffe6
- PPO (CW1_id_name): 添加 AMP GradScaler + autocast 混合精度训练,pinned memory 加速 CPU→GPU 传输,torch.compile JIT 编译支持,调整默认超参适配 RTX 5090 - DQN (Atari): 添加 AMP 混合精度、pinned memory 回放缓冲区、向量化批量添加经验 (add_batch) 和批量动作选择 (batch_select_actions),消除 Python 循环 - train_parallel.py: 重写为无缓冲脚本,集成所有优化,64 并行环境 + 每步 4 次训练更新 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
166 lines
14 KiB
Plaintext
166 lines
14 KiB
Plaintext
{
|
||
"cells": [
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {},
|
||
"source": [
|
||
"# Dueling Double DQN - Space Invaders 并行训练\n",
|
||
"\n",
|
||
"使用 AsyncVectorEnv 并行运行多个 Atari 环境,GPU 批量推理加速。\n",
|
||
"适合在 AutoDL 等多核服务器环境运行。"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": null,
|
||
"metadata": {},
|
||
"outputs": [],
|
||
"source": "import sys\nimport os\nimport time\nimport numpy as np\nimport torch\nimport torch.nn.functional as F\nfrom collections import deque\nfrom multiprocessing import Process, Queue\n\n# notebooks/ 的上级目录即项目根目录\nsys.path.insert(0, os.path.abspath(os.path.join(os.getcwd(), \"..\")))\n\nfrom src.network import QNetwork, DuelingQNetwork\nfrom src.replay_buffer import ReplayBuffer, PrioritizedReplayBuffer\nfrom src.agent import DQNAgent\nfrom src.utils import make_env, get_device\n\nprint(\"导入完成\")"
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": null,
|
||
"metadata": {},
|
||
"outputs": [],
|
||
"source": [
|
||
"# ── 环境工厂 ──\n",
|
||
"def _make_env_fn(env_id):\n",
|
||
" \"\"\"环境工厂 - 必须在模块级别以便 multiprocessing pickle.\"\"\"\n",
|
||
" # AsyncVectorEnv 子进程需要独立注册 ALE\n",
|
||
" try:\n",
|
||
" import ale_py\n",
|
||
" import gymnasium as gym\n",
|
||
" gym.register_envs(ale_py)\n",
|
||
" except ImportError:\n",
|
||
" pass\n",
|
||
"\n",
|
||
" def _make():\n",
|
||
" return make_env(env_id, gray_scale=True, resize=True, frame_stack=4)\n",
|
||
" return _make\n",
|
||
"\n",
|
||
"print(\"环境工厂就绪\")"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": null,
|
||
"metadata": {},
|
||
"outputs": [],
|
||
"source": "# ── 并行训练器(优化版) ──\nclass ParallelTrainer:\n def __init__(\n self, agent, envs, eval_env, num_envs,\n save_dir=\"models\", eval_freq=10000, save_freq=50000,\n num_eval_episodes=10, warmup_steps=10000,\n train_steps_per_update=1,\n ):\n self.agent = agent\n self.envs = envs\n self.eval_env = eval_env\n self.num_envs = num_envs\n self.save_dir = save_dir\n self.eval_freq = eval_freq\n self.save_freq = save_freq\n self.num_eval_episodes = num_eval_episodes\n self.warmup_steps = warmup_steps\n self.train_steps_per_update = train_steps_per_update\n self.episode_rewards = deque(maxlen=100)\n self.eval_rewards = []\n self.best_eval_reward = -float(\"inf\")\n\n def evaluate(self):\n \"\"\"评估智能体\"\"\"\n rewards = []\n for _ in range(self.num_eval_episodes):\n state, _ = self.eval_env.reset()\n ep_reward = 0\n done = False\n while not done:\n action = self.agent.select_action(state, evaluate=True)\n state, reward, terminated, truncated, _ = self.eval_env.step(action)\n done = terminated or truncated\n ep_reward += reward\n rewards.append(ep_reward)\n return np.mean(rewards)\n\n def train(self, total_steps):\n n = self.num_envs\n device = self.agent.device\n envs = self.envs\n\n print(f\"开始训练: {total_steps:,} 步, {n} 并行环境, 每步训练 {self.train_steps_per_update} 次\")\n print(\"=\" * 60)\n\n states, _ = envs.reset()\n ep_rewards = np.zeros(n, dtype=np.float32)\n ep_count = 0\n start_time = time.time()\n step = 0\n\n while step < total_steps:\n # ── 动作选择(向量化) ──\n if step < self.warmup_steps:\n actions = np.array([envs.single_action_space.sample() for _ in range(n)])\n else:\n actions = self.agent.batch_select_actions(states, self.agent.epsilon)\n\n # ── 环境步进 ──\n next_states, rewards, terminateds, truncateds, _ = envs.step(actions)\n dones = np.logical_or(terminateds, truncateds)\n\n # ── 向量化批量添加经验(消除 Python 循环) ──\n self.agent.replay_buffer.add_batch(states, actions, rewards, next_states, dones)\n\n ep_rewards += rewards\n\n # ── 处理结束的 episode ──\n for i in range(n):\n if dones[i]:\n self.episode_rewards.append(ep_rewards[i])\n ep_count += 1\n ep_rewards[i] = 0\n\n step += n\n states = next_states\n\n # ── 每步训练多次(提升 GPU 利用率) ──\n if step >= self.warmup_steps:\n for _ in range(self.train_steps_per_update):\n self.agent.train_step()\n\n # ── 进度打印 ──\n if ep_count > 0 and ep_count % 20 == 0:\n avg_r = np.mean(self.episode_rewards) if self.episode_rewards else 0\n elapsed = time.time() - start_time\n fps = step / elapsed\n lr = self.agent.optimizer.param_groups[0][\"lr\"]\n print(f\"Step:{step:>10,} | Ep:{ep_count:>5} | AvgR:{avg_r:>7.1f} | \"\n f\"Eps:{self.agent.epsilon:.3f} | LR:{lr:.2e} | FPS:{fps:.0f}\")\n\n # ── 定期评估 ──\n if step % self.eval_freq == 0 and step > 0:\n eval_r = self.evaluate()\n self.eval_rewards.append((step, eval_r))\n print(f\"\\n[评估] Step:{step:>10,} | 平均回报:{eval_r:.1f}\\n\")\n if eval_r > self.best_eval_reward:\n self.best_eval_reward = eval_r\n self.agent.save(f\"{self.save_dir}/dqn_best.pt\")\n\n # ── 定期保存 ──\n if step % self.save_freq == 0 and step > 0:\n self.agent.save(f\"{self.save_dir}/dqn_step_{step}.pt\")\n\n total_time = time.time() - start_time\n print(\"\\n\" + \"=\" * 60)\n print(f\"训练完成!总时间: {total_time:.1f} 秒 | FPS: {total_steps/total_time:.0f}\")\n print(f\"最佳评估回报: {self.best_eval_reward:.1f}\")\n self.agent.save(f\"{self.save_dir}/dqn_final.pt\")\n\nprint(\"训练器就绪\")"
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {},
|
||
"source": [
|
||
"## 配置参数\n",
|
||
"\n",
|
||
"根据 GPU 和预期训练时间调整以下参数。"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": null,
|
||
"metadata": {},
|
||
"outputs": [],
|
||
"source": "# ── 可修改的超参数 ──\n\nENV_ID = \"ALE/SpaceInvaders-v5\"\nN_ENVS = 64 # 超配:64 个并行环境,最大化 CPU 利用率\nTOTAL_STEPS = 10_000_000 # 总步数\nLR = 1e-4 # 大 batch 配合稍高 lr\nGAMMA = 0.99 # 折扣因子\nBATCH_SIZE = 2048 # 大 batch 充分利用 RTX 5090\nBUFFER_SIZE = 1_000_000 # 回放缓冲区\nEPSILON_START = 1.0\nEPSILON_END = 0.01\nEPSILON_DECAY = 4_000_000 # ε衰减步数\nTARGET_UPDATE = 5000 # 降低目标网络更新频率\nLR_DECAY_STEPS = 5_000_000\nLR_DECAY_FACTOR = 0.5\nWARMUP_STEPS = 50_000\nEVAL_FREQ = 200000 # 评估频率降低,减少中断\nEVAL_EPISODES = 10\nSAVE_FREQ = 500000\nSEED = 42\nSAVE_DIR = os.path.join(os.path.abspath(os.path.join(os.getcwd(), \"..\")), \"models\")\n\nTRAIN_STEPS_PER_UPDATE = 4 # 每步训练 4 次,提升 GPU 利用率\nUSE_AMP = True # AMP 混合精度训练\nUSE_COMPILE = True # torch.compile 编译加速\n\nUSE_DUELING = True\nUSE_DOUBLE = True\nUSE_PER = True\n\nos.makedirs(SAVE_DIR, exist_ok=True)\n\nprint(f\"配置: {TOTAL_STEPS/1e6:.0f}M 步, {N_ENVS} 并行环境\")\nprint(f\"每步训练 {TRAIN_STEPS_PER_UPDATE} 次, Batch {BATCH_SIZE}\")\nprint(f\"AMP: {USE_AMP}, torch.compile: {USE_COMPILE}\")\nprint(f\"模型保存: {SAVE_DIR}\")"
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": 5,
|
||
"metadata": {},
|
||
"outputs": [
|
||
{
|
||
"name": "stdout",
|
||
"output_type": "stream",
|
||
"text": [
|
||
"使用GPU: NVIDIA GeForce RTX 4060 Laptop GPU\n",
|
||
"SyncVectorEnv (Windows): 16 个环境\n",
|
||
"动作空间: 6\n"
|
||
]
|
||
}
|
||
],
|
||
"source": [
|
||
"# ── 环境设置 ──\n",
|
||
"torch.manual_seed(SEED)\n",
|
||
"np.random.seed(SEED)\n",
|
||
"import platform\n",
|
||
"\n",
|
||
"device = get_device()\n",
|
||
"\n",
|
||
"# Windows Jupyter 不支持 AsyncVectorEnv 子进程,用 SyncVectorEnv 替代\n",
|
||
"if platform.system() == \"Linux\":\n",
|
||
" from gymnasium.vector import AsyncVectorEnv\n",
|
||
" env_fns = [_make_env_fn(ENV_ID) for _ in range(N_ENVS)]\n",
|
||
" envs = AsyncVectorEnv(env_fns, shared_memory=True)\n",
|
||
" print(f\"AsyncVectorEnv: {envs.num_envs} 个环境\")\n",
|
||
"else:\n",
|
||
" from gymnasium.vector import SyncVectorEnv\n",
|
||
" env_fns = [_make_env_fn(ENV_ID) for _ in range(N_ENVS)]\n",
|
||
" envs = SyncVectorEnv(env_fns)\n",
|
||
" print(f\"SyncVectorEnv (Windows): {envs.num_envs} 个环境\")\n",
|
||
"\n",
|
||
"# 评估环境\n",
|
||
"eval_env = make_env(ENV_ID, gray_scale=True, resize=True, frame_stack=4)\n",
|
||
"\n",
|
||
"num_actions = envs.single_action_space.n\n",
|
||
"print(f\"动作空间: {num_actions}\")"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": null,
|
||
"metadata": {},
|
||
"outputs": [],
|
||
"source": "# ── 网络 + torch.compile ──\nstate_shape = (4, 84, 84)\n\nif USE_DUELING:\n q_network = DuelingQNetwork(state_shape, num_actions).to(device)\n target_network = DuelingQNetwork(state_shape, num_actions).to(device)\n print(f\"Dueling DQN: {sum(p.numel() for p in q_network.parameters()):,} 参数\")\nelse:\n q_network = QNetwork(state_shape, num_actions).to(device)\n target_network = QNetwork(state_shape, num_actions).to(device)\n print(f\"标准 DQN: {sum(p.numel() for p in q_network.parameters()):,} 参数\")\n\n# torch.compile 编译加速(PyTorch 2.x)\nif USE_COMPILE and hasattr(torch, 'compile'):\n print(\"应用 torch.compile 加速...\")\n q_network = torch.compile(q_network)\n target_network = torch.compile(target_network)\n print(\"torch.compile 完成\")\n\ntarget_network.load_state_dict(q_network.state_dict())\ntarget_network.eval()"
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": null,
|
||
"metadata": {},
|
||
"outputs": [],
|
||
"source": "# ── 回放缓冲区 + Agent ──\nif USE_PER:\n replay_buffer = PrioritizedReplayBuffer(BUFFER_SIZE, state_shape, device)\n print(\"优先经验回放 (Pinned Memory)\")\nelse:\n replay_buffer = ReplayBuffer(BUFFER_SIZE, state_shape, device)\n print(\"标准经验回放 (Pinned Memory)\")\n\nagent = DQNAgent(\n q_network=q_network,\n target_network=target_network,\n replay_buffer=replay_buffer,\n device=device,\n num_actions=num_actions,\n gamma=GAMMA,\n lr=LR,\n epsilon_start=EPSILON_START,\n epsilon_end=EPSILON_END,\n epsilon_decay_steps=EPSILON_DECAY,\n target_update_freq=TARGET_UPDATE,\n batch_size=BATCH_SIZE,\n double_dqn=USE_DOUBLE,\n lr_decay_steps=LR_DECAY_STEPS,\n lr_decay_factor=LR_DECAY_FACTOR,\n warmup_steps=WARMUP_STEPS,\n use_amp=USE_AMP,\n)\nprint(f\"Agent 创建完成 (AMP: {USE_AMP})\")"
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": null,
|
||
"metadata": {},
|
||
"outputs": [],
|
||
"source": "# ── 开始训练 ──\ntrainer = ParallelTrainer(\n agent=agent,\n envs=envs,\n eval_env=eval_env,\n num_envs=N_ENVS,\n save_dir=SAVE_DIR,\n eval_freq=EVAL_FREQ,\n save_freq=SAVE_FREQ,\n num_eval_episodes=EVAL_EPISODES,\n warmup_steps=WARMUP_STEPS,\n train_steps_per_update=TRAIN_STEPS_PER_UPDATE,\n)\n\nprint(\"\\n\" + \"=\" * 60)\nprint(f\"开始 10M 步并行训练(全优化版)\")\nprint(f\" GPU: {device}\")\nprint(f\" 并行环境: {N_ENVS}\")\nprint(f\" Batch Size: {BATCH_SIZE}\")\nprint(f\" 每步训练: {TRAIN_STEPS_PER_UPDATE} 次\")\nprint(f\" AMP 混合精度: {USE_AMP}\")\nprint(f\" torch.compile: {USE_COMPILE}\")\nprint(f\" Dueling: {USE_DUELING}\")\nprint(f\" Double DQN: {USE_DOUBLE}\")\nprint(f\" PER: {USE_PER}\")\nprint(f\" Pinned Memory: 是\")\nprint(f\" 向量化批量添加: 是\")\nprint(\"=\" * 60 + \"\\n\")\n\ntrainer.train(TOTAL_STEPS)"
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {},
|
||
"source": [
|
||
"## 训练完成后:评估最佳模型"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": null,
|
||
"metadata": {},
|
||
"outputs": [],
|
||
"source": "# ── 评估最佳模型 ──\nprint(\"加载最佳模型...\")\nagent.load(f\"{SAVE_DIR}/dqn_best.pt\")\n\nprint(\"\\n评估中...\")\nall_rewards = []\nfor i in range(20):\n state, _ = eval_env.reset()\n ep_r = 0\n done = False\n while not done:\n action = agent.select_action(state, evaluate=True)\n state, reward, terminated, truncated, _ = eval_env.step(action)\n done = terminated or truncated\n ep_r += reward\n all_rewards.append(ep_r)\n print(f\" Episode {i+1:>2}: {ep_r:.1f}\")\n\nprint(f\"\\n结果: 平均 {np.mean(all_rewards):.2f} ± {np.std(all_rewards):.2f}\")\nprint(f\"最佳: {max(all_rewards):.1f} | 最差: {min(all_rewards):.1f}\")\nprint(f\"中位数: {np.median(all_rewards):.1f}\")"
|
||
}
|
||
],
|
||
"metadata": {
|
||
"kernelspec": {
|
||
"display_name": "my_env",
|
||
"language": "python",
|
||
"name": "python3"
|
||
},
|
||
"language_info": {
|
||
"codemirror_mode": {
|
||
"name": "ipython",
|
||
"version": 3
|
||
},
|
||
"file_extension": ".py",
|
||
"mimetype": "text/x-python",
|
||
"name": "python",
|
||
"nbconvert_exporter": "python",
|
||
"pygments_lexer": "ipython3",
|
||
"version": "3.10.20"
|
||
}
|
||
},
|
||
"nbformat": 4,
|
||
"nbformat_minor": 4
|
||
} |