#!/usr/bin/env python
"""This node contains the command line interface (CLI) of the panda_autograsp
autonomous grasping solution.
"""
# Make script both python2 and python3 compatible
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
try:
input = raw_input
except NameError:
pass
# Main python packages
import rospy
import sys
# Panda_autograsp modules, msgs and srvs
from panda_autograsp.srv import (
ComputeGrasp,
PlanGrasp,
VisualizeGrasp,
ExecuteGrasp,
CalibrateSensor,
SetGripperOpen,
SetGripperClosed,
PlanGripper,
ExecuteGripperPlan,
CloseGripper,
SetGripperWidth,
)
from panda_autograsp.functions import yes_or_no
#################################################
# PandaAutograspCLI class #######################
#################################################
[docs]class PandaAutograspCLI:
"""Panda_autograsp command line interface class."""
[docs] def __init__(self):
"""Panda_autograsp command line interface initiation"""
#########################################
# Welcome message #######################
#########################################
print("----- PANDA_AUTOGRASP_PACKAGE_CLI -----")
print("| Welcome to the panda_autograsp cli. |")
print("---------------------------------------")
print("| INFO: Press ctrl+D, or ctrl+c+enter |")
print("| to close the client |")
print("---------------------------------------")
print("")
#########################################
# Initialize needed services ############
#########################################
# Initialize camera calibration service
rospy.loginfo("Connecting to the panda_autograsp services...")
rospy.logdebug("Connecting to 'calibrate_sensor' service...")
rospy.wait_for_service("calibrate_sensor")
try:
self._calibrate_sensor_srv = rospy.ServiceProxy(
"calibrate_sensor", CalibrateSensor
)
rospy.logdebug("Connected to 'calibrate_sensor' service.")
except rospy.ServiceException as e:
rospy.logerr(
"Panda_autograsp 'calibrate_sensor' service initialization failed: %s"
% e
)
shutdown_msg = (
"Shutting down %s node because %s service connection failed."
% (rospy.get_name(), self._calibrate_sensor_srv.resolved_name)
)
rospy.logerr(shutdown_msg)
sys.exit(0)
# Initialize request grasp service
rospy.logdebug("Connecting to 'compute_grasp' service...")
rospy.wait_for_service("compute_grasp")
try:
self._compute_grasp_srv = rospy.ServiceProxy("compute_grasp", ComputeGrasp)
rospy.logdebug("Connected to 'compute_grasp' service.")
except rospy.ServiceException as e:
rospy.logerr(
"Panda_autograsp 'compute_grasp' service initialization failed: %s" % e
)
shutdown_msg = (
"Shutting down %s node because %s service connection failed."
% (rospy.get_name(), self._compute_grasp_srv.resolved_name)
)
rospy.logerr(shutdown_msg)
sys.exit(0)
# Initialize moveit_planner server services
rospy.logdebug("Connecting to 'plan_grasp' service...")
rospy.wait_for_service("plan_grasp")
try:
self._plan_grasp_srv = rospy.ServiceProxy("plan_grasp", PlanGrasp)
rospy.logdebug("Connected to 'plan_grasp' service.")
except rospy.ServiceException as e:
rospy.logerr(
"Panda_autograsp 'plan_grasp' service initialization failed: %s" % e
)
shutdown_msg = (
"Shutting down %s node because %s service connection failed."
% (rospy.get_name(), self._plan_grasp_srv.resolved_name)
)
rospy.logerr(shutdown_msg)
sys.exit(0)
# Initialize plan visualization service
rospy.logdebug("Connecting to 'visualize_grasp' service...")
rospy.wait_for_service("visualize_grasp")
try:
self._visualize_grasp_srv = rospy.ServiceProxy(
"visualize_grasp", VisualizeGrasp
)
rospy.logdebug("Connected to 'visualize_grasp' service.")
except rospy.ServiceException as e:
rospy.logerr(
"Panda_autograsp 'visualize_grasp' service initialization failed: %s"
% e
)
shutdown_msg = (
"Shutting down %s node because %s service connection failed."
% (rospy.get_name(), self._visualize_grasp_srv.resolved_name)
)
rospy.logerr(shutdown_msg)
sys.exit(0)
# Initialize execute plan service
rospy.logdebug("Connecting to 'execute_grasp' service...")
rospy.wait_for_service("execute_grasp")
try:
self._execute_grasp_srv = rospy.ServiceProxy("execute_grasp", ExecuteGrasp)
rospy.logdebug("Connected to 'execute_grasp' service.")
except rospy.ServiceException as e:
rospy.logerr(
"Panda_autograsp 'execute_grasp' service initialization failed: %s" % e
)
shutdown_msg = (
"Shutting down %s node because %s service connection failed."
% (rospy.get_name(), self._execute_grasp_srv.resolved_name)
)
rospy.logerr(shutdown_msg)
sys.exit(0)
# Initialize plan_place service
rospy.logdebug("Connecting to 'plan_place' service...")
rospy.wait_for_service("plan_place")
try:
self._plan_place_srv = rospy.ServiceProxy("plan_place", CloseGripper)
rospy.logdebug("Connected to 'plan_place' service.")
except rospy.ServiceException as e:
rospy.logerr(
"Panda_autograsp 'plan_place' service initialization failed: %s" % e
)
shutdown_msg = (
"Shutting down %s node because %s service connection failed."
% (rospy.get_name(), self._plan_place_srv.resolved_name)
)
rospy.logerr(shutdown_msg)
sys.exit(0)
# Initialize moveit_planner_server/set_gripper_open service
rospy.logdebug(
"Connecting to 'moveit_planner_server/set_gripper_open' service..."
)
rospy.wait_for_service("moveit_planner_server/set_gripper_open")
try:
self._set_gripper_open_srv = rospy.ServiceProxy(
"moveit_planner_server/set_gripper_open", SetGripperOpen
)
rospy.logdebug(
"Connected to 'moveit_planner_server/set_gripper_open' service."
)
except rospy.ServiceException as e:
rospy.logerr(
"Panda_autograsp 'set_gripper_open' service initialization failed: %s"
% e
)
shutdown_msg = (
"Shutting down %s node because %s service connection failed."
% (rospy.get_name(), self._set_gripper_open_srv.resolved_name)
)
rospy.logerr(shutdown_msg)
sys.exit(0)
# Initialize moveit_planner_server/set_gripper_closed service
rospy.logdebug(
"Connecting to 'moveit_planner_server/set_gripper_closed' service..."
)
rospy.wait_for_service("moveit_planner_server/set_gripper_closed")
try:
self._set_gripper_closed_srv = rospy.ServiceProxy(
"moveit_planner_server/set_gripper_closed", SetGripperClosed
)
rospy.logdebug(
"Connected to 'moveit_planner_server/set_gripper_closed' service."
)
except rospy.ServiceException as e:
rospy.logerr(
"Panda_autograsp 'moveit_planner_server/set_gripper_closed' service "
"initialization failed: %s" % e
)
shutdown_msg = (
"Shutting down %s node because %s service connection failed."
% (rospy.get_name(), self._set_gripper_closed_srv.resolved_name)
)
rospy.logerr(shutdown_msg)
sys.exit(0)
# Initialize moveit_planner_server/plan_gripper service
rospy.logdebug("Connecting to 'moveit_planner_server/plan_gripper' service...")
rospy.wait_for_service("moveit_planner_server/plan_gripper")
try:
self._plan_gripper_srv = rospy.ServiceProxy(
"moveit_planner_server/plan_gripper", PlanGripper
)
rospy.logdebug("Connected to 'moveit_planner_server/plan_gripper' service.")
except rospy.ServiceException as e:
rospy.logerr(
"Panda_autograsp 'moveit_planner_server/plan_gripper' service "
"initialization failed: %s" % e
)
shutdown_msg = (
"Shutting down %s node because %s service connection failed."
% (rospy.get_name(), self._plan_gripper_srv.resolved_name)
)
rospy.logerr(shutdown_msg)
sys.exit(0)
# Initialize moveit_planner_server/execute_gripper_plan service
rospy.logdebug(
"Connecting to 'moveit_planner_server/execute_gripper_plan' " "service..."
)
rospy.wait_for_service("moveit_planner_server/execute_gripper_plan")
try:
self._execute_gripper_plan_srv = rospy.ServiceProxy(
"moveit_planner_server/execute_gripper_plan", ExecuteGripperPlan
)
rospy.logdebug(
"Connected to 'moveit_planner_server/execute_gripper_plan' " "service."
)
except rospy.ServiceException as e:
rospy.logerr(
"Panda_autograsp 'moveit_planner_server/execute_gripper_plan' service "
"initialization "
"failed: %s" % e
)
shutdown_msg = (
"Shutting down %s node because %s service connection failed."
% (rospy.get_name(), self._execute_gripper_plan_srv.resolved_name)
)
rospy.logerr(shutdown_msg)
sys.exit(0)
# Initialize moveit_planner_server/close_gripper service
rospy.logdebug("Connecting to 'moveit_planner_server/close_gripper' service...")
rospy.wait_for_service("moveit_planner_server/close_gripper")
try:
self._close_gripper_srv = rospy.ServiceProxy(
"moveit_planner_server/close_gripper", CloseGripper
)
rospy.logdebug(
"Connected to 'moveit_planner_server/close_gripper' " "service."
)
except rospy.ServiceException as e:
rospy.logerr(
"Panda_autograsp 'moveit_planner_server/close_gripper' service "
"initialization failed: %s" % e
)
shutdown_msg = (
"Shutting down %s node because %s service connection failed."
% (rospy.get_name(), self._close_gripper_srv.resolved_name)
)
rospy.logerr(shutdown_msg)
sys.exit(0)
# Initialize moveit_planner_server/open_gripper service
rospy.logdebug("Connecting to 'moveit_planner_server/open_gripper' service...")
rospy.wait_for_service("moveit_planner_server/open_gripper")
try:
self._open_gripper_srv = rospy.ServiceProxy(
"moveit_planner_server/open_gripper", CloseGripper
)
rospy.logdebug("Connected to 'moveit_planner_server/open_gripper' service.")
except rospy.ServiceException as e:
rospy.logerr(
"Panda_autograsp 'moveit_planner_server/open_gripper' service "
"initialization failed: %s" % e
)
shutdown_msg = (
"Shutting down %s node because %s service connection failed."
% (rospy.get_name(), self._open_gripper_srv.resolved_name)
)
rospy.logerr(shutdown_msg)
sys.exit(0)
# Initialize moveit_planner_server/set_gripper_width service
rospy.logdebug(
"Connecting to 'moveit_planner_server/set_gripper_width' " "service..."
)
rospy.wait_for_service("moveit_planner_server/set_gripper_width")
try:
self._set_gripper_width_srv = rospy.ServiceProxy(
"moveit_planner_server/set_gripper_width", SetGripperWidth
)
rospy.logdebug(
"Connected to 'moveit_planner_server/set_gripper_width' " "service."
)
except rospy.ServiceException as e:
rospy.logerr(
"Panda_autograsp 'moveit_planner_server/set_gripper_width' service "
"initialization failed: %s" % e
)
shutdown_msg = (
"Shutting down %s node because %s service connection failed."
% (rospy.get_name(), self._set_gripper_width_srv.resolved_name)
)
rospy.logerr(shutdown_msg)
sys.exit(0)
# Log success message
rospy.loginfo("Connected to the panda_autograsp services.")
[docs] def start(self):
"Start command line interface."
# Keep alive as node is alive
print("")
calib_response = yes_or_no(
"For the robot to know where it is relative to the camera a calibration "
"needs to be performed. Do you want to perform the calibration now? "
"(Y=New calibration and n=Use old calibration results)>> ",
add_options=False,
)
try: # Catch user and service exceptions
#####################################
# Calibrate Camera frame ############
#####################################
if calib_response:
while True:
print("")
response = yes_or_no(
"Is the checkerboard/arucoboard positioned on the upper left "
"corner of the table?"
)
print("Computing calibration frame transform...")
if not response:
rospy.loginfo("Shutting down %s node." % rospy.get_name())
sys.exit(0)
else:
result = self._calibrate_sensor_srv()
if not result.success:
print("")
response = yes_or_no(
"Calibration failed. Do you want to try again?"
)
if not response:
rospy.loginfo(
"Shutting down %s node." % rospy.get_name()
)
sys.exit(0)
else:
print("")
response = yes_or_no("Was the frame correct?")
if response:
break # Continue to grasp planning
else:
print("Old calibration results used.")
#####################################
# Panda_autograsp_cli loop ##########
#####################################
# Keep cli running until ros is shutdown
while not rospy.is_shutdown():
#################################
# Compute grasp #################
#################################
print("")
input("Click enter to compute a grasp>> ")
while True:
print("Computing grasp...")
result = self._compute_grasp_srv()
# Display Grasp result message
if not result.success:
print("")
response = yes_or_no(
"Grasp computation failed. Do you want to try again?"
)
if not response:
rospy.loginfo("Shutting down %s node." % rospy.get_name())
sys.exit(0)
else:
print("")
response = yes_or_no(
"A valid grasp was found. Do you want to plan for the "
"computed grasp? (Y=Continue and n=Try again)>> ",
add_options=False,
)
if response:
break # Plan grasp
#################################
# Plan grasp ####################
#################################
print("Planning grasp...")
result1 = (
self._set_gripper_open_srv()
) # Set gripper joint states to open
result2 = (
self._plan_gripper_srv()
) # Plan for the set gripper joint states
result3 = self._plan_grasp_srv()
# Check if grasp planning was successfull
if (
(not result1.success)
or (not result2.success)
or (not result3.success)
):
# Display plan failed message
if not result3.success:
print("")
response = yes_or_no(
"Grasp path planning failed. Because no arm plan "
"was found. Do you want to compute "
"another grasp?"
)
elif not result2.success:
print("")
response = yes_or_no(
"Grasp path planning failed because no gripper plan "
"was found. Do you want to compute "
"another grasp?"
)
else:
print("")
response = yes_or_no(
"Grasp path planning failed because griper open target "
"could not be set. Do you want to compute "
"another grasp?"
)
# Check user response
if not response:
rospy.loginfo("Shutting down %s node." % rospy.get_name())
sys.exit(0)
else:
continue # Go back to start of while loop
else:
print("")
input(
"Grasp path planning successfull. Press enter to "
"visualize the computed path>> "
)
#################################
# visualize grasp ###############
#################################
print("Visualizing grasp path...")
result = self._visualize_grasp_srv()
if not result.success:
# Display visualization failed message
print("")
response = yes_or_no(
"Grasp path visualization failed. Do you want to "
"compute another grasp?"
)
# Check response
if not response:
rospy.loginfo("Shutting down %s node." % rospy.get_name())
sys.exit(0)
else:
continue # Go back to start of while loop
else:
print("")
response = yes_or_no(
"Grasp path visualization successfull. Do you want to "
"execute the computed grasp?"
)
# Check response
if not response:
continue # Go back to start of while loop
#################################
# Execute grasp #################
#################################
print("Executing grasp...")
result1 = self._execute_gripper_plan_srv()
result2 = self._execute_grasp_srv()
# Check if execution was successfull
if (not result1.success) or (not result2.success):
print("")
# Display execution fail message
if not result1.success:
response = yes_or_no(
"Gripper plan execution failed. "
"Do you want to compute another grasp?"
)
else:
response = yes_or_no(
"Arm plan execution failed. "
"Do you want to compute another grasp?"
)
# Check user input
if not response:
rospy.loginfo("Shutting down %s node." % rospy.get_name())
sys.exit(0)
else:
continue # Go back to start of while loop
else:
print("")
response = yes_or_no(
"Grasp path execution successfull. Do you want to close the "
"gripper?"
)
if not response:
rospy.loginfo("Shutting down %s node." % rospy.get_name())
sys.exit(0)
# Close gripper
result = self._close_gripper_srv()
if not result.success:
print("")
response = yes_or_no(
"Gripper close action failed. Do you want to compute another "
"grasp?"
)
if not response:
rospy.loginfo("Shutting down %s node." % rospy.get_name())
sys.exit(0)
else:
continue # Go back to start of while loop
else:
print("")
response = yes_or_no(
"Gripper close action successfull. Do you want to pick the "
"object and place it somewhere else?"
)
if not response:
rospy.loginfo("Shutting down %s node." % rospy.get_name())
sys.exit(0)
#################################
# Plan place ####################
#################################
result = self._plan_place_srv()
if not result.success:
print("")
response = yes_or_no(
"Object place planning action failed. Do you want to compute "
"another grasp?"
)
if not response:
rospy.loginfo("Shutting down %s node." % rospy.get_name())
sys.exit(0)
else:
continue # Go back to start of while loop
else:
print("")
response = yes_or_no(
"Object place planning action successfull. Do you want to "
"visualize the place plan?"
)
if not response:
rospy.loginfo("Shutting down %s node." % rospy.get_name())
sys.exit(0)
#################################
# Place visualization ###########
#################################
print("Visualizing place path...")
result = self._visualize_grasp_srv()
if not result.success:
print("")
response = yes_or_no(
"Place path visualization failed. Do you want to compute "
"another grasp?"
)
if not response:
rospy.loginfo("Shutting down %s node." % rospy.get_name())
sys.exit(0)
else:
continue # Go back to start of while loop
else:
print("")
response = yes_or_no(
"Grasp path visualization successfull. Do you want to execute "
"the place plan?"
)
if not response:
continue # Go back to start of while loop
#################################
# Execute place #################
#################################
print("Executing grasp...")
result = self._execute_grasp_srv()
if not result.success:
print("")
response = yes_or_no(
"Place movement execution failed. Do you want to compute "
"another grasp?"
)
if not response:
rospy.loginfo("Shutting down %s node." % rospy.get_name())
sys.exit(0)
else:
continue # Go back to start of while loop
else:
print("")
response = yes_or_no(
"Place movement path execution successfull. Do you want "
"to open the gripper?"
)
if not response:
rospy.loginfo("Shutting down %s node." % rospy.get_name())
sys.exit(0)
# Close gripper
result = self._open_gripper_srv()
if not result.success:
print("")
response = yes_or_no(
"Gripper open action failed. Do you want to compute "
"another grasp?"
)
if not response:
rospy.loginfo("Shutting down %s node." % rospy.get_name())
sys.exit(0)
else:
continue # Go back to start of while loop
else:
print("")
response = yes_or_no(
"Gripper open action successfull. Do you want to "
"compute another grasp?"
)
if not response:
rospy.loginfo("Shutting down %s node." % rospy.get_name())
sys.exit(0)
except (
rospy.exceptions.ROSInterruptException,
rospy.service.ServiceException,
) as e:
# Mostly means used pressed ctrl + c
if isinstance(e, rospy.exceptions.ROSInterruptException):
rospy.loginfo("Shutting down panda_autograsp client.")
sys.exit(0)
else: # Could be ctrl + c inside a service thread or something else
rospy.logerr("Panda_autograsp client shut down because: %s" % e)
sys.exit(0)
#################################################
# Main script ###################################
#################################################
if __name__ == "__main__":
# Initialize ros node
rospy.init_node("panda_autograsp_cli", anonymous=True)
# Initialize cli object
panda_autograsp_cli = PandaAutograspCLI()
# Start cli object
panda_autograsp_cli.start()
# Spin till shutdown
rospy.spin()