策略梯度方法


之前一直在讲 action-value 方法,它们都依赖于对 action-value 的估计,而本章的方法将考虑直接去学习『参数化策略』,这样就能不通过 value function 来选择 action 。
$\pi(a | s, \mathbf{\theta})={Pr}\{A_{t}=a | S_{t}=s, \mathbf{\theta}_{t}=\mathbf{\theta}\}$
本章使用性能度量函数$J ( \theta )$(关于策略参数$\theta$)的梯度来学习,实现最大化性能指标,这些方法更新近似于 $J$的梯度上升,
$\mathbf{\theta}_{t+1}=\mathbf{\theta}_{t}+\alpha \widehat{\nabla J\left(\mathbf{\theta}_{t}\right)}$
其中$\widehat{\nabla J\left(\mathbf{\theta}_{t}\right)} \in \mathbb{R}^{d^{\prime}}$是对梯度的一个估计,其期望值近似于性能度量相对于其参数${\theta}_{t}$的梯度。
而 遵循此一般模式的所有方法,我们都称为 策略梯度方法,无论它们是否也学习近似值函数。
学习策略和值函数的近似值的方法通常称为『actor-critic methods』,其中『actor』指所学的 policy,『critic』指所学的 value function 。

13.1 策略近似及其优势

对于不太大的离散 action 空间,若要将策略参数化,一个很自然的方法是对每个 state-action 来构造一个参数化的数值偏好 $h(s,a,θ)∈R$,进而通过指数 softmax 函数来得到分布。通过这样参数化得到的策略为『soft-max in action preferences』。
动作偏好值可以被任意参数化。既可以用神经网络来计算( θ 作为网络的权重),也可以简单地使用线性模型 $h(s,a,θ)=θ^{⊤}x(s,a)$ (特征向量$\mathbf{x}(s, a) \in \mathbb{R}^{d^{\prime}}$可以由前面章节介绍的方法构建)
Policy Approximation 有几个优点:

  1. 近似策略可以接近于一个确定策略(比如将某个 a 对应的 h(s,a,θ) 设为无穷大即可),而传统的 ε -greedy 策略则不能做到,因为它必须对非最优策略分配 ε 的概率。
  2. 能灵活地任意分配动作的概率,对于一些特殊情况,比如不完全信息下的卡牌游戏,最佳策略对应的选择随机性很强,能够对两种差异很大的动作来分配概率进而做出选择,比如扑克中虚张声势,这对于 action-value 方法而言很难做到,但基于策略近似的方法可以。
  3. 策略可以用更简单的函数近似(相比于动作价值函数,具体情况具体分析)
  4. 策略参数化能较好地将策略形式的先验知识引入强化学习系统,这通常也是选择基于策略的学习方法的重要原因。

13.2 策略梯度定理

除了上节中优点之外,策略梯度的方法使得动作概率可以平滑变化,而不是像$\epsilon-greedy$中动作可能发生突变。这也使得基于策略梯度的方法能够比基于动作函数的方法有更强的收敛保证
针对不同的类型情况,具体的性能指标 $J ( \theta )$会有所不同:

  • 分幕式

将性能指标定义为幕初始状态的价值$J(\boldsymbol{\theta})=v_{\pi_{\theta}}\left(s_{0}\right)$
在函数逼近情况下,想要通过调整策略参数保证性能改善是比较困难的。因为当前的性能指标既依赖于动作的选择,也依赖于动作选择时所处的状态分布,这两者又会受到策略参数的影响。
给定状态下,策略参数对动作选择以及收益的影响可以直观计算出来,但是对状态分布的影响很难确切知道。
策略梯度定理让我们能够绕过这一问题得到梯度:
$\nabla J(\boldsymbol{\theta}) \propto \sum_{s} \mu(s) \sum_{a} q_{\pi}(s, a) \nabla \pi(a \mid s, \boldsymbol{\theta})$
正比于幕的平均长度。

  • 持续式

这种情况下,比例常量就是1,梯度就直接是上面的等式。

策略梯度定理的证明

状态价值函数的梯度可以写为动作价值函数的形式:
image.png
其中$Pr(s \to x,k,\pi)$是在策略$\pi$下,状态s在k步内转移到状态x的概率。

分幕式同轨策略分布(书中197页),其中h(s)表示从状态s开始一幕交互序列的概率,$\eta (s)$表示s在单幕交互中消耗的平均步数

13.3 REINFORCE: 蒙特卡洛策略梯度

接下来将介绍第一个策略梯度学习算法。要一开始的目标是要得到随机梯度上升的形式,且希望样本梯度的期望恰好为度量函数的梯度,而策略梯度定理给出的公式恰好满足:

其中$\hat{q} \approx q_{\pi}$,这个算法又被称为全部动作算法,因为更新涉及所有可能的动作。

REINFORCE算法

  • 时刻t的更新仅仅涉及了$A_t$,即在时刻t被实际采用的动作
  • 引入概率加权系数


