Source code for ros_gazebo_gym.task_envs.panda.panda_reach

r"""An ROS Panda reach gymnasium environment.

.. image:: /images/panda/panda_reach_env.png
   :alt: Panda reach environment

Observation space:
    As the panda environment inherits from the `gym.GoalEnv`_ class, the observation
    space is a dictionary.

    Type: Dict
        - **observation** (:obj:`numpy.ndarray`): The current end-effector pose, joint
          positions and joint velocities.
        - **desired_goal** (:obj:`numpy.ndarray`): The desired end-effector pose.
        - **achieved_goal** (:obj:`numpy.ndarray`): The achieved end-effector pose.

.. _`gym.GoalEnv`: https://robotics.farama.org/content/multi-goal_api/#goalenv

Action space:
    The action space of the panda environment is dependent on the control type and
    whether the gripper is loaded. The following action spaces are available:

    **Joint trajectory control**:
        Type: Box(7)
            - **panda_joint1** (:obj:`float`): The position of the first joint.
            - **panda_joint2** (:obj:`float`): The position of the second joint.
            - **panda_joint3** (:obj:`float`): The position of the third joint.
            - **panda_joint4** (:obj:`float`): The position of the fourth joint.
            - **panda_joint5** (:obj:`float`): The position of the fifth joint.
            - **panda_joint6** (:obj:`float`): The position of the sixth joint.
            - **panda_joint7** (:obj:`float`): The position of the seventh joint.

    **Joint position control**:
        Type: Box(7)
            - **panda_joint1** (:obj:`float`): The position of the first joint.
            - **panda_joint2** (:obj:`float`): The position of the second joint.
            - **panda_joint3** (:obj:`float`): The position of the third joint.
            - **panda_joint4** (:obj:`float`): The position of the fourth joint.
            - **panda_joint5** (:obj:`float`): The position of the fifth joint.
            - **panda_joint6** (:obj:`float`): The position of the sixth joint.
            - **panda_joint7** (:obj:`float`): The position of the seventh joint.

    **Joint effort control**:
        Type: Box(7)
            - **panda_joint1** (:obj:`float`): The effort of the first joint.
            - **panda_joint2** (:obj:`float`): The effort of the second joint.
            - **panda_joint3** (:obj:`float`): The effort of the third joint.
            - **panda_joint4** (:obj:`float`): The effort of the fourth joint.
            - **panda_joint5** (:obj:`float`): The effort of the fifth joint.
            - **panda_joint6** (:obj:`float`): The effort of the sixth joint.
            - **panda_joint7** (:obj:`float`): The effort of the seventh joint.

    **End-effector position control**:
        Type: Box(7)
            - **x** (:obj:`float`): The x position of the end-effector.
            - **y** (:obj:`float`): The y position of the end-effector.
            - **z** (:obj:`float`): The z position of the end-effector.
            - **rx** (:obj:`float`): The x component of the quaternion orientation of the
              end-effector.
            - **ry** (:obj:`float`): The y component of the quaternion orientation of the
              end-effector.
            - **rz** (:obj:`float`): The z component of the quaternion orientation of the
              end-effector.
            - **rw** (:obj:`float`): The w component of the quaternion orientation of the
              end-effector.

    If the gripper is loaded, the action space is extended with the following
    dimensions:

    Type: Box(2)
        - **gripper_width** (:obj:`float`): The width of the gripper - only if the
          gripper is loaded.
        - **gripper_max_effort** (:obj:`float`): The maximum effort of the gripper -
          only if the gripper is loaded.

        .. attention::
            The gripper width is ignored when the ``grasping`` parameter is set to ``true`` in the
            :ros-gazebo-gym:`task environment config file <blob/noetic/src/ros_gazebo_gym/task_envs/panda/config/panda_reach.yaml>`.
            or when the ``gripper_max_effort`` is set to a value greater than zero.

Episode termination:
    The episode terminates when the end-effector is within a certain distance of the
    goal position. The distance is defined by the ``distance_threshold`` parameter in
    the task environment configuration file. If the ``hold_samples`` parameter is
    greater than zero, the episode will terminate after ``hold_samples`` consecutive
    samples are within the ``distance_threshold``. The episode will also terminate if
    the maximum number of samples is reached.

Environment Goal:
    In this environment the agent has to learn to move the panda robot to a given goal
    position. It was based on the :gymnasium-robotics:`FetchReach-v2 <envs/fetch/reach/>`
    gymnasium environment.

Reward function:
    The reward function is defined as the negative of the Euclidean distance between the
    end-effector and the goal position. If the ``positive_reward`` parameter is set to
    ``true``, the absolute value of the reward is returned:

    .. math::
        reward = -\sqrt{(x_{ee} - x_{goal})^2 + (y_{ee} - y_{goal})^2 + (z_{ee} - z_{goal})^2}

Initialization:
    The environment is initialized by loading the Panda robot model and setting its
    initial position and orientation. The environment parameters can be set in the
    configuration file located at `ros_gazebo_gym/task_envs/panda/config/panda_reach.yaml`.

Environment step return:

    In addition to the observations, the reward, and a termination and truncation boolean,
    the environment also returns an info dictionary:

    .. code-block:: python

        [observation, reward, termination, truncation, info_dict]

    The info dictionary contains the following information:

    - **reference**: The reference position (x,y,z) that the Panda Reach is tracking (i.e. the goal position).
    - **state_of_interest**: The state that should track the reference (SOI) (i.e. the end-effector position).
    - **reference_error**: The error between SOI and the reference (i.e. the error between the end-effector position and the goal position).

.. admonition:: Configuration
    :class: important

    The configuration files for this environment are found in the
    :ros-gazebo-gym:`panda task environment config folder <blob/noetic/src/ros_gazebo_gym/task_envs/panda/config/panda_reach.yaml>`.
"""  # noqa: E501

import os
from datetime import datetime
from pathlib import Path

import numpy as np
import rospy
from geometry_msgs.msg import Pose, Quaternion
from gymnasium import spaces, utils
from ros_gazebo_gym.common.helpers import (
    flatten_list,
    gripper_width_2_finger_joints_positions,
    list_2_human_text,
    pose_msg_2_pose_dict,
    shallow_dict_merge,
    split_bounds_dict,
    split_pose_dict,
)
from ros_gazebo_gym.common.markers.sample_region_marker import SampleRegionMarker
from ros_gazebo_gym.common.markers.target_marker import TargetMarker
from ros_gazebo_gym.core import ROSLauncher
from ros_gazebo_gym.core.helpers import (
    get_log_path,
    load_ros_params_from_yaml,
    ros_exit_gracefully,
)
from ros_gazebo_gym.exceptions import EePoseLookupError
from ros_gazebo_gym.robot_envs.panda_env import PandaEnv
from rospy.exceptions import ROSException, ROSInterruptException
from std_msgs.msg import ColorRGBA

