Source code for ros_gazebo_gym.core.controllers_connection

"""Contains a python utility class that makes it easier to interact with
`ros_control <https://wiki.ros.org/ros_control>`_ controllers.
"""

import rospy
from controller_manager_msgs.srv import (
    ListControllers,
    ListControllersRequest,
    SwitchController,
    SwitchControllerRequest,
)
from ros_gazebo_gym.common.helpers import flatten_list
from ros_gazebo_gym.core.helpers import ros_exit_gracefully
from rosgraph_msgs.msg import Clock
from rospy.exceptions import ROSException, ROSInterruptException
from std_srvs.srv import Empty

# Script settings.
[docs]CONNECTION_TIMEOUT = 10
[docs]class ControllersConnection: """Class that contains several methods that can be used to interact with the `ros_control <https://wiki.ros.org/ros_control>`_ controllers. Attributes: list_service_name (str): The name of the controller list service. list_service (:obj:`rospy.impl.tcpros_service.ServiceProxy`): The controller list service. switch_service_name (str): The name of the controller switch service. switch_service (:obj:`rospy.impl.tcpros_service.ServiceProxy`): The controller switch service. """ def __init__(self, namespace="", controllers_list=None): # noqa: C901 """Initialize the ControllersConnection instance. Args: namespace (str, optional): The namespace on which the robot controllers can be found. Defaults to ``""``. controllers_list (list, optional): A list with currently available controllers to look for. Defaults to ``None``, which means that the class will try to retrieve all the running controllers. """ rospy.logwarn("Initialize ControllersConnection utility class...") self._controller_list = controllers_list self._paused_controllers = [] self._gazebo_paused_check_timeout = 0.2 self._list_controllers_service_name = ( f"{namespace}/controller_manager/list_controllers" ) try: rospy.logdebug( "Connecting to '%s' service." % self._list_controllers_service_name ) rospy.wait_for_service( self._list_controllers_service_name, timeout=CONNECTION_TIMEOUT, ) self._list_controllers_proxy = rospy.ServiceProxy( self._list_controllers_service_name, ListControllers ) rospy.logdebug( "Connected to '%s' service!" % self._list_controllers_service_name ) except (rospy.ServiceException, ROSException, ROSInterruptException): error_msg = ( f"Shutting down '{rospy.get_name()}' since no connection could be " f"established with the {self._list_controllers_service_name} service!" ) ros_exit_gracefully(shutdown_msg=error_msg, exit_code=1) self._switch_controller_service_name = ( f"{namespace}/controller_manager/switch_controller" ) try: rospy.logdebug( "Connecting to '%s' service." % self._switch_controller_service_name ) rospy.wait_for_service( self._switch_controller_service_name, timeout=CONNECTION_TIMEOUT, ) self._switch_controller_proxy = rospy.ServiceProxy( self._switch_controller_service_name, SwitchController ) rospy.logdebug( "Connected to '%s' service!" % self._switch_controller_service_name ) except (rospy.ServiceException, ROSException, ROSInterruptException): error_msg = ( f"Shutting down '{rospy.get_name()}' since no connection could be " f"established with the {self._list_controllers_service_name} service!" ) ros_exit_gracefully(shutdown_msg=error_msg, exit_code=1) self._gazebo_pause_service_name = "/gazebo/pause_physics" try: rospy.logdebug( "Connecting to '%s' service." % self._gazebo_pause_service_name ) rospy.wait_for_service( self._gazebo_pause_service_name, timeout=CONNECTION_TIMEOUT, ) self._pause_proxy = rospy.ServiceProxy( self._gazebo_pause_service_name, Empty ) rospy.logdebug( "Connected to '%s' service!" % self._gazebo_pause_service_name ) except (rospy.ServiceException, ROSException, ROSInterruptException): rospy.logwarn( "Failed to connect to '%s' service!" % self._gazebo_pause_service_name ) self._gazebo_unpause_service_name = "/gazebo/unpause_physics" try: rospy.logdebug( "Connecting to '%s' service." % self._gazebo_unpause_service_name ) rospy.wait_for_service( self._gazebo_unpause_service_name, timeout=CONNECTION_TIMEOUT, ) self._unpause_proxy = rospy.ServiceProxy( self._gazebo_unpause_service_name, Empty ) rospy.logdebug( "Connected to '%s' service!" % self._gazebo_unpause_service_name ) except (rospy.ServiceException, ROSException, ROSInterruptException): rospy.logwarn( "Failed to connect to '%s' service!" % self._gazebo_unpause_service_name ) rospy.logwarn("ControllersConnection utility class initialised.")
[docs] def switch_controllers( self, controllers_on, controllers_off, strictness=1, timeout=0.0 ): """Function used to switch controllers on and off. Args: controllers_on (list): The controllers you want to turn on. controllers_off (list): The controllers you want to turn off. strictness (int, optional): Whether the switching will fail if anything goes wrong. Defaults to ``1``. timeout (float): The timeout before the request is cancelled. Defaults to ``0.0`` meaning no timeout. Returns: bool: Boolean specifying whether the switch was successful. """ rospy.wait_for_service(self._switch_controller_service_name) try: switch_request_object = SwitchControllerRequest() switch_request_object.start_controllers = controllers_on switch_request_object.stop_controllers = controllers_off switch_request_object.strictness = strictness switch_request_object.timeout = timeout switch_result = self._switch_controller_proxy(switch_request_object) # NOTE: When a ROS Gazebo simulation is present and it is paused we briefly # unpause it. This is needed for the `controller_manager/switch_controller` # service to work. if not switch_result.ok and self.gazebo and self.gazebo_paused: self._unpause_proxy() switch_result = self._switch_controller_proxy(switch_request_object) self._pause_proxy() rospy.logdebug("Switch Result==>" + str(switch_result.ok)) return switch_result.ok except rospy.ServiceException: print(self._switch_controller_service_name + " service call failed") return None
[docs] def reset_controllers(self, timeout=1.0): """Resets the currently running controllers by turning them off and on. Args: timeout (float): The timeout before the request is cancelled. Defaults to ``0.0`` meaning no timeout. """ # Try to find all running controllers controller_list was not supplied. if self._controller_list is None: list_controllers_msg = self._list_controllers_proxy.call( ListControllersRequest() ) controllers_list = [ controller.name for controller in list_controllers_msg.controller if controller.state == "running" ] else: controllers_list = self._controller_list # Reset the running controllers. reset_result = False rospy.logdebug("Deactivating controllers") result_off_ok = self.switch_controllers( controllers_on=[], controllers_off=controllers_list, timeout=timeout ) if result_off_ok: rospy.logdebug("Re-activating controllers") result_on_ok = self.switch_controllers( controllers_on=controllers_list, controllers_off=[] ) if result_on_ok: rospy.logdebug("Controllers reset==>" + str(controllers_list)) reset_result = True else: rospy.logdebug("result_on_ok==>" + str(result_on_ok)) else: rospy.logdebug("result_off_ok==>" + str(result_off_ok)) return reset_result
[docs] def pause_controllers(self, controller_list=None, filter_list=[]): """Pauses controllers. Args: controller_list (list, optional): The controllers you want to pause. Defaults to ``None``, which means that the class will pause all the running controllers. filter_list (list, optional): The controllers you want to ignore when pausing. Defaults to ``[]``. """ # Retrieve all running controllers if controller_list was not supplied. if controller_list is None: if self._controller_list is None: list_controllers_msg = self._list_controllers_proxy.call( ListControllersRequest() ) controllers_list = [ controller.name for controller in list_controllers_msg.controller if controller.state == "running" ] else: controllers_list = self._controller_list # Filter out the controllers that should be ignored. controllers_list = [ controller for controller in controllers_list if controller not in filter_list ] # Pause the running controllers. pause_result = False rospy.logdebug("Pausing controllers") self._paused_controllers = controllers_list result_off_ok = self.switch_controllers( controllers_on=[], controllers_off=controllers_list ) if result_off_ok: rospy.logdebug("Controllers paused==>" + str(controllers_list)) pause_result = True else: rospy.logdebug("result_off_ok==>" + str(result_off_ok)) return pause_result
[docs] def unpause_controllers(self): """Unpauses all the paused controllers. Returns: bool: Boolean specifying whether the unpause was successful. """ if self._paused_controllers: unpause_result = False rospy.logdebug("Unpausing controllers") result_on_ok = self.switch_controllers( controllers_on=self._paused_controllers, controllers_off=[] ) if result_on_ok: rospy.logdebug( "Controllers unpaused==>" + str(self._paused_controllers) ) unpause_result = True self._paused_controllers = [] else: rospy.logdebug("result_on_ok==>" + str(result_on_ok)) return unpause_result else: rospy.logwarn("No controllers to unpause!") return False
@property
[docs] def controllers_list(self): """Returns the list of available controllers.""" return self._controllers_list
@controllers_list.setter def controllers_list(self, new_controllers_list): """Updates the list of available controllers.""" self._controllers_list = new_controllers_list @property
[docs] def gazebo(self): """Returns whether a ROS Gazebo simulation is running.""" return any( ["/gazebo" in topic for topic in flatten_list(rospy.get_published_topics())] )
@property
[docs] def gazebo_paused(self): """Returns whether the Gazebo simulation is paused.""" if self.gazebo and any( ["/clock" in topic for topic in flatten_list(rospy.get_published_topics())] ): try: rospy.wait_for_message( "/clock", Clock, timeout=self._gazebo_paused_check_timeout ) return False except ROSException: return True return False