Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/examples.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ jobs:
run: |
pip install --upgrade pip setuptools wheel
pip install torch --index-url https://download.pytorch.org/whl/cpu
pip install -e '.[dev]' pynput

- name: Get gstaichi version
id: gstaichi_version
Expand Down
268 changes: 138 additions & 130 deletions examples/IPC_Solver/ipc_arm_cloth.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,46 +14,21 @@
; - Roll Right (Rotate around X)
u - Reset Scene
space - Press to close gripper, release to open gripper
esc - Quit

Plus all default viewer controls (press 'i' to see them)
"""

import random
import threading
import argparse
import numpy as np
import csv
import os
from datetime import datetime
from pynput import keyboard
from scipy.spatial.transform import Rotation as R

import numpy as np
from huggingface_hub import snapshot_download
from scipy.spatial.transform import Rotation as R

import genesis as gs


class KeyboardDevice:
def __init__(self):
self.pressed_keys = set()
self.lock = threading.Lock()
self.listener = keyboard.Listener(on_press=self.on_press, on_release=self.on_release)

def start(self):
self.listener.start()

def stop(self):
self.listener.stop()
self.listener.join()

def on_press(self, key: keyboard.Key):
with self.lock:
self.pressed_keys.add(key)

def on_release(self, key: keyboard.Key):
with self.lock:
self.pressed_keys.discard(key)

def get_cmd(self):
return self.pressed_keys
from genesis.ext.pyrender.interaction.keybindings import KeyAction, Keybind


def build_scene(use_ipc=False, show_viewer=False, enable_ipc_gui=False):
Expand Down Expand Up @@ -172,14 +147,14 @@ def build_scene(use_ipc=False, show_viewer=False, enable_ipc_gui=False):
return scene, entities


def run_sim(scene, entities, clients, mode="interactive", trajectory_file=None):
def run_sim(scene, entities, mode="interactive", trajectory_file=None):
robot = entities["robot"]
target_entity = entities["target"]

robot_init_pos = np.array([0.5, 0, 0.55])
robot_init_R = R.from_euler("y", np.pi)
target_pos = robot_init_pos.copy()
target_R = robot_init_R
target_pos = [robot_init_pos.copy()] # Use list for mutability in closures
target_R = [robot_init_R] # Use list for mutability in closures

n_dofs = robot.n_dofs
motors_dof = np.arange(n_dofs - 2)
Expand All @@ -190,18 +165,97 @@ def run_sim(scene, entities, clients, mode="interactive", trajectory_file=None):
trajectory = []
recording = mode == "record"

# Gripper state (use list for mutability in closures)
gripper_closed = [False]

# Control parameters
dpos = 0.002
drot = 0.01

def reset_scene():
nonlocal target_pos, target_R
target_pos = robot_init_pos.copy()
target_R = robot_init_R
target_quat = target_R.as_quat(scalar_first=True)
target_entity.set_qpos(np.concatenate([target_pos, target_quat]))
q = robot.inverse_kinematics(link=ee_link, pos=target_pos, quat=target_quat)
target_pos[0] = robot_init_pos.copy()
target_R[0] = robot_init_R
target_quat = target_R[0].as_quat(scalar_first=True)
target_entity.set_qpos(np.concatenate([target_pos[0], target_quat]))
q = robot.inverse_kinematics(link=ee_link, pos=target_pos[0], quat=target_quat)
robot.set_qpos(q[:-2], motors_dof)

# entities["cube"].set_pos((random.uniform(0.2, 0.4), random.uniform(-0.2, 0.2), 0.05))
# entities["cube"].set_quat(R.from_euler("z", random.uniform(0, np.pi * 2)).as_quat(scalar_first=True))

# Define movement callbacks
def move_forward():
target_pos[0][0] -= dpos

def move_backward():
target_pos[0][0] += dpos

def move_left():
target_pos[0][1] -= dpos

def move_right():
target_pos[0][1] += dpos

def move_up():
target_pos[0][2] += dpos

def move_down():
target_pos[0][2] -= dpos

def yaw_left():
target_R[0] = R.from_euler("z", drot) * target_R[0]

def yaw_right():
target_R[0] = R.from_euler("z", -drot) * target_R[0]

def pitch_up():
target_R[0] = R.from_euler("y", drot) * target_R[0]

def pitch_down():
target_R[0] = R.from_euler("y", -drot) * target_R[0]

def roll_left():
target_R[0] = R.from_euler("x", drot) * target_R[0]

def roll_right():
target_R[0] = R.from_euler("x", -drot) * target_R[0]

def close_gripper():
gripper_closed[0] = True

def open_gripper():
gripper_closed[0] = False

# Register keybindings (only for interactive and record modes)
if mode in ["interactive", "record"]:
from pyglet.window import key

scene.viewer.register_keybinds(
(
Keybind(key_code=key.UP, key_action=KeyAction.HOLD, name="move_forward", callback_func=move_forward),
Keybind(
key_code=key.DOWN, key_action=KeyAction.HOLD, name="move_backward", callback_func=move_backward
),
Keybind(key_code=key.LEFT, key_action=KeyAction.HOLD, name="move_left", callback_func=move_left),
Keybind(key_code=key.RIGHT, key_action=KeyAction.HOLD, name="move_right", callback_func=move_right),
Keybind(key_code=key.N, key_action=KeyAction.HOLD, name="move_up", callback_func=move_up),
Keybind(key_code=key.M, key_action=KeyAction.HOLD, name="move_down", callback_func=move_down),
Keybind(key_code=key.J, key_action=KeyAction.HOLD, name="yaw_left", callback_func=yaw_left),
Keybind(key_code=key.K, key_action=KeyAction.HOLD, name="yaw_right", callback_func=yaw_right),
Keybind(key_code=key.I, key_action=KeyAction.HOLD, name="pitch_up", callback_func=pitch_up),
Keybind(key_code=key.O, key_action=KeyAction.HOLD, name="pitch_down", callback_func=pitch_down),
Keybind(key_code=key.L, key_action=KeyAction.HOLD, name="roll_left", callback_func=roll_left),
Keybind(key_code=key.SEMICOLON, key_action=KeyAction.HOLD, name="roll_right", callback_func=roll_right),
Keybind(key_code=key.U, key_action=KeyAction.HOLD, name="reset_scene", callback_func=reset_scene),
Keybind(
key_code=key.SPACE, key_action=KeyAction.PRESS, name="close_gripper", callback_func=close_gripper
),
Keybind(
key_code=key.SPACE, key_action=KeyAction.RELEASE, name="open_gripper", callback_func=open_gripper
),
)
)

# Load trajectory if in playback mode
if mode == "playback":
if not trajectory_file or not os.path.exists(trajectory_file):
Expand Down Expand Up @@ -233,7 +287,7 @@ def reset_scene():

print(f"\nMode: {mode.upper()}")
if mode == "record":
print("Recording trajectory... Press ESC to stop and save.")
print("Recording trajectory...")
elif mode == "playback":
print("Playing back trajectory...")

Expand All @@ -249,99 +303,57 @@ def reset_scene():
print("l/;\t- Roll Left/Right (Rotate around X axis)")
print("u\t- Reset Scene")
print("space\t- Press to close gripper, release to open gripper")
print("esc\t- Quit")
if mode in ["interactive", "record"]:
print("\nPlus all default viewer controls (press 'i' to see them)")

# reset scene before starting teleoperation
reset_scene()

# start teleoperation or playback
stop = False
step_count = 0

while not stop:
if mode == "playback":
# Playback mode: replay recorded trajectory
if step_count < len(trajectory):
step_data = trajectory[step_count]
target_pos = step_data["target_pos"]
target_R = R.from_quat(step_data["target_quat"])
is_close_gripper = step_data["gripper_closed"]
step_count += 1
print(f"\rPlayback step: {step_count}/{len(trajectory)}", end="")
# Check if user wants to stop playback
pressed_keys = clients["keyboard"].pressed_keys.copy()
stop = keyboard.Key.esc in pressed_keys
try:
while True:
if mode == "playback":
# Playback mode: replay recorded trajectory
if step_count < len(trajectory):
step_data = trajectory[step_count]
target_pos[0] = step_data["target_pos"]
target_R[0] = R.from_quat(step_data["target_quat"])
gripper_closed[0] = step_data["gripper_closed"]
step_count += 1
print(f"\rPlayback step: {step_count}/{len(trajectory)}", end="")
else:
print("\nPlayback finished!")
break
else:
print("\nPlayback finished!")
break
else:
# Interactive or recording mode
pressed_keys = clients["keyboard"].pressed_keys.copy()

# reset scene:
reset_flag = False
reset_flag |= keyboard.KeyCode.from_char("u") in pressed_keys
if reset_flag:
reset_scene()

# stop teleoperation
stop = keyboard.Key.esc in pressed_keys

# get ee target pose
is_close_gripper = False
dpos = 0.002
drot = 0.01
for key in pressed_keys:
if key == keyboard.Key.up:
target_pos[0] -= dpos
elif key == keyboard.Key.down:
target_pos[0] += dpos
elif key == keyboard.Key.right:
target_pos[1] += dpos
elif key == keyboard.Key.left:
target_pos[1] -= dpos
elif key == keyboard.KeyCode.from_char("n"):
target_pos[2] += dpos
elif key == keyboard.KeyCode.from_char("m"):
target_pos[2] -= dpos
elif key == keyboard.KeyCode.from_char("j"):
target_R = R.from_euler("z", drot) * target_R
elif key == keyboard.KeyCode.from_char("k"):
target_R = R.from_euler("z", -drot) * target_R
elif key == keyboard.KeyCode.from_char("i"):
target_R = R.from_euler("y", drot) * target_R
elif key == keyboard.KeyCode.from_char("o"):
target_R = R.from_euler("y", -drot) * target_R
elif key == keyboard.KeyCode.from_char("l"):
target_R = R.from_euler("x", drot) * target_R
elif key == keyboard.KeyCode.from_char(";"):
target_R = R.from_euler("x", -drot) * target_R
elif key == keyboard.Key.space:
is_close_gripper = True

# Record current state if recording
if recording:
step_data = {
"target_pos": target_pos.copy(),
"target_quat": target_R.as_quat(), # x,y,z,w format
"gripper_closed": is_close_gripper,
"step": step_count,
}
trajectory.append(step_data)

# control arm
target_quat = target_R.as_quat(scalar_first=True)
target_entity.set_qpos(np.concatenate([target_pos, target_quat]))
q, err = robot.inverse_kinematics(link=ee_link, pos=target_pos, quat=target_quat, return_error=True)
robot.control_dofs_position(q[:-2], motors_dof)
# control gripper
if is_close_gripper:
robot.control_dofs_force(np.array([-1.0, -1.0]), fingers_dof)
else:
robot.control_dofs_force(np.array([1.0, 1.0]), fingers_dof)
# Interactive or recording mode
# Movement is handled by keybinding callbacks
# Record current state if recording
if recording:
step_data = {
"target_pos": target_pos[0].copy(),
"target_quat": target_R[0].as_quat(), # x,y,z,w format
"gripper_closed": gripper_closed[0],
"step": step_count,
}
trajectory.append(step_data)

# control arm
target_quat = target_R[0].as_quat(scalar_first=True)
target_entity.set_qpos(np.concatenate([target_pos[0], target_quat]))
q, err = robot.inverse_kinematics(link=ee_link, pos=target_pos[0], quat=target_quat, return_error=True)
robot.control_dofs_position(q[:-2], motors_dof)
# control gripper
if gripper_closed[0]:
robot.control_dofs_force(np.array([-1.0, -1.0]), fingers_dof)
else:
robot.control_dofs_force(np.array([1.0, 1.0]), fingers_dof)

scene.step()
step_count += 1
scene.step()
step_count += 1
except KeyboardInterrupt:
gs.logger.info("Simulation interrupted, exiting.")

# Save trajectory if recording
if recording and len(trajectory) > 0:
Expand Down Expand Up @@ -437,12 +449,8 @@ def main():
elif not os.path.isabs(trajectory_file):
trajectory_file = os.path.join(traj_dir, os.path.basename(trajectory_file))

clients = dict()
clients["keyboard"] = KeyboardDevice()
clients["keyboard"].start()

scene, entities = build_scene(use_ipc=args.ipc, show_viewer=args.vis, enable_ipc_gui=False)
run_sim(scene, entities, clients, mode=args.mode, trajectory_file=trajectory_file)
run_sim(scene, entities, mode=args.mode, trajectory_file=trajectory_file)


if __name__ == "__main__":
Expand Down
Loading