梯度更新公式转换为:

  • 回报与一个向量的乘积
  • 向量:选取动作概率的梯度除以它本身
  • 使得更新朝着更利于产生最大回报的动作方向,使得高价值动作选择概率增大
  • 回报的获得表明了REINFORCE是一个蒙特卡洛算法


REINFORCE具有较好的理论收敛性,但是它也具有MC自身的缺陷:具有较高的方差和较慢的学习速度。

13.4 带有基线的REINFORCE

为了解决存在的高方差问题,在原有的算法中引入基线(可以是函数或变量)
得到:

其中(基线与动作a无关)

以下采用状态加值函数$\hat{v}(S_t,w)$作为基线函数,其中w是权重向量

该算法有两个步长,需要手动设置如何调整。

REINFORCE代码实现

class REINFORCE:
    def __init__(self, state_dim, hidden_dim, action_dim, learning_rate, gamma,
                 device):
                     self.policy_net = PolicyNet(state_dim, hidden_dim,
                                                 action_dim).to(device)
                     self.optimizer = torch.optim.Adam(self.policy_net.parameters(),
                                                       lr=learning_rate)  # 使用Adam优化器
                     self.gamma = gamma  # 折扣因子
                     self.device = device

    def take_action(self, state):  # 根据动作概率分布随机采样
        state = torch.tensor([state], dtype=torch.float).to(self.device)
        probs = self.policy_net(state)
        action_dist = torch.distributions.Categorical(probs)
        action = action_dist.sample()
        return action.item()

    def update(self, transition_dict):
        reward_list = transition_dict['rewards']
        state_list = transition_dict['states']
        action_list = transition_dict['actions']

        G = 0
        self.optimizer.zero_grad()
        for i in reversed(range(len(reward_list))):  # 从最后一步算起
            reward = reward_list[i]
            state = torch.tensor([state_list[i]],
                                 dtype=torch.float).to(self.device)
            action = torch.tensor([action_list[i]]).view(-1, 1).to(self.device)
            log_prob = torch.log(self.policy_net(state).gather(1, action))
            G = self.gamma * G + reward
            loss = -log_prob * G  # 每一步的损失函数
            loss.backward()  # 反向传播计算梯度
        self.optimizer.step()  # 梯度下降

13.5 “行动器-评判器”方法

带基线的强化学习既学习了一个策略函数也学习了一个价值函数但是状态价值函数仅用作基线,没有自举操作(用后继各个状态的价值估计值来更行当前某个状态的价值估计值),不作为评判器。带基线的REINFORCE方法是无偏差的,但是学习缓慢,不便于在线实现,不便于应用于持续性问题
单步“行动器-评判器”方法

  • 优势在于完全在线与增量式,同时避免了使用资格迹的复杂性
  • 使用单步回报与估计的状态价值函数作为基线来代替REINFORCE算法中的整个回报

v2-f6dad93d083193daa9fc474f33ff39d2_720w.png
v2-8a209f3cf15889e6c0b653a9bb530320_720w.webp
n步方法
微信图片_20221109223119.png

13.6 持续性问题的策略梯度

没有分幕式边界的持续性问题,根据每个时刻的平均收益定义性能

$$\begin{aligned} J(\theta)\dot =r(\pi)&\dot =\lim_{h\rightarrow \infty }\frac{1}{h} \sum_{t=1}^{h} E[R_t|S_0,A_{0:t-1}\sim \pi] \\ &=\lim_{t\rightarrow \infty}E[R_t|S_0,A_{0:t-1}\sim \pi] \\ &=\sum_s \mu(s)\sum_{a} \pi(a|s)\sum_{S’,r}p(s’,r|s,a)r \end{aligned}$$


$\mu=\lim_{t \rightarrow \infty}Pr\{S_t=s|A_{0:t}\sim \pi\}$为策略下$\pi$下的稳定状态分布,并假设一定存在并独立于$S_0$。根据策略$\pi$选择动作则该分布会保持不变
$\sum_s \mu(s)\sum_a\pi(a|s,\theta)p(s’|s,a)=\mu(s’)$
image.png
在持续问题中,用差分回报定义价值函数
$\begin{aligned} &V_{\pi}(s)\dot =E_{\pi}[G_t|S_t=s]\\ &q_{\pi}(s,a)\dot =E_{\pi}[G_t|S_t=s,A_t=a]\\ &G_t\dot =R_{t-1}-r(\pi)+R_{t-2}-r(\pi)+R_{t-3}-r(\pi)+… \end{aligned}$
在分幕式下策略梯度定理对于持续性任务的情况也同样正确
image.png

13.7 针对连续动作的策略参数化方法

