"""This class is responsible for switching the control type that is used for
controlling the Panda Robot robot arm and hand. It serves as a wrapper around the
services created by the ROS
`controller_manager <https://wiki.ros.org/controller_manager>`_ class.
Control types:
* `trajectory <https://wiki.ros.org/joint_trajectory_controller/>`_
* `position <https://wiki.ros.org/position_controllers/>`_
* `effort <https://wiki.ros.org/effort_controllers/>`_
"""
import time
from itertools import compress
import rospy
from controller_manager_msgs.srv import (
ListControllers,
ListControllersRequest,
LoadController,
LoadControllerRequest,
SwitchController,
SwitchControllerRequest,
)
from rospy.exceptions import ROSException, ROSInterruptException
from panda_gazebo.common import ControllerInfoDict
from panda_gazebo.common.helpers import (
dict_clean,
flatten_list,
get_unique_list,
ros_exit_gracefully,
)
# Global script vars.
[docs]ARM_CONTROLLERS = {
"end_effector": "panda_arm_trajectory_controller",
"trajectory": "panda_arm_trajectory_controller",
"position": "panda_arm_joint_position_controller",
"effort": "panda_arm_joint_effort_controller",
}
[docs]HAND_CONTROLLERS = {
"position": "franka_gripper",
}
[docs]CONTROLLER_DICT = {"arm": ARM_CONTROLLERS, "hand": HAND_CONTROLLERS}
[docs]class ControllerSwitcherResponse:
"""Class used for returning the result of the :meth:`PandaControlSwitcher.switch`
method.
Attributes:
success (bool): Specifies whether the switch operation was successful.
prev_control_type (str): The previous used control type.
"""
def __init__(self, success=True, prev_control_type=""):
"""Initialise ControllerSwitcherResponse object.
Args:
success (bool, optional): Whether the switch operation was successful.
Defaults to ``True``.
prev_control_type (str, optional): The previous used control type. Defaults
to ``""``.
"""
self.success = success
self.prev_control_type = prev_control_type
[docs]class PandaControlSwitcher(object):
"""Used for switching the Panda robot controllers.
Attributes:
verbose (bool): Boolean specifying whether we want to display log messages
during switching.
arm_control_types (list): List of currently active arm control types.
hand_control_types (list): List of currently active hand control types.
"""
def __init__(self, connection_timeout=10, verbose=True, robot_name_space=""):
"""Initialise PandaControlSwitcher object.
Args:
connection_timeout (str, optional): The timeout for connecting to the
controller_manager services. Defaults to `10` sec.
verbose (bool, optional): Whether to display debug log messages. Defaults to
``True``.
robot_name_space (string, optional): The namespace the robot, and thus the
'controller_manager' is on. Defaults to ``""``.
"""
self.verbose = verbose
self._controller_spawner_wait_timeout = 5
# Connect to controller_manager services.
try:
switch_controller_srv_topic = "%s/controller_manager/switch_controller" % (
robot_name_space
)
rospy.logdebug(
"Connecting to '%s' service." % (switch_controller_srv_topic)
)
rospy.wait_for_service(
switch_controller_srv_topic, timeout=connection_timeout
)
self._switch_controller_client = rospy.ServiceProxy(
switch_controller_srv_topic, SwitchController
)
rospy.logdebug("Connected to '%s' service!" % switch_controller_srv_topic)
list_controllers_srv_topic = "%s/controller_manager/list_controllers" % (
robot_name_space
)
rospy.logdebug("Connecting to '%s' service." % list_controllers_srv_topic)
rospy.wait_for_service(
list_controllers_srv_topic, timeout=connection_timeout
)
self._list_controller_client = rospy.ServiceProxy(
list_controllers_srv_topic, ListControllers
)
rospy.logdebug("Connected to '%s' service!" % list_controllers_srv_topic)
load_controller_srv_topic = "%s/controller_manager/load_controller" % (
robot_name_space
)
rospy.logdebug("Connecting to '%s' service." % load_controller_srv_topic)
rospy.wait_for_service(
load_controller_srv_topic, timeout=connection_timeout
)
self._load_controller_client = rospy.ServiceProxy(
load_controller_srv_topic, LoadController
)
rospy.logdebug("Connected to '%s' service!" % load_controller_srv_topic)
except (rospy.ServiceException, ROSException, ROSInterruptException) as e:
err_msg = (
"Shutting down '%s' because no connection could be established "
"with the '%s' service and this service is needed "
"when using 'position'."
% (
rospy.get_name(),
"/" + e.args[0].split(" /")[1],
)
)
ros_exit_gracefully(shutdown_message=err_msg, exit_code=1)
def _list_controllers_state(self): # noqa: C901
"""Get information about the currently running and loaded controllers.
Returns:
dict: Dictionary containing information about which controllers are
currently running or loaded divided by control group arm/hand and
other).
"""
list_controllers_resp = self._list_controller_client.call(
ListControllersRequest()
)
# Check which Panda controllers are running.
controllers_state = ControllerInfoDict()
for controller in list_controllers_resp.controller:
categorized = False
# Add Panda controllers to controllers_state.
for control_group, control_group_items in CONTROLLER_DICT.items():
for control_type, controller_names in control_group_items.items():
if controller.name in controller_names:
categorized = True
if controller.state == "running":
controllers_state[control_group]["running"][
control_type
].append(controller.name)
elif controller.state == "initialized":
controllers_state[control_group]["loaded"][
control_type
].append(controller.name)
elif controller.state == "stopped":
controllers_state[control_group]["stopped"][
control_type
].append(controller.name)
# Add non Panda controllers to controllers_state.
if not categorized:
if controller.state == "running":
controllers_state["other"]["running"][control_type].append(
controller.name
)
elif controller.state == "initialized":
controllers_state["other"]["loaded"][control_type].append(
controller.name
)
elif controller.state == "stopped":
controllers_state["other"]["stopped"][control_type].append(
controller.name
)
return dict_clean(controllers_state)
def _load(self, controllers):
"""Load controllers. For this to work the parameters of the controllers
have to be loaded onto the ROS parameter server.
Args:
controllers (union[str, list]): The name of the controller you want to load.
Returns:
bool: Success boolean or success boolean list.
"""
# Create load_controller request.
if isinstance(controllers, str):
# Send load controller request.
resp = self._load_controller_client(LoadControllerRequest(name=controllers))
return [resp.ok]
elif isinstance(controllers, list):
# Loop through controllers and request to load them.
resp = []
for controller in controllers:
# Send load controller request.
resp.append(
self._load_controller_client(
LoadControllerRequest(name=controller)
).ok
)
return resp
else:
rospy.logwarn(
"Controllers could not be loaded as the data type of the 'controllers' "
f"variable was {type(controllers)} while the 'PandaControlSwitcher' "
"only takes an argument of type 'str' or 'list'."
)
return [False]
@property
[docs] def arm_control_types(self):
"""Returns the currently active arm control types. Returns empty list when no
control type is enabled.
"""
controllers_state = self._list_controllers_state()
arm_state = controllers_state.get("arm", {})
return list(arm_state.get("running", {}).keys())
@property
[docs] def hand_control_types(self):
"""Returns the currently active hand control types. Returns empty when no
control type is enabled.
"""
controllers_state = self._list_controllers_state()
hand_state = controllers_state.get("hand", {})
return list(hand_state.get("running", {}).keys())
[docs] def wait_for_control_type(self, control_group, control_type, timeout=None, rate=10):
"""Function that can be used to wait till all the controllers used for a given
'control_group' and 'control_type' are running. Useful four when you expect a
launch file to load certain controllers.
Args:
control_group (str): The control group of which you want the switch the
control type. Options are ``hand`` or ``arm``.
control_type (str): The robot control type you want to switch to for the
given 'control_group'. Options are: ``trajectory``, ``position`` and
``effort``.
timeout (float, optional): The function timeout. Defaults to ``None`` meaning
the function will wait for ever.
rate (int, optional): The 'control_type' check rate. Defaults to `10`
hz.
Raises:
TimeoutError: Thrown when the set timeout has passed.
"""
start_time = time.time()
timeout = timeout if timeout else float("inf")
while (
(control_group == "arm" and control_type not in self.arm_control_types)
or (control_group == "hand" and control_type not in self.hand_control_types)
) and time.time() - start_time <= timeout:
time.sleep(1.0 / rate)
if time.time() - start_time > timeout:
raise TimeoutError(
f"Control type '{control_type}' for control_group '{control_group}' "
f"was not spawned within the set timeout (i.e. {timeout})."
)
[docs] def switch( # noqa: C901
self,
control_group,
control_type,
load_controllers=True,
timeout=3,
verbose=None,
):
"""Switch Panda robot control type. This function stops all currently running
controllers and starts the required controllers for a given control type.
Args:
control_group (str): The control group of which you want the switch the
control type. Options are ``hand`` or ``arm``.
control_type (str): The robot control type you want to switch to for the
given 'control_group'. Options are: ``trajectory``, ``position``
and ``effort``.
load_controllers (bool): Try to load the required controllers for a given
control_type if they are not yet loaded.
timeout (int, optional): The timeout for switching to a given controller.
Defaults to ``3`` sec.
verbose (bool, optional): Whether to display debug log messages. Defaults
to verbose value set during the class initiation.
Returns:
:obj:`~panda_gazebo.core.control_switcher.ControllerSwitcherResponse`:
Contains information about whether the switch operation was successful
'success' and the previously used controller 'prev_control_type'.
"""
resp = ControllerSwitcherResponse()
verbose = self.verbose if verbose is None else verbose
# Validate input arguments.
control_type = control_type.lower()
control_group = control_group.lower()
if isinstance(control_group, list):
if len(control_group) > 1:
rospy.logwarn(
"Please specify a single control group you want to switch the "
"control type for."
)
resp.success = False
return resp
else:
control_group = control_group[0]
if control_group not in CONTROLLER_DICT:
rospy.logwarn(
f"The '{control_group}' control group you specified is not valid. "
"Valid control groups for the Panda robot are "
f"{list(CONTROLLER_DICT.keys())}."
)
resp.success = False
return resp
if control_type not in CONTROLLER_DICT[control_group]:
rospy.logwarn(
f"The '{control_type} control type you specified is not valid. Valid "
"control types for the Panda robot are "
f"{list(CONTROLLER_DICT[control_group].keys())}."
)
resp.success = False
return resp
# Retrieve information about the currently running and loaded controllers.
# NOTE: Here we wait a bit till we are sure the controller_spawner is ready.
start_time = time.time()
controllers_state = {}
while (
control_group not in controllers_state
or "running" not in controllers_state[control_group]
) and time.time() - start_time <= self._controller_spawner_wait_timeout:
controllers_state = self._list_controllers_state()
running_controllers = controllers_state.get(control_group, {}).get("running")
prev_control_type = ", ".join(running_controllers.keys())
running_state = controllers_state.get(control_group, {}).get("running", {})
running_control_types = list(running_state.keys())
running_controllers = get_unique_list(
flatten_list(list(running_state.values()))
)
stopped_state = controllers_state.get(control_group, {}).get("stopped", {})
stopped_control_types = list(stopped_state.keys())
loaded_state = controllers_state.get(control_group, {}).get("loaded", {})
loaded_control_types = list(loaded_state.keys())
controllers = CONTROLLER_DICT[control_group][control_type]
# Generate switch controller msg and load controllers if needed.
switch_controller_msg = SwitchControllerRequest()
switch_controller_msg.strictness = SwitchControllerRequest.STRICT
switch_controller_msg.timeout = timeout
switch_controller_msg.start_controllers = (
controllers if isinstance(controllers, list) else [controllers]
)
if (
control_type in running_control_types
): # If control type controllers are already running.
if verbose:
rospy.logdebug(
f"Panda {control_group} control type not switched to "
f"'{control_type}' as the Panda robot was already using "
f"'{prev_control_type}'."
)
resp.success = True
resp.prev_control_type = prev_control_type
return resp
elif control_type in stopped_control_types: # If controller was stopped.
switch_controller_msg.stop_controllers = flatten_list(
list(controllers_state[control_group]["running"].values())
)
elif (
control_type not in loaded_control_types
): # Try to load the controllers if not yet loaded.
# Load the required controllers.
if load_controllers:
retval = self._load(controllers)
failed_controllers = list(
compress(
controllers,
[not val for val in retval],
)
)
else:
rospy.logwarn(
f"The Panda {control_group} control group was not switched to "
f"'{control_type}' because the "
f"{controllers} controllers could "
"not be loaded as 'load_controllers' was set to argument was set "
"to 'False'."
)
resp.success = False
resp.prev_control_type = prev_control_type
return resp
# Check if all controllers were loaded successfully.
if len(failed_controllers) == 0:
switch_controller_msg.stop_controllers = running_controllers
else:
if (
control_group == "arm"
and control_type not in self.arm_control_types
) or (
control_group == "hand"
and control_type not in self.hand_control_types
):
rospy.logwarn(
f"The Panda {control_group} control griyo was not switched to "
f"'{control_type}' as the {failed_controllers} controllers "
"could not be loaded."
)
resp.success = False
else:
resp.success = False
resp.prev_control_type = prev_control_type
return resp
else:
switch_controller_msg.stop_controllers = running_controllers
# Send switch_controller msgs.
rospy.logdebug(
f"Switching Panda {control_group} control group to '{control_type}'."
)
retval = self._switch_controller_client(switch_controller_msg)
if retval.ok:
rospy.logdebug(
f"Switching Panda {control_group} control group to "
f"'{control_type}' successful."
)
else:
if (
control_group == "arm" and control_type not in self.arm_control_types
) or (
control_group == "hand" and control_type not in self.arm_control_types
):
rospy.logwarn(
f"Switching Panda {control_group} control type to "
f"'{control_type}' failed."
)
else:
retval.ok = True
resp.success = retval.ok
resp.prev_control_type = prev_control_type
return resp