动手学强化学习reinforcement learning-chapter-sixteen-模型预测控制

动手学强化学习​hrl.boyuai.com/

《动手学强化学习》(张伟楠 沈键 俞勇)【简介_书评_在线阅读】 - 当当图书 (dangdang.com)​product.dangdang.com/29391150.html

ZouJiu1/Hands-on-RL: https://hrl.boyuai.com/ (github.com)​github.com/ZouJiu1/Hands-on-RL/tree/main

简介

在这里插入图片描述
model predictive control MPC,模型控制预测,MPC使用了集成learning,用多个环境模型来predict,最后用CEM来做模型控制预测的。

值函数方式DQN、策略方式的REINFORCE,以及两者结合的Actor-Critic,都是从环境采样数据来train,并没有相应的环境模型,通常环境模型可以帮助智能体train或者决策的。 不存在显式的策略,使用环境模型来选择当前步的动作。

gym库 内的环境env,会返回【反馈下一个状态、动作的奖励、是否完成、步长太长的,info】,所以模拟的环境模型至少需要返回下一个状态和动作的奖励才可以。

shooting algorithm

MPC每次会产生很多的候选episodes,然后从这些episodes内选择能得到最好结果的episode,选第一个动作执行。也就是下象棋,每走一步都需要考虑后续的局势。1、根据历史数据学习环境模型,2、和真实环境交互时用环境模型选择动作。

所以MPC不需要显式的策略模型,而是从多条episodes内选择累积奖励最大的那条episode的第一个动作来执行,策略模型通常直接输出要采取的动作,但是MPC不会直接输出下一步的动作,而是通过比较多条episodes的累积奖励来选择动作。

从所有动作序列内,选取H步以后累计奖励最大的序列,并执行第一个动作。

Random shooting algorithm

随机的产生N条动作序列,每个动作都是随机的

交叉熵方式 (cross entropy method)

动作episodes使用截断标准正态分布采样,给定参数均值和方差,然后选择奖励较大的那些episodes,最后用样本估计总体,来update分布的均值和方差。所以交叉熵方式需要update两个参数的,也就是均值和方差。这样做就能使得分布中能获得较高奖励的动作episodes的概率比较高。

给定截断标准正态分布,初始化均值和方差,然后随机采样很多很多动作episodes,比实际需要的候选N条还要多的,然后算出每条episode的奖励,并且sort,拿到sort以后的奖励最大的后N条episodes,因从小到大排列,所以选择后面的。最后使用选择的episodes来update截断正态分布的方差和均值。也就是使用样本估计总体。

## 交叉熵方式获得当前要采取的动作,不需要显式的策略模型,是选择多条 episodes 内奖励最大的那条 episode 的第一个动作的
class CEM:
    def __init__(self, n_sequence, elite_ratio, fake_env, upper_bound,
                 lower_bound):
        self.n_sequence = n_sequence       ## 很多很多的episodes,>> N,用来sort选择奖励高的episodes
        self.elite_ratio = elite_ratio     ## 实际候选的episodes比例,< 1,最后和 self.n_sequence 相乘
        self.upper_bound = upper_bound     ## 方差的上界
        self.lower_bound = lower_bound     ## 方差的下界
        self.fake_env = fake_env           ## 是虚假环境的

    def optimize(self, state, init_mean, init_var):
        ## 给定初始的 均值 和 方差
        mean, var = init_mean, init_var             ## (26-1,) (26-1,) 共这么多个,向前看26步
        ## 截断标准正态分布的,也就是均值和方差分别是(0, 1)的正态分布,截断的上下界分别是 -2,2
        X = truncnorm(-2, 2, loc=np.zeros_like(mean), scale=np.ones_like(var))
        '''
            >>> a = np.array([0, 1, 2])
            >>> np.tile(a, 2)
            array([0, 1, 2, 0, 1, 2])
        '''
        ## state (3,)----(n_sequence, 3)
        state = np.tile(state, (self.n_sequence, 1)) ## np.tile 是重复 state,也就是重复self._sequence次,方便采样这么多episodes的

        for _ in range(5):
            ##  对均值做上下界处理,用来产生相应的方差
            lb_dist, ub_dist = mean - self.lower_bound, self.upper_bound - mean ## (26-1,) (26-1,) ,向前看26步
            ##  约束以后的方差,使用了上下界均值,也就是最小值  (26-1,) ,向前看26步
            constrained_var = np.minimum(
                np.minimum(np.square(lb_dist / 2), np.square(ub_dist / 2)),
                var)
            # 生成动作序列 (n_sequence, 26-1), 向前看26步, 共n_sequence个episodes
            ## 截断到-2,2之间的标准正态分布,采样以后的话,乘上标准差,最后加上均值,也就是服从均值是mean方差是constrained_var的正态分布
            action_sequences = [X.rvs() for _ in range(self.n_sequence)
                                ] * np.sqrt(constrained_var) + mean
            # 计算每条动作序列的累积奖励
            returns = self.fake_env.propagate(state, action_sequences)[:, 0] ## 状态和对应的动作,拿到累积的奖励
            # 选取累积奖励最高的若干条动作序列
            elites = action_sequences[np.argsort(                     ## 对采样episodes使用奖励来sort,然后选择后面较大的episodes
                returns)][-int(self.elite_ratio * self.n_sequence):]
            new_mean = np.mean(elites, axis=0)  ##  根据样本估计总体动作的均值
            new_var = np.var(elites, axis=0)    ##  根据样本估计总体动作的方差
            # 更新动作序列分布
            mean = 0.1 * mean + 0.9 * new_mean  ## EMA update 动作的均值
            var = 0.1 * var + 0.9 * new_var     ## EMA update 动作的方差

        ## 返回要采取的动作
        return mean  ## 几次迭代以后,返回估计的动作均值,此时截断标准正态分布采样,能使得动作episodes的奖励较高的

