Source code for panda_autograsp.panda_autograsp_server_ros

"""Module that sets up a number of services that enable
the :py:mod:`panda_autograsp_cli` command line interface (CLI) to
communicate with the components of the `panda_autograps` solution.

# Make script both python2 and python3 compatible
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

    input = raw_input
except NameError:

# Main python packages
import sys
import numpy as np
import cv2
import cv2.aruco as aruco
import matplotlib.pyplot as plt
import os
import copy
from pyquaternion import Quaternion
import pickle
from autolab_core import YamlConfig

# ROS python packages
import rospy
from message_filters import ApproximateTimeSynchronizer, Subscriber
from cv_bridge import CvBridge
import tf2_ros

# ROS messages and services
import sensor_msgs
from tf2_geometry_msgs import PoseStamped  # Needed because we use tf2
from geometry_msgs.msg import TransformStamped
from std_msgs.msg import Header
from std_srvs.srv import Empty

from gqcnn.msg import BoundingBox
from gqcnn.srv import GQCNNGraspPlanner, GQCNNGraspPlannerBoundingBox

# Panda_autograsp modules, msgs and srvs
from panda_autograsp.srv import (

# Panda_autograsp modules, msgs and srvs
from panda_autograsp.functions import draw_axis

# Set right matplotlib backend
# Needed in order to show images inside imported modules

# Read main config ##############################

# Open panda_autograsp configuration file
MAIN_CFG = YamlConfig(
            os.path.dirname(os.path.realpath(__file__)), "../../cfg/main_config.yaml"

# Get important parameters
POSE_CALIB_METHOD = MAIN_CFG["calibration"]["pose_estimation_calib_board"]

# Script settings ###############################
CALIB_TRY_DURATION = MAIN_CFG["calibration"]["calib_try_duration"]  # [s]
CABLIB_METHODS = ["chessboard", "aruco_board"]

# Chessboard settings #########

# Board parameters
calib_config = MAIN_CFG["calibration"]
N_FRAMES = calib_config["chessboard_settings"]["N_FRAMES"]
N_ROWS = calib_config["chessboard_settings"]["N_ROWS"]
N_COLMNS = calib_config["chessboard_settings"]["N_COLUMNS"]
SQUARE_SIZE = calib_config["chessboard_settings"]["SQUARE_SIZE"]

# Aruco settings ################################

# Initialize board parameters
load_dir_path = os.path.abspath(
        os.path.dirname(os.path.realpath(__file__)), "../../cfg/_cfg/aruco_config.dict"
with open(load_dir_path, "rb") as config_dict_file:
    ARUCO_CFG = pickle.load(config_dict_file)  # Load the aruco board settings

# Overwrite settings based on measurements
ARUCO_DICT = aruco.Dictionary_get(ARUCO_CFG["ARUCO_DICT_TYPE"])
aruco_board = aruco.GridBoard_create(

# GraspPlannerClient Class ######################
[docs]class PandaAutograspServer: """Class used to create and call all the panda_autograsp components. Attributes ------- rvec : :py:obj:`list` The rotation vector of the camera/world calibration. tvec : :py:obj:`list` The translation vector of the camera/world calibration. """
[docs] def __init__( self, pose_calib_method=POSE_CALIB_METHOD, gazebo=False, bounding_box_enabled=False, ): """ Parameters ---------- pose_calib_method : :py:obj:`str`, optional Calibration pattern type., by default read from main_config.yaml. gazebo : :py:obj:`bool`, optional Specifies whether you want to use a gazebo simulation, by default false. bounding_box_enabled : :py:obj:`bool`, optional Specifies whether you want to use a bounding box for the grasp detection. """ # Get pose calib method from input if pose_calib_method == "": self.pose_calib_method = POSE_CALIB_METHOD if pose_calib_method.lower() not in CABLIB_METHODS: self.pose_calib_method = POSE_CALIB_METHOD else: self.pose_calib_method = pose_calib_method # Get input variables self.gazebo = gazebo self.bounding_box_enabled = bounding_box_enabled # Get bounding box values out of config self.bounding_box = BoundingBox() self.bounding_box.minX = MAIN_CFG["grasp_detection"]["bounding_box"]["minX"] self.bounding_box.minY = MAIN_CFG["grasp_detection"]["bounding_box"]["minY"] self.bounding_box.maxX = MAIN_CFG["grasp_detection"]["bounding_box"]["maxX"] self.bounding_box.maxY = MAIN_CFG["grasp_detection"]["bounding_box"]["maxY"] # Setup cv_bridge self._cv_bridge = CvBridge() # Setup member variables self.rvec = None self.tvec = None # Setup opencv termination criteria self._criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 30, 0.001) ############################################### # Initialize grasp computation services ####### ############################################### # Default grasp planner service rospy.loginfo("Connecting to 'gqcnn_grasp_planner' service...") rospy.wait_for_service("gqcnn_grasp_planner") try: self._gqcnn_grasp_planning_srv = rospy.ServiceProxy( "gqcnn_grasp_planner", GQCNNGraspPlanner ) rospy.loginfo("Connected to `gqcnn_grasp_planner service.") except rospy.ServiceException as e: rospy.logerr( "Panda_autograsp 'gqcnn_grasp_planner' service initialization " "failed: %s" % e ) shutdown_msg = ( "Shutting down %s node because %s service connection failed." % (rospy.get_name(), self._gqcnn_grasp_planning_srv.resolved_name) ) rospy.logerr(shutdown_msg) sys.exit(0) rospy.loginfo("Connecting to 'gqcnn_grasp_planner_bounding_box' service...") rospy.wait_for_service("gqcnn_grasp_planner_bounding_box") # Bounding box grasp planner service try: self._gqcnn_grasp_planning_bounding_box_srv = rospy.ServiceProxy( "gqcnn_grasp_planner_bounding_box", GQCNNGraspPlannerBoundingBox ) rospy.loginfo("Connected to `gqcnn_grasp_planner service.") except rospy.ServiceException as e: rospy.logerr( "Panda_autograsp 'gqcnn_grasp_planner_bounding_box' " "service initialization " "failed: %s" % e ) shutdown_msg = ( "Shutting down %s node because %s service connection failed." % ( rospy.get_name(), self._gqcnn_grasp_planning_bounding_box_srv.resolved_name, ) ) rospy.logerr(shutdown_msg) sys.exit(0) ############################################### # Initialize moveit_planner server services ### ############################################### # Initialize Plan to point service rospy.loginfo("Connecting to moveit_planner services.") rospy.logdebug("Connecting to 'moveit_planner_server/plan_to_point' service...") rospy.wait_for_service("moveit_planner_server/plan_to_point") try: self._plan_to_pose_srv = rospy.ServiceProxy( "moveit_planner_server/plan_to_point", PlanToPoint ) rospy.logdebug( "Connected to 'moveit_planner_server/plan_to_point' " "service." ) except rospy.ServiceException as e: rospy.logerr( "Panda_autograsp 'moveit_planner_server/plan_to_point' service " "initialization failed: %s" % e ) shutdown_msg = ( "Shutting down %s node because %s service connection failed." % (rospy.get_name(), self._plan_to_pose_srv.resolved_name) ) rospy.logerr(shutdown_msg) sys.exit(0) # Initialize random cartesian path service rospy.logdebug("Connecting to 'moveit_planner_server/plan_to_path' service...") rospy.wait_for_service("moveit_planner_server/plan_to_path") try: self._plan_to_path_srv = rospy.ServiceProxy( "moveit_planner_server/plan_to_path", PlanToPath ) rospy.logdebug("Connected to 'moveit_planner_server/plan_to_path' service.") except rospy.ServiceException as e: rospy.logerr( "moveit_planner_server 'plan_to_path' service initialization failed: %s" % e ) shutdown_msg = ( "Shutting down %s node because %s service connection failed." % (rospy.get_name(), self._plan_to_path_srv.resolved_name) ) rospy.logerr(shutdown_msg) sys.exit(0) # Initialize plan visualization service rospy.logdebug( "Connecting to 'moveit_planner_server/visualize_plan' " "service..." ) rospy.wait_for_service("moveit_planner_server/visualize_plan") try: self._visualize_plan_srv = rospy.ServiceProxy( "moveit_planner_server/visualize_plan", VisualizePlan ) rospy.logdebug( "Connected to 'moveit_planner_server/visualize_plan' " "service." ) except rospy.ServiceException as e: rospy.logerr( "Panda_autograsp 'moveit_planner_server/visualize_plan' service " "initialization failed: %s" % e ) shutdown_msg = ( "Shutting down %s node because %s service connection failed." % (rospy.get_name(), self._visualize_plan_srv.resolved_name) ) rospy.logerr(shutdown_msg) sys.exit(0) # Initialize execute plan service rospy.logdebug("Connecting to 'moveit_planner_server/execute_plan' service...") rospy.wait_for_service("moveit_planner_server/execute_plan") try: self._execute_plan_srv = rospy.ServiceProxy( "moveit_planner_server/execute_plan", ExecutePlan ) rospy.logdebug("Connected to 'moveit_planner_server/execute_plan' service.") except rospy.ServiceException as e: rospy.logerr( "Panda_autograsp 'moveit_planner_server/execute_plan' service " "initialization failed: %s" % e ) shutdown_msg = ( "Shutting down %s node because %s service connection failed." % (rospy.get_name(), self._execute_plan_srv.resolved_name) ) rospy.logerr(shutdown_msg) sys.exit(0) # Initialize send sensor pose service rospy.logdebug("Connecting to 'tf2_broadcaster/send_sensor_pose' service...") rospy.wait_for_service("tf2_broadcaster/set_sensor_pose") try: self._set_sensor_pose_srv = rospy.ServiceProxy( "tf2_broadcaster/set_sensor_pose", SetSensorPose ) rospy.logdebug("Connected to 'tf2_broadcaster/set_sensor_pose' service.") except rospy.ServiceException as e: rospy.logerr( "Panda_autograsp 'tf2_broadcaster/set_sensor_pose' service " "initialization failed: %s" % e ) shutdown_msg = ( "Shutting down %s node because %s service connection failed." % (rospy.get_name(), self._set_sensor_pose_srv.resolved_name) ) rospy.logerr(shutdown_msg) sys.exit(0) # Initialize set gripper_width service rospy.logdebug( "Connecting to 'moveit_planner_server/set_gripper_width' " "service..." ) rospy.wait_for_service("moveit_planner_server/set_gripper_width") try: self._set_gripper_width_srv = rospy.ServiceProxy( "moveit_planner_server/set_gripper_width", SetGripperWidth ) rospy.logdebug( "Connected to 'moveit_planner_server/set_gripper_width' " "service." ) except rospy.ServiceException as e: rospy.logerr( "Panda_autograsp 'moveit_planner_server/set_gripper_width' service " "initialization failed: %s" % e ) shutdown_msg = ( "Shutting down %s node because %s service connection failed." % (rospy.get_name(), self._set_gripper_width_srv.resolved_name) ) rospy.logerr(shutdown_msg) sys.exit(0) # Initialize set gripper_width service rospy.logdebug( "Connecting to 'moveit_planner_server/set_gripper_open' " "service..." ) rospy.wait_for_service("moveit_planner_server/set_gripper_open") try: self._set_gripper_open_srv = rospy.ServiceProxy( "moveit_planner_server/set_gripper_open", SetGripperOpen ) rospy.logdebug( "Connected to 'moveit_planner_server/set_gripper_open' " "service." ) except rospy.ServiceException as e: rospy.logerr( "Panda_autograsp 'moveit_planner_server/set_gripper_open' service " "initialization failed: %s" % e ) shutdown_msg = ( "Shutting down %s node because %s service connection failed." % (rospy.get_name(), self._set_gripper_open_srv.resolved_name) ) rospy.logerr(shutdown_msg) sys.exit(0) # Initialize set gripper_width service rospy.logdebug( "Connecting to 'moveit_planner_server/set_gripper_closed' " "service..." ) rospy.wait_for_service("moveit_planner_server/set_gripper_closed") try: self._set_gripper_closed_srv = rospy.ServiceProxy( "moveit_planner_server/set_gripper_closed", SetGripperClosed ) rospy.logdebug( "Connected to 'moveit_planner_server/set_gripper_width' " "service." ) except rospy.ServiceException as e: rospy.logerr( "Panda_autograsp 'moveit_planner_server/set_gripper_closed' " "service initialization failed: %s" % e ) shutdown_msg = ( "Shutting down %s node because %s service connection failed." % (rospy.get_name(), self._set_gripper_closed_srv.resolved_name) ) rospy.logerr(shutdown_msg) sys.exit(0) # Initialize reset octomap service rospy.logdebug("Connecting to '/clear_octomap/' service...") rospy.wait_for_service("/clear_octomap") try: self._clear_octomap_srv = rospy.ServiceProxy("/clear_octomap", Empty) rospy.logdebug("Connected to '/clear_octomap' service.") except rospy.ServiceException as e: rospy.logerr( "Panda_autograsp '/clear_octomap' " "service initialization failed: %s" % e ) shutdown_msg = ( "Shutting down %s node because %s service connection failed." % (rospy.get_name(), self._clear_octomap_srv.resolved_name) ) rospy.logerr(shutdown_msg) sys.exit(0) # Print moveit_planner_server services connection success message rospy.loginfo("Successfully connected to all moveit_planner_server services.") ############################################### # Create panda_autograsp_server services ###### ############################################### # Calibrate sensor rospy.loginfo("Initializing %s services.", rospy.get_name()) rospy.logdebug("Initializing 'calibrate_sensor' service...") self._calibrate_sensor_srv = rospy.Service( "calibrate_sensor", CalibrateSensor, self.calibrate_sensor_service ) # Compute grasp service rospy.logdebug("Initializing 'compute_grasp' service...") self._compute_grasp_srv = rospy.Service( "compute_grasp", ComputeGrasp, self.compute_grasp_service ) # Plan grasp service rospy.logdebug("Initializing 'plan_grasp' service...") self._plan_grasp_srv = rospy.Service( "plan_grasp", PlanGrasp, self.plan_grasp_service ) # Visualize grasp service rospy.logdebug("Initializing 'visualize_grasp' service...") self._visualize_grasp_srv = rospy.Service( "visualize_grasp", VisualizeGrasp, self.visualize_grasp_service ) # Execute grasp service rospy.logdebug("Initializing 'execute_grasp' service...") self._execute_grasp_srv = rospy.Service( "execute_grasp", ExecuteGrasp, self.execute_grasp_service ) # Object place planning service rospy.logdebug("Initializing 'plan_place' service...") self.plan_place_srv = rospy.Service( "plan_place", PlanPlace, self.plan_place_service ) # Service initiation success message rospy.loginfo( "'%s' services initialized successfully. Waiting for requests.", rospy.get_name(), ) ############################################### # Create subscribers and publishers ########### ############################################### # Create msg filter subscribers rospy.logdebug("Creating camera sensor message_filter...") self._color_image_sub = Subscriber("image_color", sensor_msgs.msg.Image) self._color_image_rect_sub = Subscriber( "image_color_rect", sensor_msgs.msg.Image ) self._depth_image_rect_sub = Subscriber( "image_depth_rect_32FC1", sensor_msgs.msg.Image ) self._camera_info_hd_sub = Subscriber( "hd/camera_info", sensor_msgs.msg.CameraInfo ) self._camera_info_qhd_sub = Subscriber( "qhd/camera_info", sensor_msgs.msg.CameraInfo ) self._camera_info_sd_sub = Subscriber( "sd/camera_info", sensor_msgs.msg.CameraInfo ) # Wait till camera is online rospy.loginfo("Waiting for first camera message...") rospy.wait_for_message("image_color", sensor_msgs.msg.Image) rospy.loginfo("Camera is online and publishing messages.") # Create msg filter self._ats = ApproximateTimeSynchronizer( [ self._color_image_sub, self._color_image_rect_sub, self._depth_image_rect_sub, self._camera_info_hd_sub, self._camera_info_qhd_sub, self._camera_info_sd_sub, ], queue_size=5, slop=0.1, ) self._ats.registerCallback(self.msg_filter_callback) rospy.logdebug("Camera sensor message_filter created.") # Create state listener self._tf2_buffer = tf2_ros.Buffer() self._tf2_listener = tf2_ros.TransformListener(self._tf2_buffer) # Create publisher to publish the place pose rospy.logdebug("Creating place pose publisher...") self._place_pose_pub = rospy.Publisher( "moveit/place_pose", PoseStamped, queue_size=10 ) rospy.logdebug("Place pose publisher created.") # Create pose subscriber rospy.logdebug("Creating grasp pose subscriber...") self._pose_sub = rospy.Subscriber( "gqcnn_grasp/pose", PoseStamped, self.get_pose_callback ) rospy.logdebug("Grasp pose subscriber created.")
[docs] def get_pose_callback(self, pose_msg): """Callback function of the 'gqcnn_graps/pose' subscriber. This function updates the self.pose_msg member variable. Parameters ---------- pose_msg : :py:obj:`!geometry_msgs.PosedStamed` Grasp pose msgs. """ # Update pose_msg rospy.loginfo("Received grasp pose.") self.pose_msg = pose_msg
[docs] def msg_filter_callback( self, color_image, color_image_rect, depth_image_rect, camera_info_hd, camera_info_qhd, camera_info_sd, ): """Callback function of the message filter. This message filter subscribed to a number of camera topics which are required by the panda_autograsp solution. Parameters ---------- color_image : :py:obj:`!sensor_msgs.msg.Image` The color image. color_image_rect : :py:obj:`!sensor_msgs.msg.Image` The rectified color image. depth_image_rect : :py:obj:`!sensor_msgs.msg.Image` The depth image. camera_info_hd : :py:obj:`!sensor_msgs.msg.CameraInfo` The HD camera info topic. camera_info_qhd : :py:obj:`!sensor_msgs.msg.CameraInfo` The QHD camera topic. camera_info_sd : :py:obj:`!sensor_msgs.msg.CameraInfo` The SD camera topic. """ # Call the grasp_planner_service self.color_image = color_image self.color_image_rect = color_image_rect self.depth_image_rect = depth_image_rect self.camera_info_hd = camera_info_hd self.camera_info_qhd = camera_info_qhd self.camera_info_sd = camera_info_sd
[docs] def compute_grasp_service(self, req): """This service is used for computing a vallid grasp out of the sensor data. This is done by calling the main grasp computation service of the :py:obj:`` module with the sensor data as its input. Parameters ---------- req : :py:obj:`panda_autograsp.msg.ComputeGrasp` Empty service request. Returns ------- bool Returns a bool to specify whether the plan was executed successfully. """ # Call grasp computation service if not self.bounding_box_enabled: self.grasp = self._gqcnn_grasp_planning_srv( self.color_image_rect, self.depth_image_rect, self.camera_info_sd ) else: self.grasp = self._gqcnn_grasp_planning_bounding_box_srv( self.color_image_rect, self.depth_image_rect, self.camera_info_sd, self.bounding_box, ) # Print grasp position = self.grasp.grasp.pose.position orientation = self.grasp.grasp.pose.orientation pose_array = [ position.x, position.y, position.z, orientation.x, orientation.y, orientation.z, orientation.w, ] rospy.logdebug( "Grasp pose result in kinect2_rgb_camera_frame: x={0}, y={1}, z={2}, " "q1={3}, q2={4}, q3={5} and q4={6}".format(*pose_array) ) # Test if successful if self.grasp: return True else: return False
[docs] def plan_grasp_service(self, req): """This service can be used to plan for the by the :py:meth:`compute_grasp_service` computed grasp pose. Parameters ---------- req : :py:obj:`panda_autograsp.msg.PlanGrasp` Empty service request. Returns ------- bool Returns a bool to specify whether the plan was executed successfully. """ # Get pose expressed in the panda_link0 frame # Needed since the panda_link0 is the reference frame # of the move group. try: grasp_pose_msg = copy.copy(self.pose_msg) # Create copy grasp_pose_msg.header.stamp = ( ) # As we use the default tf buffer we will set the time to be now pose_msg = self._tf2_buffer.transform( grasp_pose_msg, "panda_link0", rospy.Duration(1) ) except ( tf2_ros.LookupException, tf2_ros.ConnectivityException, tf2_ros.ExtrapolationException, ): return False # Display pose in panda_link0 frame position = pose_msg.pose.position orientation = pose_msg.pose.orientation pose_array = [ position.x, position.y, position.z, orientation.x, orientation.y, orientation.z, orientation.w, ] rospy.logdebug( "Grasp pose result in panda_link0: x={0}, y={1}, z={2}, q1={3}, q2={4}, " "q3={5} and q4={6}".format(*pose_array) ) # Call grasp plan to pose service result = self._plan_to_pose_srv(pose_msg.pose) # Test if successful if result.success: return True else: return False
[docs] def plan_place_service(self, req): """This service computes the movement plan for placing the object at the desired goal position. The place goal position can be changed by modifying the ``cfg/main_config.yaml`` document. Parameters ---------- req : :py:obj:`panda_autograsp.msg.PlanPlace` Empty service request. Returns ------- bool Returns a bool to specify whether the plan was executed successfully. """ # Retrieve the starting grasp pose (as expressed in the panda_link0 frame try: grasp_pose_msg = copy.copy(self.pose_msg) # Create copy grasp_pose_msg.header.stamp = ( ) # As we use the default tf buffer we will set the time to be now grasp_pose_msg = self._tf2_buffer.transform( self.pose_msg, "panda_link0", rospy.Duration(1) ) except ( tf2_ros.LookupException, tf2_ros.ConnectivityException, tf2_ros.ExtrapolationException, ): return False # Generate place pose relative to the panda_link0 frame # The location of this frame can be set in the `main_config.yaml` file # in the cfg folder. self.place_pose_msg = PoseStamped() self.place_pose_msg.pose.position.x = MAIN_CFG["main"]["place"]["pose"]["x_pos"] self.place_pose_msg.pose.position.y = MAIN_CFG["main"]["place"]["pose"]["y_pos"] self.place_pose_msg.pose.position.z = ( grasp_pose_msg.pose.position.z ) # Set equal to start grasp height self.place_pose_msg.pose.orientation.x = ( grasp_pose_msg.pose.orientation.x ) # Set equal to grasp pose orientation self.place_pose_msg.pose.orientation.y = ( grasp_pose_msg.pose.orientation.y ) # Set equal to grasp pose orientation self.place_pose_msg.pose.orientation.z = ( grasp_pose_msg.pose.orientation.z ) # Set equal to grasp pose orientation self.place_pose_msg.pose.orientation.w = ( grasp_pose_msg.pose.orientation.w ) # Set equal to grasp pose orientation header = Header() header.stamp = header.frame_id = "panda_link0" self.place_pose_msg.header = header # Publish place pose self._place_pose_pub.publish(self.place_pose_msg) # Create a intermediate pose # Done to pick up the object instead of sliding it. self.pickup_pose_msg = PoseStamped() self.pickup_pose_msg.pose.position.x = grasp_pose_msg.pose.position.x self.pickup_pose_msg.pose.position.y = grasp_pose_msg.pose.position.y self.pickup_pose_msg.pose.position.z = ( grasp_pose_msg.pose.position.z + MAIN_CFG["main"]["pickup"]["height"] ) # Add pickup height self.pickup_pose_msg.pose.orientation.x = ( grasp_pose_msg.pose.orientation.x ) # Set equal to grasp pose orientation self.pickup_pose_msg.pose.orientation.y = ( grasp_pose_msg.pose.orientation.y ) # Set equal to grasp pose orientation self.pickup_pose_msg.pose.orientation.z = ( grasp_pose_msg.pose.orientation.z ) # Set equal to grasp pose orientation self.pickup_pose_msg.pose.orientation.w = ( grasp_pose_msg.pose.orientation.w ) # Set equal to grasp pose orientation header = Header() header.stamp = header.frame_id = "panda_link0" self.pickup_pose_msg.header = header # Call grasp plan to pose service result = self._plan_to_path_srv( [self.pickup_pose_msg.pose, self.place_pose_msg.pose] ) # Test if successful if result.success: return True else: return False
[docs] def visualize_grasp_service(self, req): """This service can be used to visualize the planned grasp. Parameters ---------- req : :py:obj:`panda_autograsp.msg.VizualizeGrasp` Empty service request. Returns ------- bool Returns a bool to specify whether the plan was executed successfully. """ # Call grasp computation service result = self._visualize_plan_srv() # Test if successful if result.success: return True else: return False
[docs] def execute_grasp_service(self, req): """This service is used to execute the computed grasp. Parameters ---------- req : :py:obj:`panda_autograsp.msg.ExecuteGrasp` Empty service request. Returns ------- bool Returns a bool to specify whether the plan was executed successfully. """ # Call grasp computation service result = self._execute_plan_srv() # Test if successful if result.success: return True else: return False
[docs] def calibrate_sensor_service(self, req): """This service can be used to perform the sensor/world calibration. To do this you need to place a chessboard/aruco board in the bottom left corner of the robot table. You have to define which calibration pattern you use in the ``cfg/main_config.yaml file``. After a successful camera/world transformation matrix is computed this matrix is send to the :py:mod:`tf2_broadcaster` module and the sensor frame position is updated. Parameters ---------- req : :py:obj:`panda_autograsp.msg.ExecuteGrasp` Empty service request. Returns ------- bool Returns a bool to specify whether the plan was executed successfully. """ # Retrieve camera pose if self.gazebo: # Return True as we don't need a calibration in simulation return True else: # Perform calibration retval, self.rvec, self.tvec = self._camera_world_calibration( calib_type=self.pose_calib_method ) # Test if successful if retval: # Publish the camera frame self.broadcast_camera_frame(calib_type=self.pose_calib_method) # return result return True else: return False
def _camera_world_calibration(self, calib_type=POSE_CALIB_METHOD): """Perform camera world calibration (External camera matrix) using a chessboard or several aruco markers. Parameters ---------- calib_type : :py:obj:`str` Calibration pattern type. Returns ------- retval : :py:obj:`bool` Calibration succes bool. :py:obj: rvec : :py:obj:`list` Rotation vector. tvec : :py:obj:`list` Translation vector. """ # Switch between different calibrations if calib_type == "chessboard": return ( self.chessboard_pose_estimation() ) # Perform calibration using an chessboard else: return ( self.aruco_board_pose_estimation() ) # Perform calibration using an arucoboard
[docs] def aruco_board_pose_estimation(self): """Function that performs the camera/world calibration by using a Aruco board as a reference. Returns ------- retval : :py:obj:`bool` Calibration success bool. rvec : :py:obj:`list` Rotation vector. tvec : :py:obj:`list` Translation vector. """ # Get current time start_time = rospy.get_time() # Try till chessboard is found or till try time is over while rospy.get_time() < start_time + CALIB_TRY_DURATION: # Retrieve color image and convert to opencv format color_image = self.color_image camera_info = self.camera_info_hd color_image_cv = self._cv_bridge.imgmsg_to_cv2( color_image, desired_encoding="passthrough" ) # Get camera information camera_matrix = np.array(camera_info.K).reshape(3, 3) dist_coeffs = camera_info.D # Default distortion parameters are 0 # Get gray image gray = cv2.cvtColor(color_image_cv, cv2.COLOR_BGR2GRAY) # Create screen display image # Needed since opencv uses BGR instead of RGB screen_img = cv2.cvtColor(copy.copy(color_image_cv), cv2.COLOR_RGB2BGR) # Detect aruco markers corners, ids, rejectedImgPoints = aruco.detectMarkers( image=gray, dictionary=ARUCO_DICT, parameters=aruco.DetectorParameters_create(), # cameraMatrix=camera_matrix, # distCoeffs=dist_coeffs, ) # Refine detected markers # Eliminates markers not part of our board and adds missing markers corners, ids, _, _ = aruco.refineDetectedMarkers( image=gray, board=aruco_board, detectedCorners=corners, detectedIds=ids, rejectedCorners=rejectedImgPoints, cameraMatrix=camera_matrix, distCoeffs=dist_coeffs, ) # If at least one marker was found try to estimate the pose if ids is not None and ids.size > 0: # Outline all of the markers detected in our image screen_img = aruco.drawDetectedMarkers( screen_img, corners, ids, borderColor=(0, 255, 0) ) # Estimate pose retval, rvec, tvec = aruco.estimatePoseBoard( corners, ids, aruco_board, camera_matrix, dist_coeffs, None, None ) # If pose estimation was successful draw pose if retval > 0: aruco.drawAxis( screen_img, camera_matrix, dist_coeffs, rvec, tvec, 0.2 ) # Show projection to user if MAIN_CFG["vis"]["calib"]["figs"]["calib_frame"]: plt.figure("Reference frame") plt.imshow(screen_img) return retval, rvec, tvec else: rospy.logwarn( "Pose of arcuboard could not be found please try again." ) return False, None, None else: rospy.logwarn( "Arcuboard could not be detected make sure the arcuboard is " "present." ) # Display timeout message and return rospy.logwarn( "Arcuboard detector times out after %s. seconds. Please reposition the " "arcuboard and try again.", CALIB_TRY_DURATION, ) return False, None, None
[docs] def chessboard_pose_estimation(self): """Function that performs the camera/world calibration by using a chessboard as a reference. Returns ------- retval : :py:obj:`bool` Calibration success bool. rvec : :py:obj:`list` Rotation vector. tvec : :py:obj:`list` Translation vector. """ # Get current time start_time = rospy.get_time() # Try till chessboard is found or till try time is over while rospy.get_time() < start_time + CALIB_TRY_DURATION: # Retrieve color image and convert to opencv format color_image = self.color_image camera_info = self.camera_info_hd color_image_cv = self._cv_bridge.imgmsg_to_cv2( color_image, desired_encoding="passthrough" ) # Prepare object points, like (0,0,0), (1,0,0), (2,0,0) ....,(6,5,0) objp = np.zeros((N_COLMNS * N_ROWS, 3), np.float32) objp[:, :2] = ( np.mgrid[0:N_ROWS, 0:N_COLMNS].T.reshape(-1, 2) * SQUARE_SIZE ) # Multiply by chessboard scale factor to get results in mm axis = ( np.float32([[3, 0, 0], [0, 3, 0], [0, 0, -3]]).reshape(-1, 3) * SQUARE_SIZE ) # Coordinate axis # Get camera information camera_matrix = np.array(camera_info.K).reshape(3, 3) dist_coeffs = camera_info.D # Default distortion parameters are 0 # Get gray image gray = cv2.cvtColor(color_image_cv, cv2.COLOR_BGR2GRAY) # Create screen display image screen_img = cv2.cvtColor(copy.copy(color_image_cv), cv2.COLOR_RGB2BGR) # Find the chess board corners retval, corners = cv2.findChessboardCorners(gray, (N_ROWS, N_COLMNS), None) # Find external matrix if retval: # Find corners corners2 = cv2.cornerSubPix( gray, corners, (11, 11), (-1, -1), self._criteria ) # Find the rotation and translation vectors. retval, rvecs, tvecs, _ = cv2.solvePnPRansac( objp, corners2, camera_matrix, dist_coeffs ) # project 3D points to image plane imgpts, _ = cv2.projectPoints( axis, rvecs, tvecs, camera_matrix, dist_coeffs ) # Show projection to user screen_img = draw_axis(screen_img, corners2, imgpts) plt.figure("Reference frame") plt.imshow(screen_img) if retval: return retval, rvecs, tvecs else: rospy.logwarn( "Pose of chessboard could not be found please try again." ) return False, None, None else: rospy.logwarn( "Chessboard could not be detected make sure the chessboard is" " present." ) # Display timeout message and return rospy.logwarn( "Chessboard detector times out after %s. seconds. Please reposition the " "chessboard and try again.", CALIB_TRY_DURATION, ) return False, None, None
[docs] def broadcast_camera_frame(self, calib_type="aruco_board"): """Send the sensor pose we acquired from the calibration to the tf2_broadcaster so that it can broadcast the sensor camera frame. Parameters ---------- calib_type : :py:obj:`str`, optional The calibration pattern you want to use, by default "aruco_board" """ # Check calibration method if calib_type == "chessboard": # Get rotation matrix R = np.zeros(shape=(3, 3)) J = np.zeros(shape=(3, 3)) cv2.Rodrigues(self.rvec, R, J) # Compute inverse rotation and translation matrix R = R.T tvec =, self.tvec) # Create homogeneous matrix and the flip x and y axis H = np.empty((4, 4)) H[:3, :3] = R H[:3, 3] = tvec.reshape(1, 3) H[3, :] = [0, 0, 0, 1] H = np.array([[1, 0, 0, 0], [0, -1, 0, 0], [0, 0, -1, 0], [0, 0, 0, 1]]), H ) R = H[0:3, 0:3] quat = Quaternion(matrix=R) # Print Calibration information cal_pose = { "x": float(H[0, 3] / 1000.0), "y": float(H[1, 3] / 1000.0), "z": float(H[2, 3] / 1000.0), "q1": float(quat[1]), "q2": float(quat[2]), "q3": float(quat[3]), "q4": float(quat[0]), } rospy.loginfo( "Calibration result: x={x}, y={y}, z={z}, q1={q1}, q2={q2}," " q3={q3} and q4={q4}".format(**cal_pose) ) # Create geometry_msg sensor_frame_tf_msg = TransformStamped() sensor_frame_tf_msg.header.stamp = sensor_frame_tf_msg.header.frame_id = "calib_frame" sensor_frame_tf_msg.child_frame_id = "kinect2_rgb_optical_frame" sensor_frame_tf_msg.transform.translation.x = float(H[0, 3] / 1000.0) sensor_frame_tf_msg.transform.translation.y = float(H[1, 3] / 1000.0) sensor_frame_tf_msg.transform.translation.z = float(H[2, 3] / 1000.0) sensor_frame_tf_msg.transform.rotation.x = float(quat[1]) sensor_frame_tf_msg.transform.rotation.y = float(quat[2]) sensor_frame_tf_msg.transform.rotation.z = float(quat[3]) sensor_frame_tf_msg.transform.rotation.w = float(quat[0]) # Communicate sensor_frame_pose to the tf2_broadcaster node result = self._set_sensor_pose_srv(sensor_frame_tf_msg) # Reset the octomap if pose was succesfully set if result.success: # Wait some time to give tf2_broadcaster the time to broadcast rospy.sleep(MAIN_CFG["calibration"]["octomap_reset_wait_Time"]) self._clear_octomap_srv() else: # Get rotation matrix R = np.zeros(shape=(3, 3)) J = np.zeros(shape=(3, 3)) cv2.Rodrigues(self.rvec, R, J) # Compute inverse rotation and translation matrix R = R.T tvec =, self.tvec) # Create homogenious matrix and the flip x and y axis H = np.empty((4, 4)) H[:3, :3] = R H[:3, 3] = tvec.reshape(1, 3) H[3, :] = [0, 0, 0, 1] R = H[0:3, 0:3] quat = Quaternion(matrix=R) # Print Calibration information cal_pose = { "x": float(H[0, 3]), "y": float(H[1, 3]), "z": float(H[2, 3]), "q1": float(quat[1]), "q2": float(quat[2]), "q3": float(quat[3]), "q4": float(quat[0]), } rospy.logdebug( "Calibration result: x={x}, y={y}, z={z}, q1={q1}, q2={q2}," " q3={q3} and q4={q4}".format(**cal_pose) ) # Create geometry_msg sensor_frame_tf_msg = TransformStamped() sensor_frame_tf_msg.header.stamp = sensor_frame_tf_msg.header.frame_id = "calib_frame" sensor_frame_tf_msg.child_frame_id = "kinect2_rgb_optical_frame" sensor_frame_tf_msg.transform.translation.x = float(H[0, 3]) sensor_frame_tf_msg.transform.translation.y = float(H[1, 3]) sensor_frame_tf_msg.transform.translation.z = float(H[2, 3]) sensor_frame_tf_msg.transform.rotation.x = float(quat[1]) sensor_frame_tf_msg.transform.rotation.y = float(quat[2]) sensor_frame_tf_msg.transform.rotation.z = float(quat[3]) sensor_frame_tf_msg.transform.rotation.w = float(quat[0]) # Communicate sensor_frame_pose to the tf2_broadcaster node result = self._set_sensor_pose_srv(sensor_frame_tf_msg) # Reset the octomap if pose was succesfully set if result.success: # Wait some time to give tf2_broadcaster the time to broadcast rospy.sleep(MAIN_CFG["calibration"]["octomap_reset_wait_Time"]) self._clear_octomap_srv()