动手学强化学习reinforcement learning-chapter-nineteen-目标导向的强化学习

动手学强化学习​hrl.boyuai.com/

《动手学强化学习》(张伟楠 沈键 俞勇)【简介_书评_在线阅读】 - 当当图书 (dangdang.com)​product.dangdang.com/29391150.html

ZouJiu1/Hands-on-RL: https://hrl.boyuai.com/ (github.com)​github.com/ZouJiu1/Hands-on-RL/tree/main

简介

在这里插入图片描述
在这里插入图片描述

TPPO algorithm 最大化 update 前后两个策略的 J ( θ ) = E π 0 ( s 0 ) J(\theta)=E^{\pi_0}(s_0) J(θ)=Eπ0(s0) 的差值,也就是最大化残差的累加和,并且两个策略的KL散度要满足一定条件,不能太大的,导致步子太大扯着蛋。但是每个残差又要尽可能地小,来train价值网络的,保证策略收敛的,也就是最大化残差的和,同时要保证单个残差尽可能地小。

PPO algorithm和TPPO是类似的情况,加入了截断条件,也就是重要性采样的比值要在一定区间内,否则就截断,而且还用到了最小值。

SAC algorithm 加强了探索的,也就是最大熵强化学习,探索性越强的,能帮助学习相应的内容,动作价值的估计加入了熵值。

上面的PPO和SAC在面对复杂的复合任务的时候,表现往往不太好。

目标导向的强化学习 (goal-oriented reinforcement learning,GoRL),能解决较复杂的决策问题。

定义

悬崖漫步环境从不同坐标开始,都需要train不同的策略模型;机械臂抓取不同坐标的物体,都要train不同的策略模型,传统的不适用这样的情况。智能体需要做相似但不相同的任务的。

MDP定义: ( S , A , P , r g , τ , ϕ ) (S,A,P,r_g,\tau,\phi) (S,A,P,rg,τ,ϕ) ,S是状态空间,A是动作空间,P是状态转移函数, τ \tau τ 是目标空间, ϕ \phi ϕ 是一个将状态空间映射到目标空间内的一个目标g的函数, r g r_g rg 是奖励函数,和目标g相关。

不同的概念

τ \tau τ 是目标空间, ϕ \phi ϕ是映射函数,任务由目标g定义,目标g本身和状态s相关,状态s映射到目标g,机械臂的状态s包括力矩,物体的位置,任务是抓取物体,所以就是将状态s通过函数 ϕ \phi ϕ映射到坐标g。

