Source code for ros_gazebo_gym.core.ros_launcher

"""Launches all the ROS nodes that are needed for a given
:ros-gazebo-gym:`ros_gazebo_gym <>` gymnasium environment.
"""

import os
import socket
import time
from pathlib import Path

import rosgraph
import rospy
from ros_gazebo_gym.core.helpers import (
    PopenAutoCleanup,
    get_catkin_workspace_path,
    get_global_pkg_path,
    install_package,
    ros_exit_gracefully,
)


[docs]class ROSLauncher(object): """Class used to launch ROS launch files. Attributes: successful (bool): Whether the launch file was successfully launched. *This only specifies if the launchfile was successfully executed not if it encountered errors*. """
[docs] launched = {} # Stores all processes that were launched.
@classmethod
[docs] def initialize(cls): """Make sure a ros master is running and ROS is initialized.""" try: # Ensure that a ROS master is running. if not rosgraph.is_master_online(): rospy.logwarn("No ROS master was found. Starting one in a subprocess.") p = PopenAutoCleanup("roscore", critical=True) state = p.poll() if state is None: rospy.loginfo("ROS master successfully started.") cls.launched["roscore"] = p # Store a reference to the process. elif state != 0: rospy.logerr( "Something went wrong while trying to launch the ROS master." ) ros_exit_gracefully( shutdown_msg=f"Shutting down {rospy.get_name()}", exit_code=1 ) except socket.error: rospy.logerr( "No ROS master was found and none could be started please check your " "system and try again." ) ros_exit_gracefully( shutdown_msg=f"Shutting down {rospy.get_name()}", exit_code=1 ) # Make sure ROS has been initialized. if not rospy.rostime.is_rostime_initialized(): rospy.init_node("ros_gazebo_gym_launcher_node", anonymous=True)
@classmethod
[docs] def list_processes(cls): """List all launched processes.""" for process_name in cls.launched: print(f"{process_name}: {cls.launched[process_name].pid}")
@classmethod
[docs] def terminate(cls, process_name): """Terminate a launched process. Args: process_name (str): The process name. """ if process_name in cls.launched: cls.launched[process_name].terminate() del cls.launched[process_name]
@classmethod
[docs] def terminate_all(cls): """Terminate all launched processes.""" for process_name in cls.launched: cls.launched[process_name].terminate() cls.launched = {}
@classmethod
[docs] def launch( # noqa: C901 cls, package_name, launch_file_name, workspace_path=None, log_file=None, critical=False, wait_time=2, outdated_warning=False, **kwargs, ): """Launch a given launchfile while also installing the launchfile package and or dependencies. This is done by using the ros_gazebo_gym dependency index. Args: package_name (str): The package that contains the launchfile. launch_file_name (str): The launchfile name. workspace_path (str, optional): The path of the catkin workspace. Defaults to ``None`` meaning the path will be determined. log_file(str, optional): The log file to write the ``stdout`` to. Defaults to ``None`` meaning the ``stdout`` will be written to console. critical (bool, optional): Whether the process is critical and an error message should be shown when the process is no longer running. wait_time (int, optional): The time to wait before checking if the was launched successfully and is still running. Defaults to ``2``. outdated_warning (bool, optional): Whether to show a update warning when the package is outdated. Defaults to ``False``. **kwargs: Keyword arguments you want to pass to the launchfile. Raises: Exception: When something went wrong when launching the launchfile. """ if not workspace_path: workspace_path = get_catkin_workspace_path() if not workspace_path: rospy.logerr( "Workspace path could not be found. Please make sure that you source " "the workspace before calling the ROSLauncher or supply it with a " "workspace_path." ) ros_exit_gracefully( shutdown_msg=f"Shutting down {rospy.get_name()}", exit_code=1 ) # Install ROS package and its dependencies if they are not present. try: package_installed = install_package( package_name, workspace_path=workspace_path, outdated_warning=outdated_warning, ) except Exception: rospy.logwarn( f"Something went wrong while trying to install the '{package_name}' " "ROS package and its dependencies." ) package_installed = False # Launch launch file if package was found. if package_installed: # NOTE: Bash prefix needed since sourcing 'setup.sh' doesn't seem to work. rospy.loginfo( f"Starting '{launch_file_name}' launch file from package " f"'{package_name}." ) pkg_name = get_global_pkg_path(package_name) if pkg_name: launch_dir = Path(get_global_pkg_path(package_name)).joinpath("launch") path_launch_file_name = Path(launch_dir).joinpath(launch_file_name) rospy.logdebug(f"Launch file path: {path_launch_file_name}") source_command = ". ./devel/setup.bash;" roslaunch_command = "roslaunch {} {}".format(package_name, launch_file_name) kwargs_command = ( " " + " ".join([f"{key}:={val}" for key, val in kwargs.items()]) if kwargs else "" ) command = source_command + roslaunch_command + kwargs_command rospy.logwarn("Launching command: " + str(command)) # Launch the launchfile using a subprocess. # NOTE: Subprocess used instead of the roslaunch python api since it does # not provide a way to first source the catkin workspace. if log_file is not None: os.makedirs(Path(log_file).parent, exist_ok=True) log_file = open(log_file, "w") p = PopenAutoCleanup( command, critical=critical, executable="/bin/bash", shell=True, cwd=workspace_path, stdout=log_file, stderr=log_file, ) time.sleep(wait_time) # Wait a bit to make sure the process is running. state = p.poll() if state is None: rospy.logdebug("Launch file successfully launched.") cls.launched[launch_file_name] = p # Store a reference to the process. else: rospy.logerr( "Something went wrong while trying to launch the " f"{path_launch_file_name} launch file." ) ros_exit_gracefully( shutdown_msg=f"Shutting down {rospy.get_name()}", exit_code=1 ) else: raise Exception( f"Package '{package_name}' and its dependencies could not be found." )