Source code for nnabla_rl.replay_buffers.trajectory_replay_buffer

# Copyright 2023,2024 Sony Group Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Any, Dict, MutableSequence, Optional, Sequence, Tuple, Union, cast

import numpy as np

import nnabla_rl as rl
from nnabla_rl.replay_buffer import ReplayBuffer
from nnabla_rl.typing import Experience, Trajectory


[docs]class TrajectoryReplayBuffer(ReplayBuffer): """TrajectoryReplayBuffer. Enables appending/sampling not just an experience from the buffer but also a trajectory. In order to append/sample trajectory, the environment must be finite horizon setting. """ def __init__(self, num_trajectories=None): super(TrajectoryReplayBuffer, self).__init__(num_trajectories) self._samples_per_trajectory = [] self._num_experiences = 0 # this is num_trajectories * experiences_per_trajectory self._cumsum_experiences = [0] # this is cummulative sum of self._num_experiences def __len__(self): return self._num_experiences def __getitem__(self, item: int) -> Experience: return self._get_experience(item) def get_trajectory(self, trajectory_index: int) -> Trajectory: return cast(Trajectory, self._buffer[trajectory_index]) @property def trajectory_num(self): return len(self._samples_per_trajectory)
[docs] def append(self, experience: Experience): raise NotImplementedError
[docs] def append_all(self, experiences: Sequence[Experience]): raise NotImplementedError
def append_trajectory(self, trajectory: Trajectory): self._buffer.append(trajectory) self._samples_per_trajectory.append(len(trajectory)) num_experiences = 0 cumsum_experiences = [] for i in range(self.trajectory_num): num_experiences += self._samples_per_trajectory[i] cumsum_experiences.append(num_experiences) self._num_experiences = num_experiences self._cumsum_experiences = cumsum_experiences
[docs] def sample_indices( self, indices: Sequence[int], num_steps: int = 1 ) -> Tuple[Union[Sequence[Experience], Tuple[Sequence[Experience], ...]], Dict[str, Any]]: if len(indices) == 0: raise ValueError("Indices are empty") if num_steps < 1: raise ValueError(f"num_steps: {num_steps} should be greater than 0!") experiences: Union[Sequence[Experience], Tuple[Sequence[Experience], ...]] if num_steps == 1: experiences = [self._get_experience(index) for index in indices] else: experiences = tuple([self._get_experience(index + i) for index in indices] for i in range(num_steps)) weights = np.ones([len(indices), 1]) return experiences, dict(weights=weights)
[docs] def sample_trajectories( self, num_samples: int = 1 ) -> Tuple[Union[Trajectory, Tuple[Trajectory, ...]], Dict[str, Any]]: """Randomly sample num_samples trajectories from the replay buffer. Args: num_samples (int): Number of samples to sample from the replay buffer. Defaults to 1. Returns: trajectories (Tuple[Trajectory, ...]): Randomly sampled num_samples of trajectories. info (Dict[str, Any]): dictionary of information about trajectories. Raises: ValueError: num_samples exceeds the maximum possible trajectories or num_steps is 0 or negative. """ max_index = self.trajectory_num if num_samples > max_index: raise ValueError( f"num_samples: {num_samples} is greater than the number of trajectories saved in buffer: {max_index}" ) indices = self._random_trajectory_indices(num_samples=num_samples, max_index=max_index) return self.sample_indices_trajectory(indices)
def sample_indices_trajectory( self, indices: Sequence[int] ) -> Tuple[Union[Trajectory, Tuple[Trajectory, ...]], Dict[str, Any]]: if len(indices) == 0: raise ValueError("Indices are empty") trajectories: Union[Trajectory, Tuple[Trajectory, ...]] if len(indices) == 1: trajectories = self.get_trajectory(indices[0]) else: trajectories = tuple(self.get_trajectory(index) for index in indices) weights = np.ones([len(indices), 1]) return trajectories, dict(weights=weights)
[docs] def sample_trajectories_portion( self, num_samples: int = 1, portion_length: int = 1 ) -> Tuple[Tuple[Trajectory, ...], Dict[str, Any]]: """Randomly sample num_samples trajectories with length portion_length from the replay buffer. (i.e. Each trajectory length will be portion_length) Trajectory will be sampled as follows. First, a trajectory will be sampled with probablity proportional to its length. Then, a random initial index between 0 to len(sampled_trajectory) - portion_length will be sampled. Finally, a portion_length size trajectory starting from sampled intial index will be returned. Args: num_samples (int): Number of samples to sample from the replay buffer. Defaults to 1. portion_length (int): Length of each sampled trajectory. Defaults to 1. Returns: trajectories (Tuple[Trajectory, ...]): Randomly sampled num_samples of trajectories with given portion_length. info (Dict[str, Any]): dictionary of information about trajectories. Raises: ValueError: num_samples exceeds the maximum possible trajectories or num_steps is 0 or negative. RuntimeError: Trajectory's length is below portion_length. """ max_index = self.trajectory_num p = np.asarray(self._samples_per_trajectory) p = p / np.sum(p) trajectory_indices = rl.random.drng.choice(max_index, size=num_samples, replace=True, p=p) trajectories: MutableSequence[Trajectory] = [self.get_trajectory(index) for index in trajectory_indices] sliced_trajectories: MutableSequence[Trajectory] = [] for trajectory in trajectories: max_index = len(trajectory) - portion_length if max_index < 0: raise RuntimeError(f"Trajectory length is shorter than portion length: {portion_length}") initial_index = rl.random.drng.choice(max_index + 1, replace=False) sliced_trajectories.append(trajectory[initial_index : initial_index + portion_length]) weights = np.ones([len(trajectories), 1]) return tuple(sliced_trajectories), dict(weights=weights)
[docs] def sample_indices_portion( self, indices: Sequence[int], portion_length: int = 1 ) -> Tuple[Tuple[Trajectory, ...], Dict[str, Any]]: """Sample trajectory portions from the buffer. (i.e. Each trajectory length will be portion_length) Trajectory from given index to index+portion_length-1 will be sampled. Index should be the index of a experience in the buffer and not trajectory's index. For example, if this buffer has 10 trajectories which consist of 5 experiences each, then: index 0: first experience of the first trajectory. index 1: second experience of the first trajectory. index 5: first experience of the second trajectory. index 6: second experience of the second trajectory. If index + portion_length exceeds the length of a trajectory, this will sample. from len(trajectory) - portion_length to len(trajectory) - 1 Args: indices (int): Indices of the experience to sample from the replay buffer. portion_length (int): Length of each sampled trajectory. Defaults to 1. Returns: trajectories (Tuple[Trajectory, ...]): Randomly sampled num_samples of trajectories with given portion_length. info (Dict[str, Any]): dictionary of information about trajectories. Raises: RuntimeError: Trajectory's length is below portion_length. """ if len(indices) == 0: raise ValueError("Indices are empty") if portion_length < 1: raise ValueError(f"portion_length: {portion_length} should be greater than 0!") sliced_trajectories: MutableSequence[Trajectory] = [] for index in indices: trajectory_index = np.argwhere(np.asarray(self._cumsum_experiences) > index)[0][0] trajectory = self.get_trajectory(trajectory_index) if len(trajectory) < portion_length: raise RuntimeError(f"Trajectory length is shorter than portion length: {portion_length}") if 0 < trajectory_index: experience_index = index - self._cumsum_experiences[trajectory_index - 1] else: experience_index = index experience_index = min(experience_index, len(trajectory) - portion_length) sliced_trajectories.append(trajectory[experience_index : experience_index + portion_length]) weights = np.ones([len(indices), 1]) return tuple(sliced_trajectories), dict(weights=weights)
def _random_indices(self, num_samples: int, max_index: Optional[int] = None) -> Sequence[int]: if max_index is None: max_index = self._num_experiences # NOTE: Do NOT replace with np.random.choice(max_index, size=num_samples, replace=False) # np.random.choice is terribly slow when sampling without replacement indices = rl.random.drng.choice(max_index, size=num_samples, replace=False) return cast(Sequence[int], indices) def _random_trajectory_indices(self, num_samples: int, max_index: Optional[int] = None) -> Sequence[int]: if max_index is None: max_index = self.trajectory_num # NOTE: Do NOT replace with np.random.choice(max_index, size=num_samples, replace=False) # np.random.choice is terribly slow when sampling without replacement indices = rl.random.drng.choice(max_index, size=num_samples, replace=False) return cast(Sequence[int], indices) def _get_experience(self, experience_index) -> Experience: prev_cumsum = 0 for trajectory_index, cumsum in enumerate(self._cumsum_experiences): if experience_index < cumsum: trajectory: Trajectory = self.get_trajectory(trajectory_index) experience: Experience = trajectory[experience_index - prev_cumsum] return experience prev_cumsum = cumsum raise ValueError(f"index {experience_index} is out of range")