奖励函数不仅和状态s和动作a相关,还和目标g相关。对不同的目标,奖励函数不同,状态价值函数基于目标V(s,g),状态动作价值函数也是的Q(s,a,g)。 r g ( s t , a t , s t + 1 ) = { 0 ∣ ∣ ϕ ( s t + 1 ) − g ∣ ∣ 2 ≤ δ g − 1 , o t h e r w i s e r_{g}(s_t,a_t,s_{t+1})=\left\{\begin{matrix} 0&&&&||\phi(s_{t+1})-g||_2\leq \delta_g\\ -1,&&&&otherwise\end{matrix}\right. rg(st,at,st+1)={01,∣∣ϕ(st+1)g2δgotherwise

目标导向的强化学习的优化目标

定义 v 0 v_0 v0 是环境内初始状态 s 0 s_0 s0 和目标g的联合分布,GoRL的目标是优化策略 π ( a ∣ s , g ) \pi(a|s,g) π(as,g) ,使得目标函数最大化 E ( s 0 , g ) − v 0 [ V π ( s 0 , g ) ] E_{(s_0,g)-v_0}[V^{\pi}(s_0,g)] E(s0,g)v0[Vπ(s0,g)]

HER 算法

目标导向的强化学习的奖励函数的0分布很窄很窄的,绝大多数的奖励都是-1,开始train速度慢, 事后经验回放 (hindsight experience replay,HER)利用了这些失败的经验。

因每个 episode 的节点可能很分散,在目标 around区间很少很少,所以使用历史数据来train,会导致模型不能收敛,像给定6个episodes,但是这6个episodes的最后点都是目标,但是中间的节点,很多都不是的。

策略 π \pi π 以目标g在环境内探索了轨迹, s 1 , s 2 , . . , s T s_1,s_2,..,s_T s1,s2,..,sT ,但是每个轨迹都不是目标g,所以奖励都是-1,虽然没有达到目标g,但是达到了目标 ϕ ( s 1 ) , ϕ ( s 2 ) , . . , ϕ ( s T ) \phi(s_1),\phi(s_2),..,\phi(s_T) ϕ(s1),ϕ(s2),..,ϕ(sT) ,可以用这样的方式来算奖励值。

HER algorithm的策略优化算法,可以使用任意传统的DDPG,DQN、PPO,TRPO,SAC算法。

–还是给定了初始状态 s 0 s_0 s0 和目标g,并且使用了策略 π \pi π 在环境采样轨迹,放到历史数据回放池内(s,a,r, s ′ s' s ,g),也就是当前状态是s,智能体策略选择了动作a,拿到了奖励r,环境执行了动作a,状态变到了 s ′ s' s ,到达了目的地g。

–从回放池采样N个数组

–对这些数组,选择一个状态 s ′ ′ s'' s′′ ,将其映射到目标 g ′ = ϕ ( s ′ ′ ) g'=\phi(s'') g=ϕ(s′′) 算奖励值 r ′ = r g ′ ( s , a , s ′ ) r'=r_{g'}(s,a,s') r=rg(s,a,s) ,用这些数据替代之前的数组(s,a, r ′ , s ′ , g ′ r',s',g' r,s,g ),

–用替代以后的数组训练策略。

状态 s ′ ′ s'' s′′ 的选择

共3类方案,future的方案最优秀 :选择和被改写的数组(s,a,r, s ′ s' s ,g)处在同一个轨迹并在时间上处在 s ′ s' s 之后的某个状态做 s ′ ′ s'' s′′。也就是同一个轨迹状态 s ′ s' s 和状态 s ′ ′ s'' s′′ 之间的序列的目标是到达目的地 g ′ g' g

HER program

首先是环境的,给出了一个直角坐标系的方程,目的地是随机产生的,完成了重置环境、进一步step的操作

目标和状态放在了一起的,所以不需要单独考虑目标,只有在从回放池内采样的时候,才需要update目标g’。

状态和目标都是x, y的坐标,所以可以拼在一起的,而且目标在train的时候是不变的,step的时候会
下面的环境,每条episode的目标都不相同,所以智能体需要适应不同的目标,也就是给定初始状态和目标,智能体要能采取策略达到目标,使得累计奖励max,而HER采样,就在采样的时候修改了目标,让网络往能够更好收敛的方向。

给定初始状态和目标,就相当是给了相应的目的地,智能体需要前往当前的目的地才行。每个episode的目标都不相同的,智能体要能够适应不同的目的地。

HER采样将序列后面某个状态当作最后的目标 ,由于当前状态和后面那个状态挨得很近,所以两者的状态也很近的,智能体只需要一小步一小步的靠近目标就可以,而不需要一次跨越大的步长,到达目的地的。

像状态state和next_state,前两个数字是状态,后两个数字是目标,两者的状态改变值很小,所以train起来更加方便,模型只需要一步一步接近目标就可以,也就是模型只需要学着一步一步靠近目标

class WorldEnv:
    def __init__(self):
        self.distance_threshold = 0.15  ## 距离的最小阀值
        self.action_bound = 1  ## 动作的上界的

    def reset(self):  # 重置环境
        # 生成一个目标状态, 坐标范围是[3.5~4.5, 3.5~4.5]
        self.goal = np.array(
            [4 + random.uniform(-0.5, 0.5), 4 + random.uniform(-0.5, 0.5)])  ## 随机给定目标g
        self.state = np.array([0, 0])  # 初始状态
        self.count = 0 ## 重置步长的
        return np.hstack((self.state, self.goal))  ## 叠起来状态和目标

    def step(self, action):
        action = np.clip(action, -self.action_bound, self.action_bound)  ## 截断动作的上下界
        x = max(0, min(5, self.state[0] + action[0])) ## 前进一步以后,截断x坐标
        y = max(0, min(5, self.state[1] + action[1])) ## 前进一步以后,截断y坐标
        self.state = np.array([x, y])                 ## ndarray 化
        self.count += 1  ## 累积步长的呢

        dis = np.sqrt(np.sum(np.square(self.state - self.goal)))  ## 算当前状态和目标 g 的距离
        reward = -1.0 if dis > self.distance_threshold else 0  ## > 给定阀值奖励是 -1,否则 0
        if dis <= self.distance_threshold or self.count == 50: ## 若 < 阀值,或者步长一定,就完成
            done = True ## 完成
        else:
            done = False  ## 没有完成的

        return np.hstack((self.state, self.goal)), reward, done ## 叠起来状态和目标,奖励,是否完成的

策略网络就是 DDPG

##  构造策略网络的
class PolicyNet(torch.nn.Module):
    def __init__(self, state_dim, hidden_dim, action_dim, action_bound):
        super(PolicyNet, self).__init__()
        self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
        self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = torch.nn.Linear(hidden_dim, action_dim)
        self.action_bound = action_bound  # action_bound是环境可以接受的动作最大值

    def forward(self, x):
        x = F.relu(self.fc2(F.relu(self.fc1(x))))
        return torch.tanh(self.fc3(x)) * self.action_bound  ## 直接返回动作的大小

## 状态和目标 动作价值网络,输出状态和目标 动作对 (状态和目标,动作) 的价值
## 也就是动作价值网络
class QValueNet(torch.nn.Module):
    def __init__(self, state_dim, hidden_dim, action_dim):
        super(QValueNet, self).__init__()
        self.fc1 = torch.nn.Linear(state_dim + action_dim, hidden_dim)
        self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = torch.nn.Linear(hidden_dim, 1)
    ## 输入 (状态和目标,动作)
    def forward(self, x, a):
        cat = torch.cat([x, a], dim=1)  # 拼接状态和目标, 和动作
        x = F.relu(self.fc2(F.relu(self.fc1(cat))))
        return self.fc3(x)

class DDPG:
    ''' DDPG算法 '''
    def __init__(self, state_dim, hidden_dim, action_dim, action_bound,
                 actor_lr, critic_lr, sigma, tau, gamma, device):
        self.action_dim = action_dim  ## 动作的dim
        self.actor = PolicyNet(state_dim, hidden_dim, action_dim,
                               action_bound).to(device)  ##  策略网络的
        self.critic = QValueNet(state_dim, hidden_dim, action_dim).to(device) ## 状态动作价值网络
        self.target_actor = PolicyNet(state_dim, hidden_dim, action_dim,
                                      action_bound).to(device)  ##  目标策略网络,延迟update
        self.target_critic = QValueNet(state_dim, hidden_dim,
                                       action_dim).to(device)  ##  目标状态动作价值网络,延迟update
        # 初始化目标价值网络并使其参数和价值网络一样
        self.target_critic.load_state_dict(self.critic.state_dict())
        # 初始化目标策略网络并使其参数和策略网络一样
        self.target_actor.load_state_dict(self.actor.state_dict())
        self.actor_optimizer = torch.optim.Adam(self.actor.parameters(),
                                                lr=actor_lr)
        self.critic_optimizer = torch.optim.Adam(self.critic.parameters(),
                                                 lr=critic_lr)
        self.gamma = gamma    ## 折扣因子
        self.sigma = sigma  # 高斯噪声的标准差,均值直接设$0
        self.tau = tau  # 目标网络软更新参数
        self.action_bound = action_bound
        self.device = device

    def take_action(self, state):
        state = torch.tensor([state], dtype=torch.float).to(self.device)
        action = self.actor(state).detach().cpu().numpy()[0]    ## 拿到确定性策略 网络的输出,也就是确定的动作
        # 给动作添加噪声,增加探索的
        action = action + self.sigma * np.random.randn(self.action_dim)
        return action

    def soft_update(self, net, target_net):   ## EMA的方式update目标网络的,每次update很少的内容
        for param_target, param in zip(target_net.parameters(),
                                       net.parameters()):
            param_target.data.copy_(param_target.data * (1.0 - self.tau) +
                                    param.data * self.tau)

    def update(self, transition_dict):
        ## 拿到这条序列内的 状态和目标、动作和奖励,下一个状态和目标、是否完成的
        states = torch.tensor(transition_dict['states'],
                              dtype=torch.float).to(self.device)
        actions = torch.tensor(transition_dict['actions'],
                               dtype=torch.float).to(self.device)
        rewards = torch.tensor(transition_dict['rewards'],
                               dtype=torch.float).view(-1, 1).to(self.device)
        next_states = torch.tensor(transition_dict['next_states'],
                                   dtype=torch.float).to(self.device)
        dones = torch.tensor(transition_dict['dones'],
                             dtype=torch.float).view(-1, 1).to(self.device)
        ## 下个状态和目标+下个状态目标的动作,目标状态和目标动作价值网络 输出(状态和目标,动作)对的动作价值 Q 
        next_q_values = self.target_critic(next_states, self.target_actor(next_states))
        ## 用下一个(状态和目标、动作)对的动作价值 Q,然后间接求出当前的(状态和目标、动作)对的动作价值
        q_targets = rewards + self.gamma * next_q_values * (1 - dones)
        # MSE损失函数
        critic_loss = torch.mean( ## 直接求出当前(状态和目标的、动作)的动作价值,和 间接求出的动作价值,使用 MSE 来算损失函数的,q_targets不反向传播求梯度
            F.mse_loss(self.critic(states, actions), q_targets))
        self.critic_optimizer.zero_grad()       ## 价值网络的参数梯度置零的
        critic_loss.backward()                  ## 价值网络的损失loss反向传播梯度
        self.critic_optimizer.step()            ## update 网络

        # 策略网络就是$了使Q值最大化
        ## update以后的动作价值网络,使用当前的(状态和目标、动作)的动作价值,加了负号的,也就是最大化动作价值的
        ## self.actor(states) 是输出当前状态的动作
        '''
         确定性策略网络的 update,稍稍复杂些,嵌套了两个网络,首先用当前的状态和策略网络,输出当前状态和目标的动作predict_A=self.actor(states),
         然后 self.critic(states, self.actor(states))输出了当前(状态、动作)的动作价值Q,并求均值加负号,也就是最大化动作价值,
         反向传播用来update 策略网络actor,因critic动作价值网络上面已经update过了,所以下面就只update策略网络actor,critic网络不变的。
        '''
        actor_loss = -torch.mean(self.critic(states, self.actor(states)))
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()
        ## 延迟少量的update网络,也就是EMA的方式
        self.soft_update(self.actor, self.target_actor)  # 软更新策略网络
        self.soft_update(self.critic, self.target_critic)  # 软更新价值网络

然后就是用来record历史数据的呢,采样的时候按照一定的比例来使用 HER 的方式,也就是指定当前状态s到后续某个状态s’的目标是g’。

class ReplayBuffer_Trajectory:
    ''' 存储轨迹的经验回放池 '''
    def __init__(self, capacity):
        self.buffer = collections.deque(maxlen=capacity)  ## 初始化缓冲池

    def add_trajectory(self, trajectory):
        self.buffer.append(trajectory)    ## 缓冲池加入一条序列

    def size(self):
        return len(self.buffer)   ## 缓冲池的长度

    def sample(self, batch_size, use_her, dis_threshold=0.15, her_ratio=0.8):
        batch = dict(states=[],               ## 批量
                     actions=[],
                     next_states=[],
                     rewards=[],
                     dones=[])
        for _ in range(batch_size):
            traj = random.sample(self.buffer, 1)[0]      ## 采样一条序列
            step_state = np.random.randint(traj.length)  ## 随机拿序列的某一个record,坐标是 step_state
            state = traj.states[step_state]              ## 该record的状态和目标
            next_state = traj.states[step_state + 1]     ## 下一个状态和目标的
            action = traj.actions[step_state]            ## 该record的动作
            reward = traj.rewards[step_state]            ## 该record的奖励
            done = traj.dones[step_state]                ## 该record是否完成的
            ## use_her是否使用方式 her,概率值小于阀值
            if use_her and np.random.uniform() <= her_ratio:   ## 给定好 使用 her 方式的概率
                step_goal = np.random.randint(step_state + 1, traj.length + 1)  ## 从在同一个轨迹并在时间上处在s'之后的某个状态s'',坐标是step_goal
                ## 也就是目标的(x,y)坐标 g'
                goal = traj.states[step_goal][:2]  # 使用HER算法的future方案设置目标 g',也就是后面的某个状态做目标
                ## 配置修改以后的
                dis = np.sqrt(np.sum(np.square(next_state[:2] - goal)))  ## 下一个状态的目标 g_next(x,y),和配置的目标 g'之间的距离
                reward = -1.0 if dis > dis_threshold else 0  ## 大于阀值则奖励 -1,小于阀值则奖励 0
                done = False if dis > dis_threshold else True  ## 大于阀值则未完成,小于阀值则已完成
                '''
                像状态state和next_state,前两个数字是状态,后两个数字是目标
                两者的状态改变值很小,所以train起来更加方便,模型只需要一步一步接近目标就可以
                也就是模型只需要学着一步一步靠近目标,而不需要一次跨越大的步长,来到达目的地的呢。
                '''
                state = np.hstack((state[:2], goal))  ## 状态和配置的目标 g' 组合到一起的
                ## 因state和next_state两者挨得很近,所以状态也很近,模型训练起来更加方便的此时的目标变得更近了
                next_state = np.hstack((next_state[:2], goal))  ## 下一个状态和配置的目标g' 组合到一起的,最后可以用来做下一个状态
            ## 放入到采样的批量内
            batch['states'].append(state)  
            batch['next_states'].append(next_state)
            batch['actions'].append(action)
            batch['rewards'].append(reward)
            batch['dones'].append(done)
        ## ndarray化
        batch['states'] = np.array(batch['states'])
        batch['next_states'] = np.array(batch['next_states'])
        batch['actions'] = np.array(batch['actions'])
        return batch

return_list = []
allimage = []
for i in range(10):
    with tqdm(total=int(num_episodes / 10), desc='Iteration %d' % i) as pbar:
        for i_episode in range(int(num_episodes / 10)):
            episode_return = 0   ## 累加的奖励值
            state = env.reset()  ## 重置环境的呢
            traj = Trajectory(state)  ## 实例化''' 用来记录一条完整轨迹 '''
            done = False         ## 是否完成的呢
            while not done:
                action = agent.take_action(state)   ## 智能体根据状态选择 动作
                state, reward, done = env.step(action)  ## 环境执行动作,并返回下一步的状态和目标、奖励、是否完成的
                episode_return += reward  ## 累积奖励的
                traj.store_step(action, state, reward, done)  ## 保存轨迹到序列内,包括(动作、状态和目标、奖励、是否完成的)
            replay_buffer.add_trajectory(traj)  ## 回放池加入这条 episode
            return_list.append(episode_return)
            if replay_buffer.size() >= minimal_episodes:  ## 当回放池内的个数 > 最小值
                for _ in range(n_train):
                    transition_dict = replay_buffer.sample(batch_size, True)   ## 按照常规采样 和 HER 来进行采样的
                    agent.update(transition_dict)  ## 智能体使用采样的序列来 train 策略网络和价值网络
            if (i_episode + 1) % 10 == 0:
                pbar.set_postfix({
                    'episode':
                    '%d' % (num_episodes / 10 * i + i_episode + 1),
                    'return':
                    '%.3f' % np.mean(return_list[-10:])
                })
            pbar.update(1)

https://zhuanlan.zhihu.com/p/659106430

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