基于参数化策略函数的方法还提供了解决动作空间大甚至动作空间连续(动作无限多)的实际途径,不直接计算每个动作的概率,而是学习概率分布的统计量。例如动作为一个实数集可以根据正态分布来选择动作
正态分布概率密度函数可以写为
$p(x)\dot= \frac{1}{\sigma \sqrt{2\pi}}exp(-\frac{(x-\mu)^2}{2\sigma^2})$
将策略定义为实数型的标量动作的正态概率密度,均值和标准差由状态的参数化函数近似给出
$\pi (a|s,\theta)\dot =\frac{1}{\sigma(s,\theta)\sqrt{2\pi}}exp(-\frac{(a-\mu(s,\theta))^2}{2\sigma(s,\theta)^2})$
$\mu,\sigma$为两个参数化的近似函数,将策略的参数向量划分为两个部分$\theta =[\theta _{\mu},\theta_{\sigma}]$,一部分用于近似均值一部分分用于近似标准差。均值可以用一个线性函数来逼近,标准差必须为正数,因而使用线性函数的指数形式比较好
$\mu(s,\theta)\dot=\theta_{\mu}^Tx_{\mu}(s)和\sigma(s,\theta)\dot=exp(\theta_{\sigma}^Tx_{\sigma}(s))$

练习
image.png

image.png
image.pngimage.png

AC

image.png

import numpy as np
import tensorflow as tf
import gym
import pandas as pd

OUTPUT_GRAPH = False
MAX_EPISODE = 500
DISPLAY_REWARD_THRESHOLD = 200  # renders environment if total episode reward is greater then this threshold
MAX_EP_STEPS = 2000  # maximum time step in one episode
RENDER = False  # rendering wastes time
GAMMA = 0.9  # reward discount in TD error
LR_A = 0.001  # learning rate for actor
LR_C = 0.001  # learning rate for critic


class Actor(object):
    def __init__(self, sess, n_features, n_actions, lr=0.001):
        self.sess = sess

        self.s = tf.placeholder(tf.float32, [1, n_features], "state")
        self.a = tf.placeholder(tf.int32, None, "action")
        self.q = tf.placeholder(tf.float32, None, "q")  # TD_error

        with tf.variable_scope('Actor'):
            l1 = tf.layers.dense(
                inputs=self.s,
                units=20,  # number of hidden units
                activation=tf.nn.relu,
                kernel_initializer=tf.random_normal_initializer(0., .1),  # weights
                bias_initializer=tf.constant_initializer(0.1),  # biases
                name='l1'
            )

            self.acts_prob = tf.layers.dense(
                inputs=l1,
                units=n_actions,  # output units
                activation=tf.nn.softmax,  # get action probabilities
                kernel_initializer=tf.random_normal_initializer(0., .1),  # weights
                bias_initializer=tf.constant_initializer(0.1),  # biases
                name='acts_prob'
            )

        with tf.variable_scope('exp_v'):
            log_prob = tf.log(self.acts_prob[0, self.a])
            self.exp_v = tf.reduce_mean(log_prob * self.q)  # advantage (TD_error) guided loss

        with tf.variable_scope('train'):
            self.train_op = tf.train.AdamOptimizer(lr).minimize(-self.exp_v)  # minimize(-exp_v) = maximize(exp_v)

    def learn(self, s, a, q):
        s = s[np.newaxis, :]
        feed_dict = {self.s: s, self.a: a, self.q: q}
        _, exp_v = self.sess.run([self.train_op, self.exp_v], feed_dict)
        return exp_v

    def choose_action(self, s):
        s = s[np.newaxis, :]
        probs = self.sess.run(self.acts_prob, {self.s: s})  # get probabilities for all actions
        return np.random.choice(np.arange(probs.shape[1]), p=probs.ravel())  # return a int


class Critic(object):
    def __init__(self, sess, n_features,n_actions, lr=0.01):
        self.sess = sess

        self.s = tf.placeholder(tf.float32, [None, n_features], "state")
        self.a = tf.placeholder(tf.int32,[None, 1],"action")
        self.r = tf.placeholder(tf.float32, None, 'r')
        self.q_ = tf.placeholder(tf.float32,[None,1],'q_next')

        self.a_onehot = tf.one_hot(self.a, n_actions, dtype=tf.float32)
        self.a_onehot = tf.squeeze(self.a_onehot,axis=1)

        self.input = tf.concat([self.s,self.a_onehot],axis=1)

        with tf.variable_scope('Critic'):
            l1 = tf.layers.dense(
                inputs=self.input,
                units=20,  # number of hidden units
                activation=tf.nn.relu,  # None
                kernel_initializer=tf.random_normal_initializer(0., .1),  # weights
                bias_initializer=tf.constant_initializer(0.1),  # biases
                name='l1'
            )

            self.q = tf.layers.dense(
                inputs=l1,
                units=1,  # output units
                activation=None,
                kernel_initializer=tf.random_normal_initializer(0., .1),  # weights
                bias_initializer=tf.constant_initializer(0.1),  # biases
                name='Q'
            )

        with tf.variable_scope('squared_TD_error'):
            self.td_error = self.r + GAMMA * self.q_ - self.q
            self.loss = tf.square(self.td_error)  # TD_error = (r+gamma*V_next) - V_eval
        with tf.variable_scope('train'):
            self.train_op = tf.train.AdamOptimizer(lr).minimize(self.loss)

    def learn(self, s, a, r, s_):

        s, s_ = s[np.newaxis, :], s_[np.newaxis, :]
        next_a = [[i] for i in range(N_A)]
        s_ = np.tile(s_,[N_A,1])
        q_ = self.sess.run(self.q, {self.s: s_,self.a:next_a})
        q_ = np.max(q_,axis=0,keepdims=True)
        q, _ = self.sess.run([self.q, self.train_op],
                                    {self.s: s, self.q_: q_, self.r: r,self.a:[[a]]})
        return q


