PPO代码部分,训练连续动作

1.  导入必须要的包

import torch
import torch.nn as nn
import torch.nn.functional as f
import torch.optim as optim
from torch.distributions import Normal
from collections import deque
import matplotlib.pyplot as plt
import numpy as np
import gym
import copy

2.  定义actor网络和critic网络

class actor_net(nn.Module):

    def __init__(self, state_n, action_n, hidden_n):
        super(actor_net, self).__init__()
        self.fc1 = nn.Linear(state_n, hidden_n)
        self.fc2 = nn.Linear(hidden_n, hidden_n)

        self.mu = nn.Linear(hidden_n, action_n)
        self.sigma = nn.Linear(hidden_n, action_n)

    def forward(self, x):
        x = f.relu(self.fc1(x))
        x = f.relu(self.fc2(x))
        mu = torch.tanh(self.mu(x)) * 2
        sigma = f.softmax(self.sigma(x), dim = 1) + 0.001
        return mu, sigma


class critic_net(nn.Module):

    def __init__(self, state_n, hidden_n):
        super(critic_net, self).__init__()
        self.fc1 = nn.Linear(state_n, hidden_n)
        self.fc2 = nn.Linear(hidden_n, hidden_n)
        self.fc3 = nn.Linear(hidden_n, 1)

    def forward(self, x):
        x = f.relu(self.fc1(x))
        x = f.relu(self.fc2(x))
        value = self.fc3(x)
        return value

3.  定义存储数据的buffer,包括buffer添加数据,采样数据,清空数据

class buffer(object):
    def __init__(self, length):
        self.buffer_length = length
        self.buffer = deque(maxlen = self.buffer_length)
    def push(self, trans):
        self.buffer.append(trans)
    def sample(self):
        batch = list(self.buffer)
        return zip(*batch)
    def clear(self):
        self.buffer.clear()
    def length(self):
        return len(self.buffer)

4.  定义config

class config():
    def __init__(self):
        self.env_name = 'Pendulum-v1'
        self.train_eps = 10000
        self.test_eps = 20
        self.max_step = 200
        self.eval_eps = 5
        self.eval_per_ep = 10
        self.gamma = 0.99
        self.actor_lr = 2e-5
        self.critic_lr = 2e-5
        self.buffer_length = 128
        self.eps_clip = 0.2
        self.lam = 0.95
        self.batch_size = 128
        self.update_n = 8
        self.hidden_n = 256
        self.seed = 1
        self.device = 'cpu'

5.  定义PPO,包括根据状态采样动作,PPO更新

class PPO():

    def __init__(self, cfg):

        self.device = torch.device(cfg.device)
        self.old_actor = actor_net(cfg.state_n, cfg.action_n, cfg.hidden_n).to(self.device)

        self.old_critic = critic_net(cfg.state_n, cfg.hidden_n).to(self.device)
        self.new_actor = actor_net(cfg.state_n, cfg.action_n, cfg.hidden_n).to(self.device)
        self.new_critic = critic_net(cfg.state_n, cfg.hidden_n).to(self.device)

        self.actor_optim = optim.Adam(self.new_actor.parameters(), lr=cfg.actor_lr)
        self.critic_optim = optim.Adam(self.new_critic.parameters(), lr=cfg.critic_lr)
        self.memory = buffer(cfg.buffer_length)


    def sample_action(self, state):

        with torch.no_grad():
            state = torch.tensor(state, device=self.device).unsqueeze(dim=0)
            mu, sigma = self.old_actor(state)

            dist = Normal(mu, sigma)
            action = dist.sample()

        return [action.item()]


    def update(self):
        if self.memory.length() < cfg.batch_size:
            return
        states, actions, rewards, states_, dones = self.memory.sample()
        states = torch.tensor(np.array(states), device=self.device)
        actions = torch.tensor(np.array(actions), device=self.device)
        rewards = torch.tensor(np.array(rewards), device=self.device).reshape(-1, 1).float()
        states_ = torch.tensor(np.array(states_), device=self.device)
        dones = torch.tensor(np.array(dones), device=self.device).reshape(-1, 1)

        for _ in range(cfg.update_n):

            td_target = rewards + cfg.gamma * self.old_critic(states_) * ~dones
            td_target = td_target.float()
            mu, sigma = self.old_actor(states)
            old_dis = Normal(mu, sigma)
            old_log_prob = old_dis.log_prob(actions)

            td_error = rewards + cfg.gamma * self.new_critic(states_) * ~dones - self.new_critic(states)
            td_error = td_error.detach().numpy()
            advantage = []
            adv = 0
            for td in td_error[::-1]:
                adv = adv * cfg.gamma * cfg.lam + td[0]
                advantage.append(adv)
            advantage.reverse()

            advantage = torch.tensor(advantage).reshape(-1, 1)

            new_mu, new_sigma = self.new_actor(states)
            new_dist = Normal(new_mu, new_sigma)
            new_log_prob = new_dist.log_prob(actions)
            ratio = torch.exp(new_log_prob - old_log_prob)

            surr1 = ratio * advantage
            surr2 = torch.clamp(ratio, 1 - cfg.eps_clip, 1 + cfg.eps_clip) * advantage
            actor_loss = -torch.min(surr1, surr2).mean()
            critic_loss = f.mse_loss(td_target.detach(), self.new_critic(states))

            self.actor_optim.zero_grad()
            self.critic_optim.zero_grad()
            actor_loss.backward()
            critic_loss.backward()
            self.actor_optim.step()
            self.critic_optim.step()
        self.old_critic.load_state_dict(self.new_critic.state_dict())
        self.old_actor.load_state_dict(self.new_actor.state_dict())
        self.memory.clear()

