# Copyright 2024 Sony Group Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import multiprocessing as mp
import os
import threading as th
from collections import namedtuple
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple, Union, cast
import gym
import numpy as np
import nnabla as nn
import nnabla.functions as NF
import nnabla.solvers as NS
import nnabla_rl.environment_explorers as EE
import nnabla_rl.model_trainers as MT
from nnabla_rl.algorithm import Algorithm, AlgorithmConfig, eval_api
from nnabla_rl.algorithms.common_utils import (
_get_shape,
_StatePreprocessedRewardFunction,
_StatePreprocessedStochasticPolicy,
_StatePreprocessedVFunction,
_StochasticPolicyActionSelector,
)
from nnabla_rl.builders import ExplorerBuilder, ModelBuilder, PreprocessorBuilder, ReplayBufferBuilder, SolverBuilder
from nnabla_rl.environment_explorer import EnvironmentExplorer
from nnabla_rl.environments.amp_env import AMPEnv, AMPGoalEnv, TaskResult
from nnabla_rl.environments.environment_info import EnvironmentInfo
from nnabla_rl.functions import compute_std, unnormalize
from nnabla_rl.model_trainers.model_trainer import ModelTrainer, TrainingBatch
from nnabla_rl.models import (
AMPDiscriminator,
AMPGatedPolicy,
AMPGatedVFunction,
AMPPolicy,
AMPVFunction,
Model,
RewardFunction,
StochasticPolicy,
VFunction,
)
from nnabla_rl.preprocessors import Preprocessor
from nnabla_rl.random import drng
from nnabla_rl.replay_buffer import ReplayBuffer
from nnabla_rl.replay_buffers.buffer_iterator import BufferIterator
from nnabla_rl.typing import Experience
from nnabla_rl.utils import context
from nnabla_rl.utils.data import (
add_batch_dimension,
compute_std_ndarray,
marshal_experiences,
normalize_ndarray,
set_data_to_variable,
unnormalize_ndarray,
)
from nnabla_rl.utils.misc import create_variable
from nnabla_rl.utils.multiprocess import (
copy_mp_arrays_to_params,
copy_params_to_mp_arrays,
mp_array_from_np_array,
mp_to_np_array,
new_mp_arrays_from_params,
np_to_mp_array,
)
from nnabla_rl.utils.reproductions import set_global_seed
[docs]@dataclass
class AMPConfig(AlgorithmConfig):
"""List of configurations for Adversarial Motion Priors (AMP) algorithm.
Args:
gamma (float): discount factor of rewards. Defaults to 0.95.
lmb (float): scalar of lambda return's computation in GAE. Defaults to 0.95.
policy_learning_rate (float): learning rate which is set to policy solver. \
You can customize/override the learning rate for each solver by implementing the \
(:py:class:`SolverBuilder <nnabla_rl.builders.SolverBuilder>`) by yourself. \
Defaults to 0.000002.
policy_momentum (float): learning momentum which is set to policy solver. \
You can customize/override the learning rate for each solver by implementing the \
(:py:class:`SolverBuilder <nnabla_rl.builders.SolverBuilder>`) by yourself. \
Defaults to 0.9.
policy_weight_decay (float): coefficient for weight decay of policy function parameters. \
In AMP, weight decay is only applied to non bias parameters. Defaults to 0.0005.
action_bound_loss_coefficient (float): coefficient of action bound loss. Defaults to 10.0
epsilon (float): probability ratio clipping range of ppo style policy update. Defaults to 0.2
v_function_learning_rate (float): learning rate which is set to policy solver. \
You can customize/override the learning rate for each solver by implementing the \
(:py:class:`SolverBuilder <nnabla_rl.builders.SolverBuilder>`) by yourself. \
Defaults to 0.0005.
v_function_momentum (float): learning momentum which is set to value function solver. \
You can customize/override the learning rate for each solver by implementing the \
(:py:class:`SolverBuilder <nnabla_rl.builders.SolverBuilder>`) by yourself. \
Defaults to 0.9.
normalized_advantage_clip (Tuple[float, float]): clipping value for estimated advantages.\
This clipping is applied after a normalization. Defaults to (-4.0, 4.0)
value_at_task_fail (float): value for a task fail state. We overwrite the value of the state by this value \
when computing the value targets. Defaults to 0.0.
value_at_task_success (float): value for a task success state. We overwrite the value of the state by this \
value when computing the value targets. Defaults to 1.0.
target_value_clip (Tuple[float, float]): clipping value for estimated value targets. Defaults to (0.0, 1.0).
epochs (int): number of epochs to perform in each training iteration for policy and value function. \
Defaults to 1.
actor_num (int): number of parallel actors. Defaults to 16.
batch_size(int): training batch size for policy and value function. Defaults to 256.
actor_timesteps (int): number of timesteps to interact with the environment by the actors. Defaults to 4096.
max_explore_steps (int): number of maximum environment exploring steps. Defaults to 200000000.
final_explore_rate (float): final rate of the environment explorer. Defaults to 0.2.
timelimit_as_terminal (bool): Treat as done if the environment reaches the \
`timelimit <https://github.com/openai/gym/blob/master/gym/wrappers/time_limit.py>`_.\
Defaults to False.
preprocess_state (bool): enable preprocessing the states in the collected experiences\
before feeding as training batch. Defaults to False.
state_mean_initializer (Optional[Tuple[Union[float, Tuple[float, ...]], ...]]): \
mean initialize value for the state preprocessor. Defaults to None.
state_var_initialize value (Optional[Tuple[Union[float, Tuple[float, ...]], ...]]): \
variance initializer for the state preprocessor. Defaults to None.
num_processor_samples (int): number of timesteps for updating the state preprocessor. Defaults to 1000000.
normalize_action (bool): enable preprocessing the actions. Defaults to False.
action_mean (Optional[Tuple[float, ...]]) mean for the action normalization. Defaults to None.
action_var (Optional[Tuple[float, ...]]): variance for the action normalization. Defaults to None.
discriminator_learning_rate (float): learning rate which is set to discriminator solver. \
You can customize/override the learning rate for each solver by implementing the \
(:py:class:`SolverBuilder <nnabla_rl.builders.SolverBuilder>`) by yourself. \
Defaults to 0.00001.
discriminator_momentum (float): learning momentum which is set to discriminator solver. \
You can customize/override the learning rate for each solver by implementing the \
(:py:class:`SolverBuilder <nnabla_rl.builders.SolverBuilder>`) by yourself. \
Defaults to 0.9.
discriminator_weight_decay (float): coefficient for weight decay of value function parameters. \
In AMP, weight decay is only applied to non bias parameters. Defaults to 0.0005.
discriminator_extra_regularization_coefficient (float): coefficient value of extra regularization \
of discriminator function parameters that are defined in \
`discriminator_extra_regularization_variable_names`. Defaults to 0.05.
discriminator_extra_regularization_variable_names (Tuple[str]): \
variable names for applying extra regularization. Defaults to ("logits/affine/W",).
discriminator_gradient_penelty_coefficient (float): coefficient value of gradient penalty.\
See equation (8) in AMP paper. Defaults to 10.0.
discriminator_gradient_penalty_indexes (Optional[Tuple[int, ...]]): state index number for \
applying gradient penalty. Defaults to (1,).
discriminator_batch_size (int): training batch size for discriminator function Defaults to 256.
discriminator_epochs (int): number of epochs to perform in each training iteration \
for discriminator function. Defaults to 2.
discriminator_reward_scale (float): reward scale.\
This value will multiply the output reward from the discriminator. Defaults to 2.0.
discriminator_replay_buffer_size (int): replay buffer size for discriminator training. Defaults to 100000.
use_reward_from_env (bool): enable to use task reward (i.e., reward from the environment). Defaults to False.
lerp_reward_coefficient (float): coefficient value for lerping the reward from the environment and \
the reward from the discriminator. Defaults to 0.5
act_deterministic_in_eval (bool): enable act deterministically at evalution. Defaults to True.
seed (int): base seed of random number generator used by the actors. Defaults to 1.
"""
# type declarations to type check with mypy
# NOTE: declared variables are instance variable and NOT class variable, unless it is marked with ClassVar
# See https://mypy.readthedocs.io/en/stable/class_basics.html for details
gamma: float = 0.95
lmb: float = 0.95
policy_learning_rate: float = 0.000002
policy_momentum: float = 0.9
policy_weight_decay: float = 0.0005
action_bound_loss_coefficient: float = 10.0
epsilon: float = 0.2
v_function_learning_rate: float = 0.0005
v_function_momentum: float = 0.9
normalized_advantage_clip: Tuple[float, float] = (-4.0, 4.0)
value_at_task_fail: float = 0.0
value_at_task_success: float = 1.0
target_value_clip: Tuple[float, float] = (0.0, 1.0)
epochs: int = 1
actor_num: int = 16
batch_size: int = 256
actor_timesteps: int = 4096
max_explore_steps: int = 200000000
final_explore_rate: float = 0.2
timelimit_as_terminal: bool = False
preprocess_state: bool = False
state_mean_initializer: Optional[Tuple[Union[float, Tuple[float, ...]], ...]] = None
state_var_initializer: Optional[Tuple[Union[float, Tuple[float, ...]], ...]] = None
num_processor_samples: int = 1000000
normalize_action: bool = False
action_mean: Optional[Tuple[float, ...]] = None
action_var: Optional[Tuple[float, ...]] = None
discriminator_learning_rate: float = 0.00001
discriminator_momentum: float = 0.9
discriminator_weight_decay: float = 0.0005
discriminator_extra_regularization_coefficient: float = 0.05
discriminator_extra_regularization_variable_names: Tuple[str] = ("logits/affine/W",)
discriminator_gradient_penelty_coefficient: float = 10.0
discriminator_gradient_penalty_indexes: Optional[Tuple[int, ...]] = (1,)
discriminator_batch_size: int = 256
discriminator_epochs: int = 2
discriminator_reward_scale: float = 2.0
discriminator_agent_replay_buffer_size: int = 100000
use_reward_from_env: bool = False
lerp_reward_coefficient: float = 0.5
act_deterministic_in_eval: bool = True
seed: int = 1
def __post_init__(self):
"""__post_init__
Check the values are in valid range.
"""
self._assert_between(self.gamma, 0.0, 1.0, "gamma")
self._assert_between(self.lmb, 0.0, 1.0, "lmb")
self._assert_positive(self.policy_learning_rate, "policy_learning_rate")
self._assert_positive(self.policy_momentum, "policy_momentum")
self._assert_positive(self.policy_weight_decay, "policy_weight_decay")
self._assert_positive(self.action_bound_loss_coefficient, "action_bound_loss_coefficient")
self._assert_positive(self.epsilon, "epsilon")
self._assert_positive(self.v_function_learning_rate, "v_function_learning_rate")
self._assert_positive(self.v_function_momentum, "v_function_momentum")
if self.normalized_advantage_clip[0] > self.normalized_advantage_clip[1]:
raise ValueError("min normalized_advantage_clip is larger than normalized_advantage_clip")
if self.target_value_clip[0] > self.target_value_clip[1]:
raise ValueError("min target_value_clip is larger than max target_value_clip")
self._assert_positive(self.epochs, "epochs")
self._assert_positive(self.actor_num, "actor num")
self._assert_positive(self.batch_size, "batch_size")
self._assert_positive(self.actor_timesteps, "actor_timesteps")
self._assert_positive(self.max_explore_steps, "max_explore_steps")
self._assert_between(self.final_explore_rate, 0.0, 1.0, "final_explore_rate")
self._assert_positive(self.num_processor_samples, "num_processor_samples")
self._assert_positive(self.discriminator_learning_rate, "discriminator_learning_rate")
self._assert_positive(self.discriminator_momentum, "discriminator_momentum")
self._assert_positive(self.discriminator_weight_decay, "discriminator_weight_decay")
self._assert_positive(
self.discriminator_extra_regularization_coefficient, "discriminator_extra_regularization_coefficient"
)
self._assert_positive(
self.discriminator_gradient_penelty_coefficient, "discriminator_gradient_penelty_coefficient"
)
self._assert_positive(self.discriminator_batch_size, "discriminator_batch_size")
self._assert_positive(self.discriminator_epochs, "discriminator_epochs")
self._assert_positive(self.discriminator_reward_scale, "discriminator_reward_scale")
self._assert_positive(self.discriminator_agent_replay_buffer_size, "discriminator_agent_replay_buffer_size")
self._assert_between(self.lerp_reward_coefficient, 0.0, 1.0, "lerp_reward_coefficient")
class DefaultPolicyBuilder(ModelBuilder[StochasticPolicy]):
def build_model( # type: ignore[override]
self,
scope_name: str,
env_info: EnvironmentInfo,
algorithm_config: AMPConfig,
**kwargs,
) -> StochasticPolicy:
if env_info.is_goal_conditioned_env():
return AMPGatedPolicy(scope_name, env_info.action_dim, 0.01)
else:
return AMPPolicy(scope_name, env_info.action_dim, 0.01)
class DefaultVFunctionBuilder(ModelBuilder[VFunction]):
def build_model( # type: ignore[override]
self,
scope_name: str,
env_info: EnvironmentInfo,
algorithm_config: AMPConfig,
**kwargs,
) -> VFunction:
if env_info.is_goal_conditioned_env():
return AMPGatedVFunction(scope_name)
else:
return AMPVFunction(scope_name)
class DefaultRewardFunctionBuilder(ModelBuilder[RewardFunction]):
def build_model( # type: ignore[override]
self,
scope_name: str,
env_info: EnvironmentInfo,
algorithm_config: AMPConfig,
**kwargs,
) -> RewardFunction:
return AMPDiscriminator(scope_name, 1.0)
class DefaultVFunctionSolverBuilder(SolverBuilder):
def build_solver( # type: ignore[override]
self, env_info: EnvironmentInfo, algorithm_config: AMPConfig, **kwargs
) -> nn.solver.Solver:
return NS.Momentum(lr=algorithm_config.v_function_learning_rate, momentum=algorithm_config.v_function_momentum)
class DefaultPolicySolverBuilder(SolverBuilder):
def build_solver( # type: ignore[override]
self, env_info: EnvironmentInfo, algorithm_config: AMPConfig, **kwargs
) -> nn.solver.Solver:
return NS.Momentum(lr=algorithm_config.policy_learning_rate, momentum=algorithm_config.policy_momentum)
class DefaultRewardFunctionSolverBuilder(SolverBuilder):
def build_solver( # type: ignore[override]
self, env_info: EnvironmentInfo, algorithm_config: AMPConfig, **kwargs
) -> nn.solver.Solver:
return NS.Momentum(
lr=algorithm_config.discriminator_learning_rate, momentum=algorithm_config.discriminator_momentum
)
class DefaultExplorerBuilder(ExplorerBuilder):
def build_explorer( # type: ignore[override]
self,
env_info: EnvironmentInfo,
algorithm_config: AMPConfig,
algorithm: "AMP",
**kwargs,
) -> EnvironmentExplorer:
explorer_config = EE.LinearDecayEpsilonGreedyExplorerConfig(
initial_step_num=0,
timelimit_as_terminal=algorithm_config.timelimit_as_terminal,
initial_epsilon=1.0,
final_epsilon=algorithm_config.final_explore_rate,
max_explore_steps=algorithm_config.max_explore_steps,
append_explorer_info=True,
)
explorer = EE.LinearDecayEpsilonGreedyExplorer(
greedy_action_selector=kwargs["greedy_action_selector"],
random_action_selector=kwargs["random_action_selector"],
env_info=env_info,
config=explorer_config,
)
return explorer
class DefaultReplayBufferBuilder(ReplayBufferBuilder):
def build_replay_buffer( # type: ignore[override]
self, env_info: EnvironmentInfo, algorithm_config: AMPConfig, **kwargs
) -> ReplayBuffer:
return ReplayBuffer(
capacity=int(np.ceil(algorithm_config.discriminator_agent_replay_buffer_size / algorithm_config.actor_num))
)
[docs]class AMP(Algorithm):
"""Adversarial Motion Prior (AMP) implementation.
This class implements the Adversarial Motion Prior (AMP) algorithm
proposed by Xue Bin Peng, et al. in the paper:
"AMP: Adversarial Motion Priors for Stylized Physics-Based Character Control"
For detail see: https://arxiv.org/abs/2104.02180
This algorithm only supports online training.
Args:
env_or_env_info\
(gym.Env or :py:class:`EnvironmentInfo <nnabla_rl.environments.environment_info.EnvironmentInfo>`): \
the environment to train or environment info.
config (:py:class:`AMPConfig <nnabla_rl.algorithms.ppo.PPOConfig>`): configuration of AMP algorithm.
v_function_builder (:py:class:`ModelBuilder[VFunction] <nnabla_rl.builders.ModelBuilder>`): \
builder of v function models.
v_solver_builder (:py:class:`SolverBuilder <nnabla_rl.builders.SolverBuilder>`): builder for v function solvers
policy_builder (:py:class:`ModelBuilder[StochasicPolicy] <nnabla_rl.builders.ModelBuilder>`): \
builder of policy models.
policy_solver_builder (:py:class:`SolverBuilder <nnabla_rl.builders.SolverBuilder>`): builder for policy solvers
reward_function_builder (:py:class:`ModelBuilder[RewardFunction] <nnabla_rl.builders.ModelBuilder>`): \
builder of reward function models.
reward_solver_builder (:py:class:`SolverBuilder <nnabla_rl.builders.SolverBuilder>`): \
builder for reward function solvers.
state_preprocessor_builder (None or :py:class:`PreprocessorBuilder <nnabla_rl.builders.PreprocessorBuilder>`): \
state preprocessor builder to preprocess the states.
env_explorer_builder (:py:class:`ExplorerBuilder <nnabla_rl.builders.ExplorerBuilder>`): \
builder of environment explorer.
discriminator_replay_buffer_builder \
(:py:class:`ReplayBufferBuilder <nnabla_rl.builders.ReplayBufferBuilder>`): builder of replay_buffer of \
discriminator.
"""
# type declarations to type check with mypy
# NOTE: declared variables are instance variable and NOT class variable, unless it is marked with ClassVar
# See https://mypy.readthedocs.io/en/stable/class_basics.html for details
_config: AMPConfig
_v_function: VFunction
_v_function_solver: nn.solver.Solver
_policy: StochasticPolicy
_policy_solver: nn.solver.Solver
_discriminator: RewardFunction
_discriminator_solver: nn.solver.Solver
_state_preprocessor: Optional[Preprocessor]
_discriminator_state_preprocessor: Optional[Preprocessor]
_policy_trainer: ModelTrainer
_v_function_trainer: ModelTrainer
_discriminator_trainer: ModelTrainer
_policy_solver_builder: SolverBuilder
_v_solver_builder: SolverBuilder
_actors: List["_AMPActor"]
_actor_processes: List[Union[mp.Process, th.Thread]]
_v_function_trainer_state: Dict[str, Any]
_policy_trainer_state: Dict[str, Any]
_discriminator_trainer_state: Dict[str, Any]
_evaluation_actor: _StochasticPolicyActionSelector
def __init__(
self,
env_or_env_info: Union[gym.Env, EnvironmentInfo],
config: AMPConfig = AMPConfig(),
v_function_builder: ModelBuilder[VFunction] = DefaultVFunctionBuilder(),
v_solver_builder: SolverBuilder = DefaultVFunctionSolverBuilder(),
policy_builder: ModelBuilder[StochasticPolicy] = DefaultPolicyBuilder(),
policy_solver_builder: SolverBuilder = DefaultPolicySolverBuilder(),
reward_function_builder: ModelBuilder[RewardFunction] = DefaultRewardFunctionBuilder(),
reward_solver_builder: SolverBuilder = DefaultRewardFunctionSolverBuilder(),
state_preprocessor_builder: Optional[PreprocessorBuilder] = None,
env_explorer_builder: ExplorerBuilder = DefaultExplorerBuilder(),
discriminator_replay_buffer_builder: ReplayBufferBuilder = DefaultReplayBufferBuilder(),
):
super(AMP, self).__init__(env_or_env_info, config=config)
# Initialize on cpu and change the context later
with nn.context_scope(context.get_nnabla_context(-1)):
policy = policy_builder("pi", self._env_info, self._config)
v_function = v_function_builder("v", self._env_info, self._config)
discriminator = reward_function_builder("discriminator", self._env_info, self._config)
if self._config.preprocess_state:
if state_preprocessor_builder is None:
raise ValueError("State preprocessing is enabled but no preprocessor builder is given")
self._pi_v_state_preprocessor = state_preprocessor_builder(
"pi_v_preprocessor", self._env_info, self._config
)
v_function = _StatePreprocessedVFunction(
v_function=v_function, preprocessor=self._pi_v_state_preprocessor
)
policy = _StatePreprocessedStochasticPolicy(policy=policy, preprocessor=self._pi_v_state_preprocessor)
self._discriminator_state_preprocessor = state_preprocessor_builder(
"r_preprocessor", self._env_info, self._config
)
discriminator = _StatePreprocessedRewardFunction(
reward_function=discriminator, preprocessor=self._discriminator_state_preprocessor
)
self._v_function = v_function
self._policy = policy
self._discriminator = discriminator
self._v_function_solver = v_solver_builder(self._env_info, self._config)
self._v_solver_builder = v_solver_builder # keep for later use
self._policy_solver = policy_solver_builder(self._env_info, self._config)
self._policy_solver_builder = policy_solver_builder # keep for later use
self._discriminator_solver = reward_solver_builder(self._env_info, self._config)
self._discriminator_solver_builder = reward_solver_builder # keep for later use
self._env_explorer_builder = env_explorer_builder # keep for later use
self._evaluation_actor = _StochasticPolicyActionSelector(
self._env_info, self._policy.shallowcopy(), deterministic=self._config.act_deterministic_in_eval
)
self._discriminator_agent_replay_buffers = [
discriminator_replay_buffer_builder(env_info=self._env_info, algorithm_config=self._config)
for _ in range(self._config.actor_num)
]
self._discriminator_expert_replay_buffers = [
discriminator_replay_buffer_builder(env_info=self._env_info, algorithm_config=self._config)
for _ in range(self._config.actor_num)
]
if self._config.normalize_action:
action_mean = add_batch_dimension(np.array(self._config.action_mean, dtype=np.float32))
self._action_mean = nn.Variable.from_numpy_array(action_mean)
action_var = add_batch_dimension(np.array(self._config.action_var, dtype=np.float32))
self._action_std = compute_std(
nn.Variable.from_numpy_array(action_var), epsilon=0.0, mode_for_floating_point_error="max"
)
else:
self._action_mean = None
self._action_std = None
@eval_api
def compute_eval_action(self, state, *, begin_of_episode=False, extra_info={}):
with nn.context_scope(context.get_nnabla_context(self._config.gpu_id)):
action, _ = self._evaluation_action_selector(state, begin_of_episode=begin_of_episode)
if self._config.normalize_action:
std = compute_std_ndarray(
np.array(self._config.action_var, dtype=np.float32),
epsilon=0.0,
mode_for_floating_point_error="max",
)
action = unnormalize_ndarray(action, np.array(self._config.action_mean, dtype=np.float32), std)
return action
def _before_training_start(self, env_or_buffer):
if not self._is_env(env_or_buffer):
raise ValueError("AMP only supports online training")
env = env_or_buffer
if not (isinstance(env.unwrapped, AMPEnv) or isinstance(env.unwrapped, AMPGoalEnv)):
raise ValueError("AMP only support AMPEnv and AMPGoalEnv")
# FIXME: This setup is a workaround for creating underlying model parameters
# If the parameter is not created, the multiprocessable array (created in launch_actor_processes)
# will be empty and the agent does not learn anything
context.set_nnabla_context(-1)
self._setup_policy_training(env)
self._setup_v_function_training(env)
self._setup_reward_function_training(env)
self._actors, self._actor_processes = self._launch_actor_processes(env)
context.set_nnabla_context(self._config.gpu_id)
# Setup again here to use gpu (if it is set)
old_policy_solver = self._policy_solver
self._policy_solver = self._policy_solver_builder(self._env_info, self._config)
self._policy_trainer = self._setup_policy_training(env)
self._policy_solver.set_states(old_policy_solver.get_states())
old_v_function_solver = self._v_function_solver
self._v_function_solver = self._v_solver_builder(self._env_info, self._config)
self._v_function_trainer = self._setup_v_function_training(env)
self._v_function_solver.set_states(old_v_function_solver.get_states())
old_discriminator_solver = self._discriminator_solver
self._discriminator_solver = self._discriminator_solver_builder(self._env_info, self._config)
self._discriminator_trainer = self._setup_reward_function_training(env)
self._discriminator_solver.set_states(old_discriminator_solver.get_states())
def _setup_policy_training(self, env_or_buffer):
policy_trainer_config = MT.policy_trainers.AMPPolicyTrainerConfig(
epsilon=self._config.epsilon,
normalize_action=self._config.normalize_action,
action_bound_loss_coefficient=self._config.action_bound_loss_coefficient,
action_mean=self._config.action_mean,
action_var=self._config.action_var,
regularization_coefficient=self._config.policy_weight_decay,
)
policy_trainer = MT.policy_trainers.AMPPolicyTrainer(
models=self._policy,
solvers={self._policy.scope_name: self._policy_solver},
env_info=self._env_info,
config=policy_trainer_config,
)
return policy_trainer
def _setup_v_function_training(self, env_or_buffer):
# training input/loss variables
v_function_trainer_config = MT.v_value_trainers.MonteCarloVTrainerConfig(
reduction_method="mean", v_loss_scalar=0.5
)
v_function_trainer = MT.v_value_trainers.MonteCarloVTrainer(
train_functions=self._v_function,
solvers={self._v_function.scope_name: self._v_function_solver},
env_info=self._env_info,
config=v_function_trainer_config,
)
return v_function_trainer
def _setup_reward_function_training(self, env_or_buffer):
reward_function_trainer_config = MT.reward_trainiers.AMPRewardFunctionTrainerConfig(
batch_size=self._config.discriminator_batch_size,
regularization_coefficient=self._config.discriminator_weight_decay,
extra_regularization_coefficient=self._config.discriminator_extra_regularization_coefficient,
extra_regularization_variable_names=self._config.discriminator_extra_regularization_variable_names,
gradient_penelty_coefficient=self._config.discriminator_gradient_penelty_coefficient,
gradient_penalty_indexes=self._config.discriminator_gradient_penalty_indexes,
)
model = (
self._discriminator._reward_function
if isinstance(self._discriminator, _StatePreprocessedRewardFunction)
else self._discriminator
)
preprocessor = self._discriminator_state_preprocessor if self._config.preprocess_state else None
reward_function_trainer = MT.reward_trainiers.AMPRewardFunctionTrainer(
models=model,
solvers={self._discriminator.scope_name: self._discriminator_solver},
env_info=self._env_info,
state_preprocessor=preprocessor,
config=reward_function_trainer_config,
)
return reward_function_trainer
def _after_training_finish(self, env_or_buffer):
for actor in self._actors:
actor.dispose()
for process in self._actor_processes:
self._kill_actor_processes(process)
def _launch_actor_processes(self, env):
actors = self._build_amp_actors(
env=env,
policy=self._policy,
v_function=self._v_function,
state_preprocessor=(self._pi_v_state_preprocessor if self._config.preprocess_state else None),
reward_function=self._discriminator,
reward_state_preprocessor=(
self._discriminator_state_preprocessor if self._config.preprocess_state else None
),
env_explorer=self._env_explorer_builder(
self._env_info,
self._config,
self,
greedy_action_selector=self._compute_greedy_action,
random_action_selector=self._compute_explore_action,
),
)
processes = []
for actor in actors:
if self._config.actor_num == 1:
# Run on same process when we have only 1 actor
p = th.Thread(target=actor, daemon=False)
else:
p = mp.Process(target=actor, daemon=True)
p.start()
processes.append(p)
return actors, processes
def _build_amp_actors(
self, env, policy, v_function, state_preprocessor, reward_function, reward_state_preprocessor, env_explorer
):
actors = []
for i in range(self._config.actor_num):
actor = _AMPActor(
actor_num=i,
env=env,
env_info=self._env_info,
policy=policy,
v_function=v_function,
state_preprocessor=state_preprocessor,
reward_function=reward_function,
reward_state_preprocessor=reward_state_preprocessor,
config=self._config,
env_explorer=env_explorer,
)
actors.append(actor)
return actors
def _run_online_training_iteration(self, env):
update_interval = self._config.actor_timesteps * self._config.actor_num
if self.iteration_num % update_interval != 0:
return
experiences_per_agent = self._collect_experiences(self._actors)
assert len(experiences_per_agent) == self._config.actor_num
self._add_experience_to_reward_buffers(experiences_per_agent)
# s and s_expert shape is tuple_size * (batch_size, dim)
s, s_expert = _concatenate_state(experiences_per_agent)
if update_interval < self.iteration_num:
# NOTE: The first update (when update_interval == self.iteration_num) will be skipped
policy_buffers, v_function_buffers = self._create_policy_and_v_function_buffers(experiences_per_agent)
self._amp_training(
self._discriminator_agent_replay_buffers,
self._discriminator_expert_replay_buffers,
policy_buffers,
v_function_buffers,
)
if self._config.preprocess_state and self.iteration_num < self._config.num_processor_samples:
self._pi_v_state_preprocessor.update(s)
self._discriminator_state_preprocessor.update(s)
self._discriminator_state_preprocessor.update(s_expert)
def _collect_experiences(self, actors: List["_AMPActor"]):
def split_result(tuple_val):
return [_tuple_val for _tuple_val in zip(*tuple_val)]
for actor in self._actors:
if self._config.actor_num != 1:
actor.update_policy_params(self._policy.get_parameters())
actor.update_reward_function_params(self._discriminator.get_parameters())
actor.update_v_function_params(self._v_function.get_parameters())
if self._config.preprocess_state:
casted_pi_v_state_preprocessor = cast(Model, self._pi_v_state_preprocessor)
actor.update_state_preprocessor_params(casted_pi_v_state_preprocessor.get_parameters())
casted_discriminator_state_preprocessor = cast(Model, self._discriminator_state_preprocessor)
actor.update_reward_state_preprocessor_params(
casted_discriminator_state_preprocessor.get_parameters()
)
else:
# Its running on same process. No need to synchronize parameters with multiprocessing arrays.
pass
actor.run_data_collection()
experiences_per_agent = []
for actor in actors:
result = actor.wait_data_collection()
# Copy result to main processor
result = copy.deepcopy(result)
splitted_result = []
for r in result:
if isinstance(r, tuple):
splitted_result.append(split_result(r))
else:
splitted_result.append(r)
assert len(splitted_result[-1]) == self._config.actor_timesteps
experience = [
(s, a, r, non_terminal, n_s, log_prob, non_greedy, e_s, e_a, e_s_next, v_target, advantage)
for (s, a, r, non_terminal, n_s, log_prob, non_greedy, e_s, e_a, e_s_next, v_target, advantage) in zip(
*splitted_result
)
]
assert len(experience) == self._config.actor_timesteps
experiences_per_agent.append(experience)
assert len(experiences_per_agent) == self._config.actor_num
return experiences_per_agent
def _add_experience_to_reward_buffers(self, experience_per_agent):
assert len(self._discriminator_agent_replay_buffers) == len(experience_per_agent)
assert len(self._discriminator_expert_replay_buffers) == len(experience_per_agent)
for agent_buffer, expert_buffer, experience in zip(
self._discriminator_agent_replay_buffers, self._discriminator_expert_replay_buffers, experience_per_agent
):
agent_buffer.append_all(experience)
expert_buffer.append_all(experience)
def _create_policy_and_v_function_buffers(self, experiences_per_agent):
policy_buffers = []
v_function_buffers = []
for experience in experiences_per_agent:
policy_buffer = ReplayBuffer()
v_function_buffer = ReplayBuffer()
for s, a, r, non_terminal, n_s, log_prob, non_greedy, _, _, _, v, ad in experience:
v_function_buffer.append((s, a, r, non_terminal, n_s, log_prob, v, ad, non_greedy))
if non_greedy: # NOTE: Only use sampled action for policy learning
policy_buffer.append((s, a, r, non_terminal, n_s, log_prob, v, ad, non_greedy))
policy_buffers.append(policy_buffer)
v_function_buffers.append(v_function_buffer)
return policy_buffers, v_function_buffers
def _kill_actor_processes(self, process):
if isinstance(process, mp.Process):
process.terminate()
else:
# This is a thread. do nothing
pass
process.join()
def _run_offline_training_iteration(self, buffer):
raise NotImplementedError
def _amp_training(
self,
discriminator_agent_replay_buffers,
discriminator_expert_replay_buffers,
policy_replay_buffers,
v_function_replay_buffers,
):
self._reward_function_training(discriminator_agent_replay_buffers, discriminator_expert_replay_buffers)
total_updates = (self._config.actor_num * self._config.actor_timesteps) // self._config.batch_size
self._v_function_training(total_updates, v_function_replay_buffers)
self._policy_training(total_updates, policy_replay_buffers)
def _reward_function_training(self, agent_buffers: List[ReplayBuffer], expert_buffers: List[ReplayBuffer]):
num_updates = (self._config.actor_num * self._config.actor_timesteps) // self._config.discriminator_batch_size
for _ in range(self._config.discriminator_epochs):
for _ in range(num_updates):
agent_experiences = _sample_experiences_from_buffers(
agent_buffers, self._config.discriminator_batch_size
)
(s_agent, a_agent, _, _, s_next_agent, *_) = marshal_experiences(agent_experiences)
expert_experiences = _sample_experiences_from_buffers(
expert_buffers, self._config.discriminator_batch_size
)
(_, _, _, _, _, _, _, s_expert, a_expert, s_next_expert, _, _) = marshal_experiences(expert_experiences)
extra = {}
extra["s_current_agent"] = s_agent
extra["a_current_agent"] = a_agent
extra["s_next_agent"] = s_next_agent
extra["s_current_expert"] = s_expert
extra["a_current_expert"] = a_expert
extra["s_next_expert"] = s_next_expert
batch = TrainingBatch(batch_size=self._config.discriminator_batch_size, extra=extra)
self._discriminator_trainer_state = self._discriminator_trainer.train(batch)
def _v_function_training(self, total_updates, v_function_replay_buffers):
v_function_buffer_iterator = _EquallySampleBufferIterator(
total_updates, v_function_replay_buffers, self._config.batch_size
)
for _ in range(self._config.epochs):
for experiences in v_function_buffer_iterator:
(s, a, _, _, _, _, v_target, _, _) = marshal_experiences(experiences)
extra = {}
extra["v_target"] = v_target
batch = TrainingBatch(batch_size=len(experiences), s_current=s, a_current=a, extra=extra)
self._v_function_trainer_state = self._v_function_trainer.train(batch)
def _policy_training(self, total_updates, policy_replay_buffers):
policy_buffer_iterator = _EquallySampleBufferIterator(
total_updates, policy_replay_buffers, self._config.batch_size
)
for _ in range(self._config.epochs):
for experiences in policy_buffer_iterator:
(s, a, _, _, _, log_prob, _, advantage, _) = marshal_experiences(experiences)
extra = {}
extra["log_prob"] = log_prob
extra["advantage"] = advantage
batch = TrainingBatch(batch_size=len(experiences), s_current=s, a_current=a, extra=extra)
self._policy_trainer_state = self._policy_trainer.train(batch)
def _evaluation_action_selector(self, s, *, begin_of_episode=False):
return self._evaluation_actor(s, begin_of_episode=begin_of_episode)
def _models(self):
models = {}
models[self._policy.scope_name] = self._policy
models[self._v_function.scope_name] = self._v_function
models[self._discriminator.scope_name] = self._discriminator
if self._config.preprocess_state and isinstance(self._discriminator_state_preprocessor, Model):
models[self._discriminator_state_preprocessor.scope_name] = self._discriminator_state_preprocessor
if self._config.preprocess_state and isinstance(self._pi_v_state_preprocessor, Model):
models[self._pi_v_state_preprocessor.scope_name] = self._pi_v_state_preprocessor
return models
def _solvers(self):
solvers = {}
solvers[self._v_function.scope_name] = self._v_function_solver
solvers[self._policy.scope_name] = self._policy_solver
solvers[self._discriminator.scope_name] = self._discriminator_solver
return solvers
[docs] @classmethod
def is_supported_env(cls, env_or_env_info):
env_info = (
EnvironmentInfo.from_env(env_or_env_info) if isinstance(env_or_env_info, gym.Env) else env_or_env_info
)
return not env_info.is_discrete_action_env() and not env_info.is_tuple_action_env()
@eval_api
def _compute_greedy_action(self, s, *, begin_of_episode=False):
s = add_batch_dimension(s)
if not hasattr(self, "_greedy_state_var"):
self._greedy_state_var = create_variable(1, self._env_info.state_shape)
distribution = self._policy.pi(self._greedy_state_var)
self._greedy_action = distribution.choose_probable()
self._greedy_action_log_prob = distribution.log_prob(self._greedy_action)
if self._config.normalize_action:
# NOTE: an action from policy is normalized.
self._greedy_action = unnormalize(self._greedy_action, self._action_mean, self._action_std)
set_data_to_variable(self._greedy_state_var, s)
nn.forward_all([self._greedy_action, self._greedy_action_log_prob])
action = np.squeeze(self._greedy_action.d, axis=0)
log_prob = np.squeeze(self._greedy_action_log_prob.d, axis=0)
info = {}
info["log_prob"] = log_prob
return action, info
@eval_api
def _compute_explore_action(self, s, *, begin_of_episode=False):
s = add_batch_dimension(s)
if not hasattr(self, "_explore_state_var"):
self._explore_state_var = create_variable(1, self._env_info.state_shape)
distribution = self._policy.pi(self._explore_state_var)
self._explore_action, self._explore_action_log_prob = distribution.sample_and_compute_log_prob()
if self._config.normalize_action:
# NOTE: an action from policy is normalized.
self._explore_action = unnormalize(self._explore_action, self._action_mean, self._action_std)
set_data_to_variable(self._explore_state_var, s)
nn.forward_all([self._explore_action, self._explore_action_log_prob])
action = np.squeeze(self._explore_action.d, axis=0)
log_prob = np.squeeze(self._explore_action_log_prob.d, axis=0)
info = {}
info["log_prob"] = log_prob
return action, info
@property
def latest_iteration_state(self):
latest_iteration_state = super(AMP, self).latest_iteration_state
if hasattr(self, "_discriminator_trainer_state"):
discriminator_trainer_state = {}
for k, v in self._discriminator_trainer_state.items():
discriminator_trainer_state[k] = float(v)
latest_iteration_state["scalar"].update(discriminator_trainer_state)
if hasattr(self, "_v_function_trainer_state"):
latest_iteration_state["scalar"].update({"v_loss": float(self._v_function_trainer_state["v_loss"])})
if hasattr(self, "_policy_trainer_state"):
policy_trainer_state = {}
for k, v in self._policy_trainer_state.items():
policy_trainer_state[k] = float(v)
latest_iteration_state["scalar"].update(policy_trainer_state)
return latest_iteration_state
@property
def trainers(self):
return {
"discriminator": self._discriminator_trainer,
"v_function": self._v_function_trainer,
"policy": self._policy_trainer,
}
def _sample_experiences_from_buffers(buffers: List[ReplayBuffer], batch_size: int) -> List[Experience]:
experiences: List[Experience] = []
for buffer in buffers:
experience, _ = buffer.sample(num_samples=int(np.ceil(batch_size / len(buffers))))
experience = cast(List[Experience], experience)
experiences.extend(experience)
assert len(experiences) >= batch_size
return experiences[:batch_size]
def _concatenate_state(experiences_per_agent) -> Tuple[np.ndarray, np.ndarray]:
all_experience = []
for e in experiences_per_agent:
all_experience.extend(e)
s, _, _, _, _, _, _, e_s, _, _, _, _ = marshal_experiences(all_experience)
return s, e_s
class _AMPActor:
def __init__(
self,
actor_num: int,
env: gym.Env,
env_info: EnvironmentInfo,
policy: StochasticPolicy,
v_function: VFunction,
state_preprocessor: Optional[Preprocessor],
reward_function: RewardFunction,
reward_state_preprocessor: Optional[Preprocessor],
env_explorer: EnvironmentExplorer,
config: AMPConfig,
):
# These variables will be copied when process is created
self._actor_num = actor_num
self._env = env
self._env_info = env_info
self._policy = policy
self._v_function = v_function
self._reward_function = reward_function
self._reward_state_preprocessor = reward_state_preprocessor
self._state_preprocessor = state_preprocessor
self._timesteps = config.actor_timesteps
self._gamma = config.gamma
self._lambda = config.lmb
self._config = config
self._env_explorer = env_explorer
# IPC communication variables
self._disposed = mp.Value("i", False)
self._task_start_event = mp.Event()
self._task_finish_event = mp.Event()
self._policy_mp_arrays = new_mp_arrays_from_params(policy.get_parameters())
self._v_function_mp_arrays = new_mp_arrays_from_params(v_function.get_parameters())
self._reward_function_mp_arrays = new_mp_arrays_from_params(reward_function.get_parameters())
if self._config.preprocess_state:
assert state_preprocessor is not None
casted_state_preprocessor = cast(Model, state_preprocessor)
self._state_preprocessor_mp_arrays = new_mp_arrays_from_params(casted_state_preprocessor.get_parameters())
assert reward_state_preprocessor is not None
casted_reward_state_preprocessor = cast(Model, reward_state_preprocessor)
self._reward_state_preprocessor_mp_arrays = new_mp_arrays_from_params(
casted_reward_state_preprocessor.get_parameters()
)
MultiProcessingArrays = namedtuple(
"MultiProcessingArrays",
[
"state",
"action",
"reward",
"non_terminal",
"next_state",
"log_prob",
"non_greedy_action",
"expert_state",
"expert_action",
"expert_next_state",
"v_target",
"advantage",
],
)
state_mp_array = self._prepare_state_mp_array(env_info.observation_space, env_info)
action_mp_array = self._prepare_action_mp_array(env_info.action_space, env_info)
scalar_mp_array_shape = (self._timesteps, 1)
reward_mp_array = (
mp_array_from_np_array(np.empty(shape=scalar_mp_array_shape, dtype=np.float32)),
scalar_mp_array_shape,
np.float32,
)
non_terminal_mp_array = (
mp_array_from_np_array(np.empty(shape=scalar_mp_array_shape, dtype=np.float32)),
scalar_mp_array_shape,
np.float32,
)
next_state_mp_array = self._prepare_state_mp_array(env_info.observation_space, env_info)
log_prob_mp_array = (
mp_array_from_np_array(np.empty(shape=scalar_mp_array_shape, dtype=np.float32)),
scalar_mp_array_shape,
np.float32,
)
non_greedy_action_mp_array = (
mp_array_from_np_array(np.empty(shape=scalar_mp_array_shape, dtype=np.float32)),
scalar_mp_array_shape,
np.float32,
)
v_target_mp_array = (
mp_array_from_np_array(np.empty(shape=scalar_mp_array_shape, dtype=np.float32)),
scalar_mp_array_shape,
np.float32,
)
advantage_mp_array = (
mp_array_from_np_array(np.empty(shape=scalar_mp_array_shape, dtype=np.float32)),
scalar_mp_array_shape,
np.float32,
)
expert_state_mp_array = self._prepare_state_mp_array(env_info.observation_space, env_info)
expert_action_mp_array = self._prepare_action_mp_array(env_info.action_space, env_info)
expert_next_state_mp_array = self._prepare_state_mp_array(env_info.observation_space, env_info)
self._mp_arrays = MultiProcessingArrays(
state_mp_array,
action_mp_array,
reward_mp_array,
non_terminal_mp_array,
next_state_mp_array,
log_prob_mp_array,
non_greedy_action_mp_array,
expert_state_mp_array,
expert_action_mp_array,
expert_next_state_mp_array,
v_target_mp_array,
advantage_mp_array,
)
self._reward_min = np.inf
self._reward_max = -np.inf
def __call__(self):
self._run_actor_loop()
def dispose(self):
self._disposed.value = True
self._task_start_event.set()
def run_data_collection(self):
self._task_finish_event.clear()
self._task_start_event.set()
def wait_data_collection(self):
def _mp_to_np_array(mp_array):
if isinstance(mp_array[0], tuple):
# tupled state
return tuple(mp_to_np_array(*array) for array in mp_array)
else:
return mp_to_np_array(*mp_array)
self._task_finish_event.wait()
return tuple(_mp_to_np_array(mp_array) for mp_array in self._mp_arrays)
def update_policy_params(self, params):
self._update_params(src=params, dest=self._policy_mp_arrays)
def update_v_function_params(self, params):
self._update_params(src=params, dest=self._v_function_mp_arrays)
def update_reward_function_params(self, params):
self._update_params(src=params, dest=self._reward_function_mp_arrays)
def update_state_preprocessor_params(self, params):
self._update_params(src=params, dest=self._state_preprocessor_mp_arrays)
def update_reward_state_preprocessor_params(self, params):
self._update_params(src=params, dest=self._reward_state_preprocessor_mp_arrays)
def _update_params(self, src, dest):
copy_params_to_mp_arrays(src, dest)
def _run_actor_loop(self):
context.set_nnabla_context(self._config.gpu_id)
if self._config.seed >= 0:
seed = self._actor_num + self._config.seed
else:
seed = os.getpid()
self._env.seed(seed)
set_global_seed(seed)
while True:
self._task_start_event.wait()
if self._disposed.get_obj():
break
if self._config.actor_num != 1:
# Running on different process
# Sync parameters through multiproccess arrays
self._synchronize_policy_params(self._policy.get_parameters())
self._synchronize_v_function_params(self._v_function.get_parameters())
self._synchronize_reward_function_params(self._reward_function.get_parameters())
if self._config.preprocess_state:
self._synchronize_preprocessor_params(self._state_preprocessor.get_parameters())
self._synchronize_reward_preprocessor_params(self._reward_state_preprocessor.get_parameters())
experiences = self._run_data_collection()
self._fill_result(experiences)
self._task_start_event.clear()
self._task_finish_event.set()
def _fill_result(self, experiences):
indexes = np.arange(len(experiences))
drng.shuffle(indexes)
experiences = [experiences[i] for i in indexes[: self._config.actor_timesteps]]
(s, a, r, non_terminal, s_next, log_prob, non_greedy_action, e_s, e_a, e_s_next, v_target, advantage) = (
marshal_experiences(experiences)
)
_copy_np_array_to_mp_array(s, self._mp_arrays.state)
_copy_np_array_to_mp_array(a, self._mp_arrays.action)
_copy_np_array_to_mp_array(r, self._mp_arrays.reward)
_copy_np_array_to_mp_array(non_terminal, self._mp_arrays.non_terminal)
_copy_np_array_to_mp_array(s_next, self._mp_arrays.next_state)
_copy_np_array_to_mp_array(log_prob, self._mp_arrays.log_prob)
_copy_np_array_to_mp_array(non_greedy_action, self._mp_arrays.non_greedy_action)
_copy_np_array_to_mp_array(e_s, self._mp_arrays.expert_state)
_copy_np_array_to_mp_array(e_a, self._mp_arrays.expert_action)
_copy_np_array_to_mp_array(e_s_next, self._mp_arrays.expert_next_state)
_copy_np_array_to_mp_array(v_target, self._mp_arrays.v_target)
_copy_np_array_to_mp_array(advantage, self._mp_arrays.advantage)
def _run_data_collection(self):
experiences = self._env_explorer.step(self._env, n=self._timesteps)
rewards = self._compute_rewards(experiences)
self._reward_min = min(np.min(rewards), self._reward_min)
self._reward_max = max(np.max(rewards), self._reward_max)
v_targets, advantages = _compute_v_target_and_advantage_with_clipping_and_overwriting(
v_function=self._v_function,
experiences=experiences,
rewards=rewards,
gamma=self._config.gamma,
lmb=self._config.lmb,
value_clip=(self._reward_min / (1.0 - self._config.gamma), self._reward_max / (1.0 - self._config.gamma)),
value_at_task_fail=self._config.value_at_task_fail,
value_at_task_success=self._config.value_at_task_success,
)
assert self._config.target_value_clip[0] < self._config.target_value_clip[1]
v_targets = np.clip(v_targets, a_min=self._config.target_value_clip[0], a_max=self._config.target_value_clip[1])
advantage_std = compute_std_ndarray(np.var(advantages), epsilon=1e-5, mode_for_floating_point_error="add")
advantages = normalize_ndarray(
advantages, mean=np.mean(advantages), std=advantage_std, value_clip=self._config.normalized_advantage_clip
)
assert len(experiences) == len(v_targets)
assert len(experiences) == len(advantages)
organized_experiences = []
for (s, a, r, non_terminal, s_next, info), v_target, advantage in zip(experiences, v_targets, advantages):
expert_s, expert_a, _, _, expert_n_s, _ = info["expert_experience"]
assert "greedy_action" in info
organized_experiences.append(
(
s,
a,
r,
non_terminal,
s_next,
info["log_prob"],
0.0 if info["greedy_action"] else 1.0,
expert_s,
expert_a,
expert_n_s,
v_target,
advantage,
)
)
return organized_experiences
def _compute_rewards(self, experiences: List[Experience]) -> List[float]:
if not hasattr(self, "_reward_var"):
self._s_var_label = create_variable(1, self._env_info.state_shape)
self._s_next_var_label = create_variable(1, self._env_info.state_shape)
self._a_var_label = create_variable(1, self._env_info.action_shape)
logits = self._reward_function.r(self._s_var_label, self._a_var_label, self._s_next_var_label)
# equation (7) in the paper
self._reward_var = 1.0 - 0.25 * ((logits - 1.0) ** 2)
self._reward_var = NF.maximum_scalar(self._reward_var, val=0.0) * self._config.discriminator_reward_scale
rewards: List[float] = []
for experience in experiences:
s, a, env_r, _, n_s, *_ = experience
set_data_to_variable(self._s_var_label, s)
set_data_to_variable(self._a_var_label, a)
set_data_to_variable(self._s_next_var_label, n_s)
self._reward_var.forward()
if self._config.use_reward_from_env:
reward = (1.0 - self._config.lerp_reward_coefficient) * float(
self._reward_var.d
) + self._config.lerp_reward_coefficient * float(env_r)
else:
reward = float(self._reward_var.d)
rewards.append(reward)
return rewards
def _synchronize_v_function_params(self, params):
self._synchronize_params(src=self._v_function_mp_arrays, dest=params)
def _synchronize_policy_params(self, params):
self._synchronize_params(src=self._policy_mp_arrays, dest=params)
def _synchronize_reward_function_params(self, params):
self._synchronize_params(src=self._reward_function_mp_arrays, dest=params)
def _synchronize_preprocessor_params(self, params):
self._synchronize_params(src=self._state_preprocessor_mp_arrays, dest=params)
def _synchronize_reward_preprocessor_params(self, params):
self._synchronize_params(src=self._reward_state_preprocessor_mp_arrays, dest=params)
def _synchronize_params(self, src, dest):
copy_mp_arrays_to_params(src, dest)
def _prepare_state_mp_array(self, obs_space, env_info):
if env_info.is_tuple_state_env():
state_mp_arrays = []
state_mp_array_shapes = []
state_mp_array_dtypes = []
for space in obs_space:
state_mp_array_shape = (self._timesteps, *space.shape)
state_mp_array = mp_array_from_np_array(np.empty(shape=state_mp_array_shape, dtype=space.dtype))
state_mp_array_shapes.append(state_mp_array_shape)
state_mp_array_dtypes.append(space.dtype)
state_mp_arrays.append(state_mp_array)
return tuple(x for x in zip(state_mp_arrays, state_mp_array_shapes, state_mp_array_dtypes))
else:
state_mp_array_shape = (self._timesteps, *obs_space.shape)
state_mp_array = mp_array_from_np_array(np.empty(shape=state_mp_array_shape, dtype=obs_space.dtype))
return (state_mp_array, state_mp_array_shape, obs_space.dtype)
def _prepare_action_mp_array(self, action_space, env_info):
action_mp_array_shape = (self._timesteps, action_space.shape[0])
action_mp_array = mp_array_from_np_array(np.empty(shape=action_mp_array_shape, dtype=action_space.dtype))
return (action_mp_array, action_mp_array_shape, action_space.dtype)
def _copy_np_array_to_mp_array(
np_array: Union[np.ndarray, Tuple[np.ndarray]],
mp_array_shape_type: Union[
Tuple[np.ndarray, Tuple[int, ...], np.dtype], Tuple[Tuple[np.ndarray, Tuple[int, ...], np.dtype]]
],
):
"""Copy numpy array to multiprocessing array.
Args:
np_array(Union[np.ndarray, Tuple[np.ndarray]]): copy source of numpy array.
mp_array_shape_type
(Union[Tuple[np.ndarray, Tuple[int, ...], np.dtype], Tuple[Tuple[np.ndarray, Tuple[int, ...], np.dtype]]]):
copy target of multiprocessing array, shape and type.
"""
if isinstance(np_array, tuple) and isinstance(mp_array_shape_type[0], tuple):
assert len(np_array) == len(mp_array_shape_type)
for np_ary, mp_ary_shape_type in zip(np_array, mp_array_shape_type):
np_to_mp_array(np_ary, mp_ary_shape_type[0], mp_ary_shape_type[2])
elif isinstance(np_array, np.ndarray) and isinstance(mp_array_shape_type[0], np.ndarray):
np_to_mp_array(np_array, mp_array_shape_type[0], mp_array_shape_type[2])
else:
raise ValueError("Invalid pair of np_array and mp_array!")
def _compute_v_target_and_advantage_with_clipping_and_overwriting(
v_function: VFunction,
experiences: List[Experience],
rewards: List[float],
gamma: float,
lmb: float,
value_at_task_fail: float,
value_at_task_success: float,
value_clip: Optional[Tuple[float, float]] = None,
) -> Tuple[np.ndarray, np.ndarray]:
assert isinstance(v_function, VFunction), "Invalid v_function"
if value_clip is not None:
assert value_clip[0] < value_clip[1]
assert value_at_task_success > value_at_task_fail
assert len(rewards) == len(experiences)
T = len(experiences)
v_targets: np.ndarray = np.empty(shape=(T, 1), dtype=np.float32)
advantages: np.ndarray = np.empty(shape=(T, 1), dtype=np.float32)
advantage: np.float32 = np.float32(0.0)
v_current = None
v_next = None
s_var = create_variable(1, _get_shape(experiences[0][0]))
v = v_function.v(s_var) # build graph
v_forwards = []
for t in reversed(range(T)):
# Not use reward from the environment
s_current, _, _, non_terminal, s_next, info, *_ = experiences[t]
r = rewards[t]
if not non_terminal:
v_next = None
advantage = np.float32(0.0)
# predict current v
set_data_to_variable(s_var, s_current)
v.forward()
v_current = np.squeeze(v.d)
v_forwards.append(v_current)
if value_clip is not None:
v_current = np.clip(v_current, a_min=value_clip[0], a_max=value_clip[1])
if v_next is None:
set_data_to_variable(s_var, s_next)
v.forward()
v_next = np.squeeze(v.d)
if value_clip is not None:
v_next = np.clip(v_next, a_min=value_clip[0], a_max=value_clip[1])
if info["task_result"] == TaskResult.SUCCESS:
assert not non_terminal
v_next = value_at_task_success
elif info["task_result"] == TaskResult.FAIL:
assert not non_terminal
v_next = value_at_task_fail
elif info["task_result"] == TaskResult.UNKNOWN:
pass
else:
raise ValueError
delta = r + gamma * v_next - v_current
advantage = np.float32(delta + gamma * lmb * advantage)
# A = Q - V, V = E[Q] -> v_target = A + V
v_target = advantage + v_current
v_targets[t] = v_target
advantages[t] = advantage
v_next = v_current
return np.array(v_targets, dtype=np.float32), np.array(advantages, dtype=np.float32)
class _EndlessBufferIterator(BufferIterator):
"""This buffer iterates endlessly."""
def __init__(self, buffer, batch_size, shuffle=True):
super().__init__(buffer, batch_size, shuffle, repeat=True)
def next(self):
indices = self._indices[self._index : self._index + self._batch_size]
if len(indices) < self._batch_size:
rest = self._batch_size - len(indices)
self.reset()
indices = np.append(indices, self._indices[self._index : self._index + rest])
self._index += rest
else:
self._index += self._batch_size
return self._sample(indices)
__next__ = next
class _EquallySampleBufferIterator:
def __init__(self, total_num_iterations: int, replay_buffers: List[ReplayBuffer], batch_size: int):
buffer_batch_size = int(np.ceil(batch_size / len(replay_buffers)))
self._internal_iterators = [
_EndlessBufferIterator(buffer=buffer, shuffle=False, batch_size=buffer_batch_size)
for buffer in replay_buffers
]
self._total_num_iterations = total_num_iterations
self._replay_buffers = replay_buffers
self._batch_size = batch_size
self.reset()
def __iter__(self):
return self
def next(self):
self._num_iterations += 1
if self._num_iterations > self._total_num_iterations:
raise StopIteration
return self._sample()
__next__ = next
def reset(self):
self._num_iterations = 0
for iterator in self._internal_iterators:
iterator.reset()
def _sample(self):
experiences = []
for iterator in self._internal_iterators:
experience, *_ = iterator.next()
experiences.extend(experience)
if len(experiences) > self._batch_size:
drng.shuffle(experiences)
experiences = experiences[: self._batch_size]
return experiences