# action有两个,即向左或向右移动小车
# state是四维

env = gym.make('CartPole-v0')
env.seed(1)  # reproducible
env = env.unwrapped

N_F = env.observation_space.shape[0]
N_A = env.action_space.n

sess = tf.Session()

actor = Actor(sess, n_features=N_F, n_actions=N_A, lr=LR_A)
critic = Critic(sess, n_features=N_F,n_actions=N_A,lr=LR_C)

sess.run(tf.global_variables_initializer())

res = []
for i_episode in range(MAX_EPISODE):
    s = env.reset()
    t = 0
    track_r = []
    while True:
        if RENDER: env.render()

        a = actor.choose_action(s)

        s_, r, done, info = env.step(a)

        if done: r = -20

        track_r.append(r)

        q = critic.learn(s, a,r, s_)  # gradient = grad[r + gamma * V(s_) - V(s)]
        actor.learn(s, a, q)  # true_gradient = grad[logPi(s,a) * td_error]

        s = s_
        t += 1

        if done or t >= MAX_EP_STEPS:
            ep_rs_sum = sum(track_r)

            if 'running_reward' not in globals():
                running_reward = ep_rs_sum
            else:
                running_reward = running_reward * 0.95 + ep_rs_sum * 0.05
            if running_reward > DISPLAY_REWARD_THRESHOLD: RENDER = True  # rendering
            print("episode:", i_episode, "  reward:", int(running_reward))
            res.append([i_episode,running_reward])

            break

pd.DataFrame(res,columns=['episode','ac_reward']).to_csv('../ac_reward.csv')

A2C

A2C全称为优势动作评论算法(Advantage Actor Critic)。A2C使用优势函数代替Critic网络中的原始回报,可以作为衡量选取动作值和所有动作平均值好坏的指标。
$A_{\pi}(s,a)=Q_{\pi}(s,a)-V_{\pi}(s)$
image.png
image.png

"""
Actor-Critic using TD-error as the Advantage, Reinforcement Learning.

The cart pole example. Policy is oscillated.

View more on my tutorial page: https://morvanzhou.github.io/tutorials/

Using:
tensorflow 1.0
gym 0.8.0
"""

import numpy as np
import tensorflow as tf
import gym

np.random.seed(2)
tf.set_random_seed(2)  # reproducible

# Superparameters
OUTPUT_GRAPH = False
MAX_EPISODE = 3000
DISPLAY_REWARD_THRESHOLD = 200  # renders environment if total episode reward is greater then this threshold
MAX_EP_STEPS = 1000   # maximum time step in one episode
RENDER = False  # rendering wastes time
GAMMA = 0.9     # reward discount in TD error
LR_A = 0.001    # learning rate for actor
LR_C = 0.01     # learning rate for critic

env = gym.make('CartPole-v0')
env.seed(1)  # reproducible
env = env.unwrapped

N_F = env.observation_space.shape[0]
N_A = env.action_space.n


class Actor(object):
    def __init__(self, sess, n_features, n_actions, lr=0.001):
        self.sess = sess

        self.s = tf.placeholder(tf.float32, [1, n_features], "state")
        self.a = tf.placeholder(tf.int32, None, "act")
        self.td_error = tf.placeholder(tf.float32, None, "td_error")  # TD_error

        with tf.variable_scope('Actor'):
            l1 = tf.layers.dense(
                inputs=self.s,
                units=20,    # number of hidden units
                activation=tf.nn.relu,
                kernel_initializer=tf.random_normal_initializer(0., .1),    # weights
                bias_initializer=tf.constant_initializer(0.1),  # biases
                name='l1'
            )

            self.acts_prob = tf.layers.dense(
                inputs=l1,
                units=n_actions,    # output units
                activation=tf.nn.softmax,   # get action probabilities
                kernel_initializer=tf.random_normal_initializer(0., .1),  # weights
                bias_initializer=tf.constant_initializer(0.1),  # biases
                name='acts_prob'
            )

        with tf.variable_scope('exp_v'):
            log_prob = tf.log(self.acts_prob[0, self.a])
            self.exp_v = tf.reduce_mean(log_prob * self.td_error)  # 更新theta advantage (TD_error) guided loss

        with tf.variable_scope('train'):
            self.train_op = tf.train.AdamOptimizer(lr).minimize(-self.exp_v)  # minimize(-exp_v) = maximize(exp_v)

    def learn(self, s, a, td):
        s = s[np.newaxis, :]
        feed_dict = {self.s: s, self.a: a, self.td_error: td}
        _, exp_v = self.sess.run([self.train_op, self.exp_v], feed_dict)
        return exp_v

    def choose_action(self, s):
        s = s[np.newaxis, :]
        probs = self.sess.run(self.acts_prob, {self.s: s})   # get probabilities for all actions
        return np.random.choice(np.arange(probs.shape[1]), p=probs.ravel())   # return a int


