r"""An ROS Panda reach gymnasium environment.
.. image:: /images/panda/panda_reach_env.png
   :alt: Panda reach environment
Observation space:
    As the panda environment inherits from the `gym.GoalEnv`_ class, the observation
    space is a dictionary.
    Type: Dict
        - **observation** (:obj:`numpy.ndarray`): The current end-effector pose, joint
          positions and joint velocities.
        - **desired_goal** (:obj:`numpy.ndarray`): The desired end-effector pose.
        - **achieved_goal** (:obj:`numpy.ndarray`): The achieved end-effector pose.
.. _`gym.GoalEnv`: https://robotics.farama.org/content/multi-goal_api/#goalenv
Action space:
    The action space of the panda environment is dependent on the control type and
    whether the gripper is loaded. The following action spaces are available:
    **Joint trajectory control**:
        Type: Box(7)
            - **panda_joint1** (:obj:`float`): The position of the first joint.
            - **panda_joint2** (:obj:`float`): The position of the second joint.
            - **panda_joint3** (:obj:`float`): The position of the third joint.
            - **panda_joint4** (:obj:`float`): The position of the fourth joint.
            - **panda_joint5** (:obj:`float`): The position of the fifth joint.
            - **panda_joint6** (:obj:`float`): The position of the sixth joint.
            - **panda_joint7** (:obj:`float`): The position of the seventh joint.
    **Joint position control**:
        Type: Box(7)
            - **panda_joint1** (:obj:`float`): The position of the first joint.
            - **panda_joint2** (:obj:`float`): The position of the second joint.
            - **panda_joint3** (:obj:`float`): The position of the third joint.
            - **panda_joint4** (:obj:`float`): The position of the fourth joint.
            - **panda_joint5** (:obj:`float`): The position of the fifth joint.
            - **panda_joint6** (:obj:`float`): The position of the sixth joint.
            - **panda_joint7** (:obj:`float`): The position of the seventh joint.
    **Joint effort control**:
        Type: Box(7)
            - **panda_joint1** (:obj:`float`): The effort of the first joint.
            - **panda_joint2** (:obj:`float`): The effort of the second joint.
            - **panda_joint3** (:obj:`float`): The effort of the third joint.
            - **panda_joint4** (:obj:`float`): The effort of the fourth joint.
            - **panda_joint5** (:obj:`float`): The effort of the fifth joint.
            - **panda_joint6** (:obj:`float`): The effort of the sixth joint.
            - **panda_joint7** (:obj:`float`): The effort of the seventh joint.
    **End-effector position control**:
        Type: Box(7)
            - **x** (:obj:`float`): The x position of the end-effector.
            - **y** (:obj:`float`): The y position of the end-effector.
            - **z** (:obj:`float`): The z position of the end-effector.
            - **rx** (:obj:`float`): The x component of the quaternion orientation of the
              end-effector.
            - **ry** (:obj:`float`): The y component of the quaternion orientation of the
              end-effector.
            - **rz** (:obj:`float`): The z component of the quaternion orientation of the
              end-effector.
            - **rw** (:obj:`float`): The w component of the quaternion orientation of the
              end-effector.
    If the gripper is loaded, the action space is extended with the following
    dimensions:
    Type: Box(2)
        - **gripper_width** (:obj:`float`): The width of the gripper - only if the
          gripper is loaded.
        - **gripper_max_effort** (:obj:`float`): The maximum effort of the gripper -
          only if the gripper is loaded.
        .. attention::
            The gripper width is ignored when the ``grasping`` parameter is set to ``true`` in the
            :ros-gazebo-gym:`task environment config file <blob/noetic/src/ros_gazebo_gym/task_envs/panda/config/panda_reach.yaml>`.
            or when the ``gripper_max_effort`` is set to a value greater than zero.
Episode termination:
    The episode terminates when the end-effector is within a certain distance of the
    goal position. The distance is defined by the ``distance_threshold`` parameter in
    the task environment configuration file. If the ``hold_samples`` parameter is
    greater than zero, the episode will terminate after ``hold_samples`` consecutive
    samples are within the ``distance_threshold``. The episode will also terminate if
    the maximum number of samples is reached.
Environment Goal:
    In this environment the agent has to learn to move the panda robot to a given goal
    position. It was based on the :gymnasium-robotics:`FetchReach-v2 <envs/fetch/reach/>`
    gymnasium environment.
Reward function:
    The reward function is defined as the negative of the Euclidean distance between the
    end-effector and the goal position. If the ``positive_reward`` parameter is set to
    ``true``, the absolute value of the reward is returned:
    .. math::
        reward = -\sqrt{(x_{ee} - x_{goal})^2 + (y_{ee} - y_{goal})^2 + (z_{ee} - z_{goal})^2}
Initialization:
    The environment is initialized by loading the Panda robot model and setting its
    initial position and orientation. The environment parameters can be set in the
    configuration file located at `ros_gazebo_gym/task_envs/panda/config/panda_reach.yaml`.
Environment step return:
    In addition to the observations, the reward, and a termination and truncation boolean,
    the environment also returns an info dictionary:
    .. code-block:: python
        [observation, reward, termination, truncation, info_dict]
    The info dictionary contains the following information:
    - **reference**: The reference position (x,y,z) that the Panda Reach is tracking (i.e. the goal position).
    - **state_of_interest**: The state that should track the reference (SOI) (i.e. the end-effector position).
    - **reference_error**: The error between SOI and the reference (i.e. the error between the end-effector position and the goal position).
.. admonition:: Configuration
    :class: important
    The configuration files for this environment are found in the
    :ros-gazebo-gym:`panda task environment config folder <blob/noetic/src/ros_gazebo_gym/task_envs/panda/config/panda_reach.yaml>`.
"""  # noqa: E501
import os
from datetime import datetime
from pathlib import Path
import numpy as np
import rospy
from geometry_msgs.msg import Pose, Quaternion
from gymnasium import spaces, utils
from ros_gazebo_gym.common.helpers import (
    flatten_list,
    gripper_width_2_finger_joints_positions,
    list_2_human_text,
    pose_msg_2_pose_dict,
    shallow_dict_merge,
    split_bounds_dict,
    split_pose_dict,
)
from ros_gazebo_gym.common.markers.sample_region_marker import SampleRegionMarker
from ros_gazebo_gym.common.markers.target_marker import TargetMarker
from ros_gazebo_gym.core import ROSLauncher
from ros_gazebo_gym.core.helpers import (
    get_log_path,
    load_ros_params_from_yaml,
    ros_exit_gracefully,
)
from ros_gazebo_gym.exceptions import EePoseLookupError
from ros_gazebo_gym.robot_envs.panda_env import PandaEnv
from rospy.exceptions import ROSException, ROSInterruptException
from std_msgs.msg import ColorRGBA
# Specify topics and other script variables.
[docs]CONNECTION_TIMEOUT = 5  # Timeout for connecting to services or topics. 
[docs]MOVEIT_GET_RANDOM_JOINT_POSITIONS_TOPIC = (
    "panda_moveit_planner_server/get_random_joint_positions"
) 
[docs]MOVEIT_SET_JOINT_POSITIONS_TOPIC = "panda_moveit_planner_server/set_joint_positions" 
[docs]MOVEIT_GET_RANDOM_EE_POSE_TOPIC = "panda_moveit_planner_server/get_random_ee_pose" 
[docs]MOVEIT_ADD_PLANE_TOPIC = "panda_moveit_planner_server/planning_scene/add_plane" 
[docs]SET_FRANKA_MODEL_CONFIGURATION_TOPIC = "set_franka_model_configuration" 
[docs]VALID_EE_CONTROL_JOINTS = ["x", "y", "z", "rx", "ry", "rz", "rw"] 
[docs]CONFIG_FILE_PATH = "config/panda_reach.yaml" 
[docs]PANDA_REST_CONFIGURATION = [
    0,
    0,
    0,
    -1.57079632679,
    0,
    1.57079632679,
    0.785398163397,
    0.001,
    0.001,
] 
[docs]LOG_STEP_DEBUG_INFO = False 
[docs]AVAILABLE_HAND_COMMANDS = ["gripper_width", "gripper_max_effort"] 
#################################################
# Panda reach environment class #################
#################################################
[docs]class PandaReachEnv(PandaEnv, utils.EzPickle):
    """Class that provides all the methods used for the algorithm training.
    Attributes:
        action_space (:obj:`gym.spaces.box.Box`): Gym action space object.
        observation_space (:obj:`gym.spaces.dict.Dict`): Gym observation space object.
        goal (:obj:`geometry_msgs.PoseStamped`): The current goal.
    """
    _instance_count = 0  # Counts the number of instances that were created.
    def __init__(  # noqa: C901
        self,
        control_type="effort",
        positive_reward=False,
        config_path=CONFIG_FILE_PATH,
        gazebo_world_launch_file="start_reach_world.launch",
        visualize=None,
        action_space_dtype=np.float64,
        observation_space_dtype=np.float64,
    ):
        """Initializes a Panda Task Environment.
        Args:
            control_Type (str, optional): The type of control you want to use for the
                panda robot (i.e. hand and arm). Options are: ``trajectory``,
                ``position``, ``effort`` or ``end_effector``. Defaults to ``effort``.
            positive_reward (bool, optional): Whether you want to use a positive
                reward instead of a negative reward. Defaults to ``False``.
            config_path (str, optional): Path where the environment configuration
                value are found. The path is resolved relative to the
                :class:`~ros_gazebo_gym.task_envs.panda.panda_reach` class file.
            gazebo_world_launch_file (str, optional): Name of the launch file that loads
                the gazebo world. Currently only the launch files inside the
                `panda_gazebo <https://github.com/rickstaa/panda-gazebo>`_ package are
                supported. Defaults to ``start_reach_world.launch``.
            visualize (bool, optional): Whether you want to show the RViz visualization.
                Defaults to ``None`` meaning the task configuration file values will
                be used.
            action_space_dtype (union[numpy.dtype, str], optional): The data type of the
                action space. Defaults to ``np.float64``.
            observation_space_dtype (union[numpy.dtype, str], optional): The data type
                of the observation space. Defaults to ``np.float64``.
        .. important::
            In this environment, the joint trajectory control is not implemented yet for
            multiple waypoints. This is because the action space only contains one
            waypoint. The :obj:`~PandaEnv.set_arm_joint_trajectory` method, however,
            already accepts multiple waypoints. As a result, task environment can be
            easily extended to work with multiple waypoints by modifying the
            :obj:`PandaReachEnv~._create_action_space` method.
        """
        rospy.logwarn("Initialize PandaEnv task environment...")
        self.__class__._instance_count += 1
        if self.__class__._instance_count > 1:
            rospy.logwarn(
                "You are trying to create multiple instances of the "
                f"{self.__class__.__name__} class. Unfortunately, this is not yet "
                "been implemented and will cause unexpected behaviour. As a result, "
                "the script will be shut down. Feel free to open a pull request if "
                "you want to implement this functionality."
            )
        self._positive_reward = positive_reward
        self._task_env = "panda_reach"
        # Makes sure the env is pickable when it wraps C++ code.
        utils.EzPickle.__init__(**locals())
        # Makes sure roscore is running and ROS is initialized.
        ROSLauncher.initialize()
        # This is the path where the simulation files will be downloaded if not present.
        workspace_path = rospy.get_param("/panda_reach_v0/workspace_path", None)
        if workspace_path:
            assert os.path.exists(workspace_path), (
                "The Simulation ROS Workspace path "
                + workspace_path
                + " DOESN'T exist, execute: mkdir -p "
                + workspace_path
                + "/src;cd "
                + workspace_path
                + ";catkin_make"
            )
        # Load Params from the desired Yaml file relative to this TaskEnvironment.
        rospy.logdebug("Load Panda Reach parameters.")
        self._config_file_path = Path(__file__).parent.joinpath(config_path)
        load_ros_params_from_yaml(
            self._config_file_path,
            ros_package_name="ros_gazebo_gym",
        )
        self._get_params()
        # Thrown warning if gazebo is already running.
        if any(
            ["/gazebo" in topic for topic in flatten_list(rospy.get_published_topics())]
        ):
            err_msg = (
                f"Shutting down '{rospy.get_name()}' since a Gazebo instance is "
                "already running. Unfortunately, spawning multiple Panda simulations "
                "is not yet supported. Please shut down this instance and try again."
            )
            ros_exit_gracefully(shutdown_msg=err_msg, exit_code=1)
        # Launch the panda task gazebo environment (Doesn't yet add the robot).
        # NOTE: This downloads and builds the required ROS packages if not found.
        launch_log_file = (
            str(
                get_log_path()
                .joinpath(
                    "{}_{}.log".format(
                        gazebo_world_launch_file.replace(".", "_"),
                        datetime.now().strftime("%d_%m_%Y_%H_%M_%S"),
                    )
                )
                .resolve()
            )
            if not self._roslaunch_log_to_console
            else None
        )
        ROSLauncher.launch(
            package_name="panda_gazebo",
            launch_file_name=gazebo_world_launch_file,
            workspace_path=workspace_path,
            log_file=launch_log_file,
            critical=True,
            outdated_warning=True,
            paused=True,
            gazebo_gui=self._gazebo_gui,
            physics=self._physics,
        )
        ########################################
        # Initiate Robot environments ##########
        ########################################
        # Initialize the Robot environment.
        super(PandaReachEnv, self).__init__(
            robot_EE_link=self._ee_link,
            ee_frame_offset=self._ee_frame_offset,
            load_gripper=self._load_gripper,
            lock_gripper=self._lock_gripper,
            grasping=self._grasping,
            control_type=control_type,
            workspace_path=workspace_path,
            log_reset=self._log_reset,
            visualize=visualize,
        )
        # Disable visualize argument so that config can be used during inference.
        self._ezpickle_kwargs["visualize"] = None
        # Initialize task environment objects.
        self._is_done_samples = 0
        self._init_model_configuration = {}
        ########################################
        # Connect to required services, ########
        # subscribers and publishers. ##########
        ########################################
        # Connect to MoveIt 'get_random_joint_positions' service.
        try:
            moveit_get_random_joint_positions_srv_topic = (
                f"{self.robot_name_space}/{MOVEIT_GET_RANDOM_JOINT_POSITIONS_TOPIC}"
            )
            rospy.logdebug(
                "Connecting to '%s' service."
                % moveit_get_random_joint_positions_srv_topic
            )
            rospy.wait_for_service(
                moveit_get_random_joint_positions_srv_topic,
                timeout=CONNECTION_TIMEOUT,
            )
            self._moveit_get_random_joint_positions_client = rospy.ServiceProxy(
                moveit_get_random_joint_positions_srv_topic,
                self.panda_gazebo.srv.GetRandomJointPositions,
            )
            rospy.logdebug(
                "Connected to '%s' service!"
                % moveit_get_random_joint_positions_srv_topic
            )
        except (rospy.ServiceException, ROSException, ROSInterruptException):
            rospy.logwarn(
                "Failed to connect to '%s' service!"
                % moveit_get_random_joint_positions_srv_topic
            )
        # Connect to MoveIt 'get_random_ee_pose' service.
        try:
            moveit_get_random_ee_pose_srv_topic = (
                f"{self.robot_name_space}/{MOVEIT_GET_RANDOM_EE_POSE_TOPIC}"
            )
            rospy.logdebug(
                "Connecting to '%s' service." % moveit_get_random_ee_pose_srv_topic
            )
            rospy.wait_for_service(
                moveit_get_random_ee_pose_srv_topic,
                timeout=CONNECTION_TIMEOUT,
            )
            self._moveit_get_random_ee_pose_client = rospy.ServiceProxy(
                moveit_get_random_ee_pose_srv_topic,
                self.panda_gazebo.srv.GetRandomEePose,
            )
            rospy.logdebug(
                "Connected to '%s' service!" % moveit_get_random_ee_pose_srv_topic
            )
        except (rospy.ServiceException, ROSException, ROSInterruptException):
            rospy.logwarn(
                "Failed to connect to '%s' service!"
                % moveit_get_random_ee_pose_srv_topic
            )
        # Connect to MoveIt 'planning_scene/add_plane' service.
        try:
            moveit_add_plane_srv_topic = (
                f"{self.robot_name_space}/{MOVEIT_ADD_PLANE_TOPIC}"
            )
            rospy.logdebug("Connecting to '%s' service." % moveit_add_plane_srv_topic)
            rospy.wait_for_service(
                moveit_add_plane_srv_topic,
                timeout=CONNECTION_TIMEOUT,
            )
            self._moveit_add_plane_srv = rospy.ServiceProxy(
                moveit_add_plane_srv_topic, self.panda_gazebo.srv.AddPlane
            )
            rospy.logdebug("Connected to '%s' service!" % moveit_add_plane_srv_topic)
        except (rospy.ServiceException, ROSException, ROSInterruptException):
            rospy.logwarn(
                "Failed to connect to '%s' service!" % moveit_add_plane_srv_topic
            )
        if self._moveit_init_pose_control:
            # Connect to MoveIt 'planning_scene/set_joint_positions' service.
            try:
                moveit_set_joint_positions_srv_topic = (
                    f"{self.robot_name_space}/{MOVEIT_SET_JOINT_POSITIONS_TOPIC}"
                )
                rospy.logdebug(
                    "Connecting to '%s' service." % moveit_set_joint_positions_srv_topic
                )
                rospy.wait_for_service(
                    moveit_set_joint_positions_srv_topic,
                    timeout=CONNECTION_TIMEOUT,
                )
                self._moveit_set_joint_positions_srv = rospy.ServiceProxy(
                    moveit_set_joint_positions_srv_topic,
                    self.panda_gazebo.srv.SetJointPositions,
                )
                rospy.logdebug(
                    "Connected to '%s' service!" % moveit_set_joint_positions_srv_topic
                )
            except (rospy.ServiceException, ROSException, ROSInterruptException):
                rospy.logwarn(
                    "Failed to connect to '%s' service!"
                    % moveit_set_joint_positions_srv_topic
                )
        else:
            # Connect to franka_gazebo's 'set_franka_model_configuration' service.
            try:
                set_franka_model_configuration_srv_topic = (
                    f"{self.robot_name_space}/{SET_FRANKA_MODEL_CONFIGURATION_TOPIC}"
                )
                rospy.logdebug(
                    "Connecting to '%s' service."
                    % set_franka_model_configuration_srv_topic
                )
                rospy.wait_for_service(
                    set_franka_model_configuration_srv_topic,
                    timeout=CONNECTION_TIMEOUT,
                )
                self._set_franka_model_configuration_srv = rospy.ServiceProxy(
                    set_franka_model_configuration_srv_topic,
                    self.franka_msgs.srv.SetJointConfiguration,
                )
                rospy.logdebug(
                    "Connected to '%s' service!"
                    % set_franka_model_configuration_srv_topic
                )
            except (rospy.ServiceException, ROSException, ROSInterruptException):
                rospy.logwarn(
                    "Failed to connect to '%s' service!"
                    % set_franka_model_configuration_srv_topic
                )
        # Create current target publisher.
        rospy.logdebug("Creating target pose publisher.")
        self._target_pose_marker_pub = rospy.Publisher(
            "/ros_gazebo_gym/current_target", TargetMarker, queue_size=1, latch=True
        )
        rospy.logdebug("Goal target publisher created.")
        if self._load_rviz:
            # Create target bounding region publisher.
            rospy.logdebug("Creating target bounding region publisher.")
            self._target_sample_region_marker_pub = rospy.Publisher(
                "/ros_gazebo_gym/target_sample_region",
                SampleRegionMarker,
                queue_size=1,
                latch=True,
            )
            rospy.logdebug("Target bounding region publisher created.")
            # Create initial pose bounding region publisher.
            rospy.logdebug("Creating initial pose sample region publisher.")
            self._init_pose_sample_region_marker__pub = rospy.Publisher(
                "/ros_gazebo_gym/init_pose_sample_region",
                SampleRegionMarker,
                queue_size=1,
                latch=True,
            )
            rospy.logdebug("Initial pose sample region publisher created.")
        ########################################
        # Initialize RViz visualizations #######
        ########################################
        # Add ground to MoveIt planning scene.
        self._add_ground_to_moveit_scene()
        # Add pose and target sampling bounds to RViz.
        self._init_rviz_visualizations()
        ########################################
        # Create action and observation space ##
        ########################################
        rospy.logdebug("Setup gymnasium action and observation space.")
        self._action_space_dtype = action_space_dtype
        self._observation_space_dtype = observation_space_dtype
        self._action_dtype_conversion_warning = False
        self.action_space = self._create_action_space(dtype=self._action_space_dtype)
        self.goal = self._sample_goal()
        obs = self._get_obs()
        self.observation_space = spaces.Dict(
            dict(
                observation=spaces.Box(
                    -np.inf,
                    np.inf,
                    shape=obs["observation"].shape,
                    dtype=self._observation_space_dtype,
                ),
                desired_goal=spaces.Box(
                    -np.inf,
                    np.inf,
                    shape=obs["desired_goal"].shape,
                    dtype=self._observation_space_dtype,
                ),
                achieved_goal=spaces.Box(
                    -np.inf,
                    np.inf,
                    shape=obs["achieved_goal"].shape,
                    dtype=self._observation_space_dtype,
                ),
            )
        )
        # Throw warning if action space joints are locked.
        locked_action_space_joints = [
            joint
            for joint in self._action_space_joints
            if joint in self._locked_arm_joints
        ]
        if locked_action_space_joints:
            rospy.logwarn(
                f"The following joints in the action space are locked: "
                f"{list_2_human_text(locked_action_space_joints)}. As a result, the "
                "control applied to these joints will not have any effect."
            )
        rospy.logwarn("PandaEnv task environment initialized.")
    ################################################
    # Task environment internal methods ############
    ################################################
    # NOTE: Here you can add additional helper methods that are used in the task env.
    def _init_rviz_visualizations(self):
        """Add pose and target sampling bounds to RViz."""
        # Add goal sampling bounds to RViz.
        if (
            self._target_sampling_strategy != "fixed"
            and self._load_rviz
            and self._visualize_target_sampling_bounds
        ):
            goal_sample_region_marker_msg = SampleRegionMarker(
                x_min=self._target_sampling_bounds["x_min"],
                y_min=self._target_sampling_bounds["y_min"],
                z_min=self._target_sampling_bounds["z_min"],
                x_max=self._target_sampling_bounds["x_max"],
                y_max=self._target_sampling_bounds["y_max"],
                z_max=self._target_sampling_bounds["z_max"],
                id=0,
            )
            self._target_sample_region_marker_pub.publish(goal_sample_region_marker_msg)
        # Add initial pose sampling region to RViz.
        if (
            self._load_rviz
            and self._visualize_init_pose_bounds
            and self._init_pose_sampling_bounds is not None
            and self._pose_sampling_type != "joint_positions"
        ):
            init_pose_sample_region_marker_msg = SampleRegionMarker(
                x_min=self._init_pose_sampling_bounds["x_min"],
                y_min=self._init_pose_sampling_bounds["y_min"],
                z_min=self._init_pose_sampling_bounds["z_min"],
                x_max=self._init_pose_sampling_bounds["x_max"],
                y_max=self._init_pose_sampling_bounds["y_max"],
                z_max=self._init_pose_sampling_bounds["z_max"],
                id=1,
            )
            init_pose_region_color = ColorRGBA()
            init_pose_region_color.a = 0.15
            init_pose_region_color.r = 0.0
            init_pose_region_color.g = 0.0
            init_pose_region_color.b = 1.0
            init_pose_sample_region_marker_msg.color = init_pose_region_color
            self._init_pose_sample_region_marker__pub.publish(
                init_pose_sample_region_marker_msg
            )
    def _add_ground_to_moveit_scene(self):
        """Adds the ground to the MoveIt Planning scence."""
        if hasattr(self, "_moveit_add_plane_srv"):
            add_ground_req = self.panda_gazebo.srv.AddPlaneRequest(
                name="ground",
                pose=Pose(orientation=Quaternion(0, 0, 0, 1)),
                normal=[0, 0, 1],
            )
            resp = self._moveit_add_plane_srv.call(add_ground_req)
            if not resp.success:
                rospy.logerr(
                    "The ground was not added successfully to the MoveIt planning "
                    "scene. As a result, MoveIt is unaware of the ground's existence "
                    "and may plan states that are in collision."
                )
        else:
            err_msg = (
                f"Failled to connect to '{MOVEIT_ADD_PLANE_TOPIC}' service! "
                f"Shutting down '{rospy.get_name()}' as this service is required for "
                "making MoveIt aware of the ground."
            )
            ros_exit_gracefully(shutdown_msg=err_msg, exit_code=1)
    def _get_params(self, ns="panda_reach"):  # noqa: C901
        """Retrieve task environment configuration parameters from parameter server.
        Args:
            ns (str, optional): The namespace on which the parameters are found.
                Defaults to "panda_reach".
        """
        self._task_env = ns
        try:
            # == Retrieve control variables ==
            self._direct_control = rospy.get_param(
                f"/{ns}/control/direct_control", True
            )
            self._load_gripper = rospy.get_param(f"/{ns}/control/load_gripper", True)
            self._lock_gripper = rospy.get_param(f"/{ns}/control/lock_gripper", False)
            self._ee_link = rospy.get_param(
                f"/{ns}/control/ee_link",
                "panda_hand" if self._load_gripper else "panda_link8",
            )
            self._grasping = rospy.get_param(f"/{ns}/control/grasping", None)
            self._arm_wait = rospy.get_param(f"/{ns}/control/arm_wait", False)
            self._hand_wait = rospy.get_param(f"/{ns}/control/hand_wait", True)
            self._ee_control_coordinates = rospy.get_param(
                f"/{ns}/control/ee_control_coordinates", None
            )
            self._controlled_joints = rospy.get_param(
                f"/{ns}/control/controlled_joints", None
            )
            self._locked_arm_joints = rospy.get_param(
                f"/{ns}/control/locked_arm_joints", []
            )
            self._ee_frame_offset = rospy.get_param(
                f"/{ns}/control/ee_frame_offset", None
            )
            if self._ee_frame_offset is not None:
                self._ee_frame_offset["x"] = self._ee_frame_offset.get("x", 0.0)
                self._ee_frame_offset["y"] = self._ee_frame_offset.get("y", 0.0)
                self._ee_frame_offset["z"] = self._ee_frame_offset.get("z", 0.0)
                self._ee_frame_offset["rx"] = self._ee_frame_offset.get("rx", 0.0)
                self._ee_frame_offset["ry"] = self._ee_frame_offset.get("ry", 0.0)
                self._ee_frame_offset["rz"] = self._ee_frame_offset.get("rz", 0.0)
                self._ee_frame_offset["rw"] = self._ee_frame_offset.get("rw", 1.0)
            # == Retrieve sampling variables ==
            self._visualize_init_pose_bounds = rospy.get_param(
                f"/{ns}/pose_sampling/visualize_init_pose_bounds", True
            )
            self._reset_init_pose = rospy.get_param(
                f"/{ns}/pose_sampling/reset_init_pose", True
            )
            self._random_init_pose = rospy.get_param(
                f"/{ns}/pose_sampling/random_init_pose", True
            )
            self._randomize_first_episode = rospy.get_param(
                f"/{ns}/pose_sampling/randomize_first_episode", True
            )
            self._pose_sampling_attempts = rospy.get_param(
                f"/{ns}/pose_sampling/attempts", 10
            )
            self._pose_sampling_type = rospy.get_param(
                f"/{ns}/pose_sampling/pose_sampling_type", "end_effector_pose"
            ).lower()
            self._moveit_init_pose_control = rospy.get_param(
                f"/{ns}/pose_sampling/moveit_control", False
            )
            self._init_pose = rospy.get_param(
                f"/{ns}/pose_sampling/init_pose",
                {
                    "x": 0.23,
                    "y": 0.29,
                    "z": 0.35,
                    "rx": 0.78,
                    "ry": 0.62,
                    "rz": -0.0,
                    "rw": 4.42,
                    "panda_joint1": 0.0,
                    "panda_joint2": 0.0,
                    "panda_joint3": 0.0,
                    "panda_joint4": -1.57079632679,
                    "panda_joint5": 0.0,
                    "panda_joint6": 1.57079632679,
                    "panda_joint7": 0.785398163397,
                    "gripper_width": 0.001,
                },
            )
            self._init_pose_offset = rospy.get_param(
                f"/{ns}/pose_sampling/offset", {"x": 0.0, "y": 0.0, "z": 0.0}
            )
            self._init_pose_sampling_bounds = rospy.get_param(
                f"/{ns}/pose_sampling/bounds", None
            )
            self._target_sampling_strategy = rospy.get_param(
                f"/{ns}/target_sampling/strategy", "global"
            )
            self._visualize_target = rospy.get_param(
                f"/{ns}/target_sampling/visualize_target", True
            )
            self._visualize_target_sampling_bounds = rospy.get_param(
                f"/{ns}/target_sampling/visualize_target_bounds", True
            )
            self._target_offset = rospy.get_param(
                f"/{ns}/target_sampling/offset",
                {
                    "x": 0.0,
                    "y": 0.0,
                    "z": 0.0,
                },
            )
            self._fixed_target_pose = rospy.get_param(
                f"/{ns}/target_sampling/fixed_target",
                {
                    "x": 0.4,
                    "y": 0.0,
                    "z": 0.8,
                },
            )
            if self._target_sampling_strategy != "fixed":
                self._target_sampling_bounds = rospy.get_param(
                    f"/{ns}/target_sampling/bounds/"
                    f"{self._target_sampling_strategy}",
                    {
                        "x_min": -0.7,
                        "x_max": 0.7,
                        "y_min": -0.7,
                        "y_max": 0.7,
                        "z_min": 0.0,
                        "z_max": 1.3,
                    },
                )
            # == Retrieve reward variables ==
            self._reward_type = rospy.get_param(f"/{ns}/training/reward_type", "sparse")
            self._target_hold = rospy.get_param(f"/{ns}/training/target_hold", True)
            self._hold_samples = rospy.get_param(f"/{ns}/training/hold_samples", 2)
            self._distance_threshold = rospy.get_param(
                f"/{ns}/training/distance_threshold", 0.05
            )
            self._collision_penalty = rospy.get_param(
                f"/{ns}/training/collision_penalty", 0.0
            )
            # == Retrieve environment variables ==
            self._pause_after_step = rospy.get_param(
                f"/{ns}/environment/pause_after_step", True
            )
            self._action_bounds = rospy.get_param(
                f"/{ns}/environment/action_space/bounds",
                {
                    "ee_pose": {
                        "low": {
                            "x": -1.3,
                            "y": -1.3,
                            "z": 0.0,
                            "rx": 0,
                            "ry": 0,
                            "rz": 0,
                            "rw": 0,
                        },
                        "high": {
                            "x": 1.3,
                            "y": 1.3,
                            "z": 1.3,
                            "rx": 1,
                            "ry": 1,
                            "rz": 1,
                            "rw": 1,
                        },
                    },
                    "joint_positions": {
                        "low": {
                            "panda_joint1": -2.8973,
                            "panda_joint2": -1.7628,
                            "panda_joint3": -2.8973,
                            "panda_joint4": -3.0718,
                            "panda_joint5": -2.8973,
                            "panda_joint6": -0.0175,
                            "panda_joint7": -2.8973,
                            "gripper_width": 0.0,
                        },
                        "high": {
                            "panda_joint1": 2.8973,
                            "panda_joint2": 1.7628,
                            "panda_joint3": 2.8973,
                            "panda_joint4": -0.0698,
                            "panda_joint5": 2.8973,
                            "panda_joint6": 3.7525,
                            "panda_joint7": 2.8973,
                            "gripper_width": 0.08,
                        },
                    },
                    "joint_efforts": {
                        "low": {
                            "panda_joint1": -87.0,
                            "panda_joint2": -87.0,
                            "panda_joint3": -87.0,
                            "panda_joint4": -87.0,
                            "panda_joint5": -12.0,
                            "panda_joint6": -12.0,
                            "panda_joint7": -12.0,
                            "gripper_max_effort": 0.0,
                        },
                        "high": {
                            "panda_joint1": 87.0,
                            "panda_joint2": 87.0,
                            "panda_joint3": 87.0,
                            "panda_joint4": 87.0,
                            "panda_joint5": 12.0,
                            "panda_joint6": 12.0,
                            "panda_joint7": 12.0,
                            "gripper_max_effort": 140,
                        },
                    },
                },
            )
            # == Retrieve global variables ==
            self._physics = rospy.get_param(f"/{ns}/physics", "ode").lower()
            self._load_rviz = rospy.get_param(f"/{ns}/load_rviz", True)
            self._rviz_file = Path(__file__).parent.joinpath(
                rospy.get_param(
                    f"/{ns}/rviz_file",
                    "config/moveit.rviz",
                )
            )
            self._gazebo_gui = rospy.get_param(f"/{ns}/load_gazebo_gui", True)
            self._log_reset = rospy.get_param(f"/{ns}/log_reset", False)
            self._log_step_debug_info = rospy.get_param(
                f"/{ns}/log_step_debug_info", False
            )
            self._roslaunch_log_to_console = rospy.get_param(
                f"/{ns}/roslaunch_log_to_console", False
            )
            # == Retrieve other variables ==
            self._max_velocity_scaling_factor = rospy.get_param(
                "/panda_moveit_planner_server/max_velocity_scaling_factor", 1.0
            )
            self._max_acceleration_scaling_factor = rospy.get_param(
                "/panda_moveit_planner_server/max_acceleration_scaling_factor", 1.0
            )
        except KeyError as e:
            rospy.logerr(
                f"Parameter '{e.args[0]}' could not be retrieved from the parameter "
                "server. Please make sure this parameter is present in the Panda task "
                f"environment configuration file '{self._config_file_path}' and try "
                "again."
            )
            ros_exit_gracefully(
                shutdown_msg=f"Shutting down '{rospy.get_name()}'.", exit_code=1
            )
    def _robot_get_obs(self):
        """Returns all joint positions and velocities associated with a robot.
        Returns:
            :obj:`numpy.array`: Robot Positions, Robot Velocities
        """
        data = self.joint_states
        if data.position is not None and data.name:
            return (
                np.array(data.position),
                np.array(data.velocity),
            )
        else:
            return np.zeros(0), np.zeros(0)
    def _goal_distance(self, goal_a, goal_b):
        """Calculates the perpendicular distance to the goal.
        Args:
            goal_a (:obj:`numpy.ndarray`): List containing a gripper and object pose.
            goal_b (:obj:`numpy.ndarray`): List containing a gripper and object pose.
        Returns:
            :obj:`numpy.float32`: Perpendicular distance to the goal.
        """
        assert goal_a.shape == goal_b.shape
        return np.linalg.norm(goal_a - goal_b, axis=-1)
    def _get_random_joint_positions(self):
        """Get valid joint position commands for the Panda arm and hand.
        Returns:
            dict: Dictionary containing a valid joint position for each joint. Returns
                a empty dictionary if no valid joint positions were found.
        """
        if hasattr(self, "_moveit_get_random_joint_positions_client"):
            req = self.panda_gazebo.srv.GetRandomJointPositionsRequest()
            req.attempts = self._pose_sampling_attempts
            # Apply pose bounding region.
            if (
                hasattr(self, "_init_pose_sampling_bounds")
                and self._init_pose_sampling_bounds is not None
            ):
                _, joint_positions_bound_region = split_bounds_dict(
                    self._init_pose_sampling_bounds
                )
                joint_positions_bound_region = gripper_width_2_finger_joints_positions(
                    joint_positions_bound_region, self.joints["hand"]
                )
                joint_limits = self.panda_gazebo.msg.JointLimits()
                joint_limits.names = list(joint_positions_bound_region.keys())
                joint_limits.values = list(joint_positions_bound_region.values())
                req.joint_limits = joint_limits
            # Retrieve random joint pose and return.
            resp = self._moveit_get_random_joint_positions_client.call(req)
            if resp.success:
                joint_positions_dict = dict(zip(resp.joint_names, resp.joint_positions))
                return joint_positions_dict
            else:
                if resp.message == "Invalid joint limits were given.":
                    err_msg = (
                        "The joint limits specified in the pose_sampling bounds "
                        "section of the task environment configuration file were "
                        f"invalid '{str(self._config_file_path)}'. Please make sure "
                        "that the pose_sampling joint limits are within the joint "
                        "limits specified in the panda robot description file."
                    )
                    ros_exit_gracefully(shutdown_msg=err_msg, exit_code=1)
                return {}
        else:
            err_msg = (
                f"Failed to connect to '{MOVEIT_GET_RANDOM_JOINT_POSITIONS_TOPIC}' "
                f"service! Shutting down '{rospy.get_name()}' as this service is "
                "required for retrieving random joint positions."
            )
            ros_exit_gracefully(shutdown_msg=err_msg, exit_code=1)
    def _get_random_ee_pose(self):
        """Get a valid random EE pose that considers the boundaries set in the task
        environment configuration file.
        Returns:
            (tuple): tuple containing:
                - random_ee_pose (dict): Random EE pose. A empty dictionary is returned
                  when no valid random EE pose could be found.
                - model_configuration (dict): A set of joint positions that result in
                  this EE pose. A empty dictionary is returned when no valid random
                  EE pose could be found.
        """
        if hasattr(self, "_moveit_get_random_ee_pose_client"):
            req = self.panda_gazebo.srv.GetRandomEePoseRequest()
            req.attempts = self._pose_sampling_attempts
            # Apply pose bounding region.
            if (
                hasattr(self, "_init_pose_sampling_bounds")
                and self._init_pose_sampling_bounds is not None
            ):
                ee_pose_bound_region, _ = split_bounds_dict(
                    self._init_pose_sampling_bounds
                )
                req.bounding_region = self.panda_gazebo.msg.BoundingRegion(
                    **ee_pose_bound_region
                )
            # Retrieve random ee pose and return result.
            resp = self._moveit_get_random_ee_pose_client.call(req)
            if resp.success:
                return pose_msg_2_pose_dict(resp.ee_pose), dict(
                    zip(resp.joint_names, resp.joint_positions)
                )
            else:
                return {}, {}
        else:
            err_msg = (
                f"Failled to connect to '{MOVEIT_GET_RANDOM_EE_POSE_TOPIC}' service! "
                f"Shutting down '{rospy.get_name()}' as this service is required for "
                "retrieving a random end effector pose."
            )
            ros_exit_gracefully(shutdown_msg=err_msg, exit_code=1)
    def _clip_goal_position(self, goal_pose):
        """Limit the possible goal position x, y and z values to a certian range.
        Args:
            goal_pose (:obj:`numpy.ndarray`): A numpy array containing the goal x,y and
                z values.
        """
        min_bounds = [
            val for key, val in self._target_sampling_bounds.items() if "min" in key
        ]
        max_bounds = [
            val for key, val in self._target_sampling_bounds.items() if "max" in key
        ]
        clipped_goal_pose = np.clip(goal_pose, a_min=min_bounds, a_max=max_bounds)
        if any(clipped_goal_pose != goal_pose):
            rospy.logwarn(
                "Goal pose clipped since it was not within the set target bounds."
            )
            rospy.logdebug("New goal pose: %s" % clipped_goal_pose)
        return clipped_goal_pose
    def _check_config_action_space_joints(self):  # noqa: C901
        """Validates whether the 'ee_control_coordinates' and 'controlled_joints' fields
        in the task environment configuration file contain vallid joints/coordinates
        that can be used as action space joints.
        Raises:
            SystemExit: Thrown when the coordinates/joints in the task environment
                config file are invalid. As a result the ROS script is shutdown.
        """
        # Validate EE coordinates.
        invalid_ee_coordinates = []
        if self.robot_control_type == "end_effector":
            if self._ee_control_coordinates:
                for joint in self._ee_control_coordinates:
                    if joint not in VALID_EE_CONTROL_JOINTS:
                        invalid_ee_coordinates.append(joint)
        # Validate control_joints.
        invalid_joints = []
        if self._controlled_joints:
            valid_joints = self.joints["arm"] + AVAILABLE_HAND_COMMANDS
            for joint in self._controlled_joints:
                if joint not in valid_joints:
                    invalid_joints.append(joint)
        # Throw error and shutdown node if invalid joint was found.
        if invalid_ee_coordinates or invalid_joints:
            error_msg = "Shutting down '%s' since the " % rospy.get_name()
            if invalid_ee_coordinates and invalid_joints:
                invalid = flatten_list([invalid_ee_coordinates, invalid_joints])
                error_msg += list_2_human_text(
                    ["'" + item + "'" for item in invalid],
                    end_seperator="and",
                )
                error_msg += (
                    " %s that %s specified in the 'ee_control_coordinates' and "
                    "'controlled_joints' task environment config variables were "
                    "invalid."
                ) % (
                    ("coordinate/joint", "was")
                    if len(invalid) == 1
                    else ("coordinates/joints", "were")
                )
            elif invalid_ee_coordinates:
                error_msg += list_2_human_text(
                    ["'" + item + "'" for item in invalid_ee_coordinates],
                    end_seperator="and",
                )
                error_msg += (
                    " %s that %s specified in the 'ee_control_coordinates' task "
                    "environment configuration variable %s invalid."
                ) % (
                    ("coordinate", "was", "is")
                    if len(invalid_ee_coordinates) == 1
                    else ("coordinates", "were", "are")
                )
            else:
                error_msg += list_2_human_text(
                    ["'" + item + "'" for item in invalid_joints],
                    end_seperator="and",
                )
                error_msg += (
                    " %s that %s specified in the 'controlled_joints' task "
                    "environment configuration variable %s invalid."
                ) % (
                    ("joint", "was", "is")
                    if len(invalid_joints) == 1
                    else ("joints", "were", "are")
                )
            rospy.logerr(error_msg)
            ros_exit_gracefully(
                shutdown_msg=f"Shutting down '{rospy.get_name()}'.", exit_code=1
            )
    def _get_action_space_joints(self):
        """Retrieves the joints that are being controlled when we sample from the action
        space.
        Returns:
            list: Joints that are controlled.
        """
        # Validate the action space joints set in the config file.
        self._check_config_action_space_joints()
        # Get action space joints
        if self.robot_control_type == "end_effector":
            # Get end effector control coordinates.
            if not self._ee_control_coordinates:
                action_space_joints = ["x", "y", "z", "rx", "ry", "rz", "rw"]
            else:
                action_space_joints = self._ee_control_coordinates
            # Get gripper control joints.
            if self._load_gripper:
                action_space_joints.extend(["gripper_width", "gripper_max_effort"])
        else:
            if self._controlled_joints:
                controlled_joints = self._controlled_joints
                if not self._load_gripper:
                    controlled_joints = [
                        item
                        for item in self._controlled_joints
                        if item not in AVAILABLE_HAND_COMMANDS
                    ]
                    if len(controlled_joints) != len(self._controlled_joints):
                        rospy.logdebug(
                            "Hand gripper commands were removed from the controlled "
                            "joints since the gripper is not loaded."
                        )
                action_space_joints = controlled_joints
            else:
                action_space_joints = self.joints["arm"]
                if self._load_gripper:
                    action_space_joints = flatten_list(
                        [AVAILABLE_HAND_COMMANDS, action_space_joints]
                        if self.joints["both"][0] in self.joints["hand"]
                        else [action_space_joints, AVAILABLE_HAND_COMMANDS]
                    )
        return action_space_joints
    def _create_action_space(self, dtype=np.float64):
        """Create the action space based on the action space size and the action bounds.
        Args:
            dtype (numpy.dtype, optional): The data type of the action space. Defaults
                to np.float64.
        Returns:
            :obj:`gym.spaces.Box`: The gymnasium action space.
        """
        # Retrieve action space joints if not supplied.
        self._action_space_joints = self._get_action_space_joints()
        # Set action space bounds based on control_type.
        arm_bounds_key = (
            "ee_pose"
            if self.robot_control_type == "end_effector"
            else (
                "joint_efforts"
                if self.robot_control_type == "effort"
                else "joint_positions"
            )
        )
        arm_action_bounds_low = {
            joint: val
            for joint, val in self._action_bounds[arm_bounds_key]["low"].items()
            if joint != "gripper_max_effort"
        }
        gripper_with_action_bound_low = {
            joint: val
            for joint, val in self._action_bounds["joint_positions"]["low"].items()
            if joint == "gripper_width"
        }
        gripper_max_effort_action_bound_low = {
            joint: val
            for joint, val in self._action_bounds["joint_efforts"]["low"].items()
            if joint == "gripper_max_effort"
        }
        action_bounds_low = shallow_dict_merge(
            arm_action_bounds_low,
            gripper_with_action_bound_low,
            gripper_max_effort_action_bound_low,
            order=self._action_space_joints,
        )
        arm_action_bounds_high = {
            joint: val
            for joint, val in self._action_bounds[arm_bounds_key]["high"].items()
            if joint != "gripper_max_effort"
        }
        gripper_with_action_bound_high = {
            joint: val
            for joint, val in self._action_bounds["joint_positions"]["high"].items()
            if joint == "gripper_width"
        }
        gripper_max_effort_action_bound_high = {
            joint: val
            for joint, val in self._action_bounds["joint_efforts"]["high"].items()
            if joint == "gripper_max_effort"
        }
        action_bounds_high = shallow_dict_merge(
            arm_action_bounds_high,
            gripper_with_action_bound_high,
            gripper_max_effort_action_bound_high,
            order=self._action_space_joints,
        )
        action_bound_low_filtered = np.array(
            [
                val
                for key, val in action_bounds_low.items()
                if key in self._action_space_joints
            ],
            dtype=dtype,
        )
        action_bound_high_filtered = np.array(
            [
                val
                for key, val in action_bounds_high.items()
                if key in self._action_space_joints
            ],
            dtype=dtype,
        )
        # Create action space.
        return spaces.Box(
            action_bound_low_filtered,
            action_bound_high_filtered,
            shape=action_bound_low_filtered.shape,
            dtype=dtype,
        )
    def _set_panda_configuration(self, joint_names, joint_positions):
        """Set the panda robot to a given configuration.
        Args:
            joint_positions (list): The desired joint positions.
            joint_names (list): The joint names for which you want to set the joint
                position.
        Returns:
            bool: Whether the panda configuration was successfully set.
        .. note::
            Here we use the ``/set_franka_model_configuration`` service instead of the
            Gazebo's ``set_model_configuration``. This was done because the latter
            causes the reported joint positions outside the joint limits. For more
            information, see
            `https://github.com/frankaemika/franka_ros/issues/225 <https://github.com/frankaemika/franka_ros/issues/225>`_.
        """  # noqa: E501
        if hasattr(self, "_set_franka_model_configuration_srv"):
            # Unlock locked joints.
            if self._locked_arm_joints:
                self.unlock_joints(self._locked_arm_joints)
            # Set joint configuration.
            resp = self._set_franka_model_configuration_srv.call(
                self.franka_msgs.srv.SetJointConfigurationRequest(
                    joint_names=joint_names, joint_positions=joint_positions
                )
            )
            retval = resp.success
            # Lock locked joints.
            if self._locked_arm_joints:
                self.lock_joints(self._locked_arm_joints)
        else:
            rospy.logwarn_once(
                "Panda configuration not set as 'set_franka_model_configuration' "
                "service is not available."
            )
            retval = False
        return retval
    ################################################
    # Overload Robot env virtual methods ###########
    ################################################
    # NOTE: Methods that need to be implemented as they are called by the robot and
    # gazebo environments.
    def _compute_reward(self, observations, done):
        """Compute the reward.
        Args:
            observations (dict): Dictionary containing the observations.
            done (bool): Boolean specifying whether an episode is terminated.
        Returns:
            :obj:`numpy.float32`: Reward that is received by the agent.
        .. attention::
            The ``done`` argument is not used for computing the reward in this task
            environment.
        """
        # Calculate the rewards based on the distance from the goal.
        d = self._goal_distance(observations["achieved_goal"], self.goal)
        if self._reward_type == "sparse":
            if self._collision_penalty != 0.0 and self.in_collision:
                reward = np.float32(-1.0)
            else:
                reward = -(d > self._distance_threshold).astype(np.float32)
            self._step_debug_logger("=Reward info=")
            self._step_debug_logger("Reward type: Non sparse")
            self._step_debug_logger("Goal: %s", self.goal)
            self._step_debug_logger("Achieved goal: %s", observations["achieved_goal"])
            self._step_debug_logger("Perpendicular distance: %s", d)
            self._step_debug_logger("Threshold: %s", self._distance_threshold)
            self._step_debug_logger(
                "Received reward: %s",
                reward,
            )
            return np.abs(reward) if self._positive_reward else reward
        else:
            self._step_debug_logger("=Reward info=")
            self._step_debug_logger("Reward type: Sparse")
            self._step_debug_logger("Goal: %s", self.goal)
            self._step_debug_logger("Achieved goal: %s", observations["achieved_goal"])
            self._step_debug_logger("Perpendicular distance: %s", d)
            self._step_debug_logger("Threshold: %s", self._distance_threshold)
            self._step_debug_logger("Received reward: %s", -d)
            reward = -d
            if self._collision_penalty != 0.0 and self.in_collision:
                reward -= np.float64(self._collision_penalty)
            return np.abs(reward) if self._positive_reward else reward
    def _get_obs(self):
        """Get robot state observation.
        Returns:
            tuple containing:
                - observation array (:obj:`list`):
                    List of observations:
                        - End effector x position
                        - End effector y position
                        - End effector z position
                        - End effector joints positions
                        - End effector joints velocities
                - achieved_goal (:obj:`object`): The goal that was achieved during
                  execution.
                - desired_goal (:obj:`object`): The desired goal that we asked the agent
                  to attempt to achieve.
        """
        ee_pose = self.get_ee_pose()
        ee_position = [
            ee_pose.pose.position.x,
            ee_pose.pose.position.y,
            ee_pose.pose.position.z,
        ]
        robot_qpos, robot_qvel = self._robot_get_obs()
        ee_state = [
            pos
            for pos, name in zip(robot_qpos, self.joint_states.name)
            if "panda_finger" in name
        ]
        ee_vel = [
            vel
            for vel, name in zip(robot_qvel, self.joint_states.name)
            if "panda_finger" in name
        ]
        obs = np.concatenate(
            [
                ee_position,
                ee_state,
                ee_vel,
            ],
            dtype=self._observation_space_dtype,
        )
        achieved_goal = np.array(ee_position, dtype=self._observation_space_dtype)
        desired_goal = self.goal.astype(self._observation_space_dtype)
        return {
            "observation": obs,
            "achieved_goal": achieved_goal,
            "desired_goal": desired_goal,
        }
    def _get_info(self):
        """Returns a dictionary with additional step information.
        Returns:
            dict: Dictionary with additional information.
        """
        ee_pose = self.get_ee_pose()
        ee_position = [
            ee_pose.pose.position.x,
            ee_pose.pose.position.y,
            ee_pose.pose.position.z,
        ]
        info = {
            "reference": self.goal,
            "state_of_interest": ee_position,
            "reference_error": ee_position - self.goal,
        }
        return info
    def _set_action(self, action):
        """Take robot action.
        Args:
            action (numpy.ndarray): List containing joint or ee action commands.
        """
        # Throw error and shutdown if action space is not the right size.
        if not action.shape == self.action_space.shape:
            err_msg = (
                f"Shutting down '{rospy.get_name()}' since the shape of the supplied "
                f"action {action.shape} while the gymnasium action space has shape "
                f"{self.action_space.shape}."
            )
            ros_exit_gracefully(shutdown_msg=err_msg, exit_code=1)
        # Change action dtype if needed and throw one time warning.
        if action.dtype != self._action_space_dtype:
            if not self._action_dtype_conversion_warning:
                rospy.logwarn(
                    "The data type of the action that is supplied to the "
                    f"'ros_gazebo_gym:{self.spec.id}' environment ({action.dtype}) "
                    "does not match the data type of the action space "
                    f"({self._action_space_dtype.__name__}). The action data type will "
                    "be converted to the action space data type."
                )
                self._action_dtype_conversion_warning = True
            action = action.astype(self._action_space_dtype)
        # Send action commands to the controllers based on control type.
        action_dict = dict(zip(self._action_space_joints, action))
        self._step_debug_logger("=Action set info=")
        self._step_debug_logger("Action that is set:")
        self._step_debug_logger(list(action_dict.values()))
        if self.robot_control_type in ["end_effector", "trajectory"]:
            gripper_with = action_dict.pop("gripper_width", None)
            gripper_max_effort = action_dict.pop("gripper_max_effort", None)
            if self.robot_control_type == "end_effector":
                self.set_ee_pose(action_dict)
                if self._load_gripper and not self._lock_gripper:
                    self.set_gripper_width(
                        gripper_with,
                        wait=self._hand_wait,
                        max_effort=gripper_max_effort,
                    )
            else:
                self.set_arm_joint_trajectory(action_dict, wait=self._arm_wait)
                if self._load_gripper and not self._lock_gripper:
                    self.set_gripper_width(
                        gripper_with,
                        wait=self._hand_wait,
                        max_effort=gripper_max_effort,
                    )
        else:
            if self._load_gripper and self._lock_gripper:
                action_dict.pop("gripper_width", None)
                action_dict.pop("gripper_max_effort", None)
            self.set_joint_commands(
                action_dict, arm_wait=self._arm_wait, hand_wait=self._hand_wait
            )
    def _is_done(self, observations):
        """Check if task is done.
        Args:
            observations (dict): Dictionary containing the observations
        Returns:
            bool: Boolean specifying whether the episode is done (e.i. distance to the
                goal is within the distance threshold, robot has fallen etc.).
        """
        # Check if gripper is within range of the goal.
        d = self._goal_distance(observations["achieved_goal"], self.goal)
        is_done = d < self._distance_threshold
        self._step_debug_logger("=Task is done info=")
        if self._target_hold:
            if is_done:
                self._is_done_samples += 1
                if self._is_done_samples < self._hold_samples:
                    is_done = False
                    self._step_debug_logger(
                        f"Agent spend '{self._is_done_samples}' within target position."
                    )
                else:
                    self._step_debug_logger("Task is done.")
                    self._is_done_samples = 0
            else:
                self._step_debug_logger("Task is not done.")
                self._is_done_samples = 0
        else:
            if is_done:
                self._step_debug_logger("Task is done.")
            else:
                self._step_debug_logger("Task is not done.")
        return is_done
    def _sample_goal(self):
        """Sample a random goal position.
        Returns:
            :obj:`geometry_msgs.PoseStamped`: A goal pose.
        """
        rospy.logdebug("Sampling goal.")
        if self._target_sampling_strategy == "global":
            goal = self.np_random.uniform(
                [
                    self._target_sampling_bounds["x_min"],
                    self._target_sampling_bounds["y_min"],
                    self._target_sampling_bounds["z_min"],
                ],
                [
                    self._target_sampling_bounds["x_max"],
                    self._target_sampling_bounds["y_max"],
                    self._target_sampling_bounds["z_max"],
                ],
                size=3,
            )
        elif self._target_sampling_strategy == "local":  # Rel to current EE pose.
            try:
                cur_ee_pose = self.get_ee_pose()
                cur_ee_position = np.array(
                    [
                        cur_ee_pose.pose.position.x,
                        cur_ee_pose.pose.position.y,
                        cur_ee_pose.pose.position.z,
                    ]
                )
            except EePoseLookupError:
                err_msg = (
                    f"Shutting down '{rospy.get_name()}' since the current end "
                    "effector pose which is needed for sampling the goals could "
                    "not be retrieved."
                )
                ros_exit_gracefully(shutdown_msg=err_msg, exit_code=1)
            # Sample goal relative to end effector pose.
            goal = cur_ee_position + self.np_random.uniform(
                [
                    self._target_sampling_bounds["x_min"],
                    self._target_sampling_bounds["y_min"],
                    self._target_sampling_bounds["z_min"],
                ],
                [
                    self._target_sampling_bounds["x_max"],
                    self._target_sampling_bounds["y_max"],
                    self._target_sampling_bounds["z_max"],
                ],
                size=cur_ee_position.shape,
            )
        elif self._target_sampling_strategy == "fixed":
            goal = np.array(list(self._fixed_target_pose.values()))
        else:  # Thrown error if goal could not be sampled.
            err_msg = (
                f"Shutting down '{rospy.get_name()}' since no goal could be sampled "
                f"as '{self._target_sampling_strategy}' is not a valid goal sampling "
                "strategy. Options are 'global' and 'local'."
            )
            ros_exit_gracefully(shutdown_msg=err_msg, exit_code=1)
        # Make sure the goal is always within the sampling region.
        if self._target_sampling_strategy != "fixed":
            goal = self._clip_goal_position(goal)
        rospy.logdebug("Goal: %s" % goal)
        # Apply offset and return goal.
        goal += np.array(
            [
                self._target_offset["x"],
                self._target_offset["y"],
                self._target_offset["z"],
            ]
        )
        return goal
    def _visualize_goal(self, offset=[0.0, 0.0, 0.0]):
        """Visualize RViz goal marker.
        Args:
            offset (list): Position offset for visualizing the goal.
        """
        goal = self.goal + offset
        goal_marker_msg = TargetMarker(x=goal[0], y=goal[1], z=goal[2], id=3)
        self._target_pose_marker_pub.publish(goal_marker_msg)
    def _set_init_pose(self):  # noqa: C901
        """Sets the Robot in its (random) initial pose.
        Returns:
            bool: Boolean specifying whether the initial pose was successfully set.
        .. note::
           The pose is sampled based on the ``pose_sampling_type`` variable set in the
           task environment configuration file. Options are ``end_effector_pose``,
           which creates a random pose by sampling EE poses, and ``joint_positions``,
           which makes a random pose by sampling joint positions. These methods respect
           the ``bounds`` set in the task environment config.
        """
        if self._reset_init_pose:
            init_type = "random " if self._random_init_pose else ""
            rospy.loginfo(f"Setting {init_type}init pose...")
            if self._random_init_pose:  # Use random initial model configuration.
                if (
                    self._pose_sampling_type == "joint_positions"
                ):  # Sample random joint positions.
                    rospy.logdebug("Retrieve random joint positions.")
                    random_joint_positions = self._get_random_joint_positions()
                    if random_joint_positions:
                        if self._load_gripper:
                            random_joint_positions["gripper_width"] = (
                                split_pose_dict(self._init_pose)[1]["gripper_width"]
                                if self._lock_gripper
                                else random_joint_positions.pop(
                                    self.joints["hand"][0], 0.0
                                )
                                * 2
                            )
                            random_joint_positions = {
                                joint: position
                                for joint, position in random_joint_positions.items()
                                if joint not in self.joints["hand"]
                            }
                else:  # Sample random EE poses.
                    if (
                        self._randomize_first_episode or self.episode_num != 0
                    ):  # Retrieve random EE pose.
                        rospy.logdebug("Retrieve random EE pose.")
                        self._set_panda_configuration(
                            joint_names=self.joints["both"],
                            joint_positions=PANDA_REST_CONFIGURATION[
                                : len(self.joints["both"])
                            ],
                        )  # NOTE: Done as joint conflicts might prevent planning.
                        (
                            random_ee_pose,
                            random_joint_positions,
                        ) = self._get_random_ee_pose()
                    else:  # Use fixed initial arm/hand initial EE pose.
                        rospy.logdebug("Retrieving initial EE pose.")
                        random_ee_pose = split_pose_dict(self._init_pose)[0]
                        random_joint_positions = self.get_ee_pose_joint_config(
                            random_ee_pose
                        )
                    # Apply init pose offset.
                    if random_ee_pose and sum(self._init_pose_offset.values()) != 0.0:
                        rospy.logdebug("Applying offset to initial pose.")
                        random_ee_pose["x"] += self._init_pose_offset["x"]
                        random_ee_pose["y"] += self._init_pose_offset["y"]
                        random_ee_pose["z"] += self._init_pose_offset["z"]
                        # Retrieve model configuration for the new EE pose.
                        ee_pose_joint_positions = self.get_ee_pose_joint_config(
                            random_ee_pose
                        )
                        if not ee_pose_joint_positions:
                            rospy.logwarn(
                                "Could not retrieve a model configuration that relates "
                                "to the EE pose with the offset. As a result the model "
                                "configuration for the EE pose without the offset is "
                                "used."
                            )
                        else:
                            random_joint_positions = ee_pose_joint_positions
                    # Retrieve random gripper width.
                    if self._load_gripper:
                        random_joint_positions["gripper_width"] = (
                            split_pose_dict(self._init_pose)[1]["gripper_width"]
                            if self._lock_gripper
                            else self._get_random_joint_positions().pop(
                                self.joints["hand"][0], 0.0
                            )
                            * 2
                        )
            else:  # Use fixed initial pose.
                if self._pose_sampling_type == "joint_positions":
                    rospy.logdebug("Retrieving initial joint positions.")
                    _, self._init_model_configuration = split_pose_dict(self._init_pose)
                    if not self._load_gripper:
                        self._init_model_configuration.pop("gripper_width", None)
                else:
                    rospy.logdebug("Retrieving initial EE pose.")
                    random_joint_positions = self.get_ee_pose_joint_config(
                        split_pose_dict(self._init_pose)[0]
                    )
                    if random_joint_positions:
                        if self._load_gripper:
                            random_joint_positions["gripper_width"] = split_pose_dict(
                                self._init_pose
                            )[1]["gripper_width"]
                    else:
                        random_joint_positions = dict(
                            zip(
                                self.joints["both"],
                                PANDA_REST_CONFIGURATION[: len(self.joints["both"])],
                            )
                        )
                        rospy.logwarn_once(
                            "No valid joint positions could be retrieved for the EE "
                            "pose specified in the 'init_pose' field of the task "
                            "configuration file. As a "
                            "result the fallback model configuration was used "
                            "'{}'.".format(random_joint_positions)
                        )
            # Check if a valid model configuration was found.
            if random_joint_positions:
                self._init_model_configuration = random_joint_positions
            else:
                pose_sampling_type_str = (
                    "joint positions"
                    if self._pose_sampling_type == "joint_positions"
                    else "EE pose"
                )
                if self._init_model_configuration:
                    rospy.logwarn(
                        f"Unable to retrieve a valid random {pose_sampling_type_str}. "
                        "Please ensure that you have set valid pose_sampling bounds in "
                        "the task environment config file "
                        f"'{str(self._config_file_path)}'. The initial model "
                        "configuration from the previous episode was used as a result."
                    )
                else:
                    self._init_model_configuration = dict(
                        zip(
                            self.joints["both"],
                            PANDA_REST_CONFIGURATION[: len(self.joints["both"])],
                        )
                    )
                    rospy.logwarn(
                        f"Unable to retrieve a valid random {pose_sampling_type_str}. "
                        "Please ensure that you have set valid pose_sampling bounds in "
                        "the task environment config file "
                        f"'{str(self._config_file_path)}'. Because no previous "
                        "initial model configuration was available the fallback model "
                        f"configuration '{self._init_model_configuration}' was used."
                    )
            # Convert gripper width to joint positions.
            if self._load_gripper:
                self._init_model_configuration = (
                    gripper_width_2_finger_joints_positions(
                        self._init_model_configuration, self.joints["hand"]
                    )
                )
            # Set initial model configuration and return result
            # NOTE: Here two modes can be used: MoveIT or Gazebo's
            # 'set_model_configuration' service.
            if not self._moveit_init_pose_control:  # Use Gazebo service.
                rospy.loginfo("Setting initial robot pose.")
                init_pose_retval = self._set_panda_configuration(
                    joint_names=self._init_model_configuration.keys(),
                    joint_positions=self._init_model_configuration.values(),
                )
                if not init_pose_retval:
                    rospy.logwarn("Setting initial robot pose failed.")
                return init_pose_retval
            else:  # Use MoveIt.
                if hasattr(self, "_moveit_set_joint_positions_srv"):
                    prev_control_type = self.robot_control_type
                    self.robot_control_type = "trajectory"
                    # Unlock locked joints.
                    if self._locked_arm_joints:
                        self.unlock_joints(self._locked_arm_joints)
                    # Set initial model configuration.
                    resp = self._moveit_set_joint_positions_srv.call(
                        self.panda_gazebo.srv.SetJointPositionsRequest(
                            joint_names=self._init_model_configuration.keys(),
                            joint_positions=self._init_model_configuration.values(),
                            wait=True,
                        )
                    )
                    self.robot_control_type = prev_control_type
                    if not resp.success:
                        rospy.logwarn("Setting initial robot pose failed.")
                    # Lock locked joints.
                    if self._locked_arm_joints:
                        self.lock_joints(self._locked_arm_joints)
                    return resp.success
                rospy.logwarn(
                    "The initial pose failed since the {} service was not "
                    "available.".format(MOVEIT_SET_JOINT_POSITIONS_TOPIC)
                )
                return False
        return True
    def _init_env_variables(self):
        """Inits variables needed to be initialized each time we reset at the start
        of an episode.
        """
        # Sample and visualize goal.
        self.goal = self._sample_goal()
        if self._visualize_target:
            self._visualize_goal()