"""Dueling Double DQN - Space Invaders 并行训练脚本 使用 AsyncVectorEnv 并行运行多个 Atari 环境,GPU 批量推理加速。 适合在 AutoDL 等多核服务器环境运行。 与 notebooks/train_parallel.ipynb 内容一致,但使用 Python 脚本直接运行, 确保 stdout 实时输出(无缓冲)。 """ import sys import os import time import numpy as np import torch from collections import deque PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, PROJECT_ROOT) from src.network import QNetwork, DuelingQNetwork from src.replay_buffer import ReplayBuffer, PrioritizedReplayBuffer from src.agent import DQNAgent from src.utils import make_env, get_device # 强制无缓冲输出 sys.stdout.reconfigure(line_buffering=True) if hasattr(sys.stdout, 'reconfigure') else None os.environ['PYTHONUNBUFFERED'] = '1' print("导入完成", flush=True) # ── 环境工厂 ── def _make_env_fn(env_id): try: import ale_py import gymnasium as gym gym.register_envs(ale_py) except ImportError: pass def _make(): return make_env(env_id, gray_scale=True, resize=True, frame_stack=4) return _make print("环境工厂就绪", flush=True) # ── 并行训练器 ── class ParallelTrainer: def __init__( self, agent, envs, eval_env, num_envs, save_dir="models", eval_freq=10000, save_freq=50000, num_eval_episodes=10, warmup_steps=10000, train_steps_per_update=1, ): self.agent = agent self.envs = envs self.eval_env = eval_env self.num_envs = num_envs self.save_dir = save_dir self.eval_freq = eval_freq self.save_freq = save_freq self.num_eval_episodes = num_eval_episodes self.warmup_steps = warmup_steps self.train_steps_per_update = train_steps_per_update self.episode_rewards = deque(maxlen=100) self.eval_rewards = [] self.best_eval_reward = -float("inf") def evaluate(self): rewards = [] for _ in range(self.num_eval_episodes): state, _ = self.eval_env.reset() ep_reward = 0 done = False while not done: action = self.agent.select_action(state, evaluate=True) state, reward, terminated, truncated, _ = self.eval_env.step(action) done = terminated or truncated ep_reward += reward rewards.append(ep_reward) return np.mean(rewards) def train(self, total_steps): n = self.num_envs device = self.agent.device envs = self.envs print(f"开始训练: {total_steps:,} 步, {n} 并行环境, 每步训练 {self.train_steps_per_update} 次", flush=True) print("=" * 60, flush=True) states, _ = envs.reset() ep_rewards = np.zeros(n, dtype=np.float32) ep_count = 0 start_time = time.time() step = 0 while step < total_steps: if step < self.warmup_steps: actions = np.array([envs.single_action_space.sample() for _ in range(n)]) else: actions = self.agent.batch_select_actions(states, self.agent.epsilon) next_states, rewards, terminateds, truncateds, _ = envs.step(actions) dones = np.logical_or(terminateds, truncateds) # 向量化批量添加经验 self.agent.replay_buffer.add_batch(states, actions, rewards, next_states, dones) ep_rewards += rewards for i in range(n): if dones[i]: self.episode_rewards.append(ep_rewards[i]) ep_count += 1 ep_rewards[i] = 0 step += n states = next_states if step >= self.warmup_steps: for _ in range(self.train_steps_per_update): self.agent.train_step() if ep_count > 0 and ep_count % 20 == 0: avg_r = np.mean(self.episode_rewards) if self.episode_rewards else 0 elapsed = time.time() - start_time fps = step / elapsed lr = self.agent.optimizer.param_groups[0]["lr"] print(f"Step:{step:>10,} | Ep:{ep_count:>5} | AvgR:{avg_r:>7.1f} | " f"Eps:{self.agent.epsilon:.3f} | LR:{lr:.2e} | FPS:{fps:.0f}", flush=True) if step % self.eval_freq == 0 and step > 0: eval_r = self.evaluate() self.eval_rewards.append((step, eval_r)) print(f"\n[评估] Step:{step:>10,} | 平均回报:{eval_r:.1f}\n", flush=True) if eval_r > self.best_eval_reward: self.best_eval_reward = eval_r self.agent.save(f"{self.save_dir}/dqn_best.pt") print(f"最佳模型已更新 (回报: {eval_r:.1f})", flush=True) if step % self.save_freq == 0 and step > 0: self.agent.save(f"{self.save_dir}/dqn_step_{step}.pt") total_time = time.time() - start_time print("\n" + "=" * 60, flush=True) print(f"训练完成!总时间: {total_time:.1f} 秒 | FPS: {total_steps/total_time:.0f}", flush=True) print(f"最佳评估回报: {self.best_eval_reward:.1f}", flush=True) self.agent.save(f"{self.save_dir}/dqn_final.pt") print("训练器就绪", flush=True) def main(): # ── 超参数 ── ENV_ID = "ALE/SpaceInvaders-v5" N_ENVS = 64 TOTAL_STEPS = 2_000_000 LR = 1e-4 GAMMA = 0.99 BATCH_SIZE = 2048 BUFFER_SIZE = 1_000_000 EPSILON_START = 1.0 EPSILON_END = 0.01 EPSILON_DECAY = 4_000_000 TARGET_UPDATE = 5000 LR_DECAY_STEPS = 5_000_000 LR_DECAY_FACTOR = 0.5 WARMUP_STEPS = 50_000 EVAL_FREQ = 200000 EVAL_EPISODES = 10 SAVE_FREQ = 500000 SEED = 42 SAVE_DIR = os.path.join(PROJECT_ROOT, "models") TRAIN_STEPS_PER_UPDATE = 4 USE_AMP = True USE_COMPILE = True USE_DUELING = True USE_DOUBLE = True USE_PER = True os.makedirs(SAVE_DIR, exist_ok=True) print(f"配置: {TOTAL_STEPS/1e6:.0f}M 步, {N_ENVS} 并行环境", flush=True) print(f"每步训练 {TRAIN_STEPS_PER_UPDATE} 次, Batch {BATCH_SIZE}", flush=True) print(f"AMP: {USE_AMP}, torch.compile: {USE_COMPILE}", flush=True) print(f"模型保存: {SAVE_DIR}", flush=True) torch.manual_seed(SEED) np.random.seed(SEED) import platform device = get_device() print(f"使用设备: {device}", flush=True) from gymnasium.vector import SyncVectorEnv env_fns = [_make_env_fn(ENV_ID) for _ in range(N_ENVS)] envs = SyncVectorEnv(env_fns) print(f"SyncVectorEnv: {envs.num_envs} 个环境", flush=True) eval_env = make_env(ENV_ID, gray_scale=True, resize=True, frame_stack=4) num_actions = envs.single_action_space.n print(f"动作空间: {num_actions}", flush=True) state_shape = (4, 84, 84) if USE_DUELING: q_network = DuelingQNetwork(state_shape, num_actions).to(device) target_network = DuelingQNetwork(state_shape, num_actions).to(device) print(f"Dueling DQN: {sum(p.numel() for p in q_network.parameters()):,} 参数", flush=True) else: q_network = QNetwork(state_shape, num_actions).to(device) target_network = QNetwork(state_shape, num_actions).to(device) print(f"标准 DQN: {sum(p.numel() for p in q_network.parameters()):,} 参数", flush=True) if USE_COMPILE and hasattr(torch, 'compile'): print("应用 torch.compile 加速...", flush=True) q_network = torch.compile(q_network) target_network = torch.compile(target_network) print("torch.compile 完成", flush=True) target_network.load_state_dict(q_network.state_dict()) target_network.eval() if USE_PER: replay_buffer = PrioritizedReplayBuffer(BUFFER_SIZE, state_shape, device) print("优先经验回放 (Pinned Memory)", flush=True) else: replay_buffer = ReplayBuffer(BUFFER_SIZE, state_shape, device) print("标准经验回放 (Pinned Memory)", flush=True) agent = DQNAgent( q_network=q_network, target_network=target_network, replay_buffer=replay_buffer, device=device, num_actions=num_actions, gamma=GAMMA, lr=LR, epsilon_start=EPSILON_START, epsilon_end=EPSILON_END, epsilon_decay_steps=EPSILON_DECAY, target_update_freq=TARGET_UPDATE, batch_size=BATCH_SIZE, double_dqn=USE_DOUBLE, lr_decay_steps=LR_DECAY_STEPS, lr_decay_factor=LR_DECAY_FACTOR, warmup_steps=WARMUP_STEPS, use_amp=USE_AMP, ) print(f"Agent 创建完成 (AMP: {USE_AMP})", flush=True) trainer = ParallelTrainer( agent=agent, envs=envs, eval_env=eval_env, num_envs=N_ENVS, save_dir=SAVE_DIR, eval_freq=EVAL_FREQ, save_freq=SAVE_FREQ, num_eval_episodes=EVAL_EPISODES, warmup_steps=WARMUP_STEPS, train_steps_per_update=TRAIN_STEPS_PER_UPDATE, ) print("\n" + "=" * 60, flush=True) print(f"开始 10M 步并行训练(全优化版)", flush=True) print(f" GPU: {device}", flush=True) print(f" 并行环境: {N_ENVS}", flush=True) print(f" Batch Size: {BATCH_SIZE}", flush=True) print(f" 每步训练: {TRAIN_STEPS_PER_UPDATE} 次", flush=True) print(f" AMP 混合精度: {USE_AMP}", flush=True) print(f" torch.compile: {USE_COMPILE}", flush=True) print(f" Dueling: {USE_DUELING}", flush=True) print(f" Double DQN: {USE_DOUBLE}", flush=True) print(f" PER: {USE_PER}", flush=True) print("=" * 60 + "\n", flush=True) trainer.train(TOTAL_STEPS) # ── 评估最佳模型 ── print("\n加载最佳模型...", flush=True) agent.load(f"{SAVE_DIR}/dqn_best.pt") print("\n评估中...", flush=True) all_rewards = [] for i in range(20): state, _ = eval_env.reset() ep_r = 0 done = False while not done: action = agent.select_action(state, evaluate=True) state, reward, terminated, truncated, _ = eval_env.step(action) done = terminated or truncated ep_r += reward all_rewards.append(ep_r) print(f" Episode {i+1:>2}: {ep_r:.1f}", flush=True) print(f"\n结果: 平均 {np.mean(all_rewards):.2f} +/- {np.std(all_rewards):.2f}", flush=True) print(f"最佳: {max(all_rewards):.1f} | 最差: {min(all_rewards):.1f}", flush=True) print(f"中位数: {np.median(all_rewards):.1f}", flush=True) if __name__ == "__main__": main()