Source code for panda_gazebo.common.helpers

"""Module containing some additional helper functions used in the
:panda-gazebo:`panda_gazebo <>` package.
"""
import copy
import sys

import control_msgs.msg as control_msgs
import numpy as np
import rospy
from actionlib_msgs.msg import GoalStatusArray
from controller_manager_msgs.srv import ListControllersResponse
from geometry_msgs.msg import Quaternion
from moveit_msgs.msg import MoveItErrorCodes
from numpy import linalg, nan
from rospy.exceptions import ROSException
from sensor_msgs.msg import JointState
from trajectory_msgs.msg import JointTrajectoryPoint

from panda_gazebo.msg import FollowJointTrajectoryGoal


#################################################
# Type conversion functions #####################
#################################################
[docs]def joint_state_dict_2_joint_state_msg(joint_state_dict, type="position"): """Converts a joint_state dictionary into a JointState msgs. Args: joint_state_dict (dict): Dictionary specifying joint values for each joint (key). type (str, optional): The state type. Options are ``velocity``, ``effort`` and ``position``. Defaults to "position". Returns: :obj:`sensor_msgs.msg.JointState`: A JoinState message. Raises: ValueError: If the type is not one of the expected values. """ valid_types = ["velocity", "effort", "position"] if type.lower() not in valid_types: raise ValueError(f"Invalid type. Expected one of: {valid_types}") resp = JointState() resp.name = list(joint_state_dict.keys()) if type.lower() == "velocity": resp.velocity = list(joint_state_dict.values()) elif type.lower() == "effort": resp.effort = list(joint_state_dict.values()) else: resp.position = list(joint_state_dict.values()) return resp
[docs]def action_dict_2_joint_trajectory_msg( action_dict, create_time_axis=True, time_axis_step=0.01 ): """Converts an action dictionary into a panda_gazebo ``FollowJointTrajectoryGoal`` msgs. Args: action_dict (dict): Dictionary containing actions and joints. create_time_axis (bool): Whether you want to automatically create a joint trajectory time axis if it is not yet present. time_axis_step (float): The size of the time steps used for generating the time axis. Returns: :obj:`panda_gazebo.msg.FollowJointTrajectoryGoal`: New FollowJointTrajectoryGoal message. Raises: ValuesError: When the action_dict is invalid. """ goal_msg = FollowJointTrajectoryGoal() goal_msg.create_time_axis = create_time_axis goal_msg.time_axis_step = time_axis_step # Handle multiple waypoints. if all([np.isscalar(item) for item in list(action_dict.values())]): waypoints_actions = np.array( [ list(action_dict.values()), ] ) elif ( all([isinstance(item, np.ndarray) for item in action_dict.values()]) and all([item.ndim == 1 for item in action_dict.values()]) and all( [ item.shape == list(action_dict.values())[0].shape for item in action_dict.values() ] ) ): waypoints_actions = np.transpose(list(action_dict.values())) else: raise ValueError( "Joint trajectory message could not be created since the action values " "in the action dict are invalid. Please make sure that each joint contains " "an equal amount of joint commands, one for each waypoint." ) # Create and return joint trajectory message. for waypoint in waypoints_actions: wp = JointTrajectoryPoint() wp.positions = list(waypoint) goal_msg.trajectory.joint_names = list(action_dict.keys()) goal_msg.trajectory.points.append(wp) return goal_msg
[docs]def panda_action_msg_2_control_msgs_action_msg(panda_action_msg): """Converts a :class:``panda_gazebo.msg.FollowJointTrajectoryActionGoal` action message into a :control_msgs:`control_msgs/FollowJointTrajectoryGoal <html/action/FollowJointTrajectory.html>` action message. Args: panda_action_msg :obj:`control_msgs.msg.FollowJointTrajectoryGoal`: Panda_gazebo follow joint trajectory goal message. Returns: :obj:`control_msgs.msg.FollowJointTrajectoryGoal`: Control_msgs follow joint trajectory goal message. Raises: TypeError: If panda_action_msg is not of type :class:`panda_gazebo.msg.FollowJointTrajectoryGoal`. """ if not isinstance(panda_action_msg, FollowJointTrajectoryGoal): raise TypeError( "panda_action_msg must be of type " "panda_gazebo.msg.FollowJointTrajectoryGoal" ) control_msgs_action_msg = control_msgs.FollowJointTrajectoryGoal() control_msgs_action_msg.trajectory = panda_action_msg.trajectory control_msgs_action_msg.goal_time_tolerance = panda_action_msg.goal_time_tolerance control_msgs_action_msg.goal_tolerance = panda_action_msg.goal_tolerance control_msgs_action_msg.path_tolerance = panda_action_msg.path_tolerance return control_msgs_action_msg
[docs]def controller_list_array_2_dict(controller_list_msgs): """Converts a :controller_manager_msgs:`Controller_manager/list_controllers <html/srv/ListControllers.html>` message into a controller information dictionary. Args: controller_list_msgs (:obj:`controller_manager_msgs.srv.ListControllersResponse`): Controller_manager/list_controllers service response message. Returns: dict: Dictionary containing information about all the available controllers. Raises: TypeError: If controller_list_msgs is not of type controller_manager_msgs.srv.ListControllersResponse. """ # noqa: E501 if not isinstance(controller_list_msgs, ListControllersResponse): raise TypeError( "controller_list_msgs must be of type " "controller_manager_msgs.srv.ListControllersResponse" ) controller_list_dict = {} for controller in controller_list_msgs.controller: controller_name = controller.name controller_list_dict[controller_name] = copy.deepcopy(controller) return controller_list_dict
[docs]def translate_actionclient_result_error_code(actionclient_retval): """Translates the error code returned by the SimpleActionClient.get_result() function into a human readable error message. Args: actionclient_retval (:obj:`control_msgs.msg.FollowJointTrajectoryResult`): The result that is returned by the :func:`actionlib.simple_action_client.SimpleActionClient.get_result()` function. Returns: str: Error string that corresponds to the error code. Raises: TypeError: If actionclient_retval is not of type control_msgs.msg.FollowJointTrajectoryResult. """ if actionclient_retval is not None: if not isinstance( actionclient_retval, control_msgs.FollowJointTrajectoryResult ): raise TypeError( "actionclient_retval must be of type " "control_msgs.msg.FollowJointTrajectoryResult" ) error_dict = { value: attr for attr, value in actionclient_retval.__class__.__dict__.items() if attr[0] != "_" and all(map(str.isupper, attr.replace("_", ""))) } return ( error_dict[actionclient_retval.error_code] .lower() .capitalize() .replace("_", " ") + "." if error_dict[actionclient_retval.error_code] != "SUCCESSFUL" else "" ) return "No goal set"
[docs]def translate_moveit_error_code(moveit_error_code): """Translates a MoveIt error code object into a human readable error message. Args: moveit_error_code (:obj:`~moveit_msgs.msg._MoveItErrorCodes.MoveItErrorCodes`): The MoveIt error code object Returns: str: Error string that corresponds to the error code. Raises: TypeError: If moveit_error_code is not of type moveit_msgs.msg.MoveItErrorCodes. """ if not isinstance(moveit_error_code, MoveItErrorCodes): raise TypeError( "moveit_error_code must be of type moveit_msgs.msg.MoveItErrorCodes" ) error_dict = { value: attr for attr, value in moveit_error_code.__class__.__dict__.items() if attr[0] != "_" and all(map(str.isupper, attr.replace("_", ""))) } return ( error_dict[moveit_error_code.val].lower().capitalize().replace("_", " ") if error_dict[moveit_error_code.val] != "SUCCESSFUL" else "" )
################################################# # List dict and text manipulation functions ##### #################################################
[docs]def lower_first_char(string): """De-capitalize the first letter of a string. Args: string (str): The input string. Returns: str: The de-capitalized string. Raises: TypeError: If text is not of type str. .. note:: This function is not the exact opposite of the capitalize function of the standard library. For example, capitalize('abC') returns Abc rather than AbC. """ if not isinstance(string, str): raise TypeError("string must be of type str") if not string: # Added to handle case where s == None return return string[0].lower() + string[1:]
[docs]def wrap_space_around(text): """Wrap one additional space around text if it is not already present. Args: text (str): Text Returns: str: Text with extra spaces around it. Raises: TypeError: If text is not of type str. """ if not isinstance(text, str): raise TypeError("text must be of type str") if text[0] != " " and text[-1] != " ": return " " + text + " " elif text[0] != " ": return " " + text elif text[-1] != " ": return text + " " return text
[docs]def list_2_human_text(input_list, separator=",", end_separator="&"): """Function converts a list of values into human readable sentence. Example: Using this function a list of 4 items ``[item1, item2, item3, item4]`` becomes ``item2, item3 and item4``. Args: input_list (list): A input list. Returns: str: A human readable string that can be printed. Raises: TypeError: If input_list is not of type list or tuple. """ if not isinstance(input_list, (list, tuple)): raise TypeError("input_list must be of type list or tuple") # Add spaces around separators if not present. separator = wrap_space_around(separator)[1:] end_separator = wrap_space_around(end_separator) # Create human readable comma deliminated text. if isinstance(input_list, list): if len(input_list) > 1: return ( separator.join([str(item) for item in input_list[:-1]]) + end_separator + str(input_list[-1]) ) if len(input_list) == 0: return "" return str(input_list[0]) if isinstance(input_list, tuple): input_list = list(input_list) if len(input_list) > 1: return ( separator.join([str(item) for item in input_list[:-1]]) + end_separator + str(input_list[-1]) ) if len(input_list) == 0: return "" return str(input_list[0]) return input_list
[docs]def dict_clean(input_dict): """Removes empty dictionary keys from a dictionary and returns a cleaned up dictionary. Empty meaning an empty list, string or dict or a None value. Args: input_dict (dict):The input dictionary. Returns: dict: The cleaned dictionary. Raises: TypeError: If input_dict is not of type dict. """ if not isinstance(input_dict, dict): raise TypeError("input_dict must be of type dict") stripped_dict = {} for k, v in input_dict.items(): if isinstance(v, dict): v = dict_clean(v) if v not in ("", None, {}, []): stripped_dict[k] = v return stripped_dict
[docs]def get_unique_list(input_list, trim=True): """Removes non-unique items from a list. Args: input_list (list): The input list. trim (list, optional): Trim empty items. Defaults to ``True``. Returns: list: The new list containing only unique items. Raises: TypeError: If input_list is not of type list. """ if not isinstance(input_list, list): raise TypeError("input_list must be of type list") if trim: return list({item for item in input_list if item != ""}) return list({item for item in input_list})
[docs]def get_duplicate_list(input_list): """Returns the duplicates in a list. Args: input_list (list): The input list. Returns: list: The new list containing only the items that had duplicates. Raises: TypeError: If input_list is not of type list. """ if not isinstance(input_list, list): raise TypeError("input_list must be of type list") return list(set([x for x in input_list if input_list.count(x) > 1]))
[docs]def flatten_list(input_list): """Function used to flatten a list containing sublists. It does this by calling itself recursively. Args: input_list (list): A list containing strings or other lists. Returns: list: The flattened list. Raises: TypeError: If input_list is not of type list. """ if not isinstance(input_list, list): raise TypeError("input_list must be of type list") flattened_list = [] for list_item in input_list: if type(list_item) is list: flattened_list.extend( flatten_list(list_item) ) # NOTE: Calls itself recursively else: flattened_list.append(list_item) return flattened_list
################################################# # Other functions ############################### #################################################
[docs]def action_server_exists(topic_name): """Checks whether a topic contains an action server is running. Args: topic_name (str): Action server topic name. Returns: bool: Boolean specifying whether the action service exists. Raises: TypeError: If topic_name is not of type str. """ if not isinstance(topic_name, str): raise TypeError("topic_name must be of type str") # Strip action server specific topics from topic name. if topic_name.split("/")[-1] in ["cancel", "feedback", "goal", "result", "status"]: topic_name = "/".join(topic_name.split("/")[:-1]) if topic_name[-1] == "/": topic_name = topic_name[:-1] # Validate if action server topic exists. try: rospy.wait_for_message("%s/status" % topic_name, GoalStatusArray, timeout=5) except ROSException: return False # Check if topic contains action client. exists = False for item in rospy.get_published_topics(): if "%s/status" % topic_name in item[0]: if "actionlib_msgs" in item[1]: exists = True else: exists = False return exists
[docs]def quaternion_norm(quaternion): """Calculates the norm of a quaternion. Args: Quaternion (:obj:`geometry_msgs.msg.Quaternion`): A quaternion. Returns: float: The norm of the quaternion. Raises: TypeError: If quaternion is not of type Quaternion. """ if not isinstance(quaternion, Quaternion): raise TypeError("quaternion must be of type Quaternion") return linalg.norm([quaternion.x, quaternion.y, quaternion.z, quaternion.w])
[docs]def normalize_quaternion(quaternion): """Normalizes a given quaternion. Args: quaternion (:obj:`geometry_msgs.msg.Quaternion`): A quaternion. Returns: :obj:`geometry_msgs.msg.Quaternion`: The normalized quaternion. Raises: TypeError: If quaternion is not of type Quaternion. """ if not isinstance(quaternion, Quaternion): raise TypeError("quaternion must be of type Quaternion") quaternion = copy.deepcopy( quaternion ) # Make sure the original object is not changed. norm = quaternion_norm(quaternion) # Normalize quaternion. if norm == nan: # test. rospy.logwarn( "Quaternion could not be normalized since the norm could not be " "calculated." ) elif norm == 0.0: # Transform to identity. quaternion.x = 0.0 quaternion.y = 0.0 quaternion.z = 0.0 quaternion.w = 1.0 else: quaternion.x = quaternion.x / norm quaternion.y = quaternion.y / norm quaternion.z = quaternion.z / norm quaternion.w = quaternion.w / norm return quaternion
[docs]def vector_norm(vector): """Calculates the norm of a vector. Args: vector (union[list, tuple, np.array]): A vector. Returns: float: The norm of the vector. Raises: TypeError: If vector is not of type list or tuple. """ if not isinstance(vector, (list, tuple, np.ndarray)): raise TypeError("vector must be of type list, tuple or numpy array") vector = np.array(vector) return np.linalg.norm(vector)
[docs]def normalize_vector(vector, force=True): """Normalizes a given vector. Args: vector (union[list, tuple]): A vector. force (bool): Whether to force the vector to have a unit length if the norm is zero. Defaults to ``True``. Returns: list: The normalized vector. Raises: TypeError: If vector is not of type list or tuple. """ if not isinstance(vector, (list, tuple, np.ndarray)): raise TypeError("vector must be of type list or tuple or numpy array") vector = np.array(vector) norm = vector_norm(vector) if norm == 0: if force: return [0.0, 0.0, 1.0] rospy.logwarn("Vector could not be normalized since the norm is zero.") return list(vector) return list(vector / norm)
[docs]def ros_exit_gracefully(shutdown_msg=None, exit_code=0): """Shuts down the ROS node wait until it is shutdown and exits the script. Args: shutdown_msg (str, optional): The shutdown message. Defaults to ``None``. exit_code (int, optional): The exit code. Defaults to ``0``. """ try: if exit_code == 0: rospy.loginfo(shutdown_msg) else: rospy.logerr(shutdown_msg) rospy.signal_shutdown(shutdown_msg) while not rospy.is_shutdown(): rospy.sleep(0.1) finally: sys.exit(exit_code)
[docs]def load_panda_joint_limits(): """Loads the joint limits of the Panda robot from the parameter server. Returns: dict: Dictionary containing the joint position limits (i.e. min and max) for each joint. Returns an empty dictionary if the joint limits could not be loaded. """ # Load arm joint limits from parameter server. joint_limits = {} for i in range(1, 8): joint_name = f"joint{i}" if rospy.has_param(f"panda_gazebo/panda/{joint_name}"): limits = rospy.get_param(f"panda_gazebo/panda/{joint_name}") joint_limits[f"panda_{joint_name}_min"] = limits["limit"]["lower"] joint_limits[f"panda_{joint_name}_max"] = limits["limit"]["upper"] # Add finger joint limits. joint_limits.update( { "panda_finger_joint1_min": 0.0, "panda_finger_joint1_max": 0.04, "panda_finger_joint2_min": 0.0, "panda_finger_joint2_max": 0.04, } ) return joint_limits