Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 119 additions & 62 deletions kits/arms/ar_control_sm.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@

from enum import Enum, auto
from dataclasses import dataclass, field
from time import time, sleep
import os
import numpy as np
from scipy.spatial.transform import Rotation as R
import argparse

import hebi
from hebi.util import create_mobile_io

import typing
if typing.TYPE_CHECKING:
from typing import Callable
from typing import Sequence, Callable
import numpy.typing as npt
from hebi._internal.mobile_io import MobileIO
from hebi.arm import Arm, Gripper
Expand All @@ -29,44 +29,41 @@ class ArmControlState(Enum):
class ArmMobileIOInputs:
phone_pos: 'npt.NDArray[np.float32]' = field(default_factory=lambda: np.array([0., 0., 0.]))
phone_rot: 'npt.NDArray[np.float64]' = field(default_factory=lambda: np.eye(3))
ar_scaling: float = 1.0
xyz_scaling: float = 1.0
lock_toggle: bool = False
locked: bool = True
gripper_closed: bool = False
home: bool = False


class ArmMobileIOControl:
def __init__(self, arm: 'Arm', gripper: 'Gripper | None' = None, homing_time=5.0, traj_duration=1.0, xyz_scale: 'npt.NDArray[np.float64]' = np.ones(3)):
def __init__(self, arm: 'Arm', gripper: 'Gripper | None' = None, home_pose: 'Sequence[float] | npt.NDArray[np.float64] | None' = None, ik_seed: 'Sequence[float] | npt.NDArray[np.float64] | None' = None, homing_time: float = 5.0, traj_duration: float = 0.25, xyz_scale: 'npt.NDArray[np.float64]' = np.ones(3)):
self.namespace = ''

self.state = ArmControlState.STARTUP
self.arm = arm
self.gripper = gripper

# 5 DoF home
# self.arm_xyz_home = [0.34, 0.0, 0.23]
# 6 DoF home
self.arm_xyz_home = [0.5, 0.0, 0.0]
self.arm_rot_home: 'npt.NDArray[np.float64]' = (R.from_euler(
'z', np.pi / 2) * R.from_euler('x', np.pi)).as_matrix()
self.arm_xyz_home = [0.5, 0.0, 0.15]
self.arm_rot_home: 'npt.NDArray[np.float64]' = R.from_euler('x', np.pi).as_matrix()
self.homing_time = homing_time
self.traj_duration = traj_duration