class Critic(object):
    def __init__(self, sess, n_features, lr=0.01):
        self.sess = sess

        self.s = tf.placeholder(tf.float32, [1, n_features], "state")
        self.v_ = tf.placeholder(tf.float32, [1, 1], "v_next")
        self.r = tf.placeholder(tf.float32, None, 'r')

        with tf.variable_scope('Critic'):
            l1 = tf.layers.dense(
                inputs=self.s,
                units=20,  # number of hidden units
                activation=tf.nn.relu,  # None
                # have to be linear to make sure the convergence of actor.
                # But linear approximator seems hardly learns the correct Q.
                kernel_initializer=tf.random_normal_initializer(0., .1),  # weights
                bias_initializer=tf.constant_initializer(0.1),  # biases
                name='l1'
            )

            self.v = tf.layers.dense(
                inputs=l1,
                units=1,  # output units
                activation=None,
                kernel_initializer=tf.random_normal_initializer(0., .1),  # weights
                bias_initializer=tf.constant_initializer(0.1),  # biases
                name='V'
            )

        with tf.variable_scope('squared_TD_error'):
            self.td_error = self.r + GAMMA * self.v_ - self.v   #更新omega
            self.loss = tf.square(self.td_error)    # TD_error = (r+gamma*V_next) - V_eval
        with tf.variable_scope('train'):
            self.train_op = tf.train.AdamOptimizer(lr).minimize(self.loss)

    def learn(self, s, r, s_):
        s, s_ = s[np.newaxis, :], s_[np.newaxis, :]

        v_ = self.sess.run(self.v, {self.s: s_})
        td_error, _ = self.sess.run([self.td_error, self.train_op],
                                          {self.s: s, self.v_: v_, self.r: r})
        return td_error


sess = tf.Session()

actor = Actor(sess, n_features=N_F, n_actions=N_A, lr=LR_A)
critic = Critic(sess, n_features=N_F, lr=LR_C)     # we need a good teacher, so the teacher should learn faster than the actor

sess.run(tf.global_variables_initializer())

if OUTPUT_GRAPH:
    tf.summary.FileWriter("logs/", sess.graph)

for i_episode in range(MAX_EPISODE):
    s = env.reset()
    t = 0
    track_r = []
    while True:
        if RENDER: env.render()

        a = actor.choose_action(s)

        s_, r, done, info = env.step(a)

        if done: r = -20

        track_r.append(r)

        td_error = critic.learn(s, r, s_)  # gradient = grad[r + gamma * V(s_) - V(s)]
        actor.learn(s, a, td_error)     # true_gradient = grad[logPi(s,a) * td_error]

        s = s_
        t += 1

        if done or t >= MAX_EP_STEPS:
            ep_rs_sum = sum(track_r)

            if 'running_reward' not in globals():
                running_reward = ep_rs_sum
            else:
                running_reward = running_reward * 0.95 + ep_rs_sum * 0.05
            if running_reward > DISPLAY_REWARD_THRESHOLD: RENDER = True  # rendering
            print("episode:", i_episode, "  reward:", int(running_reward))
            break

image.png

A3C

A3C是Google DeepMind 提出的一种解决Actor-Critic不收敛问题的算法。我们知道DQN中很重要的一点是他具有经验池,可以降低数据之间的相关性,而A3C则提出降低数据之间的相关性的另一种方法:异步
简单来说:A3C会创建多个并行的环境, 让多个拥有副结构的 agent 同时在这些并行环境上更新主结构中的参数. 并行中的 agent 们互不干扰, 而主结构的参数更新受到副结构提交更新的不连续性干扰, 所以更新的相关性被降低, 收敛性提高.
20200321164230831.png
A3C主要包括几部分

  • 将主网络参数复制到n个子网络中
  • 子网络与环境交互产生样本
  • 抽取样本,计算子网络(策略网络与价值网络)的梯度并累积求和(与A2C方法一致)
  • 使用得到的累积梯度更新主网络(策略网络与价值网络),回到步骤1进入下一个循环

20200321161523700.png

