| from pathlib import Path |
| import sys |
| import gymnasium as gym |
| from gymnasium import spaces |
| import numpy as np |
| from huggingface_hub import snapshot_download |
| import yaml |
| snapshot_download("lerobot/unitree-g1-mujoco") |
|
|
| |
| sys.path.insert(0, str(Path(__file__).parent)) |
|
|
| from sim.simulator_factory import SimulatorFactory, init_channel |
|
|
|
|
| def make_env(n_envs=1, use_async_envs=False, **kwargs): |
| """Create a UnitreeG1 simulation environment. |
| |
| Args: |
| n_envs: Number of environments (currently only 1 supported) |
| use_async_envs: Whether to use async envs (not implemented) |
| **kwargs: Additional arguments passed to simulator |
| - publish_images: bool, whether to publish camera images via ZMQ |
| - camera_port: int, ZMQ port for camera images |
| - cameras: list of camera names |
| """ |
| repo_dir = Path(__file__).parent |
| |
| |
| config_path = repo_dir / "config.yaml" |
| with open(config_path) as f: |
| config = yaml.safe_load(f) |
| |
| |
| publish_images = kwargs.get("publish_images", True) |
| camera_port = kwargs.get("camera_port", 5555) |
| cameras = kwargs.get("cameras", ["head_camera"]) |
| |
| enable_offscreen = publish_images or config.get("ENABLE_OFFSCREEN", False) |
| camera_configs = {} |
| if enable_offscreen: |
| for cam_name in cameras: |
| camera_configs[cam_name] = {"height": 480, "width": 640} |
| |
| |
| init_channel(config=config) |
| |
| |
| simulator = SimulatorFactory.create_simulator( |
| config=config, |
| env_name="default", |
| onscreen=config.get("ENABLE_ONSCREEN", True), |
| offscreen=enable_offscreen, |
| camera_configs=camera_configs, |
| ) |
| |
| class UnitreeG1Env(gym.Env): |
| """Gymnasium environment wrapper for Unitree G1 MuJoCo simulation.""" |
| |
| metadata = {"render_modes": ["human"]} |
|
|
| def __init__(self, sim, cam_configs, pub_images, cam_port): |
| super().__init__() |
| self.simulator = sim |
| self.sim_env = sim.sim_env |
| self.step_count = 0 |
| self.camera_configs = cam_configs |
| |
| |
| self.sim_dt = config["SIMULATE_DT"] |
| self.viewer_dt = config.get("VIEWER_DT", 0.02) |
| self.image_dt = config.get("IMAGE_DT", 0.033333) |
| |
| |
| if pub_images and len(cam_configs) > 0: |
| sim.start_image_publish_subprocess( |
| start_method=config.get("MP_START_METHOD", "spawn"), |
| camera_port=cam_port |
| ) |
| print(f"Camera images publishing on tcp://localhost:{cam_port}") |
| |
| |
| num_joints = config.get("NUM_MOTORS", 29) |
| self.action_space = spaces.Box(-np.pi, np.pi, shape=(num_joints,), dtype=np.float32) |
| self.observation_space = spaces.Box(-np.inf, np.inf, shape=(num_joints * 3 + 10,), dtype=np.float32) |
|
|
| def reset(self, seed=None, options=None): |
| """Reset the simulation.""" |
| super().reset(seed=seed, options=options) |
| self.simulator.reset() |
| self.step_count = 0 |
| obs = self._get_obs() |
| return obs, {} |
|
|
| def step(self, action=None): |
| """Execute one simulation step. Caller handles timing.""" |
| |
| self.sim_env.sim_step() |
| |
| |
| if self.step_count % int(self.viewer_dt / self.sim_dt) == 0: |
| self.sim_env.update_viewer() |
| |
| |
| if self.step_count % int(self.image_dt / self.sim_dt) == 0: |
| self.sim_env.update_render_caches() |
| |
| self.step_count += 1 |
| |
| obs = self._get_obs() |
| reward = 0.0 |
| terminated = False |
| truncated = False |
| info = {} |
| return obs, reward, terminated, truncated, info |
| |
| def _get_obs(self): |
| """Get current observation from simulation.""" |
| obs_dict = self.sim_env.prepare_obs() |
| obs = np.concatenate([ |
| obs_dict.get("body_q", np.zeros(29)), |
| obs_dict.get("body_dq", np.zeros(29)), |
| obs_dict.get("body_tau_est", np.zeros(29)), |
| obs_dict.get("floating_base_pose", np.zeros(7))[:4], |
| obs_dict.get("floating_base_vel", np.zeros(6))[:3], |
| obs_dict.get("floating_base_acc", np.zeros(3)), |
| ]).astype(np.float32) |
| return obs |
|
|
| def close(self): |
| """Close the simulation.""" |
| print("++++closing simulator ++++") |
| self.simulator.close() |
|
|
| return UnitreeG1Env(simulator, camera_configs, publish_images, camera_port) |
|
|