PETS algorithm

probabilistic ensembles with trajectory sampling,带有轨迹采样的概率集成learning。产生多个环境模型,train并集成,用多个环境模型来predict,然后用CEM来做预测控制的。并没有显示的策略模型,而是用来拟合环境的。环境模型会返回当前动作的奖励,以及执行动作以后的状态。

环境存在两类不确定性, 偶然不确定性(环境本身的随机性)认知不确定性(数据太少导致欠拟合), 环境模型使用了输出高斯分布来捕捉偶然不确定性,环境模型的输出是高斯分布的均值和协方差矩阵。用集成方式来减少认知不确定性,也就是多个不同参数不同train数据的环境模型集成。

MPC会用环境模型来预测奖励和下一个状态,然后用CEM交叉熵的方式来选择下一个动作的。

gym库 内的环境env,会返回【反馈下一个状态、动作的奖励、是否完成、步长太长的,info】,所以模拟的环境模型至少需要返回下一个状态和动作的奖励才可以。

program

program内部, FakeEnv 类虚拟环境只在 CEM 类交叉熵方式选取动作时需要用到,其他地方用不到,也就是 CEM 会调用 FakeEnv 类来算累积奖励的,而 FakeEnv 类会调用集成环境模型 EnsembleDynamicsModel 类来执行动作并返回奖励和下一个状态。 EnsembleDynamicsModel 类会使用历史数据来训练环境模型。历史数据包括了和真实环境交互的数据 def explore(self):,还包括了 CEM+虚拟的环境 选择动作并和真实环境交互的数据 def mpc(self):,所以历史数据包括了两部分。

EnsembleDynamicsModel 类 train 的时候,label是奖励,以及下一步的状态 减去 当前状态得到的残差。也就是环境模型会输出奖励和残差。环境模型主要用来给出 CEM 交叉熵给定状态和动作对应的奖励和残差。

CEM交叉熵 需要 FakeEnv虚拟环境 来给出下一步对应的奖励和残差(下一步状态),往前多看几步,然后就可以选择累加奖励最大的episode的动作。

FakeEnv虚拟环境 需要使用到 EnsembleDynamicsModel环境模型, 用来输入给定的状态和动作,输出下一步的奖励和残差,然后返回给 虚拟环境的。

训练 EnsembleDynamicsModel环境模型, 需要历史数据的

self.plan_horizon = plan_horizon ## 指定向前看多少步

MPC会使用CEM来向前看多少步,然后选择累积奖励最大的episode的第一个动作执行。

Train

. 首先智能体和环境随机探索数据,放入到回放池

