Source code for panda_autograsp.moveit_planner_server_ros

"""Module which sets up a number of services that can
be used to control the Panda Emika Franka robot.
"""

# Make script both python2 and python3 compatible
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

try:
    input = raw_input
except NameError:
    pass

# Main python packages
import sys
import os
import copy
import time
from autolab_core import YamlConfig

# ROS python packages
import rospy
import moveit_commander
from moveit_commander import MoveItCommanderException

# ROS messages and services
from moveit_msgs.msg import DisplayTrajectory, RobotTrajectory
from moveit_msgs.srv import ApplyPlanningScene

# Panda_autograsp modules, msgs and srvs
from panda_autograsp.functions.moveit import (
    get_trajectory_duration,
    plan_exists,
    at_joint_target,
    add_collision_objects,
)
from panda_autograsp.srv import (
    ExecutePlan,
    PlanToJoint,
    PlanToPoint,
    PlanToPath,
    PlanToRandomPoint,
    PlanToRandomJoint,
    PlanToRandomPath,
    VisualizePlan,
    CloseGripper,
    OpenGripper,
    SetGripperWidth,
    SetGripperOpen,
    SetGripperClosed,
    PlanGripper,
    ExecuteGripperPlan,
)

#################################################
# Script parameters #############################
#################################################

# Get file path
FILE_PATH = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))

# Read panda_autograsp configuration file
MAIN_CFG = YamlConfig(
    os.path.abspath(os.path.join(FILE_PATH, "../../cfg/main_config.yaml"))
)
POINT_N_STEP = MAIN_CFG["planning"]["point"]["point_n_step"]
EEF_STEP = MAIN_CFG["planning"]["cartesian"]["eef_step"]
JUMP_THRESHOLD = MAIN_CFG["planning"]["cartesian"]["jump_threshold"]

# Read collision object configuration file
COLLISION_OBJ_CFG = YamlConfig(
    os.path.abspath(os.path.join(FILE_PATH, "../../cfg/moveit_scene_constraints.yaml"))
)