# 5 DoF seed
# self.arm_seed_ik = np.array([0.25, -1.0, 0, -0.75, 0])
# 6 DoF seed
self.arm_seed_ik = np.array([0.3, 1.2, 2.2, 2.9, -1.57, 0])
self.arm_home = self.arm.ik_target_xyz_so3(
# Set the IK seed for the arm
self.arm_seed_ik = ik_seed if ik_seed is not None else np.array([0.01, 1.4, 1.8, 2.0, -1.57, 0.01])[:arm.size]
assert len(self.arm_seed_ik) == arm.size, \
f"IK seed must have the same size as the arm ({arm.size}), got {self.arm_seed_ik.shape[0]} elements."
# Set the home position for the arm
self.arm_home = home_pose if home_pose is not None else self.arm.ik_target_xyz_so3(
self.arm_seed_ik,
self.arm_xyz_home,
self.arm_rot_home)

self.xyz_scale_init = xyz_scale
self.xyz_scale = xyz_scale

self.last_locked_xyz = self.arm_xyz_home.copy()
self.last_locked_rot = self.arm_rot_home.copy()
self.last_locked_seed = self.arm_home.copy()

self.locked = True
self._transition_handlers: 'list[Callable[[ArmMobileIOControl, ArmControlState], None]]' = [
Expand All @@ -82,7 +79,7 @@ def send(self):
if self.gripper:
self.gripper.send()

def update(self, t_now: float, arm_input: 'ArmMobileIOInputs | None'):
def update(self, t_now: float, arm_input: 'ArmMobileIOInputs | None' = None):
self.arm.update()

if self.state is self.state.EXIT:
Expand All @@ -97,7 +94,6 @@ def update(self, t_now: float, arm_input: 'ArmMobileIOInputs | None'):
# Reset the timeout
self.mobile_last_fbk_t = t_now
# Transition to teleop if mobileIO is reconnected
last_pos = self.arm.last_feedback.position
if self.state is self.state.DISCONNECTED:
self.mobile_last_fbk_t = t_now
print(self.namespace + 'Controller reconnected, demo continued.')
Expand All @@ -112,18 +108,18 @@ def update(self, t_now: float, arm_input: 'ArmMobileIOInputs | None'):
# If homing is complete, transition to teleop
elif self.state is self.state.HOMING:
if self.arm.at_goal:
self.phone_xyz_home = arm_input.phone_pos
self.phone_rot_home = arm_input.phone_rot
self.phone_xyz_init = arm_input.phone_pos.copy()
self.phone_rot_init = arm_input.phone_rot.copy()

self.last_locked_seed = last_pos.copy()
xyz = np.empty(3)
orientation = np.empty((3, 3))
self.arm.FK(last_pos,
self.arm.FK(self.arm.last_feedback.position,
xyz_out=xyz,
orientation_out=orientation)

self.last_locked_xyz = xyz
self.last_locked_rot = orientation
self.last_locked_xyz = xyz.copy()
self.last_locked_rot = orientation.copy()
self.locked = True # Lock the arm after reaching home

self.transition_to(t_now, self.state.TELEOP)

Expand All @@ -135,26 +131,26 @@ def update(self, t_now: float, arm_input: 'ArmMobileIOInputs | None'):

if arm_input.lock_toggle:
self.locked = arm_input.locked
if not self.locked:
self.phone_xyz_init = arm_input.phone_pos.copy()
self.phone_rot_init = arm_input.phone_rot.copy()

xyz = np.zeros(3)
orientation = np.zeros((3, 3))
self.arm.FK(self.arm.last_feedback.position,
xyz_out=xyz,
orientation_out=orientation)

self.last_locked_xyz = xyz.copy()
self.last_locked_rot = orientation.copy()

if not self.locked:
arm_goal = self.compute_arm_goal(arm_input)
if arm_goal is not None:
self.arm.set_goal(arm_goal)
self.arm.set_goal(arm_goal)
else:
self.phone_xyz_home = arm_input.phone_pos
self.phone_rot_home = arm_input.phone_rot
self.xyz_scale = self.xyz_scale_init * arm_input.xyz_scaling

self.last_locked_seed = last_pos.copy()
xyz = np.zeros(3)
orientation = np.zeros((3, 3))
self.arm.FK(last_pos,
xyz_out=xyz,
orientation_out=orientation)

self.last_locked_xyz = xyz
self.last_locked_rot = orientation

if self.gripper is not None:
if self.gripper:
gripper_closed = self.gripper.state == 1.0
if arm_input.gripper_closed and not gripper_closed:
print('Gripper Close')
Expand Down Expand Up @@ -186,20 +182,19 @@ def transition_to(self, t_now: float, state: ArmControlState):
self.state = state

def compute_arm_goal(self, arm_input: ArmMobileIOInputs):
phone_offset = arm_input.phone_pos - self.phone_xyz_home
rot_mat = self.phone_rot_home
arm_xyz_target = self.last_locked_xyz + arm_input.ar_scaling * \
self.xyz_scale * (rot_mat.T @ phone_offset)
arm_rot_target = rot_mat.T @ arm_input.phone_rot @ self.last_locked_rot
phone_offset = arm_input.phone_pos - self.phone_xyz_init
arm_xyz_target = self.last_locked_xyz + self.xyz_scale * \
(self.phone_rot_init.T @ phone_offset)
arm_rot_target = (arm_input.phone_rot @ self.phone_rot_init.T) @ self.last_locked_rot

# if ar scaling is 0, move the home AR pose to current pose
# this keeps the arm from driving to some weird offset when scaling
# is turned back up by the user in the future
if arm_input.ar_scaling == 0.0:
if arm_input.xyz_scaling == 0.0:
self.phone_xyz_home = arm_input.phone_pos

joint_target = self.arm.ik_target_xyz_so3(
self.last_locked_seed,
self.arm.last_feedback.position,
arm_xyz_target,
arm_rot_target)

Expand All @@ -217,15 +212,33 @@ def stop(self):


def setup_mobile_io(m: 'MobileIO'):
m.set_button_label(1, '⟲')
m.set_button_label(5, 'lock')
m.resetUI()
m.set_button_label(1, '⟲', blocking=False)
m.set_button_label(2, '', blocking=False)
m.set_button_label(3, '', blocking=False)
m.set_button_label(4, '', blocking=False)
m.set_button_label(5, 'Lock', blocking=False)
m.set_button_mode(5, 1)
m.set_button_label(7, 'grip')
m.set_button_label(6, '', blocking=False)
m.set_button_label(7, 'Gripper', blocking=False)
m.set_button_mode(7, 1)
m.set_button_label(8, '❌')


def parse_mobile_feedback(m: 'MobileIO'):
m.set_button_label(8, '❌', blocking=False)

m.set_axis_label(1, '', blocking=False)
m.set_axis_label(2, '', blocking=False)
m.set_axis_label(3, '', blocking=False)
m.set_axis_label(4, 'XYZ\nScale', blocking=False)
m.set_axis_label(5, '', blocking=False)
m.set_axis_label(6, '', blocking=False)
m.set_axis_label(7, '', blocking=False)
m.set_axis_label(8, '', blocking=False)

m.set_snap(4, np.nan)
global xyz_scaling
xyz_scaling = 1.0
m.set_axis_value(4, xyz_scaling)

def parse_mobile_io_feedback(m: 'MobileIO', locked: bool):
if not m.update(0.0):
return False, None

Expand All @@ -243,40 +256,82 @@ def parse_mobile_feedback(m: 'MobileIO'):
print(f'Error getting orientation as matrix: {e}\n{m.orientation}')
rotation = np.eye(3)

global xyz_scaling
# Disable xyz scaling if the arm is unlocked
if locked:
xyz_scaling = (m.get_axis_state(4) + 1.0) / 2.0
if not locked:
m.set_axis_value(4, (2 * xyz_scaling) - 1.0)

arm_input = ArmMobileIOInputs(
phone_pos=np.copy(m.position),
phone_rot=rotation,
lock_toggle=m.get_button_state(5),
locked=m.get_button_state(7))
lock_toggle=(m.get_button_diff(5) != 0),
locked=(not m.get_button_state(5)),
gripper_closed=m.get_button_state(7),
xyz_scaling=(m.get_axis_state(4) + 1.0) / 2.0
)

return False, arm_input


if __name__ == "__main__":
# Parse command line arguments
parser = argparse.ArgumentParser(description='Control a HEBI arm with AR input')
parser.add_argument('--enable-logging', action='store_true', help='Enable logging')
args = parser.parse_args()

lookup = hebi.Lookup()
sleep(2)

# Arm setup
cfg = hebi.config.load_config('./config/A-2580-06.cfg.yaml')
root_dir = os.path.abspath(os.path.dirname(__file__))
cfg_file = os.path.join(root_dir, "config", "A-2580-06G.cfg.yaml")
cfg = hebi.config.load_config(cfg_file)
family = cfg.families[0]

# Create Arm object
arm = hebi.arm.create_from_config(cfg, lookup)

arm_control = ArmMobileIOControl(arm)
enable_logging = args.enable_logging
if enable_logging:
arm.group.start_log('dir', 'logs', mkdirs=True)

arm_family = cfg.families[0]
# Check if the gripper is disabled
use_gripper = cfg.user_data.get('has_gripper', False)
gripper = None
if use_gripper:
gripper_family = family
gripper_name = 'gripperSpool'

gripper_group = lookup.get_group_from_names([gripper_family], [gripper_name])
while gripper_group is None:
print(f"Looking for gripper module {gripper_family} / {gripper_name} ...")
sleep(1)
gripper_group = lookup.get_group_from_names([gripper_family], [gripper_name])

print("Gripper module found.")
gripper = hebi.arm.Gripper(gripper_group, -5, 1)
gripper_gains = os.path.join(root_dir, "config/gains/gripper_spool_gains.xml")
gripper.load_gains(gripper_gains)

arm_control = ArmMobileIOControl(arm,
gripper,
ik_seed=cfg.user_data.get('ik_seed_pos', None)
)

# Setup MobileIO
print('Looking for mobileIO device...')
m = create_mobile_io(lookup, arm_family)
m = create_mobile_io(lookup, family)
while m is None:
try:
print('Waiting for mobileIO device to come online...')
sleep(1)
m = create_mobile_io(lookup, arm_family)
m = create_mobile_io(lookup, family)
except KeyboardInterrupt as e:
exit()

print("mobileIO device found.")
m.resetUI()
m.update()
setup_mobile_io(m)

Expand All @@ -287,7 +342,7 @@ def parse_mobile_feedback(m: 'MobileIO'):
while arm_control.running:
t = time()
try:
quit, arm_input = parse_mobile_feedback(m)
quit, arm_input = parse_mobile_io_feedback(m, arm_control.locked)
if quit:
break
arm_control.update(t, arm_input)
Expand All @@ -296,3 +351,5 @@ def parse_mobile_feedback(m: 'MobileIO'):
break

arm_control.stop()
if enable_logging:
hebi_log = arm.group.stop_log()
Loading