|
大家好,今天和各位分享一下深度强化学习中的近端策略优化算法(proximalpolicyoptimization,PPO),并借助OpenAI的gym环境完成一个小案例,完整代码可以从我的GitHub中获得:https://github.com/LiSir-HIT/Reinforcement-Learning/tree/main/Model1.算法原理PPO算法之所以被提出,根本原因在于PolicyGradient在处理连续动作空间时Learningrate取值抉择困难。Learningrate取值过小,就会导致深度强化学习收敛性较差,陷入完不成训练的局面,取值过大则导致新旧策略迭代时数据不一致,造成学习波动较大或局部震荡。除此之外,PolicyGradient因为在线学习的性质,进行迭代策略时原先的采样数据无法被重复利用,每次迭代都需要重新采样;同样地置信域策略梯度算法(TrustRegionPolicyOptimization,TRPO)虽然利用重要性采样(Important-sampling)、共轭梯度法求解提升了样本效率、训练速率等,但在处理函数的二阶近似时会面临计算量过大,以及实现过程复杂、兼容性差等缺陷。 PO算法具备PolicyGradient、TRPO的部分优点,采样数据和使用随机梯度上升方法优化代替目标函数之间交替进行,虽然标准的策略梯度方法对每个数据样本执行一次梯度更新,但PPO提出新目标函数,可以实现小批量更新。鉴于上述问题,该算法在迭代更新时,观察当前策略在t时刻智能体处于状态s所采取的行为概率,与之前策略所采取行为概率 ,计算概率的比值来控制新策略更新幅度,比值 记作:若新旧策略差异明显且优势函数较大,则适当增加更新幅度;若 比值越接近1,表明新旧策略差异越小。优势函数代表,在状态s下,行为a相对于均值的偏差。在论文中,优势函数 使用GAE(generalizedadvantageestimation)来计算:PPO算法可依据Actor网络的更新方式细化为含有自适应KL-散度(KLPenalty)的PPO-Penalty和含有ClipppedSurrogateObjective函数的PPO-Clip。(1)PPO-Penalty基于KL惩罚项优化目标函数,实验证明惩罚项系数 在迭代过程中并非固定值,需要动态调整惩罚权重,其目标函数L可以定义为:惩罚项 的初始值的选择对算法几乎无影响,原因是它能在每次迭代时依据新旧策略的KL散度做适宜调整,首先设置KL散度阈值 ,再通过下面的表达式计算 :如果 时,证明散度较小,需要弱化惩罚力度, 调整为 ;如果 时,证明散度较大,需要增强惩罚力度, 调整为 。(2)PPO-Clip直接对新旧策略比例进行一定程度的Clip操作,以约束变化幅度。其目标函数的计算方式如下:其中, 代表截断超参数,一般设定值为0.2; 表示截断函数,负责限制比例 在 区间之内,以保证收敛性;最终 借助 函数选取未截断与截断目标之间的更小值,形成目标下限。 可以分为优势函数A为正数和负数两种情况,其变化趋势如下图所示:如果优势函数为正数,需要增大新旧策略比值 ,然而当 时,将不提供额外的激励;如果优势函数是负数,需要减少新旧策略比值 ,但在 时,不提供额外的激励,这使得新旧策略的差异被限制在合理范围内。PPO本质上基于Actor-Critic框架,算法流程如下:PPO算法主要由Actor和Critic两部分构成,Critic部分更新方式与其他Actor-Critic类型相似,通常采用计算TD error(时序差分误差)形式。对于Actor的更新方式,PPO可在KLPENL、CLIPL之间选择对于当前实验环境稳定性适用性更强的目标函数,经过OpenAI研究团队实验论证,PPO-Clip比PPO-Penalty有更好的数据效率和可行性。 2.代码实现下面我就采用Clip形式的PPO。模型构建代码如下。下面的模型适用于action是离散的情况,连续情况的代码可以从我的GitHub中获取。#代码用于离散环境的模型importnumpyasnpimporttorchfromtorchimportnnfromtorch.nnimportfunctionalasF#-----------------------------------##构建策略网络--actor#-----------------------------------#classPolicyNet(nn.Module):def__init__(self,n_states,n_hiddens,n_actions):super(PolicyNet,self).__init__()self.fc1=nn.Linear(n_states,n_hiddens)self.fc2=nn.Linear(n_hiddens,n_actions)defforward(self,x):x=self.fc1(x)#[b,n_states]-->[b,n_hiddens]x=F.relu(x)x=self.fc2(x)#[b,n_actions]x=F.softmax(x,dim=1)#[b,n_actions]计算每个动作的概率returnx#-----------------------------------##构建价值网络--critic#-----------------------------------#classValueNet(nn.Module):def__init__(self,n_states,n_hiddens):super(ValueNet,self).__init__()self.fc1=nn.Linear(n_states,n_hiddens)self.fc2=nn.Linear(n_hiddens,1)defforward(self,x):x=self.fc1(x)#[b,n_states]-->[b,n_hiddens]x=F.relu(x)x=self.fc2(x)#[b,n_hiddens]-->[b,1]评价当前的状态价值state_valuereturnx#-----------------------------------##构建模型#-----------------------------------#classPPO:def__init__(self,n_states,n_hiddens,n_actions,actor_lr,critic_lr,lmbda,epochs,eps,gamma,device):#实例化策略网络self.actor=PolicyNet(n_states,n_hiddens,n_actions).to(device)#实例化价值网络self.critic=ValueNet(n_states,n_hiddens).to(device)#策略网络的优化器self.actor_optimizer=torch.optim.Adam(self.actor.parameters(),lr=actor_lr)#价值网络的优化器self.critic_optimizer=torch.optim.Adam(self.critic.parameters(),lr=critic_lr)self.gamma=gamma#折扣因子self.lmbda=lmbda#GAE优势函数的缩放系数self.epochs=epochs#一条序列的数据用来训练轮数self.eps=eps#PPO中截断范围的参数self.device=device#动作选择deftake_action(self,state):#维度变换[n_state]-->tensor[1,n_states]state=torch.tensor(state[np.newaxis,:]).to(self.device)#当前状态下,每个动作的概率分布[1,n_states]probs=self.actor(state)#创建以probs为标准的概率分布action_list=torch.distributions.Categorical(probs)#依据其概率随机挑选一个动作action=action_list.sample().item()returnaction#训练deflearn(self,transition_dict):#提取数据集states=torch.tensor(transition_dict['states'],dtype=torch.float).to(self.device)actions=torch.tensor(transition_dict['actions']).to(self.device).view(-1,1)rewards=torch.tensor(transition_dict['rewards'],dtype=torch.float).to(self.device).view(-1,1)next_states=torch.tensor(transition_dict['next_states'],dtype=torch.float).to(self.device)dones=torch.tensor(transition_dict['dones'],dtype=torch.float).to(self.device).view(-1,1)#目标,下一个状态的state_value[b,1]next_q_target=self.critic(next_states)#目标,当前状态的state_value[b,1]td_target=rewards+self.gamma*next_q_target*(1-dones)#预测,当前状态的state_value[b,1]td_value=self.critic(states)#目标值和预测值state_value之差[b,1]td_delta=td_target-td_value#时序差分值tensor-->numpy[b,1]td_delta=td_delta.cpu().detach().numpy()advantage=0#优势函数初始化advantage_list=[]#计算优势函数fordeltaintd_delta[::-1]:#逆序时序差分值axis=1轴上倒着取[],[],[]#优势函数GAE的公式advantage=self.gamma*self.lmbda*advantage+deltaadvantage_list.append(advantage)#正序advantage_list.reverse()#numpy-->tensor[b,1]advantage=torch.tensor(advantage_list,dtype=torch.float).to(self.device)#策略网络给出每个动作的概率,根据action得到当前时刻下该动作的概率old_log_probs=torch.log(self.actor(states).gather(1,actions)).detach()#一组数据训练epochs轮for_inrange(self.epochs):#每一轮更新一次策略网络预测的状态log_probs=torch.log(self.actor(states).gather(1,actions))#新旧策略之间的比例ratio=torch.exp(log_probs-old_log_probs)#近端策略优化裁剪目标函数公式的左侧项surr1=ratio*advantage#公式的右侧项,ratio小于1-eps就输出1-eps,大于1+eps就输出1+epssurr2=torch.clamp(ratio,1-self.eps,1+self.eps)*advantage#策略网络的损失函数actor_loss=torch.mean(-torch.min(surr1,surr2))#价值网络的损失函数,当前时刻的state_value-下一时刻的state_valuecritic_loss=torch.mean(F.mse_loss(self.critic(states),td_target.detach()))#梯度清0self.actor_optimizer.zero_grad()self.critic_optimizer.zero_grad()#反向传播actor_loss.backward()critic_loss.backward()#梯度更新self.actor_optimizer.step()self.critic_optimizer.step()3.案例演示基于OpenAI的gym环境完成一个推车游戏,一个离散的环境,目标是左右移动小车将黄色的杆子保持竖直。动作维度为2,属于离散值;状态维度为4,分别是坐标、速度、角度、角速度。importnumpyasnpimportmatplotlib.pyplotaspltimportgymimporttorchfromRL_brainimportPPOdevice=torch.device('cuda')iftorch.cuda.is_available()\elsetorch.device('cpu')#-----------------------------------------##参数设置#-----------------------------------------#num_episodes=100#总迭代次数gamma=0.9#折扣因子actor_lr=1e-3#策略网络的学习率critic_lr=1e-2#价值网络的学习率n_hiddens=16#隐含层神经元个数env_name='CartPole-v1'return_list=[]#保存每个回合的return#-----------------------------------------##环境加载#-----------------------------------------#env=gym.make(env_name,render_mode="human")n_states=env.observation_space.shape[0]#状态数4n_actions=env.action_space.n#动作数2#-----------------------------------------##模型构建#-----------------------------------------#agent=PPO(n_states=n_states,#状态数n_hiddens=n_hiddens,#隐含层数n_actions=n_actions,#动作数actor_lr=actor_lr,#策略网络学习率critic_lr=critic_lr,#价值网络学习率lmbda=0.95,#优势函数的缩放因子epochs=10,#一组序列训练的轮次eps=0.2,#PPO中截断范围的参数gamma=gamma,#折扣因子device=device)#-----------------------------------------##训练--回合更新on_policy#-----------------------------------------#foriinrange(num_episodes):state=env.reset()[0]#环境重置done=False#任务完成的标记episode_return=0#累计每回合的reward#构造数据集,保存每个回合的状态数据transition_dict={'states':[],'actions':[],'next_states':[],'rewards':[],'dones':[],}whilenotdone:action=agent.take_action(state)#动作选择next_state,reward,done,_,_=env.step(action)#环境更新#保存每个时刻的状态\动作\...transition_dict['states'].append(state)transition_dict['actions'].append(action)transition_dict['next_states'].append(next_state)transition_dict['rewards'].append(reward)transition_dict['dones'].append(done)#更新状态state=next_state#累计回合奖励episode_return+=reward#保存每个回合的returnreturn_list.append(episode_return)#模型训练agent.learn(transition_dict)#打印回合信息print(f'iter:{i},return:{np.mean(return_list[-10:])}')#--------------------------------------##绘图#--------------------------------------#plt.plot(return_list)plt.title('return')plt.show()训练100回合,绘制每回合的return
|
|