#################################################
# MoveitPlannerServer class #####################
#################################################
[docs]class MoveitPlannerServer: """Class used for controlling the Panda Emika Franka robot. Attributes ------------ robot : :py:obj:`!moveit_commander.RobotCommander` The moveit robot commander. move_group : :py:obj:`!moveit_commander.MoveGroupCommander` The main robot move group. move_group_gripper : :py:obj:`!moveit_commander.MoveGroupCommander` The gripper move group. scene : :py:obj:`!moveit_commander.PlanningSceneInterface` The moveit scene commander. current_plan : The last computed plan of the main move group. current_plan_gripper : The last computed plan of the gripper. desired_pose : :py:obj:`list` The main move group goal pose. desired_joint_values: :py:obj:`list` The main move group target joint values. desired_gripper_joint_values : :py:obj:`list` The gripper target joint values. """
[docs] def __init__( self, robot_description, args, move_group="panda_arm", move_group_end_effector_link="panda_gripper_center", move_group_gripper="hand", pose_reference_frame="panda_link0", planner="TRRTkConfigDefault", add_scene_collision_objects=True, ): """PandaPathPlannerService class initialization. Parameters ---------- robot_description : :py:obj:`str` Where to find the URDF. args : objects Roscpp args, passed on. move_group : :py:obj:`str` Name of the pose planning reference frame, by default "panda_link0". move_group_end_effector_link : :py:obj:`str` Name of the end effector link. move_group_gripper : :py:obj:`str` Name of the move group, by default "panda_arm_hand". pose_reference_frame : :py:obj:`str` Name of the planner reference frame. planner : :py:obj:`str`, optional The Path planning algorithm, by default 'TRRTkConfigDefault'. add_scene_collision_objects : :py:obj:`bool` If true the collision objects specified in the ``moveit_scene_constraints.yaml`` file will be added to the scene. """ # Initialize class attributes self.current_plan = RobotTrajectory() # Empty plan self.current_plan_gripper = RobotTrajectory() # Empty plan self.desired_pose = [] self.desired_joint_values = [] self.desired_gripper_joint_values = {} # initialize moveit_commander and robot commander moveit_commander.roscpp_initialize(args) # Connect to moveit services rospy.loginfo( "Connecting moveit default moveit 'apply_planning_scene' service." ) rospy.wait_for_service("apply_planning_scene") try: self._moveit_apply_planning_srv = rospy.ServiceProxy( "apply_planning_scene", ApplyPlanningScene ) rospy.loginfo("Moveit 'apply_planning_scene' service found!") except rospy.ServiceException as e: rospy.logerr( "Moveit 'apply_planning_scene' service initialization failed: %s" % e ) shutdown_msg = ( "Shutting down %s node because %s service connection failed." % (rospy.get_name(), self._moveit_apply_planning_srv.resolved_name) ) rospy.logerr(shutdown_msg) sys.exit(0) # Create robot commander # Used to get information about the robot and control it. self.robot = moveit_commander.RobotCommander( robot_description=robot_description, ns="/" ) rospy.logdebug("Robot Groups: %s", self.robot.get_group_names()) # Initiate main move_group try: # Get main move group self.move_group = self.robot.get_group(move_group) # Set move group settings self.move_group.set_pose_reference_frame(pose_reference_frame) self.move_group.set_planning_time( MAIN_CFG["planning"]["general"]["planning_time"] ) self.move_group.set_planner_id(planner) # Set end effector if ( move_group_end_effector_link in self.robot.get_link_names() ): # Try to set end to user specified link (Keep default if fails).k self.move_group.set_end_effector_link(move_group_end_effector_link) except MoveItCommanderException: self.move_group = None shutdown_msg = ( "Shutting down %s node because main move_group could not be created." "Please check if move group %s exists." % (rospy.get_name(), move_group) ) rospy.logerr(shutdown_msg) sys.exit(0) # Create gripper move_group try: self.move_group_gripper = self.robot.get_group(move_group_gripper) self.move_group_gripper.set_planning_time( MAIN_CFG["planning"]["general"]["planning_time"] ) # self.move_group_gripper.set_planner_id(planner) except MoveItCommanderException: self.move_group_gripper = None rospy.logwarn( "Gripper move_group creation failed. Check if move group %s exists. " % move_group ) # Create scene commanders # Used to get information about the world and update the robot # its understanding of the world. self.scene = moveit_commander.PlanningSceneInterface(ns="/") # Create a `DisplayTrajectory`_ ROS publisher to display the plan in RVIZ self._display_trajectory_publisher = rospy.Publisher( "/move_group/display_planned_path", DisplayTrajectory, queue_size=20 ) # Print some information about the planner configuration rospy.loginfo("Using planner: %s", planner) rospy.logdebug("Reference frame: %s", self.move_group.get_planning_frame()) rospy.logdebug("End effector: %s", self.move_group.get_end_effector_link()) rospy.logdebug("Current robot state: %s", self.robot.get_current_state()) # Add our custom services rospy.loginfo("Initializing %s services.", rospy.get_name()) rospy.Service( "%s/plan_to_point" % rospy.get_name()[1:], PlanToPoint, self.plan_to_point_service, ) rospy.Service( "%s/plan_to_joint" % rospy.get_name()[1:], PlanToJoint, self.plan_to_joint_service, ) rospy.Service( "%s/plan_to_path" % rospy.get_name()[1:], PlanToPath, self.plan_cartesian_path_service, ) rospy.Service( "%s/plan_random_pose" % rospy.get_name()[1:], PlanToRandomPoint, self.plan_random_pose_service, ) rospy.Service( "%s/plan_random_joint" % rospy.get_name()[1:], PlanToRandomJoint, self.plan_random_joint_service, ) rospy.Service( "%s/plan_random_path" % rospy.get_name()[1:], PlanToRandomPath, self.plan_random_cartesian_path_service, ) rospy.Service( "%s/visualize_plan" % rospy.get_name()[1:], VisualizePlan, self.visualize_plan_service, ) rospy.Service( "%s/execute_plan" % rospy.get_name()[1:], ExecutePlan, self.execute_plan_service, ) rospy.Service( "%s/plan_gripper" % rospy.get_name()[1:], PlanGripper, self.plan_gripper_service, ) rospy.Service( "%s/open_gripper" % rospy.get_name()[1:], OpenGripper, self.open_gripper_service, ) rospy.Service( "%s/close_gripper" % rospy.get_name()[1:], CloseGripper, self.close_gripper_service, ) rospy.Service( "%s/set_gripper_width" % rospy.get_name()[1:], SetGripperWidth, self.set_gripper_width_service, ) rospy.Service( "%s/set_gripper_open" % rospy.get_name()[1:], SetGripperOpen, self.set_gripper_open_service, ) rospy.Service( "%s/set_gripper_closed" % rospy.get_name()[1:], SetGripperClosed, self.set_gripper_closed_service, ) rospy.Service( "%s/execute_gripper_plan" % rospy.get_name()[1:], ExecuteGripperPlan, self.execute_gripper_plan_service, ) # Display service initiation success message rospy.loginfo( "'%s' services initialized successfully. Waiting for requests.", rospy.get_name(), ) # Wait some time to give moveit the time to initialize and rviz to load rospy.sleep(2) # Add collision objects to scene if add_scene_collision_objects: self._collision_obj_cfg = COLLISION_OBJ_CFG rospy.loginfo("Adding collision objects to the planning scene...") add_collision_objects(self.scene, self._collision_obj_cfg) rospy.loginfo("Collision objects added to the planning scene.")
[docs] def plan_to_joint_service(self, req): """Plan to a given joint position. Parameters ---------- req : :py:obj:`panda_autograsp.msg.PlanToJoint` The service request message containing the joint targets you want to plan to. Returns ------- Bool Boolean specifying whether the planning was successful. """ # Set joint targets and plan trajectory rospy.loginfo("Planning to: \n %s", req.target) self.current_plan = self.move_group.plan(joints=list(req.target)) self.desired_joint_values = req.target # Validate whether planning was successful rospy.logdebug( "Plan points: %d" % len(self.current_plan.joint_trajectory.points) ) if plan_exists(self.current_plan): rospy.loginfo("Plan to joint path planning successful.") return True else: rospy.logwarn("Plan to joint path planning failed.") return False
[docs] def plan_to_point_service(self, req): """Plan to a given pose. Parameters ---------- req : :py:obj:`panda_autograsp.msg.PlanToPoint` The service request message containing the pose you want to plan to. Returns ------- bool Boolean specifying whether the planning was successful. """ # Initialize local variables plans = [] points = [] # Set the target position rospy.loginfo("Planning to: \n %s", req.target) self.move_group.set_pose_target(req.target) self.desired_pose = req.target # Perform planning # Perform multiple time to get the best path. for i in range(POINT_N_STEP): plans.append(self.move_group.plan()) points.append(len(plans[i].joint_trajectory.points)) rospy.logdebug("Found plan %d with %d points" % (i, points[i])) # Find shortest path self.current_plan = plans[points.index(min(points))] rospy.logdebug( "Points of chosen plan: %d" % len(self.current_plan.joint_trajectory.points) ) # Validate whether planning was successful if plan_exists(self.current_plan): rospy.loginfo("Plan to point planning successful.") self.move_group.clear_pose_targets() # Clear pose targets return True else: rospy.logwarn("Plan to point planning failed.") self.move_group.clear_pose_targets() # Clear pose targets return False
[docs] def plan_cartesian_path_service(self, req): """Plan to a given cartesian path. Parameters ---------- req : :py:obj:`panda_autograsp.msg.PlanToPath` The service request message containing the path you want to plan to. Returns ------- bool Boolean specifying whether the planning was successful. """ # Plan cartesian path (self.current_plan, fraction) = self.move_group.compute_cartesian_path( req.waypoints, EEF_STEP, JUMP_THRESHOLD # waypoints to follow # eef_step ) # jump_threshold self.desired_pose = req.waypoints[-1] # Validate whether planning was successful if plan_exists(self.current_plan): rospy.loginfo("Cartesian path planning successful.") rospy.loginfo("%s %% of the path can be executed.", str(fraction * 100)) return True else: rospy.logwarn("Cartesian path planning failed.") return False
[docs] def plan_random_pose_service(self, req): """Plan to a random pose goal. Parameters ---------- req : :py:obj:`panda_autograsp.msg.PlanToRandomPoint` Empty service request. Returns ------- bool Returns a bool to specify whether the plan was executed successfully. """ # Create random pose rospy.loginfo("Creating random pose.") rand_pose = self.move_group.get_random_pose() # Plan to random pose def req(): None # Create dummy request function object req.target = rand_pose # set random pose as a property result = self.plan_to_point_service(req) # Remove req dummy function del req # Check whether path planning was successful if result: rospy.loginfo("Random pose path planning successful.") return True else: rospy.logwarn("Random pose path planning failed.") return False
[docs] def plan_random_joint_service(self, req): """Plan to a random joint goal. Parameters ---------- req : :py:obj:`panda_autograsp.msg.PlanToRandomJoint` Empty service request. Returns ------- bool Returns a bool to specify whether the plan was executed successfully. """ # Create random pose rospy.loginfo("Creating random joint goal.") rand_joint = self.move_group.get_random_joint_values() # Plan to random pose def req(): None # Create dummy request function object req.target = rand_joint # set random joint goal as a property result = self.plan_to_joint_service(req) # Remove req dummy function del req # Check whether path planning was successful if result: rospy.loginfo("Random cartesian path planning successful.") return True else: rospy.logwarn("Random cartesian path planning failed.") return False
[docs] def plan_random_cartesian_path_service(self, req): """Plan to a random cartesian path. Parameters ---------- req : :py:obj:`panda_autograsp.msg.PlanToRandomPath` Empty service request. Returns ------- bool Returns a bool to specify whether the plan was executed successfully. """ # Create local variables waypoints = [] w_pose = self.move_group.get_current_pose().pose # Create a cartesian path for _ in range(req.n_waypoints): # Generate random pose rand_pose = self.move_group.get_random_pose() w_pose.position.x = rand_pose.pose.position.x w_pose.position.y = rand_pose.pose.position.y w_pose.position.z = rand_pose.pose.position.z waypoints.append(copy.deepcopy(w_pose)) # Plan cartesian path def req(): None # Create dummy request function object req.waypoints = waypoints # set random pose as a property result = self.plan_cartesian_path_service(req) # Remove req dummy function del req # Check whether path planning was successful if result: rospy.loginfo("Random cartesian path planning successful.") return True else: rospy.logwarn("Random cartesian path planning failed.") return False
[docs] def visualize_plan_service(self, req): """Visualize the plan that has been computed by the other plan services. Parameters ---------- req : :py:obj:`panda_autograsp.msg.VisualizePlan` Empty service request. Returns ------- bool Returns a bool to specify whether the plan was executed successfully. """ # Generate moveit_msgs if plan_exists(self.current_plan_gripper) or plan_exists(self.current_plan): # Display trajectory display_trajectory = DisplayTrajectory() display_trajectory.trajectory_start = self.robot.get_current_state() display_trajectory.trajectory.append(self.current_plan_gripper) display_trajectory.trajectory.append(self.current_plan) # Get visualization duration duration = get_trajectory_duration(display_trajectory) # Publish plan self._display_trajectory_publisher.publish(display_trajectory) # Sleep till trajectory is visualized time.sleep(duration) # Return success bool rospy.loginfo("Movement plan visualization successful.") return True else: rospy.logwarn("No movement plan available for the visualization.") return False
[docs] def execute_plan_service(self, req): """Execute the plan that has been computed for the main move group by the other plan services. Parameters ---------- req : :py:obj:`panda_autograsp.msg.ExecutePlan` Empty service request. Returns ------- bool Returns a bool to specify whether the plan was executed successfully. """ # Check if gripper plan is set if plan_exists(self.current_plan): # Execute plan result = self.move_group.execute(plan_msg=self.current_plan, wait=True) self.current_plan = ( RobotTrajectory() ) # Empty plan (Needed for visualization). # Check if execution was successful if result: rospy.loginfo("Plan execution was successful.") return True else: rospy.logwarn("Plan execution was unsuccessful.") return False # Return success bool else: # Check if the robot state is already at the desired state # joint_now = self.move_group.get_current_joint_values() # pose_now = self.move_group.get_current_pose() # # if (joint_now =) rospy.logwarn("No movement plan available for the execution.") return False
[docs] def plan_gripper_service(self, req): """Compute plan for the currently set gripper target joint values. Parameters ---------- req : :py:obj:`panda_autograsp.msg.PlanGripper.` Empty service request. Returns ------- bool Returns a bool to specify whether the plan was executed successfully. """ # Check if gripper controller exists if self.move_group_gripper is None: rospy.logwarn( "Gripper move group appears to be missing. As a result the gripper can " "not be controlled." ) return False # Plan for gripper command self.current_plan_gripper = self.move_group_gripper.plan( joints=self.desired_gripper_joint_values ) # Validate whether planning was successful if plan_exists(self.current_plan_gripper): rospy.loginfo("Gripper plan found.") return True # Return success bool elif at_joint_target( self.move_group_gripper.get_current_joint_values(), self.desired_gripper_joint_values.values(), self.move_group_gripper.get_goal_joint_tolerance(), ): # Plan empty because already at goal rospy.loginfo("Gripper already at desired location.") return True else: # Plan empty rospy.logwarn("No gripper plan found.") return False
[docs] def execute_gripper_plan_service(self, req): """Execute previously planned gripper plan. Parameters ---------- req : :py:obj:`panda_autograsp.msg.ExecuteGripperPlan.` Empty service request. Returns ------- bool Returns a bool to specify whether the plan was executed successfully. """ # Check if gripper controller exists if self.move_group_gripper is None: rospy.logwarn( "Gripper move group appears to be missing. As a result the gripper " "can not be controlled." ) return False # Check if gripper is already at desired location if at_joint_target( self.move_group_gripper.get_current_joint_values(), self.desired_gripper_joint_values.values(), self.move_group_gripper.get_goal_joint_tolerance(), ): # Plan empty because already at goal rospy.loginfo("Gripper already at desired location.") self.current_plan_gripper = RobotTrajectory() # Reset plan return True else: # Execute gripper plan result = self.move_group_gripper.execute( self.current_plan_gripper, wait=True ) if not result: rospy.logwarn("Gripper plan could not be executed.") self.current_plan_gripper = RobotTrajectory() # Reset plan return False else: self.current_plan_gripper = RobotTrajectory() # Reset plan return True
[docs] def open_gripper_service(self, req): """Open the gripper. Parameters ---------- req : :py:obj:`panda_autograsp.msg.OpenGripper.` Empty service request. Returns ------- bool Returns a bool to specify whether the plan was executed successfully. """ # Check if gripper controller exists if self.move_group_gripper is None: rospy.logwarn( "Gripper move group appears to be missing. As a result the gripper " "can not be controlled." ) return False # Set named target try: desired_values = self.move_group_gripper.get_named_target_values("open") except MoveItCommanderException as e: rospy.logwarn(e) return False # Plan and execute # Note: I used execute instead of go since it failed in some cases. plan = self.move_group_gripper.plan(joints=desired_values) result = self.move_group_gripper.execute(plan, wait=True) if not result: return False else: return True # Return success bool
[docs] def close_gripper_service(self, req): """Close the gripper. Parameters ---------- req : :py:obj:`panda_autograsp.msg.CloseGripper.` Empty service request. Returns ------- bool Returns a bool to specify whether the plan was executed successfully. """ # Check if gripper controller exists if self.move_group_gripper is None: rospy.logwarn( "Gripper move group appears to be missing. As a result the gripper " "can not be controlled." ) return False # Get named target try: desired_values = self.move_group_gripper.get_named_target_values("close") except MoveItCommanderException as e: rospy.logwarn(e) return False # Plan and execute # Note: I used execute instead of go since it failed in some cases. plan = self.move_group_gripper.plan(joints=desired_values) result = self.move_group_gripper.execute(plan, wait=True) if not result: return False else: return True # Return success bool
[docs] def set_gripper_open_service(self, req): """Set gripper joint targets values to open. Parameters ---------- req : :py:obj:`panda_autograsp.msg.SetGripperOpen.` Empty service request. Returns ------- bool Returns a bool to specify whether the plan was executed successfully. """ # Check if gripper controller exists if self.move_group_gripper is None: rospy.logwarn( "Gripper move group appears to be missing. As a result the gripper " "can not be controlled." ) return False # Get named target try: desired_values = self.move_group_gripper.get_named_target_values("open") except MoveItCommanderException as e: rospy.logwarn(e) return False # Set gripper target value to open try: # Save desired joint targets self.desired_gripper_joint_values = desired_values return True except MoveItCommanderException as e: rospy.logwarn(e) return False
[docs] def set_gripper_closed_service(self, req): """Set gripper joint target values to closed. Parameters ---------- req : :py:obj:`panda_autograsp.msg.SetGripperClosed.` Empty service request. Returns ------- bool Returns a bool to specify whether the plan was executed successfully. """ # Check if gripper controller exists if self.move_group_gripper is None: rospy.logwarn( "Gripper move group appears to be missing. As a result the gripper " "can not be controlled." ) return False # Get named target try: desired_values = self.move_group_gripper.get_named_target_values("close") except MoveItCommanderException as e: rospy.logwarn(e) return False # Get gripper target value to closed try: # Save desired joint targets self.desired_gripper_joint_values = desired_values return True except MoveItCommanderException as e: rospy.logwarn(e) return False
[docs] def set_gripper_width_service(self, req): """Set gripper joint target values to a given value. Parameters ---------- req : :py:obj:`panda_autograsp.msg.SetGripperWidth.` This message specifies the gripper width. Returns ------- bool Returns a bool to specify whether the plan was executed successfully. """ # Check if gripper controller exists if self.move_group_gripper is None: rospy.logwarn( "Gripper move group appears to be missing. As a result the gripper " "can not be controlled." ) return False # Try to set the joint values dict_keys = self.move_group_gripper.get_named_target_values("open").keys() dict_values = [req.gripper_width / 2.0] * 2 try: # Save desired joint targets self.desired_gripper_joint_values = dict(zip(dict_keys, dict_values)) return True except MoveItCommanderException as e: rospy.logwarn(e) return False