Functions¶
- nnabla_rl.functions.sample_gaussian(mean: Variable, ln_var: Variable, noise_clip: Tuple[float, float] | None = None) Variable [source]¶
Sample value from a gaussian distribution of given mean and variance.
- Parameters:
mean (nn.Variable) – Mean of the gaussian distribution
ln_var (nn.Variable) – Logarithm of the variance of the gaussian distribution
noise_clip (Optional[Tuple(float, float)]) – Clipping value of the sampled noise.
- Returns:
Sampled value from gaussian distribution of given mean and variance
- Return type:
nn.Variable
- nnabla_rl.functions.sample_gaussian_multiple(mean: Variable, ln_var: Variable, num_samples: int, noise_clip: Tuple[float, float] | None = None) Variable [source]¶
Sample multiple values from a gaussian distribution of given mean and variance. The returned variable will have an additional axis in the middle as follows (batch_size, num_samples, dimension)
- Parameters:
mean (nn.Variable) – Mean of the gaussian distribution
ln_var (nn.Variable) – Logarithm of the variance of the gaussian distribution
num_samples (int) – Number of samples to sample
noise_clip (Optional[Tuple(float, float)]) – Clipping value of the sampled noise.
- Returns:
Sampled values from gaussian distribution of given mean and variance
- Return type:
nn.Variable
- nnabla_rl.functions.expand_dims(x: Variable, axis: int) Variable [source]¶
Add dimension to target axis of given variable.
- Parameters:
x (nn.Variable) – Variable to expand the dimension
axis (int) – The axis to expand the dimension. Non negative.
- Returns:
Variable with additional dimension in the target axis
- Return type:
nn.Variable
- nnabla_rl.functions.repeat(x: Variable, repeats: int, axis: int) Variable [source]¶
Repeats the value along given axis for repeats times.
- Parameters:
x (nn.Variable) – Variable to repeat the values along given axis
repeats (int) – Number of times to repeat
axis (int) – The axis to expand the dimension. Non negative.
- Returns:
Variable with values repeated along given axis
- Return type:
nn.Variable
- nnabla_rl.functions.sqrt(x: Variable)[source]¶
Compute the squared root of given variable.
- Parameters:
x (nn.Variable) – Variable to compute the squared root
- Returns:
Squared root of given variable
- Return type:
nn.Variable
- nnabla_rl.functions.std(x: Variable, axis: int | None = None, keepdims: bool = False) Variable [source]¶
Compute the standard deviation of given variable along axis.
- Parameters:
x (nn.Variable) – Variable to compute the squared root
axis (Optional[int]) – Axis to compute the standard deviation. Defaults to None. None will reduce all dimensions.
keepdims (bool) – Flag whether the reduced axis are kept as a dimension with 1 element.
- Returns:
Standard deviation of given variable along axis.
- Return type:
nn.Variable
- nnabla_rl.functions.argmax(x: Variable, axis: int | None = None, keepdims: bool = False) Variable [source]¶
Compute the index which given variable has maximum value along the axis.
- Parameters:
x (nn.Variable) – Variable to compute the argmax
axis (Optional[int]) – Axis to compare the values. Defaults to None. None will reduce all dimensions.
keepdims (bool) – Flag whether the reduced axis are kept as a dimension with 1 element.
- Returns:
Index of the variable which its value is maximum along the axis
- Return type:
nn.Variable
- nnabla_rl.functions.quantile_huber_loss(x0: Variable, x1: Variable, kappa: float, tau: Variable) Variable [source]¶
Compute the quantile huber loss. See the following papers for details.
- Parameters:
x0 (nn.Variable) – Quantile values
x1 (nn.Variable) – Quantile values
kappa (float) – Threshold value of huber loss which switches the loss value between squared loss and linear loss
tau (nn.Variable) – Quantile targets
- Returns:
Quantile huber loss
- Return type:
nn.Variable
- nnabla_rl.functions.mean_squared_error(x0: Variable, x1: Variable) Variable [source]¶
Convenient alias for mean squared error operation.
- Parameters:
x0 (nn.Variable) – N-D array
x1 (nn.Variable) – N-D array
- Returns:
Mean squared error between x0 and x1
- Return type:
nn.Variable
- nnabla_rl.functions.minimum_n(variables: Sequence[Variable]) Variable [source]¶
Compute the minimum among the list of variables.
- Parameters:
variables (Sequence[nn.Variable]) – Sequence of variables. All the variables must have same shape.
- Returns:
Minimum value among the list of variables
- Return type:
nn.Variable
- nnabla_rl.functions.gaussian_cross_entropy_method(objective_function: Callable[[Variable], Variable], init_mean: Variable | ndarray, init_var: Variable | ndarray, sample_size: int = 500, num_elites: int = 10, num_iterations: int = 5, alpha: float = 0.25) Tuple[Variable, Variable] [source]¶
Optimize objective function with respect to input using cross entropy method using gaussian distribution. Candidates are sampled from a gaussian distribution \(\mathcal{N}(mean,\,variance)\)
Examples
>>> import numpy as np >>> import nnabla as nn >>> import nnabla.functions as NF >>> import nnabla_rl.functions as RF >>> def objective_function(x): return -((x - 3.)**2) # this function will be called with x which has (batch_size, sample_size, x_dim) >>> batch_size = 1 >>> variable_size = 1 >>> init_mean = nn.Variable.from_numpy_array(np.zeros((batch_size, variable_size))) >>> init_var = nn.Variable.from_numpy_array(np.ones((batch_size, variable_size))) >>> optimal_x, _ = RF.gaussian_cross_entropy_method(objective_function, init_mean, init_var, alpha=0) >>> optimal_x.forward() >>> optimal_x.shape (1, 1) # (batch_size, variable_size) >>> optimal_x.d array([[3.]], dtype=float32)
- Parameters:
objective_function (Callable[[nn.Variable], nn.Variable]) – objective function, this function will be called with nn.Variable which has (batch_size, sample_size, dim) during the optimization process, and should return nn.Variable such as costs which has (batch_size, sample_size, 1)
init_mean (Union[nn.Variable, np.ndarray]) – initial mean for the gaussian distribution
init_var (Union[nn.Variable, np.ndarray]) – initial variance for the gaussian distribution
sample_size (int) – number of candidates at the sampling step.
num_elites (int) – number of elites for computing the new gaussian distribution.
num_iterations (int) – number of optimization iterations.
alpha (float) – parameter for soft updating the gaussian distribution.
- Returns:
mean of elites samples and top of elites samples, Both have (batch_size, dim)
- Return type:
Tuple[nn.Variable, nn.Variable]
Note
If you want to optimize a time sequence action such as (time_steps, action_dim). You can use this optimization function by transforming the action to (time_steps*action_dim). For example,
def objective_function(time_seq_action): # time_seq_action.shape = (batch_size, sample_size, time_steps*action_dim) # Implement the way to compute some value such as costs. batch_size = 1 time_steps = 2 action_dim = 1 init_mean = nn.Variable.from_numpy_array(np.zeros((batch_size, time_steps*action_dim))) init_var = nn.Variable.from_numpy_array(np.ones((batch_size, time_steps*action_dim))) optimal_x, _ = RF.gaussian_cross_entropy_method(objective_function, init_mean, init_var, alpha=0) optimal_x.forward() # (1, 2) == (batch_size, time_steps*action_dim) print(optimal_x.shape)
- nnabla_rl.functions.random_shooting_method(objective_function: Callable[[Variable], Variable], upper_bound: ndarray, lower_bound: ndarray, sample_size: int = 500) Variable [source]¶
Optimize objective function with respect to the variable using random shooting method. Candidates are sampled from a uniform distribution \(x \sim U(lower\:bound, upper\:bound)\).
Examples
>>> import numpy as np >>> import nnabla as nn >>> import nnabla.functions as NF >>> import nnabla_rl.functions as RF >>> def objective_function(x): return -((x - 3.)**2) # this function will be called with x which has (batch_size, sample_size, x_dim) >>> batch_size = 1 >>> variable_size = 1 >>> upper_bound = np.ones((batch_size, variable_size)) * 3.5 >>> lower_bound = np.ones((batch_size, variable_size)) * 2.5 >>> optimal_x = RF.random_shooting_method(objective_function, upper_bound, lower_bound) >>> optimal_x.forward() >>> optimal_x.shape (1, 1) # (batch_size, variable_size) >>> np.allclose(optimal_x.d, np.array([[3.]]), atol=1e-1) True
- Parameters:
objective_function (Callable[[nn.Variable], nn.Variable]) – objective function, this function will be called with nn.Variable which has (batch_size, sample_size, dim) during the optimization process, and should return nn.Variable such as costs which has (batch_size, sample_size, 1)
upper_bound (np.ndarray) – upper bound of an uniform distribution for sampling candidates of the variables.
lower_bound (np.ndarray) – lower bound of an uniform distribution for sampling candidates of the variables.
sample_size (int) – number of candidates at the sampling step.
- Returns:
argmax sample, shape is (batch_size, dim)
- Return type:
nn.Variable
Note
If you want to optimize a time sequence action such as (time_steps, action_dim). You can use this optimization function by transforming the action to (time_steps*action_dim). For example,
def objective_function(time_seq_action): # time_seq_action.shape = (batch_size, sample_size, time_steps*action_dim) # Implement the way to compute some value such as costs. batch_size = 1 time_steps = 2 action_dim = 1 upper_bound = np.ones((batch_size, time_steps*action_dim)) * 3.5) lower_bound = np.ones((batch_size, time_steps*action_dim)) * 2.5) optimal_x = RF.random_shooting_method(objective_function, upper_bound, lower_bound) optimal_x.forward() # (1, 2) == (batch_size, time_steps*action_dim) print(optimal_x.shape)
- nnabla_rl.functions.triangular_matrix(diagonal: Variable, non_diagonal: Variable | None = None, upper=False) Variable [source]¶
Compute triangular_matrix from given diagonal and non_diagonal elements. If non_diagonal is None, will create a diagonal matrix.
Example
>>> import numpy as np >>> import nnabla as nn >>> import nnabla.functions as NF >>> import nnabla_rl.functions as RF >>> diag_size = 3 >>> batch_size = 2 >>> non_diag_size = diag_size * (diag_size - 1) // 2 >>> diagonal = nn.Variable.from_numpy_array(np.ones(6).astype(np.float32).reshape((batch_size, diag_size))) >>> non_diagonal = nn.Variable.from_numpy_array(np.arange(batch_size*non_diag_size).astype(np.float32).reshape((batch_size, non_diag_size))) >>> diagonal.d array([[1., 1., 1.], [1., 1., 1.]], dtype=float32) >>> non_diagonal.d array([[0., 1., 2.], [3., 4., 5.]], dtype=float32) >>> lower_triangular_matrix = RF.triangular_matrix(diagonal, non_diagonal) >>> lower_triangular_matrix.forward() >>> lower_triangular_matrix.d array([[[1., 0., 0.], [0., 1., 0.], [1., 2., 1.]], [[1., 0., 0.], [3., 1., 0.], [4., 5., 1.]]], dtype=float32)
- Parameters:
diagonal (nn.Variable) – diagonal elements of lower triangular matrix. It’s shape must be (batch_size, diagonal_size).
non_diagonal (nn.Variable or None) – non-diagonal part of lower triangular elements. It’s shape must be (batch_size, diagonal_size * (diagonal_size - 1) // 2).
upper (bool) – If true will create an upper triangular matrix. Otherwise will create a lower triangular matrix.
- Returns:
lower triangular matrix constructed from given variables.
- Return type:
nn.Variable
- nnabla_rl.functions.batch_flatten(x: Variable) Variable [source]¶
Collapse the variable shape into (batch_size, rest).
Example
>>> import numpy as np >>> import nnabla as nn >>> import nnabla_rl.functions as RF >>> variable_shape = (3, 4, 5, 6) >>> x = nn.Variable.from_numpy_array(np.random.normal(size=variable_shape)) >>> x.shape (3, 4, 5, 6) >>> flattened_x = RF.batch_flatten(x) >>> flattened_x.shape (3, 120)
- Parameters:
x (nn.Variable) – N-D array
- Returns:
Flattened variable.
- Return type:
nn.Variable
- nnabla_rl.functions.pytorch_equivalent_gather(x: Variable, indices: Variable, axis: int) Variable [source]¶
Pytorch equivalent gather function. Gather according to given indices from x.
See https://pytorch.org/docs/stable/generated/torch.gather.html for details.
The shape of x and indices should be the same except for the given axis’ dimension.
- Parameters:
x (nn.Variable) – N-D array. The data to gather from.
indices (nn.Variable) – N-D array. The index of elements to gather.
axis (int) – indexing axis.
- Returns:
gathered (in pytorch’s style) variable.
- Return type:
nn.Variable
- nnabla_rl.functions.concat_interleave(variables: Sequence[Variable], axis: int) Variable [source]¶
Concat given variables along given axis. For example if we have a sequence which consists of 3 variables A, B, C with same size. This function will concat A, B, and C along given axis but interleaving its elements. For example, if you concat 3 variables along axis = 0 this function should return:
>>> interleaved[0::3, ...] == A >>> interleaved[1::3, ...] == B >>> interleaved[2::3, ...] == C
- Parameters:
x (Sequence[nn.Variable]) – sequence of N-D array. The data to concatenate.
axis (int) – concatenating axis.
- Returns:
concatenated variable which elements are interleaved in given axis.
- Return type:
nn.Variable
- nnabla_rl.functions.swapaxes(x: Variable, axis1: int, axis2: int) Variable [source]¶
Interchange two axis of given variable.
- Parameters:
x (Sequence[nn.Variable]) – Target variable to interchange its axis.
axis1 (int) – first axis.
axis2 (int) – second axis.
- Returns:
Interchanged variable.
- Return type:
nn.Variable