ros_gazebo_gym.core.helpers

Contains several helper functions that are used by the ros_gazebo_gym package for the setup of the ROS Gazebo Gym gymnasium environments.

Module Contents

Classes

GitProgressCallback

A pygit2.RemoteCallbacks callback that shows a git clone progress bar.

PopenAutoCleanup

A subprocess.Popen that cleans up after itself and terminates the

Functions

get_catkin_workspace_path()

Retrieve the path of the currently sourced catkin workspace.

query_yes_no(question[, default])

Ask the user a yes/no question via raw_input() and return the answer.

get_global_pkg_path(package_name[, workspace_path])

Retrieves the global package path. Meaning the path of a package if it is

get_local_pkg_path(package_name, catkin_workspace)

Retrieves the local package path. Meaning the path of a package in the catkin

clone_repo(path, git_src[, branch, recursive])

Clones the repository of the dependency.

is_repo_up_to_date(repo_path)

Checks if the local repository is up to date with the remote repository.

ros_exit_gracefully([shutdown_msg, exit_code])

Shuts down the ROS node wait until it is shutdown and exits the script.

build_catkin_ws(workspace_path[, install_ros_deps])

Builds the catkin workspace and installs system dependencies while doing so if

install_package(package_name[, workspace_path, ...])

Install a given ROS package together with it's dependencies.

load_ros_params_from_yaml(yaml_file_path[, ...])

Loads ros parameters from yaml file.

get_log_path()

Returns the 'ros_gazebo_gym' package log folder path.

Attributes

ROSDEP_INDEX_PATH

ros_gazebo_gym.core.helpers.ROSDEP_INDEX_PATH = '../cfg/rosdep.yaml'[source]
class ros_gazebo_gym.core.helpers.GitProgressCallback[source]

Bases: pygit2.RemoteCallbacks

A pygit2.RemoteCallbacks callback that shows a git clone progress bar.

Initializes the GitProgressCallback class.

transfer_progress(statsTransferProgress)[source]

Displays the current git transfer progress.

Parameters:

statsTransferProgress (pygit2.statsTransferProgress) – The progress up to now.

class ros_gazebo_gym.core.helpers.PopenAutoCleanup(*args, critical=False, **kwargs)[source]

Bases: subprocess.Popen

A subprocess.Popen that cleans up after itself and terminates the process when the main Python script exits. It also allows users to specify whether the process is critical and show an error message when the process is no longer running.

critical

Whether the process is critical and an error message should be shown when the process is no longer running.

Type:

bool

Initializes the PopenAutoCleanup class.

Parameters:
  • *args – The positional arguments that are passed to the subprocess.Popen class.

  • critical (bool, optional) – Whether the process is critical and an error message should be shown when the process is no longer running.

  • **kwargs – The keyword arguments that are passed to the subprocess.Popen class.

terminate()[source]

Terminates the process and stops the critical check thread.

kill()[source]

Kills the process and stops the critical check thread.

ros_gazebo_gym.core.helpers.get_catkin_workspace_path()[source]

Retrieve the path of the currently sourced catkin workspace.

Returns:

The path of the currently sourced catkin workspace. Returns None if no

catkin workspace was sourced.

Return type:

str

ros_gazebo_gym.core.helpers.query_yes_no(question, default='yes')[source]

Ask the user a yes/no question via raw_input() and return the answer.

Parameters:
  • question (str) – String presented to the user.

  • default (str, optional) – The presumed answer if the user just hits <Enter>. It must be “yes” (the default), “no” or None (meaning an answer is required of the user). Defaults to “yes”.

Raises:

ValueError – If the default answer is not correct.

Returns:

Returns the given answer.

Return type:

str

See also

This function was based on the function given by @fmark in this stackoverflow question

ros_gazebo_gym.core.helpers.get_global_pkg_path(package_name, workspace_path=None)[source]

Retrieves the global package path. Meaning the path of a package if it is contained in the global ROS workspace. Returns None if the package is not found.

Parameters:

package_name (str) – The name of the package you want to check.

Returns:

The global package path.

Return type:

str

ros_gazebo_gym.core.helpers.get_local_pkg_path(package_name, catkin_workspace)[source]

Retrieves the local package path. Meaning the path of a package in the catkin workspace. Returns None if package is not found in the catkin workspace.

Parameters:
  • package_name (str) – The name of the package you want to check.

  • catkin_workspace (str) – The catkin_workspace you want to check.

Returns:

The local package path.

Return type:

str

ros_gazebo_gym.core.helpers.clone_repo(path, git_src, branch=None, recursive=False)[source]

Clones the repository of the dependency.

Parameters:
  • path (str) – The path to which you want to clone the repository.

  • git_src (str) – The git repository url.

  • branch (str, optional) – The branch to checkout. Defaults to None.

  • recursive (bool, optional) – After the clone is created, initialize and clone submodules within based on the provided pathspec. Defaults to False.

ros_gazebo_gym.core.helpers.is_repo_up_to_date(repo_path)[source]

Checks if the local repository is up to date with the remote repository.

Parameters:

repo_path (str) – The path of the local repository.

Returns:

Whether the local repository is up to date with the remote repository.

Return type:

bool

ros_gazebo_gym.core.helpers.ros_exit_gracefully(shutdown_msg=None, exit_code=0)[source]

Shuts down the ROS node wait until it is shutdown and exits the script.

Parameters:
  • shutdown_msg (str, optional) – The shutdown message. Defaults to None.

  • exit_code (int, optional) – The exit code. Defaults to 0.

ros_gazebo_gym.core.helpers.build_catkin_ws(workspace_path, install_ros_deps=False)[source]

Builds the catkin workspace and installs system dependencies while doing so if requested.

Parameters:
  • workspace_path (str) – The path of the catkin workspace.

  • install_ros_deps (bool, optional) – Whether you also want to install the system dependencies using rosdep before building the workspace. Defaults to False.

Raises:

Exception – When something goes wrong while re-building the workspace.

ros_gazebo_gym.core.helpers.install_package(package_name, workspace_path=None, outdated_warning=True)[source]

Install a given ROS package together with it’s dependencies.

This function checks if a ROS package is installed and installs it if this is not the case. It uses the ros_gazebo_gym package dependency index to clone the package and dependencies in the local catkin workspace and subsequently rebuilds this workspace. It also throws a warning if a package is installed but not up to date.

Parameters:
  • package_name (str) – The package you want to install.

  • workspace_path (str, optional) – The catkin workspace path. Defaults to None (i.e. path will be determined).

  • outdated_warning (bool, optional) – Whether to show a update warning when the package is outdated. Defaults to True.

Returns:

Whether the package and its dependencies are installed.

Return type:

bool

ros_gazebo_gym.core.helpers.load_ros_params_from_yaml(yaml_file_path, ros_package_name=None)[source]

Loads ros parameters from yaml file.

Parameters:
  • yaml_file_path (str) – The configuration file path. Can be absolute or relative. If relative, the path is relative to the package directory.

  • package_name (str) – The package name that contains the configuration file. Defaults to None.

Raises:

rosparam.RosParamException – If the yaml file could not be loaded or the ROS package could not be found.

ros_gazebo_gym.core.helpers.get_log_path()[source]

Returns the ‘ros_gazebo_gym’ package log folder path.

Returns:

The package log folder path.

Return type:

pathlib.Path