动手学强化学习reinforcement_learning-chapter-nineteen-目标导向的强化学习
TPPO algorithm 最大化 update 前后两个策略的JθEπ0s0JθEπ0s0的差值,也就是最大化残差的累加和,并且两个策略的KL散度要满足一定条件,不能太大的,导致步子太大扯着蛋。但是每个残差又要尽可能地小,来train价值网络的,保证策略收敛的,也就是最大化残差的和,同时要保证单个残差尽可能地小。PPO algorithm和TPPO是类似的情况,加入了截断条件,也就是重要性
动手学强化学习reinforcement learning-chapter-nineteen-目标导向的强化学习
《动手学强化学习》(张伟楠 沈键 俞勇)【简介_书评_在线阅读】 - 当当图书 (dangdang.com)product.dangdang.com/29391150.html
ZouJiu1/Hands-on-RL: https://hrl.boyuai.com/ (github.com)github.com/ZouJiu1/Hands-on-RL/tree/main
简介
TPPO algorithm 最大化 update 前后两个策略的 J ( θ ) = E π 0 ( s 0 ) J(\theta)=E^{\pi_0}(s_0) J(θ)=Eπ0(s0) 的差值,也就是最大化残差的累加和,并且两个策略的KL散度要满足一定条件,不能太大的,导致步子太大扯着蛋。但是每个残差又要尽可能地小,来train价值网络的,保证策略收敛的,也就是最大化残差的和,同时要保证单个残差尽可能地小。
PPO algorithm和TPPO是类似的情况,加入了截断条件,也就是重要性采样的比值要在一定区间内,否则就截断,而且还用到了最小值。
SAC algorithm 加强了探索的,也就是最大熵强化学习,探索性越强的,能帮助学习相应的内容,动作价值的估计加入了熵值。
上面的PPO和SAC在面对复杂的复合任务的时候,表现往往不太好。
目标导向的强化学习 (goal-oriented reinforcement learning,GoRL),能解决较复杂的决策问题。
定义
悬崖漫步环境从不同坐标开始,都需要train不同的策略模型;机械臂抓取不同坐标的物体,都要train不同的策略模型,传统的不适用这样的情况。智能体需要做相似但不相同的任务的。
MDP定义: ( S , A , P , r g , τ , ϕ ) (S,A,P,r_g,\tau,\phi) (S,A,P,rg,τ,ϕ) ,S是状态空间,A是动作空间,P是状态转移函数, τ \tau τ 是目标空间, ϕ \phi ϕ 是一个将状态空间映射到目标空间内的一个目标g的函数, r g r_g rg 是奖励函数,和目标g相关。
不同的概念
τ \tau τ 是目标空间, ϕ \phi ϕ是映射函数,任务由目标g定义,目标g本身和状态s相关,状态s映射到目标g,机械臂的状态s包括力矩,物体的位置,任务是抓取物体,所以就是将状态s通过函数 ϕ \phi ϕ映射到坐标g。
奖励函数不仅和状态s和动作a相关,还和目标g相关。对不同的目标,奖励函数不同,状态价值函数基于目标V(s,g),状态动作价值函数也是的Q(s,a,g)。 r g ( s t , a t , s t + 1 ) = { 0 ∣ ∣ ϕ ( s t + 1 ) − g ∣ ∣ 2 ≤ δ g − 1 , o t h e r w i s e r_{g}(s_t,a_t,s_{t+1})=\left\{\begin{matrix} 0&&&&||\phi(s_{t+1})-g||_2\leq \delta_g\\ -1,&&&&otherwise\end{matrix}\right. rg(st,at,st+1)={0−1,∣∣ϕ(st+1)−g∣∣2≤δgotherwise
目标导向的强化学习的优化目标
定义 v 0 v_0 v0 是环境内初始状态 s 0 s_0 s0 和目标g的联合分布,GoRL的目标是优化策略 π ( a ∣ s , g ) \pi(a|s,g) π(a∣s,g) ,使得目标函数最大化 E ( s 0 , g ) − v 0 [ V π ( s 0 , g ) ] E_{(s_0,g)-v_0}[V^{\pi}(s_0,g)] E(s0,g)−v0[Vπ(s0,g)]
HER 算法
目标导向的强化学习的奖励函数的0分布很窄很窄的,绝大多数的奖励都是-1,开始train速度慢, 事后经验回放 (hindsight experience replay,HER)利用了这些失败的经验。
因每个 episode 的节点可能很分散,在目标 around区间很少很少,所以使用历史数据来train,会导致模型不能收敛,像给定6个episodes,但是这6个episodes的最后点都是目标,但是中间的节点,很多都不是的。
策略 π \pi π 以目标g在环境内探索了轨迹, s 1 , s 2 , . . , s T s_1,s_2,..,s_T s1,s2,..,sT ,但是每个轨迹都不是目标g,所以奖励都是-1,虽然没有达到目标g,但是达到了目标 ϕ ( s 1 ) , ϕ ( s 2 ) , . . , ϕ ( s T ) \phi(s_1),\phi(s_2),..,\phi(s_T) ϕ(s1),ϕ(s2),..,ϕ(sT) ,可以用这样的方式来算奖励值。
HER algorithm的策略优化算法,可以使用任意传统的DDPG,DQN、PPO,TRPO,SAC算法。
–还是给定了初始状态 s 0 s_0 s0 和目标g,并且使用了策略 π \pi π 在环境采样轨迹,放到历史数据回放池内(s,a,r, s ′ s' s′ ,g),也就是当前状态是s,智能体策略选择了动作a,拿到了奖励r,环境执行了动作a,状态变到了 s ′ s' s′ ,到达了目的地g。
–从回放池采样N个数组
–对这些数组,选择一个状态 s ′ ′ s'' s′′ ,将其映射到目标 g ′ = ϕ ( s ′ ′ ) g'=\phi(s'') g′=ϕ(s′′) 算奖励值 r ′ = r g ′ ( s , a , s ′ ) r'=r_{g'}(s,a,s') r′=rg′(s,a,s′) ,用这些数据替代之前的数组(s,a, r ′ , s ′ , g ′ r',s',g' r′,s′,g′ ),
–用替代以后的数组训练策略。
状态 s ′ ′ s'' s′′ 的选择
共3类方案,future的方案最优秀 :选择和被改写的数组(s,a,r, s ′ s' s′ ,g)处在同一个轨迹并在时间上处在 s ′ s' s′ 之后的某个状态做 s ′ ′ s'' s′′。也就是同一个轨迹状态 s ′ s' s′ 和状态 s ′ ′ s'' s′′ 之间的序列的目标是到达目的地 g ′ g' g′
HER program
首先是环境的,给出了一个直角坐标系的方程,目的地是随机产生的,完成了重置环境、进一步step的操作
目标和状态放在了一起的,所以不需要单独考虑目标,只有在从回放池内采样的时候,才需要update目标g’。
状态和目标都是x, y的坐标,所以可以拼在一起的,而且目标在train的时候是不变的,step的时候会
下面的环境,每条episode的目标都不相同,所以智能体需要适应不同的目标,也就是给定初始状态和目标,智能体要能采取策略达到目标,使得累计奖励max,而HER采样,就在采样的时候修改了目标,让网络往能够更好收敛的方向。
给定初始状态和目标,就相当是给了相应的目的地,智能体需要前往当前的目的地才行。每个episode的目标都不相同的,智能体要能够适应不同的目的地。
HER采样将序列后面某个状态当作最后的目标 ,由于当前状态和后面那个状态挨得很近,所以两者的状态也很近的,智能体只需要一小步一小步的靠近目标就可以,而不需要一次跨越大的步长,到达目的地的。
像状态state和next_state,前两个数字是状态,后两个数字是目标,两者的状态改变值很小,所以train起来更加方便,模型只需要一步一步接近目标就可以,也就是模型只需要学着一步一步靠近目标
class WorldEnv:
def __init__(self):
self.distance_threshold = 0.15 ## 距离的最小阀值
self.action_bound = 1 ## 动作的上界的
def reset(self): # 重置环境
# 生成一个目标状态, 坐标范围是[3.5~4.5, 3.5~4.5]
self.goal = np.array(
[4 + random.uniform(-0.5, 0.5), 4 + random.uniform(-0.5, 0.5)]) ## 随机给定目标g
self.state = np.array([0, 0]) # 初始状态
self.count = 0 ## 重置步长的
return np.hstack((self.state, self.goal)) ## 叠起来状态和目标
def step(self, action):
action = np.clip(action, -self.action_bound, self.action_bound) ## 截断动作的上下界
x = max(0, min(5, self.state[0] + action[0])) ## 前进一步以后,截断x坐标
y = max(0, min(5, self.state[1] + action[1])) ## 前进一步以后,截断y坐标
self.state = np.array([x, y]) ## ndarray 化
self.count += 1 ## 累积步长的呢
dis = np.sqrt(np.sum(np.square(self.state - self.goal))) ## 算当前状态和目标 g 的距离
reward = -1.0 if dis > self.distance_threshold else 0 ## > 给定阀值奖励是 -1,否则 0
if dis <= self.distance_threshold or self.count == 50: ## 若 < 阀值,或者步长一定,就完成
done = True ## 完成
else:
done = False ## 没有完成的
return np.hstack((self.state, self.goal)), reward, done ## 叠起来状态和目标,奖励,是否完成的
策略网络就是 DDPG
## 构造策略网络的
class PolicyNet(torch.nn.Module):
def __init__(self, state_dim, hidden_dim, action_dim, action_bound):
super(PolicyNet, self).__init__()
self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim)
self.fc3 = torch.nn.Linear(hidden_dim, action_dim)
self.action_bound = action_bound # action_bound是环境可以接受的动作最大值
def forward(self, x):
x = F.relu(self.fc2(F.relu(self.fc1(x))))
return torch.tanh(self.fc3(x)) * self.action_bound ## 直接返回动作的大小
## 状态和目标 动作价值网络,输出状态和目标 动作对 (状态和目标,动作) 的价值
## 也就是动作价值网络
class QValueNet(torch.nn.Module):
def __init__(self, state_dim, hidden_dim, action_dim):
super(QValueNet, self).__init__()
self.fc1 = torch.nn.Linear(state_dim + action_dim, hidden_dim)
self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim)
self.fc3 = torch.nn.Linear(hidden_dim, 1)
## 输入 (状态和目标,动作)
def forward(self, x, a):
cat = torch.cat([x, a], dim=1) # 拼接状态和目标, 和动作
x = F.relu(self.fc2(F.relu(self.fc1(cat))))
return self.fc3(x)
class DDPG:
''' DDPG算法 '''
def __init__(self, state_dim, hidden_dim, action_dim, action_bound,
actor_lr, critic_lr, sigma, tau, gamma, device):
self.action_dim = action_dim ## 动作的dim
self.actor = PolicyNet(state_dim, hidden_dim, action_dim,
action_bound).to(device) ## 策略网络的
self.critic = QValueNet(state_dim, hidden_dim, action_dim).to(device) ## 状态动作价值网络
self.target_actor = PolicyNet(state_dim, hidden_dim, action_dim,
action_bound).to(device) ## 目标策略网络,延迟update
self.target_critic = QValueNet(state_dim, hidden_dim,
action_dim).to(device) ## 目标状态动作价值网络,延迟update
# 初始化目标价值网络并使其参数和价值网络一样
self.target_critic.load_state_dict(self.critic.state_dict())
# 初始化目标策略网络并使其参数和策略网络一样
self.target_actor.load_state_dict(self.actor.state_dict())
self.actor_optimizer = torch.optim.Adam(self.actor.parameters(),
lr=actor_lr)
self.critic_optimizer = torch.optim.Adam(self.critic.parameters(),
lr=critic_lr)
self.gamma = gamma ## 折扣因子
self.sigma = sigma # 高斯噪声的标准差,均值直接设$0
self.tau = tau # 目标网络软更新参数
self.action_bound = action_bound
self.device = device
def take_action(self, state):
state = torch.tensor([state], dtype=torch.float).to(self.device)
action = self.actor(state).detach().cpu().numpy()[0] ## 拿到确定性策略 网络的输出,也就是确定的动作
# 给动作添加噪声,增加探索的
action = action + self.sigma * np.random.randn(self.action_dim)
return action
def soft_update(self, net, target_net): ## EMA的方式update目标网络的,每次update很少的内容
for param_target, param in zip(target_net.parameters(),
net.parameters()):
param_target.data.copy_(param_target.data * (1.0 - self.tau) +
param.data * self.tau)
def update(self, transition_dict):
## 拿到这条序列内的 状态和目标、动作和奖励,下一个状态和目标、是否完成的
states = torch.tensor(transition_dict['states'],
dtype=torch.float).to(self.device)
actions = torch.tensor(transition_dict['actions'],
dtype=torch.float).to(self.device)
rewards = torch.tensor(transition_dict['rewards'],
dtype=torch.float).view(-1, 1).to(self.device)
next_states = torch.tensor(transition_dict['next_states'],
dtype=torch.float).to(self.device)
dones = torch.tensor(transition_dict['dones'],
dtype=torch.float).view(-1, 1).to(self.device)
## 下个状态和目标+下个状态目标的动作,目标状态和目标动作价值网络 输出(状态和目标,动作)对的动作价值 Q
next_q_values = self.target_critic(next_states, self.target_actor(next_states))
## 用下一个(状态和目标、动作)对的动作价值 Q,然后间接求出当前的(状态和目标、动作)对的动作价值
q_targets = rewards + self.gamma * next_q_values * (1 - dones)
# MSE损失函数
critic_loss = torch.mean( ## 直接求出当前(状态和目标的、动作)的动作价值,和 间接求出的动作价值,使用 MSE 来算损失函数的,q_targets不反向传播求梯度
F.mse_loss(self.critic(states, actions), q_targets))
self.critic_optimizer.zero_grad() ## 价值网络的参数梯度置零的
critic_loss.backward() ## 价值网络的损失loss反向传播梯度
self.critic_optimizer.step() ## update 网络
# 策略网络就是$了使Q值最大化
## update以后的动作价值网络,使用当前的(状态和目标、动作)的动作价值,加了负号的,也就是最大化动作价值的
## self.actor(states) 是输出当前状态的动作
'''
确定性策略网络的 update,稍稍复杂些,嵌套了两个网络,首先用当前的状态和策略网络,输出当前状态和目标的动作predict_A=self.actor(states),
然后 self.critic(states, self.actor(states))输出了当前(状态、动作)的动作价值Q,并求均值加负号,也就是最大化动作价值,
反向传播用来update 策略网络actor,因critic动作价值网络上面已经update过了,所以下面就只update策略网络actor,critic网络不变的。
'''
actor_loss = -torch.mean(self.critic(states, self.actor(states)))
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
## 延迟少量的update网络,也就是EMA的方式
self.soft_update(self.actor, self.target_actor) # 软更新策略网络
self.soft_update(self.critic, self.target_critic) # 软更新价值网络
然后就是用来record历史数据的呢,采样的时候按照一定的比例来使用 HER 的方式,也就是指定当前状态s到后续某个状态s’的目标是g’。
class ReplayBuffer_Trajectory:
''' 存储轨迹的经验回放池 '''
def __init__(self, capacity):
self.buffer = collections.deque(maxlen=capacity) ## 初始化缓冲池
def add_trajectory(self, trajectory):
self.buffer.append(trajectory) ## 缓冲池加入一条序列
def size(self):
return len(self.buffer) ## 缓冲池的长度
def sample(self, batch_size, use_her, dis_threshold=0.15, her_ratio=0.8):
batch = dict(states=[], ## 批量
actions=[],
next_states=[],
rewards=[],
dones=[])
for _ in range(batch_size):
traj = random.sample(self.buffer, 1)[0] ## 采样一条序列
step_state = np.random.randint(traj.length) ## 随机拿序列的某一个record,坐标是 step_state
state = traj.states[step_state] ## 该record的状态和目标
next_state = traj.states[step_state + 1] ## 下一个状态和目标的
action = traj.actions[step_state] ## 该record的动作
reward = traj.rewards[step_state] ## 该record的奖励
done = traj.dones[step_state] ## 该record是否完成的
## use_her是否使用方式 her,概率值小于阀值
if use_her and np.random.uniform() <= her_ratio: ## 给定好 使用 her 方式的概率
step_goal = np.random.randint(step_state + 1, traj.length + 1) ## 从在同一个轨迹并在时间上处在s'之后的某个状态s'',坐标是step_goal
## 也就是目标的(x,y)坐标 g'
goal = traj.states[step_goal][:2] # 使用HER算法的future方案设置目标 g',也就是后面的某个状态做目标
## 配置修改以后的
dis = np.sqrt(np.sum(np.square(next_state[:2] - goal))) ## 下一个状态的目标 g_next(x,y),和配置的目标 g'之间的距离
reward = -1.0 if dis > dis_threshold else 0 ## 大于阀值则奖励 -1,小于阀值则奖励 0
done = False if dis > dis_threshold else True ## 大于阀值则未完成,小于阀值则已完成
'''
像状态state和next_state,前两个数字是状态,后两个数字是目标
两者的状态改变值很小,所以train起来更加方便,模型只需要一步一步接近目标就可以
也就是模型只需要学着一步一步靠近目标,而不需要一次跨越大的步长,来到达目的地的呢。
'''
state = np.hstack((state[:2], goal)) ## 状态和配置的目标 g' 组合到一起的
## 因state和next_state两者挨得很近,所以状态也很近,模型训练起来更加方便的此时的目标变得更近了
next_state = np.hstack((next_state[:2], goal)) ## 下一个状态和配置的目标g' 组合到一起的,最后可以用来做下一个状态
## 放入到采样的批量内
batch['states'].append(state)
batch['next_states'].append(next_state)
batch['actions'].append(action)
batch['rewards'].append(reward)
batch['dones'].append(done)
## ndarray化
batch['states'] = np.array(batch['states'])
batch['next_states'] = np.array(batch['next_states'])
batch['actions'] = np.array(batch['actions'])
return batch
return_list = []
allimage = []
for i in range(10):
with tqdm(total=int(num_episodes / 10), desc='Iteration %d' % i) as pbar:
for i_episode in range(int(num_episodes / 10)):
episode_return = 0 ## 累加的奖励值
state = env.reset() ## 重置环境的呢
traj = Trajectory(state) ## 实例化''' 用来记录一条完整轨迹 '''
done = False ## 是否完成的呢
while not done:
action = agent.take_action(state) ## 智能体根据状态选择 动作
state, reward, done = env.step(action) ## 环境执行动作,并返回下一步的状态和目标、奖励、是否完成的
episode_return += reward ## 累积奖励的
traj.store_step(action, state, reward, done) ## 保存轨迹到序列内,包括(动作、状态和目标、奖励、是否完成的)
replay_buffer.add_trajectory(traj) ## 回放池加入这条 episode
return_list.append(episode_return)
if replay_buffer.size() >= minimal_episodes: ## 当回放池内的个数 > 最小值
for _ in range(n_train):
transition_dict = replay_buffer.sample(batch_size, True) ## 按照常规采样 和 HER 来进行采样的
agent.update(transition_dict) ## 智能体使用采样的序列来 train 策略网络和价值网络
if (i_episode + 1) % 10 == 0:
pbar.set_postfix({
'episode':
'%d' % (num_episodes / 10 * i + i_episode + 1),
'return':
'%.3f' % np.mean(return_list[-10:])
})
pbar.update(1)

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐
所有评论(0)