Source code for nnabla_rl.algorithms.ddp

# Copyright 2022,2023 Sony Group Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union, cast

import gym
import numpy as np

from nnabla_rl.algorithm import Algorithm, AlgorithmConfig, eval_api
from nnabla_rl.environments.environment_info import EnvironmentInfo
from nnabla_rl.numpy_models.cost_function import CostFunction
from nnabla_rl.numpy_models.dynamics import Dynamics


[docs]@dataclass class DDPConfig(AlgorithmConfig): """List of configurations for DDP (Differential Dynamic Programming) algorithm. Args: T_max (int): Planning time step length. Defaults to 50. num_iterations (int): Number of iterations for the optimization. Defaults to 10. mu_min (float): Minimum value for regularizing the hessian of the value funtion. Defaults to 1e-6. modification_factor (float): Modification factor for the regularizer. Defaults to 2.0. accept_improvement_ratio (float): Threshold value for deciding to accept the update or not. Defaults to 0.0 """ T_max: int = 50 num_iterations: int = 10 mu_min: float = 1e-6 modification_factor: float = 2.0 accept_improvement_ratio: float = 0.0 def __post_init__(self): super().__post_init__() self._assert_positive(self.T_max, 'T_max') self._assert_positive(self.num_iterations, 'num_iterations')
[docs]class DDP(Algorithm): """Differential Dynamic Programming algorithm. This class implements the differential dynamic programming (DDP) algorithm proposed by D. Mayne in the paper: "A Second-order Gradient Method for Determining Optimal Trajectories of Non-linear Discrete-time Systems". We also referred the paper written by Y. Tassa et al.: "Synthesis and Stabilization of Complex Behaviors through Online Trajectory Optimization" for the implementation of this algorithm. Args: env_or_env_info\ (gym.Env or :py:class:`EnvironmentInfo <nnabla_rl.environments.environment_info.EnvironmentInfo>`): the environment to train or environment info dynamics (:py:class:`Dynamics <nnabla_rl.non_nn_models.dynamics.Dynamics>`): dynamics of the system to control cost_function (:py:class:`Dynamics <nnabla_rl.non_nn_models.cost_function.CostFunction>`): cost function to optimize the trajectory config (:py:class:`DDPConfig <nnabla_rl.algorithmss.ilqr.DDPConfig>`): the parameter for DDP controller """ _config: DDPConfig def __init__(self, env_or_env_info, dynamics: Dynamics, cost_function: CostFunction, config=DDPConfig()): super(DDP, self).__init__(env_or_env_info, config=config) self._dynamics = dynamics self._cost_function = cost_function @eval_api def compute_eval_action(self, state, *, begin_of_episode=False, extra_info={}): dynamics = self._dynamics cost_function = self._cost_function x0 = state u0 = [np.zeros((dynamics.action_dim(), 1)) for t in range(self._config.T_max - 1)] initial_trajectory = self._compute_initial_trajectory(x0, dynamics, self._config.T_max, u0) improved_trajectory, _ = self._optimize(initial_trajectory, dynamics, cost_function) return improved_trajectory[0][1] @eval_api def compute_trajectory(self, initial_trajectory: Sequence[Tuple[np.ndarray, Optional[np.ndarray]]]) \ -> Tuple[Sequence[Tuple[np.ndarray, Optional[np.ndarray]]], Sequence[Dict[str, Any]]]: assert len(initial_trajectory) == self._config.T_max dynamics = self._dynamics cost_function = self._cost_function mu = 0.0 delta = 0.0 trajectory = initial_trajectory for _ in range(self._config.num_iterations): trajectory, trajectory_info, mu, delta = \ self._improve_trajectory(trajectory, dynamics, cost_function, mu, delta) return trajectory, trajectory_info def _optimize(self, initial_state: Union[np.ndarray, Sequence[Tuple[np.ndarray, Optional[np.ndarray]]]], dynamics: Dynamics, cost_function: CostFunction, **kwargs) \ -> Tuple[Sequence[Tuple[np.ndarray, Optional[np.ndarray]]], Sequence[Dict[str, Any]]]: assert len(initial_state) == self._config.T_max initial_state = cast(Sequence[Tuple[np.ndarray, Optional[np.ndarray]]], initial_state) mu = 0.0 delta = 0.0 trajectory = initial_state for _ in range(self._config.num_iterations): trajectory, trajectory_info, mu, delta = \ self._improve_trajectory(trajectory, dynamics, cost_function, mu, delta) return trajectory, trajectory_info def _compute_initial_trajectory(self, x0, dynamics, T, u): trajectory = [] x = x0 for t in range(T - 1): trajectory.append((x, u[t])) x, _ = dynamics.next_state(x, u[t], t) trajectory.append((x, None)) return trajectory def _improve_trajectory(self, trajectory: Sequence[Tuple[np.ndarray, Optional[np.ndarray]]], dynamics: Dynamics, cost_function: CostFunction, mu: float, delta: float) -> Tuple[Sequence[Tuple[np.ndarray, Optional[np.ndarray]]], Sequence[Dict[str, Any]], float, float]: while True: ks, Ks, Qus, Quus, Quu_invs, success = self._backward_pass(trajectory, dynamics, cost_function, mu) mu, delta = self._update_regularizer(mu, delta, increase=not success) if success: break # Backtracking linear search alphas = 0.9**(np.arange(10) ** 2) improved_trajectory = trajectory improved_trajectory_info: Sequence[Dict[str, Any]] = [] J_current = self._compute_cost(trajectory, cost_function) for alpha in alphas: new_trajectory, new_trajectory_info = self._forward_pass(trajectory, dynamics, ks, Ks, alpha) J_new = self._compute_cost(new_trajectory, cost_function) delta_J = self._expected_cost_reduction(ks, Qus, Quus, alpha) reduction_ratio = (J_current - J_new) / np.abs(delta_J) if self._config.accept_improvement_ratio < reduction_ratio: improved_trajectory = new_trajectory # append Quu for info, k, K, Quu, Quu_inv in zip(new_trajectory_info, ks, Ks, Quus, Quu_invs): info.update({'k': k, 'K': K, 'Quu': Quu, 'Quu_inv': Quu_inv}) improved_trajectory_info = new_trajectory_info break return improved_trajectory, improved_trajectory_info, mu, delta def _update_regularizer(self, mu, delta, increase): if increase: # increase mu delta0 = self._config.modification_factor delta = max(delta0, delta * delta0) mu = max(self._config.mu_min, mu * delta) else: # decrease mu delta0 = self._config.modification_factor delta = min(1.0 / delta0, delta / delta0) mu = mu * delta if mu < self._config.mu_min: mu = 0.0 return mu, delta def _backward_pass(self, trajectory, dynamics, cost_function, mu): x_last, u_last = trajectory[-1] # Initialize Vx and Vxx to the gradient/hessian of value function of the final state of the trajectory Vx, *_ = cost_function.gradient(x_last, u_last, self._config.T_max, final_state=True) Vxx, *_ = cost_function.hessian(x_last, u_last, self._config.T_max, final_state=True) E = np.identity(n=Vxx.shape[0]) ks: List[np.ndarray] = [] Ks: List[np.ndarray] = [] Qus: List[np.ndarray] = [] Quus: List[np.ndarray] = [] Quu_invs: List[np.ndarray] = [] x_dim = dynamics.state_dim() u_dim = dynamics.action_dim() for t in reversed(range(self._config.T_max - 1)): (x, u) = trajectory[t] Cx, Cu = cost_function.gradient(x, u, self._config.T_max - t - 1) Cxx, Cxu, Cux, Cuu = cost_function.hessian(x, u, self._config.T_max - t - 1) Fx, Fu = dynamics.gradient(x, u, self._config.T_max - t - 1) # Hessians should be a 3d tensor Fxx, Fxu, Fux, Fuu = dynamics.hessian(x, u, self._config.T_max - t - 1) Quu = Cuu + Fu.T.dot(Vxx + mu * E).dot(Fu) + np.tensordot(Vx, Fuu, axes=(0, 0)).reshape((u_dim, u_dim)) if not self._is_positive_definite(Quu): return ks, Ks, Qus, Quus, Quu_invs, False Qx = Cx + Fx.T.dot(Vx) Qu = Cu + Fu.T.dot(Vx) Qxx = Cxx + Fx.T.dot(Vxx).dot(Fx) + np.tensordot(Vx, Fxx, axes=(0, 0)).reshape((x_dim, x_dim)) # NOTE: Qxu and Qux should be symmetric and same matrix. i.e. Qxu = Qux and Qxu.T = Qux Qxu = Cxu + Fu.T.dot(Vxx + mu * E).dot(Fx).T + np.tensordot(Vx, Fxu, axes=(0, 0)).reshape((x_dim, u_dim)) Qux = Cux + Fu.T.dot(Vxx + mu * E).dot(Fx) + np.tensordot(Vx, Fux, axes=(0, 0)).reshape((u_dim, x_dim)) assert np.allclose(Qxu, Qux.T) Quu_inv = np.linalg.inv(Quu) k = -Quu_inv.dot(Qu) K = -Quu_inv.dot(Qux) ks.append(k) Ks.append(K) Qus.append(Qu) Quus.append(Quu) Quu_invs.append(Quu_inv) Vx = Qx + K.T.dot(Quu).dot(k) + K.T.dot(Qu) + Qux.T.dot(k) Vxx = Qxx + K.T.dot(Quu).dot(K) + K.T.dot(Qux) + Qux.T.dot(K) ks = list(reversed(ks)) Ks = list(reversed(Ks)) Qus = list(reversed(Qus)) Quus = list(reversed(Quus)) Quu_invs = list(reversed(Quu_invs)) return ks, Ks, Qus, Quus, Quu_invs, True def _forward_pass( self, trajectory: Sequence[Tuple[np.ndarray, Optional[np.ndarray]]], dynamics: Dynamics, ks: List[np.ndarray], Ks: List[np.ndarray], alpha: float ) -> Tuple[Sequence[Tuple[np.ndarray, Optional[np.ndarray]]], Sequence[Dict[str, Any]]]: x_hat = trajectory[0][0] new_trajectory = [] new_trajectory_info: List[Dict[str, Any]] = [] for t, ((x, u), k, K) in enumerate(zip(trajectory[:-1], ks, Ks)): # not include final step assert u is not None dx = x_hat - x du = alpha * k + K.dot(dx) u_hat = u + du new_trajectory.append((x_hat, u_hat)) new_trajectory_info.append({}) x_hat, _ = dynamics.next_state(x_hat, u_hat, t) new_trajectory.append((x_hat, None)) # final timestep input is None new_trajectory_info.append({}) return new_trajectory, new_trajectory_info def _compute_cost( self, trajectory: Sequence[Tuple[np.ndarray, Optional[np.ndarray]]], cost_function: CostFunction ) -> float: J = 0.0 for t, (x, u) in enumerate(trajectory[:-1]): # not include final step J += float(cost_function.evaluate(x, u, t)) J += float(cost_function.evaluate(trajectory[-1][0], trajectory[-1][1], len(trajectory), final_state=True)) return J def _expected_cost_reduction(self, ks, Qus, Quus, alpha) -> float: delta_J = 0.0 for (k, Qu, Quu) in zip(ks, Qus, Quus): linear_part = alpha * k.T.dot(Qu) squared_part = 0.5 * (alpha ** 2.0) * k.T.dot(Quu).dot(k) delta_J += float(linear_part) + float(squared_part) return delta_J def _is_positive_definite(self, symmetric_matrix: np.ndarray): return np.all(np.linalg.eigvals(symmetric_matrix) > 0.0) def _before_training_start(self, env_or_buffer): raise NotImplementedError('You do not need training to use this algorithm.') def _run_online_training_iteration(self, env): raise NotImplementedError('You do not need training to use this algorithm.') def _run_offline_training_iteration(self, buffer): raise NotImplementedError('You do not need training to use this algorithm.') def _after_training_finish(self, env_or_buffer): raise NotImplementedError('You do not need training to use this algorithm.') def _models(self): return {} def _solvers(self): return {}
[docs] @classmethod def is_supported_env(cls, env_or_env_info): env_info = EnvironmentInfo.from_env(env_or_env_info) if isinstance(env_or_env_info, gym.Env) \ else env_or_env_info return not env_info.is_discrete_action_env() and not env_info.is_tuple_action_env()
@property def trainers(self): return {}