. 使用回放池内的数据 train 集成的环境模型,也就是CEM选择动作和环境交互的数据,包括一条环境随机探索episode

. MPC方式,使用CEM交叉熵的方式,来获取动作的,并得到相应的episode

----------------------.CEM+虚拟的环境选择动作并和真实环境交互的数据,放入到回放池

----------------------.--------CEM使用初始状态和随机动作,然后使用虚拟环境来给出奖励和下一步状态,算出一定步长的累积奖励,最后选择累积奖励最大的episode的第一个动作执行,最后由真实环境来执行动作,拿到下一个动作和状态

. 最后返回累积奖励的列表

device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

class Swish(nn.Module):
    ''' Swish激活函数 '''
    def __init__(self):
        super(Swish, self).__init__()

    def forward(self, x):
        return x * torch.sigmoid(x)

def init_weights(m):
    ''' 初始化模型权重 '''
    ##  就是截断正态分布的
    def truncated_normal_init(t, mean=0.0, std=0.01):
        torch.nn.init.normal_(t, mean=mean, std=std)     ## 用均值=0,标准差是std的正态分布,来初始化模型的权重
        while True:   ## 做截断
            ##  截断区间是 [mean - 2 * std, mean + 2 * std],也就是拿到不满足条件的布尔值
            cond = (t < mean - 2 * std) | (t > mean + 2 * std)   ## 截断的条件是 标准差的 -2倍到2倍
            if not torch.sum(cond): ## 权重的所有值是否 都被 截断到 区间内
                break  ## 都被截断到区间内,退出循环返回的
            ##  不满足截断条件的地方,再次初始化
            t = torch.where(cond, torch.nn.init.normal_(torch.ones(t.shape, device=device), mean=mean, std=std), t)
        return t
    ## 只有 full connect layer层才有权重需要被初始化
    if type(m) == nn.Linear or isinstance(m, FCLayer):
        truncated_normal_init(m.weight, std=1 / (2 * np.sqrt(m._input_dim)))
        m.bias.data.fill_(0.0)

class FCLayer(nn.Module):
    ''' 集成之后的全连接层 '''
    def __init__(self, input_dim, output_dim, ensemble_size, activation):
        super(FCLayer, self).__init__()
        self._input_dim, self._output_dim = input_dim, output_dim  ## 输入和输出的 dim
        self.weight = nn.Parameter(torch.Tensor(ensemble_size, input_dim, output_dim).to(device))    ## 配置参数的dim
        self._activation = activation
        self.bias = nn.Parameter(torch.Tensor(ensemble_size, output_dim).to(device))  ## 配置参数的dim

    def forward(self, x):
        '''
        torch.bmm:批量的矩阵乘运算
        若输入是(b×n×m)向量, mat2是(b×m×p)向量, 输出是(b×n×p)向量,第一个dim是batch批量
        >>> input = torch.randn(10, 3, 6)
        >>> mat2 = torch.randn(10, 6, 60)
        >>> res = torch.bmm(input, mat2)
        >>> res.size()
        torch.Size([10, 3, 60])
        '''
        return self._activation(torch.add(torch.bmm(x, self.weight), self.bias[:, None, :]))  ## 构造环境模型的 full connect 层