6.  定义环境和智能体

def get_env_agent(cfg):
    env = gym.make(cfg.env_name)
    state_n = env.observation_space.shape[0]
    action_n = env.action_space.shape[0]
    print('状态空间维度:', state_n)
    print('动作空间维度:', action_n)
    setattr(cfg, 'state_n', state_n)
    setattr(cfg, 'action_n', action_n)
    agent = PPO(cfg)
    return env, agent

7.  定义训练

def train(cfg, env, agent):
    print('train')
    rewards = []
    steps = []
    best_ep_reward = -10000
    output_agent = None
    for ep_i in range(cfg.train_eps):
        ep_reward = 0
        ep_step = 0
        state = env.reset(seed = cfg.seed)
        for _ in range(cfg.max_step):
            ep_step += 1
            action = agent.sample_action(state)
            next_state, reward, done, _ = env.step(action)
            agent.memory.push((state, action, (reward + 8.1) / 8.1, next_state, done))
            state = next_state
            agent.update()
            ep_reward += reward
            if done:
                break
        if (ep_i + 1) % cfg.eval_per_ep == 0:
            sum_eval_reward = 0
            for _ in range(cfg.eval_eps):
                eval_ep_reward = 0
                state = env.reset()
                for _ in range(cfg.max_step):
                    action = agent.sample_action(state)
                    next_state, reward, done, _ = env.step(action)
                    state = next_state
                    eval_ep_reward += reward
                    if done:
                        break
                sum_eval_reward += eval_ep_reward
            mean_eval_reward = sum_eval_reward / cfg.eval_eps
            if mean_eval_reward > best_ep_reward:
                best_ep_reward = mean_eval_reward
                output_agent = copy.deepcopy(agent)
                print('train ep_i:%d/%d, rewards:%f, mean_eval_reward:%f, best_ep_reward:%f, update model'%(ep_i + 1, cfg.train_eps, ep_reward, mean_eval_reward, best_ep_reward))
            else:
                print('train ep_i:%d/%d, rewards:%f, mean_eval_reward:%f, best_ep_reward:%f'%(ep_i + 1, cfg.train_eps, ep_reward, mean_eval_reward, best_ep_reward))
        steps.append(ep_step)
        rewards.append(ep_reward)
    env.close()
    return output_agent, rewards

8.  定义测试

def test(cfg, env, agent):
    print('test')
    rewards = []
    steps = []
    for ep_i in range(cfg.test_eps):
        ep_reward = 0
        ep_step = 0
        state = env.reset()
        for _ in range(cfg.max_step):
            ep_step += 1
            action = agent.sample_action(state)
            next_state, reward, done, _ = env.step(action)
            state = next_state
            ep_reward += reward
            if done:
                break
        steps.append(ep_step)
        rewards.append(ep_reward)
        print('test ep_i:%d, reward:%f'%(ep_i + 1, ep_reward))
    env.close()
    return rewards

9.  定义画图

def smooth(data, weight = 0.9):
    last = data[0]
    smoothed = []
    for point in data:
        smoothed_val = last * weight + (1 - weight) * point
        smoothed.append(smoothed_val)
        last = smoothed_val
    return smoothed

10.  main函数

if __name__ == '__main__':
    cfg = config()
    env, agent = get_env_agent(cfg)
    better_agent, train_rewards = train(cfg, env, agent)
    plt.figure()
    plt.title('training rewards')
    plt.plot(train_rewards, label='train_rewards')
    plt.plot(smooth(train_rewards), label='train_smooth_rewards')

    test_rewards = test(cfg, env, better_agent)
    plt.figure()
    plt.title('testing rewards')
    plt.plot(test_rewards, label='test_rewards')
    plt.plot(smooth(test_rewards), label='test_smooth_ewards')
    plt.show()

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