import gym
import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T
env = gym.make('CartPole-v1').unwrapped
# set up matplotlib
is_ipython ='inline'in matplotlib.get_backend()if is_ipython:from IPython import display
# matplotlib.get_backend()=inline即当前后端为ipykernel.pylab.backend_inline# is_ipython=True
plt.ion()# plt.ion()函数能使matplotlib的显示模式转换为交互(interactive)模式。# if gpu is to be used
device = torch.device("cuda"if torch.cuda.is_available()else"cpu")
模型是一个卷积神经网络,输入当前屏幕补丁和之前屏幕补丁的差异。它有两个输出,分别代表 Q ( s , l e f t ) Q(s, \mathrm{left}) Q(s,left)和 Q ( s , r i g h t ) Q(s, \mathrm{right}) Q(s,right),其中 s s s是网络的输入,网络在尝试预测在当前输入下执行每个操作的预期收益
以下为训练模型的代码,即执行优化步骤的optimize_model函数,首先采样一个batch,将所有张量连接成一个张量,计算 Q ( s t , a t ) Q(s_t, a_t) Q(st,at)和 V ( s t + 1 ) = max a Q ( s t + 1 , a ) V(s_{t+1}) = \max_a Q(s_{t+1}, a) V(st+1)=maxaQ(st+1,a),将其合并为损失。根据定义,如果 s s s为终端状态,则设置 V ( s ) = 0 V(s) = 0 V(s)=0。 使用目标网络来计算 V ( s [ t + 1 ] ) V(s[t+1]) V(s[t+1])
defoptimize_model():iflen(memory)< BATCH_SIZE:return
transitions = memory.sample(BATCH_SIZE)# 从经验池中随机采样BATCH_SIZE# 将batch_size个四元组,转换成,四个元祖,每个元祖有batch_size个项# 如选择2个样本(1,1,1,1)和(2,2,2,2),转换后batch=Transition(state=(1,2),action=(1,2),next_state=(1,2),reward=(1,2))
batch = Transition(*zip(*transitions))# lambda s:s is not None:输入s,输出一个bool值判断s是否为空# (map(lambda s:s is not None,batch.next_state)为每个样本的next_state执行lambda操作即s=batch.next_state# 整句代码是输出每个样本batch.next_state的True/False,区分终止状态和非终止状态
non_final_mask = torch.tensor(tuple(map(lambda s:s isnotNone,batch.next_state)),device=device,dtype=torch.bool)# non_final_next_state存储非终止状态值
non_final_next_state = torch.cat([s for s in batch.next_state if s isnotNone])
state_batch = torch.cat(batch.state)
action_batch = torch.cat(batch.action)
reward_batch = torch.cat(batch.reward)# gather(dim,index):按照index取值,dim决定索引维度
state_action_values = policy_net(state_batch).gather(1,action_batch)
next_state_values = torch.zeros(BATCH_SIZE,device=device)# 更新下一非空状态的Q值,选择值函数最大的动作对应的state-action value
next_state_values[non_final_mask]= target_net(non_final_next_state).max(1)[0].detach()# 计算预期Q值(贝尔曼方程)
expeacted_state_action_values =(next_state_values * GAMMA)+ reward_batch
# 计算TD误差
criterion = nn.SmoothL1Loss()
loss = criterion(state_action_values,expeacted_state_action_values.unsqueeze(1))# 优化模型
optimizer.zero_grad()
loss.backward()for param in policy_net.parameters():
param.grad.data.clamp_(-1,1)# 梯度截断,防止出现梯度爆炸,将梯度约束在(-1,1)间
optimizer.step()
# 示例from collections import namedtuple
Transition11 = namedtuple('Transition',('state','action','next_state','reward'))
tran1 = Transition11._make([1,2,3,4])
tran2 = Transition11._make([11,22,33,44])
tran3 = Transition11._make([11,22,0,44])
batch1 = Transition11(*zip(tran1,tran2,tran3))
x = torch.tensor(tuple(map(lambda s:s isnotNone,batch1.next_state)),dtype=torch.bool)
y = torch.tensor([s for s in batch1.next_state if s isnotNone])
z = torch.zeros(3)
bs = torch.tensor(batch1.state)print(x,y,z)print(z[x])