一、实验目的
(1)熟悉动态规划算法中策略评估过程;
(2)了解如何对问题进行建模处理,包括环境、状态、动作、奖惩值的初始化;
二、实验内容与要求
(1)掌握动态算法基本思想,以及策略估计和策略改进过程
(2)理解状态值函数及其定义:
(3)掌握策略估计方法,伪代码如下:
(4)实验要求:
输入(根据Gridworld问题初始化):
⑴ 能够动态输入状态个数(例如5*5,6*6,…);
⑵ 动作空间4个(up,down,left,right);
⑶ 一般的立即奖惩值根据状态和下一动作设定为0或者-1。
输出:
根据Bellman公式计算得到的最后值和对应策略,用图形界面显示(自行设计),在每次迭代中所更新的每个状态的下一动作、立即奖惩值及值需记录在log中(使用XML文档保存)。
三、实验过程及代码
(1)引入要用的的库,初始化参数,接收输入的格子行数,定义起点和终点的位置,确定折扣因子,设定动作集,设定动作执行概率。
import numpy as np import matplotlib.pyplot as plt from matplotlib.table import Table from xml.dom.minidom import Document #手动输入格子的大小 WORLD_SIZE = int(input("请输入行数:")) # 起点和终点的位置(下标从0开始,下同) START_POS = [0,0] END_POS = [WORLD_SIZE-1, WORLD_SIZE-1] # 折扣因子 DISCOUNT = 0.9 # 动作集={上,下,左,右} ACTIONS = [np.array([0, -1]), # left np.array([-1, 0]), # up np.array([0, 1]), # right np.array([1, 0])] # down # 策略,每个动作等概率 ACTION_PROB = 0.25 |
(2)写好文件写入函数,方便后续调用。
def write_datato_xml(data,name): # 实例化一个Domcument dom = Document() # 创建根节点 paper = dom.createElement("Paper") # 将根节点添加到domcument中 dom.appendChild(paper) # 循环遍历所有数据,写入domcument中 # 将sortnumber 写入 for x in range(len(data)): # 创建sortnumber标签 sortnumber = dom.createElement(name) # 将sortnumber加入到根节点paper paper.appendChild(sortnumber) # 取出每一个数据 x_data = data[x] # 创建text标签 sortnumber_text = dom.createTextNode(x_data) # 将text标签加入到sortnumber标签中 sortnumber.appendChild(sortnumber_text) # 添加属性 sortnumber.setAttribute("迭代次数",'{}'.format(x)) with open("data.xml",'w',encoding='utf-8') as f: # f:文件对象,indent:每个tag前面填充的字符,addindent:每个子节点的缩进字符,newl:每个tag后填充的字符 dom.writexml(f, indent='\t', newl='\n', addindent='\t') f.close() |
(3)写好绘图函数,方便后续调用
def draw_image(image): fig, ax = plt.subplots() ax.set_axis_off() tb = Table(ax, bbox=[0, 0, 1, 1]) nrows, ncols = image.shape width, height = 1.0/ ncols, 1.0/ nrows # 添加表格 for (i, j), val in np.ndenumerate(image): if [i,j] == START_POS: tb.add_cell(i, j, width, height, text=val, loc='center', facecolor='Blue') elif [i,j] == END_POS: tb.add_cell(i, j, width, height, text=val, loc='center', facecolor='Red') else: tb.add_cell(i, j, width, height, text=val, loc='center', facecolor='white') # 行标签 for i, label in enumerate(range(len(image))): tb.add_cell(i, -1, width, height, text=label + 1, loc='right', edgecolor='none', facecolor='none') # 列标签 for j, label in enumerate(range(len(image))): tb.add_cell(WORLD_SIZE, j, width, height / 2, text=label + 1, loc='center', edgecolor='none', facecolor='none') ax.add_table(tb) |
(4)指定步行策略,当回到起点和终点时奖励0分,并走一步,当撞墙时,奖励-1分,并原地不动,其他步行均奖励-1分并走一步。
def step(state, action): if state == START_POS: return START_POS, 0 if state == END_POS: return END_POS, 0 next_state = (np.array(state) + action).tolist() x, y = next_state # 判断是否出界 if x < 0 or x >= WORLD_SIZE or y < 0 or y >= WORLD_SIZE: reward = -1.0 next_state = state else: reward = -1.0 return next_state, reward |
(5)使用迭代策略评估计算每个单元格的状态价值函数,遍历执行动作,转移到后继状态,并获得立即惩奖值,求解贝尔曼方程,每一轮迭代都会产生一个new_value,直到new_value和value很接近即收敛为止,迭代终止条件设定为delta小于0.0001,求解出的16个最终状态价值如下图所示。
def grid_world_value_function(): # 状态价值函数的初值 value = np.zeros((WORLD_SIZE, WORLD_SIZE)) episode = 0 history = {} status = []; while True: episode = episode + 1 new_value = np.zeros_like(value) for i in range(WORLD_SIZE): for j in range(WORLD_SIZE): for action in ACTIONS: (next_i, next_j), reward = step([i, j], action) # bellman equation # 由于每个方向只有一个reward和s'的组合,这里的p(s',r|s,a)=1 # 贝尔曼期望方程 new_value[i, j] += ACTION_PROB * (reward + DISCOUNT * value[next_i, next_j]) delta = np.sum(np.abs(new_value - value)) history[episode] = delta # 迭代终止条件: delta小于1e-4 if delta < 1e-4: draw_image(np.round(new_value, decimals=2)) plt.title('$v_{\pi}$') plt.show() plt.close() break # 观察每一轮次状态价值函数及其delta的变化情况 value1 = f"第{episode}轮-delta:{np.round(delta ,decimals=5)}:\n{np.round(new_value,decimals=2)}"; status.append(value1); # print(f"{episode}-{np.round(delta ,decimals=5)}:\n{np.round(new_value,decimals=2)}") value = new_value write_datato_xml(status,"grid_world_value_function") return history, value |
(6)通过迭代求解贝尔曼函数,计算每个格子往各个方向的动作值,并取每个格子的最大值作为该格子的最优动作值,通过该数值可知每个格子该往哪个方向前进。求解出的16个最终的动作值如下图所示。
(7)通过本图可以发现,所有包含且仅包含2个-1.0,两个-1.9,一个-2.71的路径都是最优路径。
def grid_world_optimal_policy(): value = np.zeros((WORLD_SIZE, WORLD_SIZE)) # 通过一个数组来表示每一个格子的最优动作,1表示在相应的方向上最优的 optimal_policy = np.zeros((WORLD_SIZE, WORLD_SIZE, len(ACTIONS))) episode = 0 while True: episode = episode + 1 # keep iteration until convergence new_value = np.zeros_like(value) for i in range(WORLD_SIZE): for j in range(WORLD_SIZE): # 保存当前格子所有action下的state value action_values = [] for action in ACTIONS: (next_i, next_j), reward = step([i, j], action) # 缓存动作值函数 q(s,a) = r + γ*v(s') action_values.append(reward + DISCOUNT * value[next_i, next_j]) # 根据贝尔曼最优方程,找出最大的动作值函数 q(s,a) 进行更新 new_value[i, j] = np.max(action_values) # optimal_policy[i, j] = get_optimal_actions(action_values) delta = np.sum(np.abs(new_value - value)) #迭代终止条件: delta小于1e-4 if delta < 1e-4: draw_image(np.round(new_value, decimals=2)) plt.title('$v_{*}$') plt.show() plt.close() break # 观察每一轮次状态价值函数及其delta的变化情况 print(f"第{episode}轮-delta:{np.round(delta ,decimals=5)}:\n{np.round(new_value,decimals=2)}") value = new_value |
绘制迭代策略评估的曲线
def plot_his(history, title): index, delta = history.keys(), history.values() plt.plot(index, delta ) plt.title(title) plt.xlabel("episode") plt.ylabel("delta ") if len(history) != 1: plt.legend(["grid_world_value_function", "grid_world_value_function_in_place"]) plt.show() |
(8)写入xml文件的每次迭代的数据如下:
四、实验分析及总结
强化学习过程可以看做一系列的state、reward、action的组合。在学习中,我们关注的不仅仅是当前的reward,因状态的转移可能对未来的收益都有影响,所以我们关注的是总体reward之和。为了数学上使得reward之和收敛,此处选择了一个折扣因子 discount 。当discount =1时,表明未来以及现在的reward具有相同的权重,可以说此时的agent是一个 far-sighted,当discount =0时,表明完全不考虑未来的reward,只考虑当下,可以说此时的agent是一个 myopic。当我们从当前状态 s 转移到下一时刻状态时,对下一时刻每一个状态s’ 存在一个从状态s 到 s’的状态转移概率 Pss′,那么当前状态的价值函数可以表示为当前状态的reward 加上带discount的下一时刻状态s与状态转移概率 Pss′乘积的累加和。策略表示在状态s下采取动作a的概率,每个动作都伴随着状态的转移,所以找出最优的状态价值函数就意味着, 找出了最优策略。