class ACNet(object):
    def __init__(self, globalAC=None):
        # 当创建 worker 网络的时候, 我们传入之前创建的 globalAC 给这个 worker
        if 这是 global:   # 判断当下建立的网络是 local 还是 global
            with tf.variable_scope('Global_Net'):
                self._build_net()
        else:
            with tf.variable_scope('worker'):
                self._build_net()

            # 接着计算 critic loss 和 actor loss
            # 用这两个 loss 计算要推送的 gradients

            with tf.name_scope('sync'):  # 同步
                with tf.name_scope('pull'):
                    # 更新去 global
                with tf.name_scope('push'):
                    # 获取 global 参数

    def _build_net(self):
        # 在这里搭建 Actor 和 Critic 的网络
        return 均值, 方差, state_value

    def update_global(self, feed_dict):
        # 进行 push 操作

    def pull_global(self):
        # 进行 pull 操作

    def choose_action(self, s):
        # 根据 s 选动作
class Worker(object):
    def __init__(self, name, globalAC):
        self.env = gym.make(GAME).unwrapped # 创建自己的环境
        self.name = name    # 自己的名字
        self.AC = ACNet(name, globalAC) # 自己的 local net, 并绑定上 globalAC

    def work(self):
        # s, a, r 的缓存, 用于 n_steps 更新
        buffer_s, buffer_a, buffer_r = [], [], []
        while not COORD.should_stop() and GLOBAL_EP < MAX_GLOBAL_EP:
            s = self.env.reset()

            for ep_t in range(MAX_EP_STEP):
                a = self.AC.choose_action(s)
                s_, r, done, info = self.env.step(a)

                buffer_s.append(s)  # 添加各种缓存
                buffer_a.append(a)
                buffer_r.append(r)

                # 每 UPDATE_GLOBAL_ITER 步 或者回合完了, 进行 sync 操作
                if total_step % UPDATE_GLOBAL_ITER == 0 or done:
                    # 获得用于计算 TD error 的 下一 state 的 value
                    if done:
                        v_s_ = 0   # terminal
                    else:
                        v_s_ = SESS.run(self.AC.v, {self.AC.s: s_[np.newaxis, :]})[0, 0]

                    buffer_v_target = []    # 下 state value 的缓存, 用于算 TD
                    for r in buffer_r[::-1]:    # 进行 n_steps forward view
                        v_s_ = r + GAMMA * v_s_
                        buffer_v_target.append(v_s_)
                    buffer_v_target.reverse()

                    buffer_s, buffer_a, buffer_v_target = np.vstack(buffer_s), np.vstack(buffer_a), np.vstack(buffer_v_target)

                    feed_dict = {
                        self.AC.s: buffer_s,
                        self.AC.a_his: buffer_a,
                        self.AC.v_target: buffer_v_target,
                    }

                    self.AC.update_global(feed_dict)    # 推送更新去 globalAC
                    buffer_s, buffer_a, buffer_r = [], [], []   # 清空缓存
                    self.AC.pull_global()   # 获取 globalAC 的最新参数

                s = s_
                if done:
                    GLOBAL_EP += 1  # 加一回合
                    break   # 结束这回合
with tf.device("/cpu:0"):
    GLOBAL_AC = ACNet(GLOBAL_NET_SCOPE)  # 建立 Global AC
    workers = []
    for i in range(N_WORKERS):  # 创建 worker, 之后在并行
        workers.append(Worker(GLOBAL_AC))   # 每个 worker 都有共享这个 global AC

COORD = tf.train.Coordinator()  # Tensorflow 用于并行的工具

worker_threads = []
for worker in workers:
    job = lambda: worker.work()
    t = threading.Thread(target=job)    # 添加一个工作线程
    t.start()
    worker_threads.append(t)
COORD.join(worker_threads)  # tf 的线程调度
"""
Asynchronous Advantage Actor Critic (A3C) with discrete action space, Reinforcement Learning.

The Cartpole example.

View more on my tutorial page: https://morvanzhou.github.io/tutorials/

Using:
tensorflow 1.8.0
gym 0.10.5
"""

import multiprocessing
import threading
import tensorflow as tf
import numpy as np
import gym
import os
import shutil
import matplotlib.pyplot as plt


GAME = 'CartPole-v0'
OUTPUT_GRAPH = True
LOG_DIR = './log'
N_WORKERS = multiprocessing.cpu_count()
MAX_GLOBAL_EP = 1000
GLOBAL_NET_SCOPE = 'Global_Net'
UPDATE_GLOBAL_ITER = 10
GAMMA = 0.9
ENTROPY_BETA = 0.001
LR_A = 0.001    # learning rate for actor
LR_C = 0.001    # learning rate for critic
GLOBAL_RUNNING_R = []
GLOBAL_EP = 0

env = gym.make(GAME)
N_S = env.observation_space.shape[0]
N_A = env.action_space.n