class EnsembleModel(nn.Module):
    ''' 环境模型集成 '''
    def __init__(self,
                 state_dim,
                 action_dim,
                 ensemble_size=5,
                 learning_rate=1e-3):
        super(EnsembleModel, self).__init__()
        # 输出包括均值和方差,因此是状态与奖励维度之和的两倍
        ## 每个状态都服从高斯分布,然后奖励也是服从高斯分布,用来捕捉 偶然不确定性
        self._output_dim = (state_dim + 1) * 2
        self._max_logvar = nn.Parameter((torch.ones(
            (1, self._output_dim // 2)).float() / 2).to(device),
                                        requires_grad=False)         ## 初始化方差的最大值是 1/2
        self._min_logvar = nn.Parameter((-torch.ones(
            (1, self._output_dim // 2)).float() * 10).to(device),
                                        requires_grad=False)         ## 初始化方差的最小值是 -10
        ## 集成的full connect layer
        self.layer1 = FCLayer(state_dim + action_dim, 200, ensemble_size, Swish())
        self.layer2 = FCLayer(200, 200, ensemble_size, Swish())
        self.layer3 = FCLayer(200, 200, ensemble_size, Swish())
        self.layer4 = FCLayer(200, 200, ensemble_size, Swish())
        self.layer5 = FCLayer(200, self._output_dim, ensemble_size, nn.Identity()) ## 最后一层没有激活函数
        self.apply(init_weights)  # 初始化环境模型中的参数
        self.optimizer = torch.optim.Adam(self.parameters(), lr=learning_rate) ## 优化器

    ## return_log_var,var是方差,Log_var是方差的log值
    def forward(self, x, return_log_var=False):
        ret = self.layer5(self.layer4(self.layer3(self.layer2(self.layer1(x)))))   ##  Sequential顺序执行的模型
        ## 【:self._output_dim // 2】  后半部分是均值的
        mean = ret[:, :, :self._output_dim // 2]
        # 在 PETS 算法中,将方差控制在最小值和最大值之间
        '''
        可导的上截断操作
        F.softplus和relu类似,返回值都是大于0的,【self._output_dim // 2:】前半部分是log方差的
        self._max_logvar - ret[:, :, self._output_dim // 2:],比_max_logvar大的数值会小于0
        F.softplus(self._max_logvar - ret[:, :, self._output_dim // 2:])会将小于0的值置0
        self._max_logvar - F.softplus(self._max_logvar - ret[:, :, self._output_dim // 2:]) 恢复开始的值,但是比_max_logvar大的数值都被截断到_max_logvar
        不同于直接截断操作,这里的 F.softplus 是可导的梯度可以传播
        '''
        logvar = self._max_logvar - F.softplus(self._max_logvar - ret[:, :, self._output_dim // 2:])
        '''
        可导的下截断操作
        logvar - self._min_logvar,比_min_logvar小的数值会小于0
        F.softplus(logvar - self._min_logvar) 会将小于0的值置0
        self._min_logvar + F.softplus(logvar - self._min_logvar) 恢复开始的值,但是比_min_logvar小的数值都被截断到_min_logvar
        不同于直接截断操作,这里的 F.softplus 是可导的梯度可以传播
        '''
        logvar = self._min_logvar + F.softplus(logvar - self._min_logvar)
        ## 返回环境下一步状态和奖励的高斯分布的均值和log方差,拟合偶然不确定性
        ## 也就是下一步状态 用高斯分布采样,奖励 也用高斯分布采样的
        return mean, logvar if return_log_var else torch.exp(logvar)

    def loss(self, mean, logvar, labels, use_var_loss=True):  ## 损失函数的呢
        inverse_var = torch.exp(-logvar)  ## 方差的
        if use_var_loss:
            ## 求均值 和 label的loss,MSE,两者距离越小越好,然后还乘上了方差的逆
            mse_loss = torch.mean(torch.mean(torch.pow(mean - labels, 2) *  ##(6-1, bs, 2*2)---(6-1)
                                             inverse_var,
                                             dim=-1),
                                  dim=-1)
            ## 方差的 Log 值越小越好,所以可以做损失函数
            var_loss = torch.mean(torch.mean(logvar, dim=-1), dim=-1)
            total_loss = torch.sum(mse_loss) + torch.sum(var_loss)
        else:
            ## 求均值 和 label的loss,MSE,两者距离越小越好
            mse_loss = torch.mean(torch.pow(mean - labels, 2), dim=(1, 2))
            total_loss = torch.sum(mse_loss)
        return total_loss, mse_loss

    def train(self, loss):
        self.optimizer.zero_grad()  ## 参数的梯度置 0 
        ##  _max_logvar 和 _min_logvar 都是不反向传播梯度的,requires_grad=False
        loss += 0.01 * torch.sum(self._max_logvar) - 0.01 * torch.sum(self._min_logvar)
        loss.backward()         ## 反向传播求出梯度
        self.optimizer.step()   ## update求出梯度的

class EnsembleDynamicsModel:
    ''' 环境模型集成,加入精细化的训练 '''
    def __init__(self, state_dim, action_dim, num_network=5):
        self._num_network = num_network ## 集成环境模型的个数
        self._state_dim, self._action_dim = state_dim, action_dim  ## 状态的dim,动作的dim
        ## 实例化集成的环境模型
        self.model = EnsembleModel(state_dim,
                                   action_dim,
                                   ensemble_size=num_network)
        self._epoch_since_last_update = 0

    def train(self,
              inputs, ## (200, 2*2)
              labels, ## (200, 2*2)
              batch_size=64,
              holdout_ratio=0.1,
              max_iter=20):
        # 设置训练集与验证集
        '''
        np.random.permutation(10)
        array([1, 7, 4, 3, 0, 9, 2, 5, 8, 6]) # random
        np.random.permutation([1, 4, 9, 12, 15])
        array([15,  1,  9,  4, 12]) # random
        '''
        permutation = np.random.permutation(inputs.shape[0])    ## shuffle 输入
        inputs, labels = inputs[permutation], labels[permutation]    ## shuffle 输入
        num_holdout = int(inputs.shape[0] * holdout_ratio) ## 用来验证的比例,20
        ## (200*(1-0.2), 2*2)
        train_inputs, train_labels = inputs[num_holdout:], labels[num_holdout:]   ## 拿到用来训练的输入和label
        ## (20, 2*2)
        holdout_inputs, holdout_labels = inputs[:num_holdout], labels[:num_holdout]  ## 用来验证网络是否收敛
        holdout_inputs = torch.from_numpy(holdout_inputs).float().to(device)  ##  转torch的tensor
        holdout_labels = torch.from_numpy(holdout_labels).float().to(device)
        ## (6-1, 20, 2*2)
        holdout_inputs = holdout_inputs[None, :, :].repeat([self._num_network, 1, 1])              ## 重复很多次,每个环境网络拿到一份输入的拷贝
        holdout_labels = holdout_labels[None, :, :].repeat([self._num_network, 1, 1])              ## 重复很多次,每个环境网络拿到一份label的拷贝

        # 保留最好的结果
        self._snapshots = {i: (None, 1e10) for i in range(self._num_network)} ## 每个环境网络的结果

        ## 训练的epoch计数,不会自动停止,需要 break 才可以
        for epoch in itertools.count():
            # 定义每一个网络的train数据
            '''
            np.random.permutation(train_inputs.shape[0])  : shuffle 输入数据的 index
            for _ in range(self._num_network)   : 多少个环境网络,就重复多少次的
            因每个网络都做了一次permutation shuffle,所以每个网络的输入数据的sequence都是不相同的,
            虽然总体的训练数据相同,但是因sequence不同导致了每个batch的输入都不相同。
            np.vstack:最后 vstack起来的,也就是最后的输入 index
            下面的输出train_index的dim是:(20, 6)
            train_index = np.vstack([
                            np.random.permutation(6)
                            for _ in range(20)
                        ])
            train_index 的 dim 是( _num_network,train_inputs.shape[0])
            '''
            ## (6-1, 200-20)
            train_index = np.vstack([
                np.random.permutation(train_inputs.shape[0])
                for _ in range(self._num_network)
            ])
            # 所有真实数据都用来train
            for batch_start_pos in range(0, train_inputs.shape[0], batch_size):  ## 每次输入的数量是 batch_size 个
                ## train_index的第一个dim是 环境网络的个数,第二个dim才是 数据的个数 (6-1,bs)
                batch_index = train_index[:, batch_start_pos : batch_start_pos + batch_size]  ## (num_network, train_input.shape[0])
                train_input = torch.from_numpy(train_inputs[batch_index]).float().to(device)  ## 拿到输入的数据,(6-1,bs, 2*2)
                train_label = torch.from_numpy(train_labels[batch_index]).float().to(device)  ## 输入的label,(6-1,bs, 2*2)
                ## train 集成起来的多个环境模型 (6-1,bs, 2*2) (6-1,bs, 2*2)
                mean, logvar = self.model(train_input, return_log_var=True) 
                loss, _ = self.model.loss(mean, logvar, train_label)  ## 算 loss
                self.model.train(loss)  ## train

            with torch.no_grad():  ## 不算梯度的
                mean, logvar = self.model(holdout_inputs, return_log_var=True)  ## 其他的数据前向传播 (6-1,20, 2*2)(6-1,20, 2*2)
                _, holdout_losses = self.model.loss(mean,
                                                    logvar,
                                                    holdout_labels,
                                                    use_var_loss=False)   ## 算出loss的,只算均值损失,不算方差的损失
                holdout_losses = holdout_losses.cpu()
                break_condition = self._save_best(epoch, holdout_losses)  ## 根据损失的下降程度,决定是否保存模型
                if break_condition or epoch > max_iter:  # 结束训练
                    break

    def _save_best(self, epoch, losses, threshold=0.1):
        updated = False                    ## 是否 update 模型的
        for i in range(len(losses)):  
            current = losses[i]            ## 当前的损失
            _, best = self._snapshots[i]   ## 保存的损失
            improvement = (best - current) / best  ## 损失下降的比例
            if improvement > threshold:    ## 下降的比例大于 threshold
                self._snapshots[i] = (epoch, current)  ## 保存当前的epoch和损失
                updated = True ## 损失update过了的
        ## 用来做train的终止条件的,若是距离上次 update 已经过去了6个epoch,那么就可以终止train
        self._epoch_since_last_update = 0 if updated else self._epoch_since_last_update + 1
        return self._epoch_since_last_update > 5

    def predict(self, inputs, batch_size=64): ## (n_sequence, 2*2)
        mean, var = [], []
        ## 使用多个环境模型来 predict 下个状态和奖励分布的 均值和方差
        for i in range(0, inputs.shape[0], batch_size):
            input = torch.from_numpy(
                inputs[i:min(i +
                             batch_size, inputs.shape[0])]).float().to(device)
            cur_mean, cur_var = self.model(input[None, :, :].repeat(  ## input[None, :, :].repeat([self._num_network, 1, 1])  (6-1, n_sequence, 2*2)
                [self._num_network, 1, 1]),
                                           return_log_var=False)
            mean.append(cur_mean.detach().cpu().numpy()) ## (6-1, n_sequence, 2*2)
            var.append(cur_var.detach().cpu().numpy()) ## (6-1, n_sequence, 2*2)
        return np.hstack(mean), np.hstack(var)

## 构造的虚拟环境,不是真实环境的,虚拟环境只在CEM交叉熵方式选取动作时需要用到,其他地方用不到
class FakeEnv:
    def __init__(self, model):
        ## 集成环境模型
        self.model = model

    def step(self, obs, act):
        inputs = np.concatenate((obs, act), axis=-1)  ## 拼接状态和动作 (n_sequence, 2*2)
        ## 虚拟环境来predict 下个状态和奖励的 均值和方差
        ensemble_model_means, ensemble_model_vars = self.model.predict(inputs) ##  (6-1, n_sequence, 2*2)(6-1, n_sequence, 2*2)
        '''
        下个状态的均值 加上 当前的状态,所以集成环境模型,返回的均值其实是当前状态的残差
        需要加上当前状态,才是真正的下一个状态的均值
        '''
        ensemble_model_means[:, :, 1:] += obs.numpy()
        ensemble_model_stds = np.sqrt(ensemble_model_vars) ## 算标准差的
        ## 标准正态分布采样,然后乘标准差加均值,变到常规正态分布 (ensemble_model_means, ensemble_model_stds**2) 采样
        ensemble_samples = ensemble_model_means + np.random.normal(size=ensemble_model_means.shape) * ensemble_model_stds  ## (6-1, n_sequence, 2*2)

        num_models, batch_size, _ = ensemble_model_means.shape ## 均值的shape  (6-1, n_sequence, 2*2)
        models_to_use = np.random.choice(
            [i for i in range(self.model._num_network)], size=batch_size)  ## 随机选择环境模型 ##(batch_size,)
        batch_inds = np.arange(0, batch_size)
        samples = ensemble_samples[models_to_use, batch_inds] ## 拿到实际的采样值  ##(batch_size,2*2)
        rewards, next_obs = samples[:, :1], samples[:, 1:]    ## 拿出 奖励值,以及 下一个状态  (batch_size, 1)  (batch_size, 2*2-1)
        return rewards, next_obs

    ##  使用了 状态 + 动作 episodes,然后不保存梯度,前向算出累积奖励的
    def propagate(self, obs, actions): ## (n_sequence, 3) (n_sequence, 26)  ,向前看26步
        with torch.no_grad():  ##  不保存梯度的
            obs = np.copy(obs) ##  复制状态的 (n_sequence, 3)
            total_reward = np.expand_dims(np.zeros(obs.shape[0]), axis=-1) ## 初始化累积奖励 (n_sequence,1)
            obs, actions = torch.as_tensor(obs), torch.as_tensor(actions)  ## 向量化
            for i in range(actions.shape[1]):  ## 每个episodes的长度 H,26,也就是这个状态后续的节点个数,也就是向前看26步 =actions.shape[1]
                action = torch.unsqueeze(actions[:, i], 1)  ## 加dim,方便后续运算的,拿到第i个时刻的节点动作  (n_sequence, 1)
                rewards, next_obs = self.step(obs, action)  ## 虚拟环境根据 状态+动作 来执行动作,并返回奖励和下一个状态 (n_sequence, 1)  (n_sequence, 2*2 - 1)
                total_reward += rewards  ## 累积奖励的 
                obs = torch.as_tensor(next_obs) ## 向量化
            return total_reward ## 返回累积奖励的

## 保存历史数据,也就是回放池,用来train模型的
class ReplayBuffer:
    def __init__(self, capacity):
        ## 回放池
        self.buffer = collections.deque(maxlen=capacity)

    def add(self, state, action, reward, next_state, done):
        ## 加入到回放池
        self.buffer.append((state, action, reward, next_state, done))

    def size(self):
        ## 返回回放池内个数的
        return len(self.buffer)

    def return_all_samples(self):
        ## 返回保存的所有历史数据,并转置,每列都是不同的
        all_transitions = list(self.buffer)
        state, action, reward, next_state, done = zip(*all_transitions)
        return np.array(state), action, reward, np.array(next_state), done

class PETS:
    ''' PETS算法 '''
    def __init__(self, env, replay_buffer, n_sequence, elite_ratio,
                 plan_horizon, num_episodes):
        self._env = env     ## 环境的
        self._env_pool = ReplayBuffer(buffer_size)   ## 保存历史数据,回放池

        obs_dim = env.observation_space.shape[0]     ## 状态的dim
        self._action_dim = env.action_space.shape[0] ## 动作的dim
        self._model = EnsembleDynamicsModel(obs_dim, self._action_dim) ## 集成环境模型的
        self._fake_env = FakeEnv(self._model)        ## 实例化虚拟环境,虚拟环境只在CEM交叉熵方式选取动作时需要用到,其他地方用不到
        self.upper_bound = env.action_space.high[0]  ## 动作的最大值
        self.lower_bound = env.action_space.low[0]   ## 动作的最小值

        ## 实例化交叉熵方式,向前看几步用来给出下一个动作,取代了策略网络
        self._cem = CEM(n_sequence, elite_ratio, self._fake_env, self.upper_bound, self.lower_bound)
        self.plan_horizon = plan_horizon ## 指定向前看多少步
        self.num_episodes = num_episodes

    def train_model(self):
        env_samples = self._env_pool.return_all_samples()  ## 返回回放池内所有的历史数据
        obs = env_samples[0]  ## 所有历史状态
        actions = np.array(env_samples[1]) ## 所有历史动作
        rewards = np.array(env_samples[2]).reshape(-1, 1) ## 所有历史奖励
        next_obs = env_samples[3] ## 所有下一步的状态
        inputs = np.concatenate((obs, actions), axis=-1) ## 输入
        ## label是奖励,以及下一步的状态 减去 当前状态得到的残差
        labels = np.concatenate((rewards, next_obs - obs), axis=-1) ## 标签
        self._model.train(inputs, labels) ## train

    def mpc(self, index, num_episode):
        ## np.tile重复均值的
        allimage = []
        mean = np.tile((self.upper_bound + self.lower_bound) / 2.0, self.plan_horizon)            ## 指定向前看多少步均值
        ## np.tile重复方差的
        var = np.tile(np.square(self.upper_bound - self.lower_bound) / 16, self.plan_horizon)     ## 指定向前看多少步方差
        ## 环境重置的,完成标志初始化False,返回的episode
        obs, done, episode_return = self._env.reset(), False, 0
        if len(obs)!=2*2-1:
            obs = obs[0]
        while not done:
            if index==num_episode - 1:
                img = self._env.render()
                allimage.append(img)
            ## 交叉熵方式 来选择 奖励较高的 动作episodes,截断标准正态分布,然后采样算奖励,用奖励较高的episodes来update分布的均值和方差
            ## 样本估计总体的,CEM根据当前状态,均值和方差,返回要采取的动作,不使用策略网络返回动作
            ## (3,) (26-1,) (26-1,)
            actions = self._cem.optimize(obs, mean, var) ## (26-1,),也就是选择的累加奖励最大的episode的向前看的动作
            action = actions[:self._action_dim]  # 选取第一个动作
            ##  环境执行动作,并反馈下一个状态、动作的奖励、是否完成、步长太长的,info
            next_obs, reward, terminated, truncated, info = self._env.step(action)
            done = terminated | truncated       ## 终止或者步长太长,都会导致已经结束
            # next_obs, reward, done, _ = self._env.step(action)  ## 根据动作返回下一个状态、动作的奖励
            ## (当前的状态,当前的动作,奖励,下一个状态,是否完成的)加入到回放池内
            self._env_pool.add(obs, action, reward, next_obs, done)  ## CEM+虚拟的环境选择动作并和真实环境交互的数据
            obs = next_obs
            episode_return += reward
            mean = np.concatenate([
                np.copy(actions)[self._action_dim:],
                np.zeros(self._action_dim)
            ])  ## 使用当前动作来做均值
        if index == num_episode - 1:
            # https://github.com/guicalare/Img2gif/blob/master/Code/Img2Gif.py
            pth = r'C:\Users\10696\Desktop\access\Hands-on-RL'
            imageio.mimsave(os.path.join(pth, 'chapter%s.gif'%str("16")), allimage, duration=10)
        return episode_return

    ## 探索环境的,使用真实环境,来获得真实的交互数据
    def explore(self):
        obs, done, episode_return = self._env.reset(), False, 0 ## 重置环境的
        if len(obs)!=2*2-1:
            obs = obs[0]
        while not done:
            action = self._env.action_space.sample() ## 真实环境随机采取动作
            ##  环境执行动作,并反馈下一个状态、动作的奖励、是否完成、步长太长的,info
            next_obs, reward, terminated, truncated, info = self._env.step(action)
            done = terminated | truncated       ## 终止或者步长太长,都会导致已经结束
            # next_obs, reward, done, _ = self._env.step(action)
            self._env_pool.add(obs, action, reward, next_obs, done) ## 加入到回放池  和真实环境交互的数据
            obs = next_obs
            episode_return += reward
        return episode_return ## 返回累积奖励的

    def train(self):
        return_list = []
        ## 直接返回奖励的
        explore_return = self.explore()  # 先进行随机策略的探索来收集一条序列的数据
        print('episode: 1, return: %d' % explore_return)
        return_list.append(explore_return)
        
        for i_episode in range(self.num_episodes - 1):  ## 指定episode的数量
            self.train_model()                          ## 使用回放池内的数据 train 集成的环境模型
            episode_return = self.mpc(i_episode, self.num_episodes - 1)                 ## 使用CEM交叉熵的方式,来获取动作的,并得到相应的episode
            return_list.append(episode_return)          ## 加入MPC方式episode的累积奖励
            print('episode: %d, return: %d' % (i_episode + 2, episode_return))
        return return_list                              ## 返回累积奖励的列表

考虑 Swish激活函数的求导和梯度

y 1 = e − x y 0 = 1 1 + y 1 y = x ∗ y 0 = x 1 + y 1 = x 1 + e − x y ′ = 1 + e − x − x e − x ( 1 + e − x ) 2 = 1 + y 1 − x y 1 ( 1 + y 1 ) 2 = y 0 − x y 0 2 y 1 y_1=e^{-x}\\ y_0=\frac{1}{1+y_1} \\ y = x*y_0=\frac{x}{1+y_1}=\frac{x}{1+e^{-x}} \\ y'=\frac{1+e^{-x}-xe^{-x}}{(1+e^{-x})^2}=\frac{1+y_1-xy_1}{(1+y_1)^2}=y_0-xy_0^2y_1 y1=exy0=1+y11y=xy0=1+y1x=1+exxy=(1+ex)21+exxex=(1+y1)21+y1xy1=y0xy02y1


https://zhuanlan.zhihu.com/p/658777952

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