# Specify topics and other script variables.
[docs]CONNECTION_TIMEOUT = 5 # Timeout for connecting to services or topics.
[docs]MOVEIT_GET_RANDOM_JOINT_POSITIONS_TOPIC = ( "panda_moveit_planner_server/get_random_joint_positions" )
[docs]MOVEIT_SET_JOINT_POSITIONS_TOPIC = "panda_moveit_planner_server/set_joint_positions"
[docs]MOVEIT_GET_RANDOM_EE_POSE_TOPIC = "panda_moveit_planner_server/get_random_ee_pose"
[docs]MOVEIT_ADD_PLANE_TOPIC = "panda_moveit_planner_server/planning_scene/add_plane"
[docs]SET_FRANKA_MODEL_CONFIGURATION_TOPIC = "set_franka_model_configuration"
[docs]VALID_EE_CONTROL_JOINTS = ["x", "y", "z", "rx", "ry", "rz", "rw"]
[docs]CONFIG_FILE_PATH = "config/panda_reach.yaml"
[docs]PANDA_REST_CONFIGURATION = [ 0, 0, 0, -1.57079632679, 0, 1.57079632679, 0.785398163397, 0.001, 0.001, ]
[docs]LOG_STEP_DEBUG_INFO = False
[docs]AVAILABLE_HAND_COMMANDS = ["gripper_width", "gripper_max_effort"]
################################################# # Panda reach environment class ################# #################################################
[docs]class PandaReachEnv(PandaEnv, utils.EzPickle): """Class that provides all the methods used for the algorithm training. Attributes: action_space (:obj:`gym.spaces.box.Box`): Gym action space object. observation_space (:obj:`gym.spaces.dict.Dict`): Gym observation space object. goal (:obj:`geometry_msgs.PoseStamped`): The current goal. """ _instance_count = 0 # Counts the number of instances that were created. def __init__( # noqa: C901 self, control_type="effort", positive_reward=False, config_path=CONFIG_FILE_PATH, gazebo_world_launch_file="start_reach_world.launch", visualize=None, action_space_dtype=np.float64, observation_space_dtype=np.float64, ): """Initializes a Panda Task Environment. Args: control_Type (str, optional): The type of control you want to use for the panda robot (i.e. hand and arm). Options are: ``trajectory``, ``position``, ``effort`` or ``end_effector``. Defaults to ``effort``. positive_reward (bool, optional): Whether you want to use a positive reward instead of a negative reward. Defaults to ``False``. config_path (str, optional): Path where the environment configuration value are found. The path is resolved relative to the :class:`~ros_gazebo_gym.task_envs.panda.panda_reach` class file. gazebo_world_launch_file (str, optional): Name of the launch file that loads the gazebo world. Currently only the launch files inside the `panda_gazebo <https://github.com/rickstaa/panda-gazebo>`_ package are supported. Defaults to ``start_reach_world.launch``. visualize (bool, optional): Whether you want to show the RViz visualization. Defaults to ``None`` meaning the task configuration file values will be used. action_space_dtype (union[numpy.dtype, str], optional): The data type of the action space. Defaults to ``np.float64``. observation_space_dtype (union[numpy.dtype, str], optional): The data type of the observation space. Defaults to ``np.float64``. .. important:: In this environment, the joint trajectory control is not implemented yet for multiple waypoints. This is because the action space only contains one waypoint. The :obj:`~PandaEnv.set_arm_joint_trajectory` method, however, already accepts multiple waypoints. As a result, task environment can be easily extended to work with multiple waypoints by modifying the :obj:`PandaReachEnv~._create_action_space` method. """ rospy.logwarn("Initialize PandaEnv task environment...") self.__class__._instance_count += 1 if self.__class__._instance_count > 1: rospy.logwarn( "You are trying to create multiple instances of the " f"{self.__class__.__name__} class. Unfortunately, this is not yet " "been implemented and will cause unexpected behaviour. As a result, " "the script will be shut down. Feel free to open a pull request if " "you want to implement this functionality." ) self._positive_reward = positive_reward self._task_env = "panda_reach" # Makes sure the env is pickable when it wraps C++ code. utils.EzPickle.__init__(**locals()) # Makes sure roscore is running and ROS is initialized. ROSLauncher.initialize() # This is the path where the simulation files will be downloaded if not present. workspace_path = rospy.get_param("/panda_reach_v0/workspace_path", None) if workspace_path: assert os.path.exists(workspace_path), ( "The Simulation ROS Workspace path " + workspace_path + " DOESN'T exist, execute: mkdir -p " + workspace_path + "/src;cd " + workspace_path + ";catkin_make" ) # Load Params from the desired Yaml file relative to this TaskEnvironment. rospy.logdebug("Load Panda Reach parameters.") self._config_file_path = Path(__file__).parent.joinpath(config_path) load_ros_params_from_yaml( self._config_file_path, ros_package_name="ros_gazebo_gym", ) self._get_params() # Thrown warning if gazebo is already running. if any( ["/gazebo" in topic for topic in flatten_list(rospy.get_published_topics())] ): err_msg = ( f"Shutting down '{rospy.get_name()}' since a Gazebo instance is " "already running. Unfortunately, spawning multiple Panda simulations " "is not yet supported. Please shut down this instance and try again." ) ros_exit_gracefully(shutdown_msg=err_msg, exit_code=1) # Launch the panda task gazebo environment (Doesn't yet add the robot). # NOTE: This downloads and builds the required ROS packages if not found. launch_log_file = ( str( get_log_path() .joinpath( "{}_{}.log".format( gazebo_world_launch_file.replace(".", "_"), datetime.now().strftime("%d_%m_%Y_%H_%M_%S"), ) ) .resolve() ) if not self._roslaunch_log_to_console else None ) ROSLauncher.launch( package_name="panda_gazebo", launch_file_name=gazebo_world_launch_file, workspace_path=workspace_path, log_file=launch_log_file, critical=True, outdated_warning=True, paused=True, gazebo_gui=self._gazebo_gui, physics=self._physics, ) ######################################## # Initiate Robot environments ########## ######################################## # Initialize the Robot environment. super(PandaReachEnv, self).__init__( robot_EE_link=self._ee_link, ee_frame_offset=self._ee_frame_offset, load_gripper=self._load_gripper, lock_gripper=self._lock_gripper, grasping=self._grasping, control_type=control_type, workspace_path=workspace_path, log_reset=self._log_reset, visualize=visualize, ) # Disable visualize argument so that config can be used during inference. self._ezpickle_kwargs["visualize"] = None # Initialize task environment objects. self._is_done_samples = 0 self._init_model_configuration = {} ######################################## # Connect to required services, ######## # subscribers and publishers. ########## ######################################## # Connect to MoveIt 'get_random_joint_positions' service. try: moveit_get_random_joint_positions_srv_topic = ( f"{self.robot_name_space}/{MOVEIT_GET_RANDOM_JOINT_POSITIONS_TOPIC}" ) rospy.logdebug( "Connecting to '%s' service." % moveit_get_random_joint_positions_srv_topic ) rospy.wait_for_service( moveit_get_random_joint_positions_srv_topic, timeout=CONNECTION_TIMEOUT, ) self._moveit_get_random_joint_positions_client = rospy.ServiceProxy( moveit_get_random_joint_positions_srv_topic, self.panda_gazebo.srv.GetRandomJointPositions, ) rospy.logdebug( "Connected to '%s' service!" % moveit_get_random_joint_positions_srv_topic ) except (rospy.ServiceException, ROSException, ROSInterruptException): rospy.logwarn( "Failed to connect to '%s' service!" % moveit_get_random_joint_positions_srv_topic ) # Connect to MoveIt 'get_random_ee_pose' service. try: moveit_get_random_ee_pose_srv_topic = ( f"{self.robot_name_space}/{MOVEIT_GET_RANDOM_EE_POSE_TOPIC}" ) rospy.logdebug( "Connecting to '%s' service." % moveit_get_random_ee_pose_srv_topic ) rospy.wait_for_service( moveit_get_random_ee_pose_srv_topic, timeout=CONNECTION_TIMEOUT, ) self._moveit_get_random_ee_pose_client = rospy.ServiceProxy( moveit_get_random_ee_pose_srv_topic, self.panda_gazebo.srv.GetRandomEePose, ) rospy.logdebug( "Connected to '%s' service!" % moveit_get_random_ee_pose_srv_topic ) except (rospy.ServiceException, ROSException, ROSInterruptException): rospy.logwarn( "Failed to connect to '%s' service!" % moveit_get_random_ee_pose_srv_topic ) # Connect to MoveIt 'planning_scene/add_plane' service. try: moveit_add_plane_srv_topic = ( f"{self.robot_name_space}/{MOVEIT_ADD_PLANE_TOPIC}" ) rospy.logdebug("Connecting to '%s' service." % moveit_add_plane_srv_topic) rospy.wait_for_service( moveit_add_plane_srv_topic, timeout=CONNECTION_TIMEOUT, ) self._moveit_add_plane_srv = rospy.ServiceProxy( moveit_add_plane_srv_topic, self.panda_gazebo.srv.AddPlane ) rospy.logdebug("Connected to '%s' service!" % moveit_add_plane_srv_topic) except (rospy.ServiceException, ROSException, ROSInterruptException): rospy.logwarn( "Failed to connect to '%s' service!" % moveit_add_plane_srv_topic ) if self._moveit_init_pose_control: # Connect to MoveIt 'planning_scene/set_joint_positions' service. try: moveit_set_joint_positions_srv_topic = ( f"{self.robot_name_space}/{MOVEIT_SET_JOINT_POSITIONS_TOPIC}" ) rospy.logdebug( "Connecting to '%s' service." % moveit_set_joint_positions_srv_topic ) rospy.wait_for_service( moveit_set_joint_positions_srv_topic, timeout=CONNECTION_TIMEOUT, ) self._moveit_set_joint_positions_srv = rospy.ServiceProxy( moveit_set_joint_positions_srv_topic, self.panda_gazebo.srv.SetJointPositions, ) rospy.logdebug( "Connected to '%s' service!" % moveit_set_joint_positions_srv_topic ) except (rospy.ServiceException, ROSException, ROSInterruptException): rospy.logwarn( "Failed to connect to '%s' service!" % moveit_set_joint_positions_srv_topic ) else: # Connect to franka_gazebo's 'set_franka_model_configuration' service. try: set_franka_model_configuration_srv_topic = ( f"{self.robot_name_space}/{SET_FRANKA_MODEL_CONFIGURATION_TOPIC}" ) rospy.logdebug( "Connecting to '%s' service." % set_franka_model_configuration_srv_topic ) rospy.wait_for_service( set_franka_model_configuration_srv_topic, timeout=CONNECTION_TIMEOUT, ) self._set_franka_model_configuration_srv = rospy.ServiceProxy( set_franka_model_configuration_srv_topic, self.franka_msgs.srv.SetJointConfiguration, ) rospy.logdebug( "Connected to '%s' service!" % set_franka_model_configuration_srv_topic ) except (rospy.ServiceException, ROSException, ROSInterruptException): rospy.logwarn( "Failed to connect to '%s' service!" % set_franka_model_configuration_srv_topic ) # Create current target publisher. rospy.logdebug("Creating target pose publisher.") self._target_pose_marker_pub = rospy.Publisher( "/ros_gazebo_gym/current_target", TargetMarker, queue_size=1, latch=True ) rospy.logdebug("Goal target publisher created.") if self._load_rviz: # Create target bounding region publisher. rospy.logdebug("Creating target bounding region publisher.") self._target_sample_region_marker_pub = rospy.Publisher( "/ros_gazebo_gym/target_sample_region", SampleRegionMarker, queue_size=1, latch=True, ) rospy.logdebug("Target bounding region publisher created.") # Create initial pose bounding region publisher. rospy.logdebug("Creating initial pose sample region publisher.") self._init_pose_sample_region_marker__pub = rospy.Publisher( "/ros_gazebo_gym/init_pose_sample_region", SampleRegionMarker, queue_size=1, latch=True, ) rospy.logdebug("Initial pose sample region publisher created.") ######################################## # Initialize RViz visualizations ####### ######################################## # Add ground to MoveIt planning scene. self._add_ground_to_moveit_scene() # Add pose and target sampling bounds to RViz. self._init_rviz_visualizations() ######################################## # Create action and observation space ## ######################################## rospy.logdebug("Setup gymnasium action and observation space.") self._action_space_dtype = action_space_dtype self._observation_space_dtype = observation_space_dtype self._action_dtype_conversion_warning = False self.action_space = self._create_action_space(dtype=self._action_space_dtype) self.goal = self._sample_goal() obs = self._get_obs() self.observation_space = spaces.Dict( dict( observation=spaces.Box( -np.inf, np.inf, shape=obs["observation"].shape, dtype=self._observation_space_dtype, ), desired_goal=spaces.Box( -np.inf, np.inf, shape=obs["desired_goal"].shape, dtype=self._observation_space_dtype, ), achieved_goal=spaces.Box( -np.inf, np.inf, shape=obs["achieved_goal"].shape, dtype=self._observation_space_dtype, ), ) ) # Throw warning if action space joints are locked. locked_action_space_joints = [ joint for joint in self._action_space_joints if joint in self._locked_arm_joints ] if locked_action_space_joints: rospy.logwarn( f"The following joints in the action space are locked: " f"{list_2_human_text(locked_action_space_joints)}. As a result, the " "control applied to these joints will not have any effect." ) rospy.logwarn("PandaEnv task environment initialized.") ################################################ # Task environment internal methods ############ ################################################ # NOTE: Here you can add additional helper methods that are used in the task env. def _init_rviz_visualizations(self): """Add pose and target sampling bounds to RViz.""" # Add goal sampling bounds to RViz. if ( self._target_sampling_strategy != "fixed" and self._load_rviz and self._visualize_target_sampling_bounds ): goal_sample_region_marker_msg = SampleRegionMarker( x_min=self._target_sampling_bounds["x_min"], y_min=self._target_sampling_bounds["y_min"], z_min=self._target_sampling_bounds["z_min"], x_max=self._target_sampling_bounds["x_max"], y_max=self._target_sampling_bounds["y_max"], z_max=self._target_sampling_bounds["z_max"], id=0, ) self._target_sample_region_marker_pub.publish(goal_sample_region_marker_msg) # Add initial pose sampling region to RViz. if ( self._load_rviz and self._visualize_init_pose_bounds and self._init_pose_sampling_bounds is not None and self._pose_sampling_type != "joint_positions" ): init_pose_sample_region_marker_msg = SampleRegionMarker( x_min=self._init_pose_sampling_bounds["x_min"], y_min=self._init_pose_sampling_bounds["y_min"], z_min=self._init_pose_sampling_bounds["z_min"], x_max=self._init_pose_sampling_bounds["x_max"], y_max=self._init_pose_sampling_bounds["y_max"], z_max=self._init_pose_sampling_bounds["z_max"], id=1, ) init_pose_region_color = ColorRGBA() init_pose_region_color.a = 0.15 init_pose_region_color.r = 0.0 init_pose_region_color.g = 0.0 init_pose_region_color.b = 1.0 init_pose_sample_region_marker_msg.color = init_pose_region_color self._init_pose_sample_region_marker__pub.publish( init_pose_sample_region_marker_msg ) def _add_ground_to_moveit_scene(self): """Adds the ground to the MoveIt Planning scence.""" if hasattr(self, "_moveit_add_plane_srv"): add_ground_req = self.panda_gazebo.srv.AddPlaneRequest( name="ground", pose=Pose(orientation=Quaternion(0, 0, 0, 1)), normal=[0, 0, 1], ) resp = self._moveit_add_plane_srv.call(add_ground_req) if not resp.success: rospy.logerr( "The ground was not added successfully to the MoveIt planning " "scene. As a result, MoveIt is unaware of the ground's existence " "and may plan states that are in collision." ) else: err_msg = ( f"Failled to connect to '{MOVEIT_ADD_PLANE_TOPIC}' service! " f"Shutting down '{rospy.get_name()}' as this service is required for " "making MoveIt aware of the ground." ) ros_exit_gracefully(shutdown_msg=err_msg, exit_code=1) def _get_params(self, ns="panda_reach"): # noqa: C901 """Retrieve task environment configuration parameters from parameter server. Args: ns (str, optional): The namespace on which the parameters are found. Defaults to "panda_reach". """ self._task_env = ns try: # == Retrieve control variables == self._direct_control = rospy.get_param( f"/{ns}/control/direct_control", True ) self._load_gripper = rospy.get_param(f"/{ns}/control/load_gripper", True) self._lock_gripper = rospy.get_param(f"/{ns}/control/lock_gripper", False) self._ee_link = rospy.get_param( f"/{ns}/control/ee_link", "panda_hand" if self._load_gripper else "panda_link8", ) self._grasping = rospy.get_param(f"/{ns}/control/grasping", None) self._arm_wait = rospy.get_param(f"/{ns}/control/arm_wait", False) self._hand_wait = rospy.get_param(f"/{ns}/control/hand_wait", True) self._ee_control_coordinates = rospy.get_param( f"/{ns}/control/ee_control_coordinates", None ) self._controlled_joints = rospy.get_param( f"/{ns}/control/controlled_joints", None ) self._locked_arm_joints = rospy.get_param( f"/{ns}/control/locked_arm_joints", [] ) self._ee_frame_offset = rospy.get_param( f"/{ns}/control/ee_frame_offset", None ) if self._ee_frame_offset is not None: self._ee_frame_offset["x"] = self._ee_frame_offset.get("x", 0.0) self._ee_frame_offset["y"] = self._ee_frame_offset.get("y", 0.0) self._ee_frame_offset["z"] = self._ee_frame_offset.get("z", 0.0) self._ee_frame_offset["rx"] = self._ee_frame_offset.get("rx", 0.0) self._ee_frame_offset["ry"] = self._ee_frame_offset.get("ry", 0.0) self._ee_frame_offset["rz"] = self._ee_frame_offset.get("rz", 0.0) self._ee_frame_offset["rw"] = self._ee_frame_offset.get("rw", 1.0) # == Retrieve sampling variables == self._visualize_init_pose_bounds = rospy.get_param( f"/{ns}/pose_sampling/visualize_init_pose_bounds", True ) self._reset_init_pose = rospy.get_param( f"/{ns}/pose_sampling/reset_init_pose", True ) self._random_init_pose = rospy.get_param( f"/{ns}/pose_sampling/random_init_pose", True ) self._randomize_first_episode = rospy.get_param( f"/{ns}/pose_sampling/randomize_first_episode", True ) self._pose_sampling_attempts = rospy.get_param( f"/{ns}/pose_sampling/attempts", 10 ) self._pose_sampling_type = rospy.get_param( f"/{ns}/pose_sampling/pose_sampling_type", "end_effector_pose" ).lower() self._moveit_init_pose_control = rospy.get_param( f"/{ns}/pose_sampling/moveit_control", False ) self._init_pose = rospy.get_param( f"/{ns}/pose_sampling/init_pose", { "x": 0.23, "y": 0.29, "z": 0.35, "rx": 0.78, "ry": 0.62, "rz": -0.0, "rw": 4.42, "panda_joint1": 0.0, "panda_joint2": 0.0, "panda_joint3": 0.0, "panda_joint4": -1.57079632679, "panda_joint5": 0.0, "panda_joint6": 1.57079632679, "panda_joint7": 0.785398163397, "gripper_width": 0.001, }, ) self._init_pose_offset = rospy.get_param( f"/{ns}/pose_sampling/offset", {"x": 0.0, "y": 0.0, "z": 0.0} ) self._init_pose_sampling_bounds = rospy.get_param( f"/{ns}/pose_sampling/bounds", None ) self._target_sampling_strategy = rospy.get_param( f"/{ns}/target_sampling/strategy", "global" ) self._visualize_target = rospy.get_param( f"/{ns}/target_sampling/visualize_target", True ) self._visualize_target_sampling_bounds = rospy.get_param( f"/{ns}/target_sampling/visualize_target_bounds", True ) self._target_offset = rospy.get_param( f"/{ns}/target_sampling/offset", { "x": 0.0, "y": 0.0, "z": 0.0, }, ) self._fixed_target_pose = rospy.get_param( f"/{ns}/target_sampling/fixed_target", { "x": 0.4, "y": 0.0, "z": 0.8, }, ) if self._target_sampling_strategy != "fixed": self._target_sampling_bounds = rospy.get_param( f"/{ns}/target_sampling/bounds/" f"{self._target_sampling_strategy}", { "x_min": -0.7, "x_max": 0.7, "y_min": -0.7, "y_max": 0.7, "z_min": 0.0, "z_max": 1.3, }, ) # == Retrieve reward variables == self._reward_type = rospy.get_param(f"/{ns}/training/reward_type", "sparse") self._target_hold = rospy.get_param(f"/{ns}/training/target_hold", True) self._hold_samples = rospy.get_param(f"/{ns}/training/hold_samples", 2) self._distance_threshold = rospy.get_param( f"/{ns}/training/distance_threshold", 0.05 ) self._collision_penalty = rospy.get_param( f"/{ns}/training/collision_penalty", 0.0 ) # == Retrieve environment variables == self._pause_after_step = rospy.get_param( f"/{ns}/environment/pause_after_step", True ) self._action_bounds = rospy.get_param( f"/{ns}/environment/action_space/bounds", { "ee_pose": { "low": { "x": -1.3, "y": -1.3, "z": 0.0, "rx": 0, "ry": 0, "rz": 0, "rw": 0, }, "high": { "x": 1.3, "y": 1.3, "z": 1.3, "rx": 1, "ry": 1, "rz": 1, "rw": 1, }, }, "joint_positions": { "low": { "panda_joint1": -2.8973, "panda_joint2": -1.7628, "panda_joint3": -2.8973, "panda_joint4": -3.0718, "panda_joint5": -2.8973, "panda_joint6": -0.0175, "panda_joint7": -2.8973, "gripper_width": 0.0, }, "high": { "panda_joint1": 2.8973, "panda_joint2": 1.7628, "panda_joint3": 2.8973, "panda_joint4": -0.0698, "panda_joint5": 2.8973, "panda_joint6": 3.7525, "panda_joint7": 2.8973, "gripper_width": 0.08, }, }, "joint_efforts": { "low": { "panda_joint1": -87.0, "panda_joint2": -87.0, "panda_joint3": -87.0, "panda_joint4": -87.0, "panda_joint5": -12.0, "panda_joint6": -12.0, "panda_joint7": -12.0, "gripper_max_effort": 0.0, }, "high": { "panda_joint1": 87.0, "panda_joint2": 87.0, "panda_joint3": 87.0, "panda_joint4": 87.0, "panda_joint5": 12.0, "panda_joint6": 12.0, "panda_joint7": 12.0, "gripper_max_effort": 140, }, }, }, ) # == Retrieve global variables == self._physics = rospy.get_param(f"/{ns}/physics", "ode").lower() self._load_rviz = rospy.get_param(f"/{ns}/load_rviz", True) self._rviz_file = Path(__file__).parent.joinpath( rospy.get_param( f"/{ns}/rviz_file", "config/moveit.rviz", ) ) self._gazebo_gui = rospy.get_param(f"/{ns}/load_gazebo_gui", True) self._log_reset = rospy.get_param(f"/{ns}/log_reset", False) self._log_step_debug_info = rospy.get_param( f"/{ns}/log_step_debug_info", False ) self._roslaunch_log_to_console = rospy.get_param( f"/{ns}/roslaunch_log_to_console", False ) # == Retrieve other variables == self._max_velocity_scaling_factor = rospy.get_param( "/panda_moveit_planner_server/max_velocity_scaling_factor", 1.0 ) self._max_acceleration_scaling_factor = rospy.get_param( "/panda_moveit_planner_server/max_acceleration_scaling_factor", 1.0 ) except KeyError as e: rospy.logerr( f"Parameter '{e.args[0]}' could not be retrieved from the parameter " "server. Please make sure this parameter is present in the Panda task " f"environment configuration file '{self._config_file_path}' and try " "again." ) ros_exit_gracefully( shutdown_msg=f"Shutting down '{rospy.get_name()}'.", exit_code=1 ) def _robot_get_obs(self): """Returns all joint positions and velocities associated with a robot. Returns: :obj:`numpy.array`: Robot Positions, Robot Velocities """ data = self.joint_states if data.position is not None and data.name: return ( np.array(data.position), np.array(data.velocity), ) else: return np.zeros(0), np.zeros(0) def _goal_distance(self, goal_a, goal_b): """Calculates the perpendicular distance to the goal. Args: goal_a (:obj:`numpy.ndarray`): List containing a gripper and object pose. goal_b (:obj:`numpy.ndarray`): List containing a gripper and object pose. Returns: :obj:`numpy.float32`: Perpendicular distance to the goal. """ assert goal_a.shape == goal_b.shape return np.linalg.norm(goal_a - goal_b, axis=-1) def _get_random_joint_positions(self): """Get valid joint position commands for the Panda arm and hand. Returns: dict: Dictionary containing a valid joint position for each joint. Returns a empty dictionary if no valid joint positions were found. """ if hasattr(self, "_moveit_get_random_joint_positions_client"): req = self.panda_gazebo.srv.GetRandomJointPositionsRequest() req.attempts = self._pose_sampling_attempts # Apply pose bounding region. if ( hasattr(self, "_init_pose_sampling_bounds") and self._init_pose_sampling_bounds is not None ): _, joint_positions_bound_region = split_bounds_dict( self._init_pose_sampling_bounds ) joint_positions_bound_region = gripper_width_2_finger_joints_positions( joint_positions_bound_region, self.joints["hand"] ) joint_limits = self.panda_gazebo.msg.JointLimits() joint_limits.names = list(joint_positions_bound_region.keys()) joint_limits.values = list(joint_positions_bound_region.values()) req.joint_limits = joint_limits # Retrieve random joint pose and return. resp = self._moveit_get_random_joint_positions_client.call(req) if resp.success: joint_positions_dict = dict(zip(resp.joint_names, resp.joint_positions)) return joint_positions_dict else: if resp.message == "Invalid joint limits were given.": err_msg = ( "The joint limits specified in the pose_sampling bounds " "section of the task environment configuration file were " f"invalid '{str(self._config_file_path)}'. Please make sure " "that the pose_sampling joint limits are within the joint " "limits specified in the panda robot description file." ) ros_exit_gracefully(shutdown_msg=err_msg, exit_code=1) return {} else: err_msg = ( f"Failed to connect to '{MOVEIT_GET_RANDOM_JOINT_POSITIONS_TOPIC}' " f"service! Shutting down '{rospy.get_name()}' as this service is " "required for retrieving random joint positions." ) ros_exit_gracefully(shutdown_msg=err_msg, exit_code=1) def _get_random_ee_pose(self): """Get a valid random EE pose that considers the boundaries set in the task environment configuration file. Returns: (tuple): tuple containing: - random_ee_pose (dict): Random EE pose. A empty dictionary is returned when no valid random EE pose could be found. - model_configuration (dict): A set of joint positions that result in this EE pose. A empty dictionary is returned when no valid random EE pose could be found. """ if hasattr(self, "_moveit_get_random_ee_pose_client"): req = self.panda_gazebo.srv.GetRandomEePoseRequest() req.attempts = self._pose_sampling_attempts # Apply pose bounding region. if ( hasattr(self, "_init_pose_sampling_bounds") and self._init_pose_sampling_bounds is not None ): ee_pose_bound_region, _ = split_bounds_dict( self._init_pose_sampling_bounds ) req.bounding_region = self.panda_gazebo.msg.BoundingRegion( **ee_pose_bound_region ) # Retrieve random ee pose and return result. resp = self._moveit_get_random_ee_pose_client.call(req) if resp.success: return pose_msg_2_pose_dict(resp.ee_pose), dict( zip(resp.joint_names, resp.joint_positions) ) else: return {}, {} else: err_msg = ( f"Failled to connect to '{MOVEIT_GET_RANDOM_EE_POSE_TOPIC}' service! " f"Shutting down '{rospy.get_name()}' as this service is required for " "retrieving a random end effector pose." ) ros_exit_gracefully(shutdown_msg=err_msg, exit_code=1) def _clip_goal_position(self, goal_pose): """Limit the possible goal position x, y and z values to a certian range. Args: goal_pose (:obj:`numpy.ndarray`): A numpy array containing the goal x,y and z values. """ min_bounds = [ val for key, val in self._target_sampling_bounds.items() if "min" in key ] max_bounds = [ val for key, val in self._target_sampling_bounds.items() if "max" in key ] clipped_goal_pose = np.clip(goal_pose, a_min=min_bounds, a_max=max_bounds) if any(clipped_goal_pose != goal_pose): rospy.logwarn( "Goal pose clipped since it was not within the set target bounds." ) rospy.logdebug("New goal pose: %s" % clipped_goal_pose) return clipped_goal_pose def _check_config_action_space_joints(self): # noqa: C901 """Validates whether the 'ee_control_coordinates' and 'controlled_joints' fields in the task environment configuration file contain vallid joints/coordinates that can be used as action space joints. Raises: SystemExit: Thrown when the coordinates/joints in the task environment config file are invalid. As a result the ROS script is shutdown. """ # Validate EE coordinates. invalid_ee_coordinates = [] if self.robot_control_type == "end_effector": if self._ee_control_coordinates: for joint in self._ee_control_coordinates: if joint not in VALID_EE_CONTROL_JOINTS: invalid_ee_coordinates.append(joint) # Validate control_joints. invalid_joints = [] if self._controlled_joints: valid_joints = self.joints["arm"] + AVAILABLE_HAND_COMMANDS for joint in self._controlled_joints: if joint not in valid_joints: invalid_joints.append(joint) # Throw error and shutdown node if invalid joint was found. if invalid_ee_coordinates or invalid_joints: error_msg = "Shutting down '%s' since the " % rospy.get_name() if invalid_ee_coordinates and invalid_joints: invalid = flatten_list([invalid_ee_coordinates, invalid_joints]) error_msg += list_2_human_text( ["'" + item + "'" for item in invalid], end_seperator="and", ) error_msg += ( " %s that %s specified in the 'ee_control_coordinates' and " "'controlled_joints' task environment config variables were " "invalid." ) % ( ("coordinate/joint", "was") if len(invalid) == 1 else ("coordinates/joints", "were") ) elif invalid_ee_coordinates: error_msg += list_2_human_text( ["'" + item + "'" for item in invalid_ee_coordinates], end_seperator="and", ) error_msg += ( " %s that %s specified in the 'ee_control_coordinates' task " "environment configuration variable %s invalid." ) % ( ("coordinate", "was", "is") if len(invalid_ee_coordinates) == 1 else ("coordinates", "were", "are") ) else: error_msg += list_2_human_text( ["'" + item + "'" for item in invalid_joints], end_seperator="and", ) error_msg += ( " %s that %s specified in the 'controlled_joints' task " "environment configuration variable %s invalid." ) % ( ("joint", "was", "is") if len(invalid_joints) == 1 else ("joints", "were", "are") ) rospy.logerr(error_msg) ros_exit_gracefully( shutdown_msg=f"Shutting down '{rospy.get_name()}'.", exit_code=1 ) def _get_action_space_joints(self): """Retrieves the joints that are being controlled when we sample from the action space. Returns: list: Joints that are controlled. """ # Validate the action space joints set in the config file. self._check_config_action_space_joints() # Get action space joints if self.robot_control_type == "end_effector": # Get end effector control coordinates. if not self._ee_control_coordinates: action_space_joints = ["x", "y", "z", "rx", "ry", "rz", "rw"] else: action_space_joints = self._ee_control_coordinates # Get gripper control joints. if self._load_gripper: action_space_joints.extend(["gripper_width", "gripper_max_effort"]) else: if self._controlled_joints: controlled_joints = self._controlled_joints if not self._load_gripper: controlled_joints = [ item for item in self._controlled_joints if item not in AVAILABLE_HAND_COMMANDS ] if len(controlled_joints) != len(self._controlled_joints): rospy.logdebug( "Hand gripper commands were removed from the controlled " "joints since the gripper is not loaded." ) action_space_joints = controlled_joints else: action_space_joints = self.joints["arm"] if self._load_gripper: action_space_joints = flatten_list( [AVAILABLE_HAND_COMMANDS, action_space_joints] if self.joints["both"][0] in self.joints["hand"] else [action_space_joints, AVAILABLE_HAND_COMMANDS] ) return action_space_joints def _create_action_space(self, dtype=np.float64): """Create the action space based on the action space size and the action bounds. Args: dtype (numpy.dtype, optional): The data type of the action space. Defaults to np.float64. Returns: :obj:`gym.spaces.Box`: The gymnasium action space. """ # Retrieve action space joints if not supplied. self._action_space_joints = self._get_action_space_joints() # Set action space bounds based on control_type. arm_bounds_key = ( "ee_pose" if self.robot_control_type == "end_effector" else ( "joint_efforts" if self.robot_control_type == "effort" else "joint_positions" ) ) arm_action_bounds_low = { joint: val for joint, val in self._action_bounds[arm_bounds_key]["low"].items() if joint != "gripper_max_effort" } gripper_with_action_bound_low = { joint: val for joint, val in self._action_bounds["joint_positions"]["low"].items() if joint == "gripper_width" } gripper_max_effort_action_bound_low = { joint: val for joint, val in self._action_bounds["joint_efforts"]["low"].items() if joint == "gripper_max_effort" } action_bounds_low = shallow_dict_merge( arm_action_bounds_low, gripper_with_action_bound_low, gripper_max_effort_action_bound_low, order=self._action_space_joints, ) arm_action_bounds_high = { joint: val for joint, val in self._action_bounds[arm_bounds_key]["high"].items() if joint != "gripper_max_effort" } gripper_with_action_bound_high = { joint: val for joint, val in self._action_bounds["joint_positions"]["high"].items() if joint == "gripper_width" } gripper_max_effort_action_bound_high = { joint: val for joint, val in self._action_bounds["joint_efforts"]["high"].items() if joint == "gripper_max_effort" } action_bounds_high = shallow_dict_merge( arm_action_bounds_high, gripper_with_action_bound_high, gripper_max_effort_action_bound_high, order=self._action_space_joints, ) action_bound_low_filtered = np.array( [ val for key, val in action_bounds_low.items() if key in self._action_space_joints ], dtype=dtype, ) action_bound_high_filtered = np.array( [ val for key, val in action_bounds_high.items() if key in self._action_space_joints ], dtype=dtype, ) # Create action space. return spaces.Box( action_bound_low_filtered, action_bound_high_filtered, shape=action_bound_low_filtered.shape, dtype=dtype, ) def _set_panda_configuration(self, joint_names, joint_positions): """Set the panda robot to a given configuration. Args: joint_positions (list): The desired joint positions. joint_names (list): The joint names for which you want to set the joint position. Returns: bool: Whether the panda configuration was successfully set. .. note:: Here we use the ``/set_franka_model_configuration`` service instead of the Gazebo's ``set_model_configuration``. This was done because the latter causes the reported joint positions outside the joint limits. For more information, see `https://github.com/frankaemika/franka_ros/issues/225 <https://github.com/frankaemika/franka_ros/issues/225>`_. """ # noqa: E501 if hasattr(self, "_set_franka_model_configuration_srv"): # Unlock locked joints. if self._locked_arm_joints: self.unlock_joints(self._locked_arm_joints) # Set joint configuration. resp = self._set_franka_model_configuration_srv.call( self.franka_msgs.srv.SetJointConfigurationRequest( joint_names=joint_names, joint_positions=joint_positions ) ) retval = resp.success # Lock locked joints. if self._locked_arm_joints: self.lock_joints(self._locked_arm_joints) else: rospy.logwarn_once( "Panda configuration not set as 'set_franka_model_configuration' " "service is not available." ) retval = False return retval ################################################ # Overload Robot env virtual methods ########### ################################################ # NOTE: Methods that need to be implemented as they are called by the robot and # gazebo environments. def _compute_reward(self, observations, done): """Compute the reward. Args: observations (dict): Dictionary containing the observations. done (bool): Boolean specifying whether an episode is terminated. Returns: :obj:`numpy.float32`: Reward that is received by the agent. .. attention:: The ``done`` argument is not used for computing the reward in this task environment. """ # Calculate the rewards based on the distance from the goal. d = self._goal_distance(observations["achieved_goal"], self.goal) if self._reward_type == "sparse": if self._collision_penalty != 0.0 and self.in_collision: reward = np.float32(-1.0) else: reward = -(d > self._distance_threshold).astype(np.float32) self._step_debug_logger("=Reward info=") self._step_debug_logger("Reward type: Non sparse") self._step_debug_logger("Goal: %s", self.goal) self._step_debug_logger("Achieved goal: %s", observations["achieved_goal"]) self._step_debug_logger("Perpendicular distance: %s", d) self._step_debug_logger("Threshold: %s", self._distance_threshold) self._step_debug_logger( "Received reward: %s", reward, ) return np.abs(reward) if self._positive_reward else reward else: self._step_debug_logger("=Reward info=") self._step_debug_logger("Reward type: Sparse") self._step_debug_logger("Goal: %s", self.goal) self._step_debug_logger("Achieved goal: %s", observations["achieved_goal"]) self._step_debug_logger("Perpendicular distance: %s", d) self._step_debug_logger("Threshold: %s", self._distance_threshold) self._step_debug_logger("Received reward: %s", -d) reward = -d if self._collision_penalty != 0.0 and self.in_collision: reward -= np.float64(self._collision_penalty) return np.abs(reward) if self._positive_reward else reward def _get_obs(self): """Get robot state observation. Returns: tuple containing: - observation array (:obj:`list`): List of observations: - End effector x position - End effector y position - End effector z position - End effector joints positions - End effector joints velocities - achieved_goal (:obj:`object`): The goal that was achieved during execution. - desired_goal (:obj:`object`): The desired goal that we asked the agent to attempt to achieve. """ ee_pose = self.get_ee_pose() ee_position = [ ee_pose.pose.position.x, ee_pose.pose.position.y, ee_pose.pose.position.z, ] robot_qpos, robot_qvel = self._robot_get_obs() ee_state = [ pos for pos, name in zip(robot_qpos, self.joint_states.name) if "panda_finger" in name ] ee_vel = [ vel for vel, name in zip(robot_qvel, self.joint_states.name) if "panda_finger" in name ] obs = np.concatenate( [ ee_position, ee_state, ee_vel, ], dtype=self._observation_space_dtype, ) achieved_goal = np.array(ee_position, dtype=self._observation_space_dtype) desired_goal = self.goal.astype(self._observation_space_dtype) return { "observation": obs, "achieved_goal": achieved_goal, "desired_goal": desired_goal, } def _get_info(self): """Returns a dictionary with additional step information. Returns: dict: Dictionary with additional information. """ ee_pose = self.get_ee_pose() ee_position = [ ee_pose.pose.position.x, ee_pose.pose.position.y, ee_pose.pose.position.z, ] info = { "reference": self.goal, "state_of_interest": ee_position, "reference_error": ee_position - self.goal, } return info def _set_action(self, action): """Take robot action. Args: action (numpy.ndarray): List containing joint or ee action commands. """ # Throw error and shutdown if action space is not the right size. if not action.shape == self.action_space.shape: err_msg = ( f"Shutting down '{rospy.get_name()}' since the shape of the supplied " f"action {action.shape} while the gymnasium action space has shape " f"{self.action_space.shape}." ) ros_exit_gracefully(shutdown_msg=err_msg, exit_code=1) # Change action dtype if needed and throw one time warning. if action.dtype != self._action_space_dtype: if not self._action_dtype_conversion_warning: rospy.logwarn( "The data type of the action that is supplied to the " f"'ros_gazebo_gym:{self.spec.id}' environment ({action.dtype}) " "does not match the data type of the action space " f"({self._action_space_dtype.__name__}). The action data type will " "be converted to the action space data type." ) self._action_dtype_conversion_warning = True action = action.astype(self._action_space_dtype) # Send action commands to the controllers based on control type. action_dict = dict(zip(self._action_space_joints, action)) self._step_debug_logger("=Action set info=") self._step_debug_logger("Action that is set:") self._step_debug_logger(list(action_dict.values())) if self.robot_control_type in ["end_effector", "trajectory"]: gripper_with = action_dict.pop("gripper_width", None) gripper_max_effort = action_dict.pop("gripper_max_effort", None) if self.robot_control_type == "end_effector": self.set_ee_pose(action_dict) if self._load_gripper and not self._lock_gripper: self.set_gripper_width( gripper_with, wait=self._hand_wait, max_effort=gripper_max_effort, ) else: self.set_arm_joint_trajectory(action_dict, wait=self._arm_wait) if self._load_gripper and not self._lock_gripper: self.set_gripper_width( gripper_with, wait=self._hand_wait, max_effort=gripper_max_effort, ) else: if self._load_gripper and self._lock_gripper: action_dict.pop("gripper_width", None) action_dict.pop("gripper_max_effort", None) self.set_joint_commands( action_dict, arm_wait=self._arm_wait, hand_wait=self._hand_wait ) def _is_done(self, observations): """Check if task is done. Args: observations (dict): Dictionary containing the observations Returns: bool: Boolean specifying whether the episode is done (e.i. distance to the goal is within the distance threshold, robot has fallen etc.). """ # Check if gripper is within range of the goal. d = self._goal_distance(observations["achieved_goal"], self.goal) is_done = d < self._distance_threshold self._step_debug_logger("=Task is done info=") if self._target_hold: if is_done: self._is_done_samples += 1 if self._is_done_samples < self._hold_samples: is_done = False self._step_debug_logger( f"Agent spend '{self._is_done_samples}' within target position." ) else: self._step_debug_logger("Task is done.") self._is_done_samples = 0 else: self._step_debug_logger("Task is not done.") self._is_done_samples = 0 else: if is_done: self._step_debug_logger("Task is done.") else: self._step_debug_logger("Task is not done.") return is_done def _sample_goal(self): """Sample a random goal position. Returns: :obj:`geometry_msgs.PoseStamped`: A goal pose. """ rospy.logdebug("Sampling goal.") if self._target_sampling_strategy == "global": goal = self.np_random.uniform( [ self._target_sampling_bounds["x_min"], self._target_sampling_bounds["y_min"], self._target_sampling_bounds["z_min"], ], [ self._target_sampling_bounds["x_max"], self._target_sampling_bounds["y_max"], self._target_sampling_bounds["z_max"], ], size=3, ) elif self._target_sampling_strategy == "local": # Rel to current EE pose. try: cur_ee_pose = self.get_ee_pose() cur_ee_position = np.array( [ cur_ee_pose.pose.position.x, cur_ee_pose.pose.position.y, cur_ee_pose.pose.position.z, ] ) except EePoseLookupError: err_msg = ( f"Shutting down '{rospy.get_name()}' since the current end " "effector pose which is needed for sampling the goals could " "not be retrieved." ) ros_exit_gracefully(shutdown_msg=err_msg, exit_code=1) # Sample goal relative to end effector pose. goal = cur_ee_position + self.np_random.uniform( [ self._target_sampling_bounds["x_min"], self._target_sampling_bounds["y_min"], self._target_sampling_bounds["z_min"], ], [ self._target_sampling_bounds["x_max"], self._target_sampling_bounds["y_max"], self._target_sampling_bounds["z_max"], ], size=cur_ee_position.shape, ) elif self._target_sampling_strategy == "fixed": goal = np.array(list(self._fixed_target_pose.values())) else: # Thrown error if goal could not be sampled. err_msg = ( f"Shutting down '{rospy.get_name()}' since no goal could be sampled " f"as '{self._target_sampling_strategy}' is not a valid goal sampling " "strategy. Options are 'global' and 'local'." ) ros_exit_gracefully(shutdown_msg=err_msg, exit_code=1) # Make sure the goal is always within the sampling region. if self._target_sampling_strategy != "fixed": goal = self._clip_goal_position(goal) rospy.logdebug("Goal: %s" % goal) # Apply offset and return goal. goal += np.array( [ self._target_offset["x"], self._target_offset["y"], self._target_offset["z"], ] ) return goal def _visualize_goal(self, offset=[0.0, 0.0, 0.0]): """Visualize RViz goal marker. Args: offset (list): Position offset for visualizing the goal. """ goal = self.goal + offset goal_marker_msg = TargetMarker(x=goal[0], y=goal[1], z=goal[2], id=3) self._target_pose_marker_pub.publish(goal_marker_msg) def _set_init_pose(self): # noqa: C901 """Sets the Robot in its (random) initial pose. Returns: bool: Boolean specifying whether the initial pose was successfully set. .. note:: The pose is sampled based on the ``pose_sampling_type`` variable set in the task environment configuration file. Options are ``end_effector_pose``, which creates a random pose by sampling EE poses, and ``joint_positions``, which makes a random pose by sampling joint positions. These methods respect the ``bounds`` set in the task environment config. """ if self._reset_init_pose: init_type = "random " if self._random_init_pose else "" rospy.loginfo(f"Setting {init_type}init pose...") if self._random_init_pose: # Use random initial model configuration. if ( self._pose_sampling_type == "joint_positions" ): # Sample random joint positions. rospy.logdebug("Retrieve random joint positions.") random_joint_positions = self._get_random_joint_positions() if random_joint_positions: if self._load_gripper: random_joint_positions["gripper_width"] = ( split_pose_dict(self._init_pose)[1]["gripper_width"] if self._lock_gripper else random_joint_positions.pop( self.joints["hand"][0], 0.0 ) * 2 ) random_joint_positions = { joint: position for joint, position in random_joint_positions.items() if joint not in self.joints["hand"] } else: # Sample random EE poses. if ( self._randomize_first_episode or self.episode_num != 0 ): # Retrieve random EE pose. rospy.logdebug("Retrieve random EE pose.") self._set_panda_configuration( joint_names=self.joints["both"], joint_positions=PANDA_REST_CONFIGURATION[ : len(self.joints["both"]) ], ) # NOTE: Done as joint conflicts might prevent planning. ( random_ee_pose, random_joint_positions, ) = self._get_random_ee_pose() else: # Use fixed initial arm/hand initial EE pose. rospy.logdebug("Retrieving initial EE pose.") random_ee_pose = split_pose_dict(self._init_pose)[0] random_joint_positions = self.get_ee_pose_joint_config( random_ee_pose ) # Apply init pose offset. if random_ee_pose and sum(self._init_pose_offset.values()) != 0.0: rospy.logdebug("Applying offset to initial pose.") random_ee_pose["x"] += self._init_pose_offset["x"] random_ee_pose["y"] += self._init_pose_offset["y"] random_ee_pose["z"] += self._init_pose_offset["z"] # Retrieve model configuration for the new EE pose. ee_pose_joint_positions = self.get_ee_pose_joint_config( random_ee_pose ) if not ee_pose_joint_positions: rospy.logwarn( "Could not retrieve a model configuration that relates " "to the EE pose with the offset. As a result the model " "configuration for the EE pose without the offset is " "used." ) else: random_joint_positions = ee_pose_joint_positions # Retrieve random gripper width. if self._load_gripper: random_joint_positions["gripper_width"] = ( split_pose_dict(self._init_pose)[1]["gripper_width"] if self._lock_gripper else self._get_random_joint_positions().pop( self.joints["hand"][0], 0.0 ) * 2 ) else: # Use fixed initial pose. if self._pose_sampling_type == "joint_positions": rospy.logdebug("Retrieving initial joint positions.") _, self._init_model_configuration = split_pose_dict(self._init_pose) if not self._load_gripper: self._init_model_configuration.pop("gripper_width", None) else: rospy.logdebug("Retrieving initial EE pose.") random_joint_positions = self.get_ee_pose_joint_config( split_pose_dict(self._init_pose)[0] ) if random_joint_positions: if self._load_gripper: random_joint_positions["gripper_width"] = split_pose_dict( self._init_pose )[1]["gripper_width"] else: random_joint_positions = dict( zip( self.joints["both"], PANDA_REST_CONFIGURATION[: len(self.joints["both"])], ) ) rospy.logwarn_once( "No valid joint positions could be retrieved for the EE " "pose specified in the 'init_pose' field of the task " "configuration file. As a " "result the fallback model configuration was used " "'{}'.".format(random_joint_positions) ) # Check if a valid model configuration was found. if random_joint_positions: self._init_model_configuration = random_joint_positions else: pose_sampling_type_str = ( "joint positions" if self._pose_sampling_type == "joint_positions" else "EE pose" ) if self._init_model_configuration: rospy.logwarn( f"Unable to retrieve a valid random {pose_sampling_type_str}. " "Please ensure that you have set valid pose_sampling bounds in " "the task environment config file " f"'{str(self._config_file_path)}'. The initial model " "configuration from the previous episode was used as a result." ) else: self._init_model_configuration = dict( zip( self.joints["both"], PANDA_REST_CONFIGURATION[: len(self.joints["both"])], ) ) rospy.logwarn( f"Unable to retrieve a valid random {pose_sampling_type_str}. " "Please ensure that you have set valid pose_sampling bounds in " "the task environment config file " f"'{str(self._config_file_path)}'. Because no previous " "initial model configuration was available the fallback model " f"configuration '{self._init_model_configuration}' was used." ) # Convert gripper width to joint positions. if self._load_gripper: self._init_model_configuration = ( gripper_width_2_finger_joints_positions( self._init_model_configuration, self.joints["hand"] ) ) # Set initial model configuration and return result # NOTE: Here two modes can be used: MoveIT or Gazebo's # 'set_model_configuration' service. if not self._moveit_init_pose_control: # Use Gazebo service. rospy.loginfo("Setting initial robot pose.") init_pose_retval = self._set_panda_configuration( joint_names=self._init_model_configuration.keys(), joint_positions=self._init_model_configuration.values(), ) if not init_pose_retval: rospy.logwarn("Setting initial robot pose failed.") return init_pose_retval else: # Use MoveIt. if hasattr(self, "_moveit_set_joint_positions_srv"): prev_control_type = self.robot_control_type self.robot_control_type = "trajectory" # Unlock locked joints. if self._locked_arm_joints: self.unlock_joints(self._locked_arm_joints) # Set initial model configuration. resp = self._moveit_set_joint_positions_srv.call( self.panda_gazebo.srv.SetJointPositionsRequest( joint_names=self._init_model_configuration.keys(), joint_positions=self._init_model_configuration.values(), wait=True, ) ) self.robot_control_type = prev_control_type if not resp.success: rospy.logwarn("Setting initial robot pose failed.") # Lock locked joints. if self._locked_arm_joints: self.lock_joints(self._locked_arm_joints) return resp.success rospy.logwarn( "The initial pose failed since the {} service was not " "available.".format(MOVEIT_SET_JOINT_POSITIONS_TOPIC) ) return False return True def _init_env_variables(self): """Inits variables needed to be initialized each time we reset at the start of an episode. """ # Sample and visualize goal. self.goal = self._sample_goal() if self._visualize_target: self._visualize_goal()