class ACNet(object):
    def __init__(self, scope, globalAC=None):

        if scope == GLOBAL_NET_SCOPE:   # get global network
            with tf.variable_scope(scope):
                self.s = tf.placeholder(tf.float32, [None, N_S], 'S')
                self.a_params, self.c_params = self._build_net(scope)[-2:]
        else:   # local net, calculate losses
            with tf.variable_scope(scope):
                self.s = tf.placeholder(tf.float32, [None, N_S], 'S')
                self.a_his = tf.placeholder(tf.int32, [None, ], 'A')
                self.v_target = tf.placeholder(tf.float32, [None, 1], 'Vtarget')

                self.a_prob, self.v, self.a_params, self.c_params = self._build_net(scope)

                td = tf.subtract(self.v_target, self.v, name='TD_error')
                with tf.name_scope('c_loss'):
                    self.c_loss = tf.reduce_mean(tf.square(td))

                with tf.name_scope('a_loss'):
                    log_prob = tf.reduce_sum(tf.log(self.a_prob + 1e-5) * tf.one_hot(self.a_his, N_A, dtype=tf.float32), axis=1, keep_dims=True)
                    exp_v = log_prob * tf.stop_gradient(td)
                    entropy = -tf.reduce_sum(self.a_prob * tf.log(self.a_prob + 1e-5),
                                             axis=1, keep_dims=True)  # encourage exploration
                    self.exp_v = ENTROPY_BETA * entropy + exp_v
                    self.a_loss = tf.reduce_mean(-self.exp_v)

                with tf.name_scope('local_grad'):
                    self.a_grads = tf.gradients(self.a_loss, self.a_params)
                    self.c_grads = tf.gradients(self.c_loss, self.c_params)

            with tf.name_scope('sync'):
                with tf.name_scope('pull'):
                    self.pull_a_params_op = [l_p.assign(g_p) for l_p, g_p in zip(self.a_params, globalAC.a_params)]
                    self.pull_c_params_op = [l_p.assign(g_p) for l_p, g_p in zip(self.c_params, globalAC.c_params)]
                with tf.name_scope('push'):
                    self.update_a_op = OPT_A.apply_gradients(zip(self.a_grads, globalAC.a_params))
                    self.update_c_op = OPT_C.apply_gradients(zip(self.c_grads, globalAC.c_params))

    def _build_net(self, scope):
        w_init = tf.random_normal_initializer(0., .1)
        with tf.variable_scope('actor'):
            l_a = tf.layers.dense(self.s, 200, tf.nn.relu6, kernel_initializer=w_init, name='la')
            a_prob = tf.layers.dense(l_a, N_A, tf.nn.softmax, kernel_initializer=w_init, name='ap')
        with tf.variable_scope('critic'):
            l_c = tf.layers.dense(self.s, 100, tf.nn.relu6, kernel_initializer=w_init, name='lc')
            v = tf.layers.dense(l_c, 1, kernel_initializer=w_init, name='v')  # state value
        a_params = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope=scope + '/actor')
        c_params = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope=scope + '/critic')
        return a_prob, v, a_params, c_params

    def update_global(self, feed_dict):  # run by a local
        SESS.run([self.update_a_op, self.update_c_op], feed_dict)  # local grads applies to global net

    def pull_global(self):  # run by a local
        SESS.run([self.pull_a_params_op, self.pull_c_params_op])

    def choose_action(self, s):  # run by a local
        prob_weights = SESS.run(self.a_prob, feed_dict={self.s: s[np.newaxis, :]})
        action = np.random.choice(range(prob_weights.shape[1]),
                                  p=prob_weights.ravel())  # select action w.r.t the actions prob
        return action


class Worker(object):
    def __init__(self, name, globalAC):
        self.env = gym.make(GAME).unwrapped
        self.name = name
        self.AC = ACNet(name, globalAC)

    def work(self):
        global GLOBAL_RUNNING_R, GLOBAL_EP
        total_step = 1
        buffer_s, buffer_a, buffer_r = [], [], []
        while not COORD.should_stop() and GLOBAL_EP < MAX_GLOBAL_EP:
            s = self.env.reset()
            ep_r = 0
            while True:
                # if self.name == 'W_0':
                #     self.env.render()
                a = self.AC.choose_action(s)
                s_, r, done, info = self.env.step(a)
                if done: r = -5
                ep_r += r
                buffer_s.append(s)
                buffer_a.append(a)
                buffer_r.append(r)

                if total_step % UPDATE_GLOBAL_ITER == 0 or done:   # update global and assign to local net
                    if done:
                        v_s_ = 0   # terminal
                    else:
                        v_s_ = SESS.run(self.AC.v, {self.AC.s: s_[np.newaxis, :]})[0, 0]
                    buffer_v_target = []
                    for r in buffer_r[::-1]:    # reverse buffer r
                        v_s_ = r + GAMMA * v_s_
                        buffer_v_target.append(v_s_)
                    buffer_v_target.reverse()

                    buffer_s, buffer_a, buffer_v_target = np.vstack(buffer_s), np.array(buffer_a), np.vstack(buffer_v_target)
                    feed_dict = {
                        self.AC.s: buffer_s,
                        self.AC.a_his: buffer_a,
                        self.AC.v_target: buffer_v_target,
                    }
                    self.AC.update_global(feed_dict)

                    buffer_s, buffer_a, buffer_r = [], [], []
                    self.AC.pull_global()

                s = s_
                total_step += 1
                if done:
                    if len(GLOBAL_RUNNING_R) == 0:  # record running episode reward
                        GLOBAL_RUNNING_R.append(ep_r)
                    else:
                        GLOBAL_RUNNING_R.append(0.99 * GLOBAL_RUNNING_R[-1] + 0.01 * ep_r)
                    print(
                        self.name,
                        "Ep:", GLOBAL_EP,
                        "| Ep_r: %i" % GLOBAL_RUNNING_R[-1],
                          )
                    GLOBAL_EP += 1
                    break

