Source code for nnabla_rl.algorithms.lqr

# Copyright 2022,2023,2024 Sony Group Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union, cast

import gym
import numpy as np

from nnabla_rl.algorithm import Algorithm, AlgorithmConfig, eval_api
from nnabla_rl.environments.environment_info import EnvironmentInfo
from nnabla_rl.numpy_models.cost_function import CostFunction
from nnabla_rl.numpy_models.dynamics import Dynamics


[docs]@dataclass class LQRConfig(AlgorithmConfig): """List of configurations for LQR (Linear Quadratic Regulator) algorithm. Args: T_max (int): Planning time step length. Defaults to 50. """ T_max: int = 50 def __post_init__(self): super().__post_init__() self._assert_positive(self.T_max, "T_max")
[docs]class LQR(Algorithm): """LQR (Linear Quadratic Regulator) algorithm. Args: env_or_env_info\ (gym.Env or :py:class:`EnvironmentInfo <nnabla_rl.environments.environment_info.EnvironmentInfo>`): the environment to train or environment info dynamics (:py:class:`Dynamics <nnabla_rl.non_nn_models.dynamics.Dynamics>`): dynamics of the system to control cost_function (:py:class:`Dynamics <nnabla_rl.non_nn_models.cost_function.CostFunction>`): cost function to optimize the trajectory config (:py:class:`LQRConfig <nnabla_rl.algorithmss.lqr.LQRConfig>`): the parameter for LQR controller """ _config: LQRConfig def __init__(self, env_or_env_info, dynamics: Dynamics, cost_function: CostFunction, config=LQRConfig()): super(LQR, self).__init__(env_or_env_info, config=config) self._dynamics = dynamics self._cost_function = cost_function @eval_api def compute_eval_action(self, state, *, begin_of_episode=False, extra_info={}): dynamics = self._dynamics cost_function = self._cost_function x0 = state u0 = [np.zeros((dynamics.action_dim(), 1)) for t in range(self._config.T_max - 1)] initial_trajectory = self._compute_initial_trajectory(x0, dynamics, self._config.T_max, u0) improved_trajectory, _ = self._optimize(initial_trajectory, dynamics, cost_function) return improved_trajectory[0][1] @eval_api def compute_trajectory( self, initial_trajectory: Sequence[Tuple[np.ndarray, Optional[np.ndarray]]] ) -> Tuple[Sequence[Tuple[np.ndarray, Optional[np.ndarray]]], Sequence[Dict[str, Any]]]: assert len(initial_trajectory) == self._config.T_max dynamics = self._dynamics cost_function = self._cost_function return self._optimize(initial_trajectory, dynamics, cost_function) def _compute_initial_trajectory(self, x0, dynamics, T, u): trajectory = [] x = x0 for t in range(T - 1): trajectory.append((x, u[t])) x, _ = dynamics.next_state(x, u[t], t) trajectory.append((x, None)) return trajectory def _optimize( self, initial_state: Union[np.ndarray, Sequence[Tuple[np.ndarray, Optional[np.ndarray]]]], dynamics: Dynamics, cost_function: CostFunction, **kwargs, ) -> Tuple[Sequence[Tuple[np.ndarray, Optional[np.ndarray]]], Sequence[Dict[str, Any]]]: assert len(initial_state) == self._config.T_max initial_state = cast(Sequence[Tuple[np.ndarray, Optional[np.ndarray]]], initial_state) x_last, u_last = initial_state[-1] Sk, *_ = cost_function.hessian(x_last, u_last, self._config.T_max, final_state=True) matrices: List[Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]] = [] for t in reversed(range(self._config.T_max - 1)): (x, u) = initial_state[t] assert u is not None A, B = dynamics.gradient(x, u, self._config.T_max - t - 1) assert B is not None Q, F, _, R = cost_function.hessian(x, u, self._config.T_max - t - 1) assert F is not None assert R is not None C = np.linalg.inv(R + (B.T.dot(Sk).dot(B))) D = F.T + B.T.dot(Sk).dot(A) Sk = Q + A.T.dot(Sk).dot(A) - D.T.dot(C).dot(D) matrices.append((Sk, A, B, R, F)) trajectory: List[Tuple[np.ndarray, Optional[np.ndarray]]] = [] trajectory_info: List[Dict[str, np.ndarray]] = [] x = initial_state[0][0] for t, (S, A, B, R, F) in enumerate(reversed(matrices)): u = self._compute_optimal_input(x, S, A, B, R, F) trajectory.append((x, u)) # Save quadratic cost coefficient R as Quu and R^-1 as Quu_inv trajectory_info.append({"Quu": R, "Quu_inv": np.linalg.inv(R)}) x, _ = dynamics.next_state(x, u, t) trajectory.append((x, None)) # final timestep input is None trajectory_info.append({}) return trajectory, trajectory_info def _compute_optimal_input(self, x, S, A, B, R, F) -> np.ndarray: C = np.linalg.inv(R + (B.T.dot(S).dot(B))) D = F.T + B.T.dot(S).dot(A) return cast(np.ndarray, -C.dot(D).dot(x)) def _before_training_start(self, env_or_buffer): raise NotImplementedError("You do not need training to use this algorithm.") def _run_online_training_iteration(self, env): raise NotImplementedError("You do not need training to use this algorithm.") def _run_offline_training_iteration(self, buffer): raise NotImplementedError("You do not need training to use this algorithm.") def _after_training_finish(self, env_or_buffer): raise NotImplementedError("You do not need training to use this algorithm.") def _models(self): return {} def _solvers(self): return {}
[docs] @classmethod def is_supported_env(cls, env_or_env_info): env_info = ( EnvironmentInfo.from_env(env_or_env_info) if isinstance(env_or_env_info, gym.Env) else env_or_env_info ) return not env_info.is_discrete_action_env() and not env_info.is_tuple_action_env()
@property def trainers(self): return {}