"""Train PPO on CarRacing-v3 with vectorised envs (parallel rollout). Usage (Windows): python train_vec.py --n-envs 4 --total-steps 10000 --run-name vec_smoke python train_vec.py --n-envs 4 --total-steps 500000 --run-name vec_main \ --anneal-lr --anneal-ent --reward-clip 1.0 Usage (Linux server with RTX 5090): python train_vec.py --n-envs 16 --total-steps 2000000 --run-name vec_main \ --n-steps 512 --batch-size 512 --n-epochs 10 \ --anneal-lr --anneal-ent --reward-clip 1.0 --use-amp The ``if __name__ == "__main__"`` guard at the bottom is mandatory on Windows for AsyncVectorEnv (otherwise child processes infinite-spawn). """ import argparse import time from collections import deque from pathlib import Path import numpy as np import torch from torch.utils.tensorboard import SummaryWriter from src.ppo_agent import PPOAgent from src.utils import format_seconds, set_seed from src.vec_env_wrappers import make_vec_env from src.vec_rollout_buffer import VecRolloutBuffer def parse_args(): p = argparse.ArgumentParser() p.add_argument("--total-steps", type=int, default=2_000_000) p.add_argument("--n-envs", type=int, default=16) p.add_argument("--n-steps", type=int, default=512) p.add_argument("--n-epochs", type=int, default=10) p.add_argument("--batch-size", type=int, default=512) p.add_argument("--lr", type=float, default=2.5e-4) p.add_argument("--gamma", type=float, default=0.99) p.add_argument("--lam", type=float, default=0.95) p.add_argument("--clip", type=float, default=0.2) p.add_argument("--ent-coef", type=float, default=0.01) p.add_argument("--vf-coef", type=float, default=0.5) p.add_argument("--max-grad-norm", type=float, default=0.5) p.add_argument("--seed", type=int, default=42) p.add_argument("--run-name", type=str, default="ppo_vec_main") p.add_argument("--save-every-iters", type=int, default=20) p.add_argument("--anneal-lr", action="store_true") p.add_argument("--anneal-ent", action="store_true") p.add_argument("--reward-clip", type=float, default=None) p.add_argument("--ent-floor", type=float, default=0.0, help="Lower bound on ent_coef when --anneal-ent is on") p.add_argument("--clip-floor", type=float, default=None, help="Linearly anneal clip range to this floor (e.g. 0.05). " "None disables clip annealing.") p.add_argument("--target-kl", type=float, default=None, help="Stop the current update epoch early if mean approx_kl " "exceeds 1.5 * target_kl. None disables. SB3 default 0.015.") p.add_argument("--use-data-aug", action="store_true", help="Apply random-shift augmentation to obs during PPO update") p.add_argument("--sync-mode", action="store_true", help="Use SyncVectorEnv (debug mode)") p.add_argument("--use-amp", action="store_true", help="Use AMP mixed precision training for GPU acceleration") p.add_argument("--use-compile", action="store_true", help="Use torch.compile for JIT compilation acceleration") return p.parse_args() def main(): args = parse_args() project_root = Path(__file__).resolve().parent run_dir = project_root / "runs" / args.run_name ckpt_dir = project_root / "models" / args.run_name run_dir.mkdir(parents=True, exist_ok=True) ckpt_dir.mkdir(parents=True, exist_ok=True) set_seed(args.seed) # Throughput tweak: let cuDNN auto-pick the fastest conv algorithm # for our fixed (B, 4, 84, 84) input shape. torch.backends.cudnn.benchmark = True vec_env = make_vec_env( n_envs=args.n_envs, seed=args.seed, async_mode=not args.sync_mode, ) agent = PPOAgent( n_actions=5, lr=args.lr, clip=args.clip, vf_coef=args.vf_coef, ent_coef=args.ent_coef, max_grad_norm=args.max_grad_norm, n_epochs=args.n_epochs, batch_size=args.batch_size, anneal_lr=args.anneal_lr, anneal_ent=args.anneal_ent, clip_floor=args.clip_floor, target_kl=args.target_kl, use_data_aug=args.use_data_aug, use_amp=args.use_amp, ) # torch.compile JIT 编译加速 if args.use_compile and hasattr(torch, 'compile'): print("应用 torch.compile 加速...") agent.net = torch.compile(agent.net) print("torch.compile 完成") buffer = VecRolloutBuffer( n_steps=args.n_steps, n_envs=args.n_envs, obs_shape=(4, 84, 84), device=agent.device, ) writer = SummaryWriter(str(run_dir)) samples_per_iter = args.n_steps * args.n_envs print("=" * 60) print(f"Run: {args.run_name}") mode_str = "sync" if args.sync_mode else "async" print(f"Mode: {mode_str} vec env, n_envs={args.n_envs}") print(f"Total steps: {args.total_steps:,}") print(f"Per-iter samples: {samples_per_iter} (n_steps={args.n_steps} x n_envs={args.n_envs})") print(f"lr={args.lr} gamma={args.gamma} lam={args.lam} clip={args.clip}") print(f"anneal_lr={args.anneal_lr} anneal_ent={args.anneal_ent} " f"ent_floor={args.ent_floor} reward_clip={args.reward_clip}") print(f"clip_floor={args.clip_floor} target_kl={args.target_kl} " f"use_data_aug={args.use_data_aug}") print(f"n_epochs={args.n_epochs} batch_size={args.batch_size}") print(f"AMP: {args.use_amp}") print(f"Compile: {args.use_compile}") print(f"Device: {agent.device}") print(f"Logs: {run_dir}") print(f"Ckpts: {ckpt_dir}") print("=" * 60) obs, _ = vec_env.reset(seed=args.seed) next_done = np.zeros(args.n_envs, dtype=np.float32) global_step = 0 iteration = 0 episode_returns = deque(maxlen=100) cur_ep_returns = np.zeros(args.n_envs, dtype=np.float32) cur_ep_lens = np.zeros(args.n_envs, dtype=np.int64) start_time = time.time() while global_step < args.total_steps: iteration += 1 agent.step_schedule( global_step / args.total_steps, ent_floor=args.ent_floor, ) # Rollout (n_steps per env, total samples = n_steps * n_envs) for step in range(args.n_steps): actions, log_probs, values = agent.act_batch(obs) next_obs, rewards, terms, truncs, _ = vec_env.step(actions) done = np.logical_or(terms, truncs).astype(np.float32) train_rewards = ( np.maximum(rewards, -args.reward_clip) if args.reward_clip is not None else rewards ) # Use CleanRL convention: dones[step] = was obs[step] a fresh start buffer.add(obs, actions, log_probs, train_rewards, values, next_done) cur_ep_returns += rewards cur_ep_lens += 1 for i in range(args.n_envs): if done[i]: episode_returns.append(float(cur_ep_returns[i])) writer.add_scalar("episode/return", cur_ep_returns[i], global_step) writer.add_scalar("episode/length", cur_ep_lens[i], global_step) cur_ep_returns[i] = 0.0 cur_ep_lens[i] = 0 obs = next_obs next_done = done global_step += args.n_envs # GAE last_value = agent.evaluate_value_batch(obs) buffer.compute_gae( last_value=last_value, last_done=next_done, gamma=args.gamma, lam=args.lam, ) # Update losses = agent.update_vec(buffer) for k, v in losses.items(): writer.add_scalar(f"losses/{k}", v, global_step) elapsed = time.time() - start_time steps_per_sec = global_step / max(elapsed, 1e-6) avg_ret = sum(episode_returns) / len(episode_returns) if episode_returns else 0.0 writer.add_scalar("perf/steps_per_sec", steps_per_sec, global_step) writer.add_scalar("episode/avg_return_100", avg_ret, global_step) writer.add_scalar("hp/lr", agent.optim.param_groups[0]["lr"], global_step) writer.add_scalar("hp/ent_coef", agent.ent_coef, global_step) writer.add_scalar("hp/clip", agent.clip, global_step) epochs_done = int(losses.get("epochs_completed", args.n_epochs)) early = losses.get("early_stopped", 0.0) > 0.5 mark = "*" if early else " " print( f"iter {iteration:4d} | step {global_step:>9,} | " f"avg_ret(100) {avg_ret:7.2f} | " f"pg {losses['policy_loss']:+.4f} | " f"v {losses['value_loss']:7.3f} | " f"ent {losses['entropy']:.3f} | " f"kl {losses['approx_kl']:.4f} | " f"clip {agent.clip:.3f} | " f"clip% {losses['clip_frac']:.2%} | " f"ep {epochs_done}{mark}/{args.n_epochs} | " f"sps {steps_per_sec:5.0f} | " f"{format_seconds(elapsed)}" ) if iteration % args.save_every_iters == 0: agent.save(str(ckpt_dir / f"iter_{iteration:04d}.pt")) buffer.reset() final_path = ckpt_dir / "final.pt" agent.save(str(final_path)) print(f"\nTraining done. Final model: {final_path}") writer.close() vec_env.close() if __name__ == "__main__": main()