if __name__ == "__main__":
    SESS = tf.Session()

    with tf.device("/cpu:0"):
        OPT_A = tf.train.RMSPropOptimizer(LR_A, name='RMSPropA')
        OPT_C = tf.train.RMSPropOptimizer(LR_C, name='RMSPropC')
        GLOBAL_AC = ACNet(GLOBAL_NET_SCOPE)  # we only need its params
        workers = []
        # Create worker
        for i in range(N_WORKERS):
            i_name = 'W_%i' % i   # worker name
            workers.append(Worker(i_name, GLOBAL_AC))

    COORD = tf.train.Coordinator()
    SESS.run(tf.global_variables_initializer())

    if OUTPUT_GRAPH:
        if os.path.exists(LOG_DIR):
            shutil.rmtree(LOG_DIR)
        tf.summary.FileWriter(LOG_DIR, SESS.graph)

    worker_threads = []
    for worker in workers:
        job = lambda: worker.work()
        t = threading.Thread(target=job)
        t.start()
        worker_threads.append(t)
    COORD.join(worker_threads)

    plt.plot(np.arange(len(GLOBAL_RUNNING_R)), GLOBAL_RUNNING_R)
    plt.xlabel('step')
    plt.ylabel('Total moving reward')
    plt.show()

A3C.png

扩展介绍-GAE(Generalized Advantage Estimation)

GAE引入

  • 策略梯度法存在的问题:

样本利用率低需要大量采样;需要让算法在变化的数据分布中稳定提升

  • 策略梯度法与AC法的区别

使用全部奖励估计策略梯度,尽管无偏但是方差大;
AC方法使用值函数估计奖励能够降低方差但是偏差较大

  • 方差与偏差的影响:

高方差需要更多样本训练;偏差会导致训练不收敛或收敛结果较差

GAE介绍

  1. GAE作用
  2. 泛化优势估计-即用来优化优势函数
  3. 权衡方差与偏差问题:
    1. On-policy直接交互并用每一时刻的回报作为长期回报的估计$\sum_{t’=t}^{T} \gamma^{t’-t}r_{t’}$会产生较大的方差
    2. 通过基于优势函数的AC方法进行回报值估计,则会产生方差较小但偏差较大的问题
  4. GAE的具体形式

策略估计中以下采用优势函数方法都能够有效减小方差

$\gamma -just$条件:
引入参数$\gamma$

状态值函数的TD误差:
$TD \space Error=\delta_t=r_t+\gamma v(s_{t+1})-v(s_t)$
k步估计的优势函数:

当k越大方差越大,偏差越小(多个时刻的优势函数加权求和起到平滑作用)
GAE定义$\lambda$指数下降权重调整参数

$\hat{A_t}^{GAE(\gamma,\lambda)}= \sum_{l=0}^\infin(\gamma\lambda)^l \delta_{t+l}^V$
因此:

$\lambda=0$,采用的就是TD(0)估计方法(方差小,偏差大)
$\lambda=1$,则使用MC方法估计t时刻回报与价值之差(方差大,偏差小)
为了快速估计序列中所有时刻的估计值,实际使用GAE采用倒序计算,从t+1时刻估计t时刻:
$\hat{A_t}^{GAE(\gamma,\lambda)}=\sum_{l=0}^{\infin}(\gamma\lambda)^l \delta_{t+l}^V=\delta_t^V+\gamma\lambda\hat{A}_{t+l}^{GAE(\gamma,\lambda)}$

参考文献

https://zhuanlan.zhihu.com/p/61731174
https://zhuanlan.zhihu.com/p/433509362
https://zhuanlan.zhihu.com/p/360411344
https://mofanpy.com/tutorials/machine-learning/reinforcement-learning/A3C
https://www.jianshu.com/p/428b640046aa
https://zhuanlan.zhihu.com/p/61731174
http://hrl.boyuai.com/chapter/2/%E7%AD%96%E7%95%A5%E6%A2%AF%E5%BA%A6%E7%AE%97%E6%B3%95/
https://zhuanlan.zhihu.com/p/433509362
https://zhkmxx9302013.github.io/post/3cc694a0.html
https://codeantenna.com/a/wnJD8aosyP
https://zhuanlan.zhihu.com/p/343943792
https://zhuanlan.zhihu.com/p/45107835
论文代码中需要注意gym的版本,此处采用的是0.10.5
reinforce.ipynbmultiprocessing_env.pygae.pyrl_utils.py