"""Improved training script for CarRacing-v3 PPO with reward shaping.""" import os import time import argparse import numpy as np import torch import torch.nn as nn from torch.utils.tensorboard import SummaryWriter from collections import deque import gymnasium as gym import cv2 class RewardShapingWrapper(gym.Wrapper): def __init__(self, env): super().__init__(env) self.steps_on_track = 0 def reset(self, **kwargs): obs, info = self.env.reset(**kwargs) self.steps_on_track = 0 return obs, info def step(self, action): obs, reward, terminated, truncated, info = self.env.step(action) done = terminated or truncated shaped_reward = reward if info.get("speed", 0) > 0.1: shaped_reward += info["speed"] * 0.1 if not info.get("offtrack", False): shaped_reward += 0.1 self.steps_on_track += 1 else: shaped_reward -= 0.5 self.steps_on_track = 0 if info.get("lap_complete", False): shaped_reward += 100 return obs, shaped_reward, terminated, truncated, info class GrayScaleWrapper(gym.ObservationWrapper): def __init__(self, env): super().__init__(env) def observation(self, obs): gray = 0.299 * obs[:, :, 0] + 0.587 * obs[:, :, 1] + 0.114 * obs[:, :, 2] return gray.astype(np.uint8) class ResizeWrapper(gym.ObservationWrapper): def __init__(self, env, size=(84, 84)): super().__init__(env) self.size = size def observation(self, obs): return cv2.resize(obs, self.size, interpolation=cv2.INTER_AREA) class FrameStackWrapper(gym.ObservationWrapper): def __init__(self, env, num_stack=4): super().__init__(env) self.num_stack = num_stack self.frames = deque(maxlen=num_stack) obs_shape = env.observation_space.shape self.observation_space = gym.spaces.Box( low=0, high=255, shape=(num_stack, *obs_shape[-2:]), dtype=np.uint8 ) def reset(self, **kwargs): obs, info = self.env.reset(**kwargs) for _ in range(self.num_stack): self.frames.append(obs) return self._get_observation(), info def observation(self, obs): self.frames.append(obs) return self._get_observation() def _get_observation(self): return np.stack(list(self.frames), axis=0) def make_env(env_id="CarRacing-v3", gray_scale=True, resize=True, frame_stack=4): env = gym.make(env_id, render_mode="rgb_array") if resize: env = ResizeWrapper(env, size=(84, 84)) if gray_scale: env = GrayScaleWrapper(env) if frame_stack > 1: env = FrameStackWrapper(env, num_stack=frame_stack) env = RewardShapingWrapper(env) return env def get_device(): if torch.cuda.is_available(): device = torch.device("cuda") print(f"Using GPU: {torch.cuda.get_device_name(0)}") else: device = torch.device("cpu") print("Using CPU") return device class Actor(nn.Module): def __init__(self, state_shape=(84, 84, 4), action_dim=3): super().__init__() c, h, w = state_shape[2], state_shape[0], state_shape[1] self.conv = nn.Sequential( nn.Conv2d(c, 32, kernel_size=8, stride=4), nn.LeakyReLU(0.2), nn.BatchNorm2d(32), nn.Conv2d(32, 64, kernel_size=4, stride=2), nn.LeakyReLU(0.2), nn.BatchNorm2d(64), nn.Conv2d(64, 64, kernel_size=3, stride=1), nn.LeakyReLU(0.2), ) out_h = (h - 8) // 4 + 1 out_h = (out_h - 4) // 2 + 1 out_h = (out_h - 3) // 1 + 1 feat_size = 64 * out_h * out_h self.fc = nn.Sequential( nn.Linear(feat_size, 512), nn.LeakyReLU(0.2), ) self.mu_head = nn.Linear(512, action_dim) self.log_std_head = nn.Linear(512, action_dim) for m in self.modules(): if isinstance(m, (nn.Conv2d, nn.Linear)): nn.init.orthogonal_(m.weight, gain=np.sqrt(2)) if m.bias is not None: nn.init.constant_(m.bias, 0) nn.init.orthogonal_(self.mu_head.weight, gain=0.01) nn.init.orthogonal_(self.log_std_head.weight, gain=0.01) def forward(self, x): x = x / 255.0 x = self.conv(x) x = x.view(x.size(0), -1) x = self.fc(x) mu = torch.tanh(self.mu_head(x)) log_std = torch.clamp(self.log_std_head(x), -20, 2) return mu, log_std.exp() class Critic(nn.Module): def __init__(self, state_shape=(84, 84, 4)): super().__init__() c, h, w = state_shape[2], state_shape[0], state_shape[1] self.conv = nn.Sequential( nn.Conv2d(c, 32, kernel_size=8, stride=4), nn.LeakyReLU(0.2), nn.BatchNorm2d(32), nn.Conv2d(32, 64, kernel_size=4, stride=2), nn.LeakyReLU(0.2), nn.BatchNorm2d(64), nn.Conv2d(64, 64, kernel_size=3, stride=1), nn.LeakyReLU(0.2), ) out_h = (h - 8) // 4 + 1 out_h = (out_h - 4) // 2 + 1 out_h = (out_h - 3) // 1 + 1 feat_size = 64 * out_h * out_h self.fc = nn.Sequential(nn.Linear(feat_size, 512), nn.LeakyReLU(0.2), nn.Linear(512, 1)) for m in self.modules(): if isinstance(m, (nn.Conv2d, nn.Linear)): nn.init.orthogonal_(m.weight, gain=np.sqrt(2)) if m.bias is not None: nn.init.constant_(m.bias, 0) def forward(self, x): x = x / 255.0 x = self.conv(x) x = x.view(x.size(0), -1) return self.fc(x) class RolloutBuffer: def __init__(self, buffer_size, state_shape, action_dim): self.buffer_size = buffer_size self.ptr = 0 self.size = 0 self.states = np.zeros((buffer_size, *state_shape), dtype=np.uint8) self.actions = np.zeros((buffer_size, action_dim), dtype=np.float32) self.rewards = np.zeros(buffer_size, dtype=np.float32) self.dones = np.zeros(buffer_size, dtype=np.bool_) self.values = np.zeros(buffer_size, dtype=np.float32) self.log_probs = np.zeros(buffer_size, dtype=np.float32) def add(self, state, action, reward, done, value, log_prob): self.states[self.ptr] = state self.actions[self.ptr] = action self.rewards[self.ptr] = reward self.dones[self.ptr] = done self.values[self.ptr] = value self.log_probs[self.ptr] = log_prob self.ptr = (self.ptr + 1) % self.buffer_size self.size = min(self.size + 1, self.buffer_size) def compute_returns(self, last_value, gamma=0.99, gae_lambda=0.98): advantages = np.zeros(self.size, dtype=np.float32) last_gae = 0 for t in reversed(range(self.size)): if t == self.size - 1: next_value = last_value else: next_value = self.values[t + 1] delta = self.rewards[t] + gamma * next_value * (1 - self.dones[t]) - self.values[t] last_gae = delta + gamma * gae_lambda * (1 - self.dones[t]) * last_gae advantages[t] = last_gae returns = advantages + self.values[: self.size] return returns, advantages def get(self): return ( self.states[: self.size], self.actions[: self.size], self.rewards[: self.size], self.dones[: self.size], self.values[: self.size], self.log_probs[: self.size], ) def reset(self): self.ptr = 0 self.size = 0 class PPOTrainer: def __init__( self, actor, critic, rollout_buffer, device, clip_eps=0.1, gamma=0.99, gae_lambda=0.98, lr=3e-4, ent_coef=0.005, vf_coef=0.75, max_grad_norm=0.5, ppo_epochs=10, mini_batch_size=128, ): self.actor = actor self.critic = critic self.buffer = rollout_buffer self.device = device self.clip_eps = clip_eps self.gamma = gamma self.gae_lambda = gae_lambda self.ent_coef = ent_coef self.vf_coef = vf_coef self.max_grad_norm = max_grad_norm self.ppo_epochs = ppo_epochs self.mini_batch_size = mini_batch_size self.actor_optim = torch.optim.Adam(actor.parameters(), lr=lr, eps=1e-5) self.critic_optim = torch.optim.Adam(critic.parameters(), lr=lr, eps=1e-5) def update(self, last_value): states, actions, rewards, dones, values, log_probs_old = self.buffer.get() returns, advantages = self.buffer.compute_returns(last_value, self.gamma, self.gae_lambda) advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8) states_t = torch.from_numpy(states).float().permute(0, 3, 1, 2).to(self.device) actions_t = torch.from_numpy(actions).float().to(self.device) log_probs_old_t = torch.from_numpy(log_probs_old).float().to(self.device) returns_t = torch.from_numpy(returns).float().to(self.device) advantages_t = torch.from_numpy(advantages).float().to(self.device) dataset = torch.utils.data.TensorDataset( states_t, actions_t, log_probs_old_t, returns_t, advantages_t ) loader = torch.utils.data.DataLoader(dataset, batch_size=self.mini_batch_size, shuffle=True) total_actor_loss = 0 total_critic_loss = 0 total_entropy = 0 count = 0 for _ in range(self.ppo_epochs): for batch in loader: s, a, log_pi_old, ret, adv = batch mu, std = self.actor(s) dist = torch.distributions.Normal(mu, std) log_pi = dist.log_prob(a).sum(dim=-1) entropy = dist.entropy().sum(dim=-1) ratio = torch.exp(log_pi - log_pi_old) surr1 = ratio * adv surr2 = torch.clamp(ratio, 1 - self.clip_eps, 1 + self.clip_eps) * adv actor_loss = -torch.min(surr1, surr2).mean() value = self.critic(s) critic_loss = nn.MSELoss()(value.squeeze(), ret) loss = actor_loss + self.vf_coef * critic_loss - self.ent_coef * entropy.mean() self.actor_optim.zero_grad() self.critic_optim.zero_grad() loss.backward() nn.utils.clip_grad_norm_(self.actor.parameters(), self.max_grad_norm) nn.utils.clip_grad_norm_(self.critic.parameters(), self.max_grad_norm) self.actor_optim.step() self.critic_optim.step() total_actor_loss += actor_loss.item() total_critic_loss += critic_loss.item() total_entropy += entropy.mean().item() count += 1 avg_actor = total_actor_loss / count avg_critic = total_critic_loss / count avg_entropy = total_entropy / count self.buffer.reset() return avg_actor, avg_critic, avg_entropy def collect_rollout(actor, critic, env, buffer, device, rollout_steps): obs, _ = env.reset() obs = np.transpose(obs, (1, 2, 0)) for step in range(rollout_steps): obs_t = torch.from_numpy(obs).float().unsqueeze(0).permute(0, 3, 1, 2).to(device) with torch.no_grad(): mu, std = actor(obs_t) dist = torch.distributions.Normal(mu, std) action = dist.sample() action = torch.clamp(action, -1, 1) log_prob = dist.log_prob(action).sum(dim=-1) value = critic(obs_t).squeeze(0).item() action_np = action.squeeze(0).cpu().numpy() log_prob_np = log_prob.squeeze(0).cpu().numpy() next_obs, reward, terminated, truncated, _ = env.step(action_np) done = terminated or truncated next_obs_stored = np.transpose(next_obs, (1, 2, 0)) buffer.add(obs.copy(), action_np, reward, done, value, log_prob_np) obs = next_obs_stored if done: obs, _ = env.reset() obs = np.transpose(obs, (1, 2, 0)) return obs def train( total_steps=2000000, rollout_steps=2048, eval_interval=10, save_interval=50, device=None, ): if device is None: device = get_device() env = make_env() eval_env = make_env() state_shape = (84, 84, 4) action_dim = 3 actor = Actor(state_shape=state_shape, action_dim=action_dim).to(device) critic = Critic(state_shape=state_shape).to(device) buffer = RolloutBuffer( buffer_size=rollout_steps, state_shape=state_shape, action_dim=action_dim, ) trainer = PPOTrainer( actor=actor, critic=critic, rollout_buffer=buffer, device=device, clip_eps=0.1, gamma=0.99, gae_lambda=0.98, lr=3e-4, ent_coef=0.005, vf_coef=0.75, max_grad_norm=0.5, ppo_epochs=10, mini_batch_size=128, ) log_dir = os.path.join("logs", "tensorboard", f"run_improved_{int(time.time())}") writer = SummaryWriter(log_dir) print(f"Training on {device}") print(f"Log directory: {log_dir}") print("Improvements: LeakyReLU, BatchNorm, He init, Reward shaping, More epochs") episode = 0 total_timesteps = 0 episode_rewards = [] best_eval = -float("inf") while total_timesteps < total_steps: obs = collect_rollout(actor, critic, env, buffer, device, rollout_steps) with torch.no_grad(): obs_t = torch.from_numpy(obs).float().unsqueeze(0).permute(0, 3, 1, 2).to(device) last_value = critic(obs_t).squeeze(0).item() actor_loss, critic_loss, entropy = trainer.update(last_value) writer.add_scalar("Loss/Actor", actor_loss, total_timesteps) writer.add_scalar("Loss/Critic", critic_loss, total_timesteps) writer.add_scalar("Loss/Entropy", entropy, total_timesteps) total_timesteps += rollout_steps episode += 1 ep_reward = buffer.rewards[: buffer.size].sum() episode_rewards.append(ep_reward) recent_rewards = episode_rewards[-10:] if len(episode_rewards) >= 10 else episode_rewards avg_reward = np.mean(recent_rewards) writer.add_scalar("Reward/Episode", ep_reward, total_timesteps) writer.add_scalar("Reward/AvgLast10", avg_reward, total_timesteps) print( f"Episode {episode}, steps {total_timesteps}, ep_reward={ep_reward:.1f}, avg_10={avg_reward:.1f}" ) if episode % eval_interval == 0: eval_returns = [] for _ in range(5): eval_obs, _ = eval_env.reset() eval_obs = np.transpose(eval_obs, (1, 2, 0)) eval_reward = 0 done = False while not done: with torch.no_grad(): eval_obs_t = ( torch.from_numpy(eval_obs) .float() .unsqueeze(0) .permute(0, 3, 1, 2) .to(device) ) mu, std = actor(eval_obs_t) action = torch.clamp(mu, -1, 1).squeeze(0).cpu().numpy() eval_obs, reward, terminated, truncated, _ = eval_env.step(action) eval_obs = np.transpose(eval_obs, (1, 2, 0)) eval_reward += reward done = terminated or truncated eval_returns.append(eval_reward) mean_eval = np.mean(eval_returns) writer.add_scalar("Eval/MeanReturn", mean_eval, episode) print(f" Eval: mean_return={mean_eval:.2f}") if mean_eval > best_eval: best_eval = mean_eval os.makedirs("models", exist_ok=True) torch.save( { "actor": actor.state_dict(), "critic": critic.state_dict(), "episode": episode, "timesteps": total_timesteps, "best_eval": best_eval, }, os.path.join("models", "ppo_improved_best.pt"), ) print(f" New best model saved! eval={best_eval:.2f}") if episode % save_interval == 0: os.makedirs("models", exist_ok=True) torch.save( { "actor": actor.state_dict(), "critic": critic.state_dict(), "episode": episode, "timesteps": total_timesteps, }, os.path.join("models", f"ppo_improved_ep{episode}.pt"), ) print(f" Saved model at episode {episode}") os.makedirs("models", exist_ok=True) torch.save( { "actor": actor.state_dict(), "critic": critic.state_dict(), "episode": episode, "timesteps": total_timesteps, "best_eval": best_eval, }, os.path.join("models", "ppo_improved_final.pt"), ) writer.close() env.close() eval_env.close() print(f"Training complete! Total episodes: {episode}, Best eval: {best_eval:.2f}") if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--steps", type=int, default=2000000, help="Total training steps") parser.add_argument("--rollout", type=int, default=2048, help="Rollout buffer size") args = parser.parse_args() device = get_device() train(total_steps=args.steps, rollout_steps=args.rollout, device=device)