#!/usr/bin/env python3 # -*- coding: utf-8 -*- """Portable inference example for the Hyundai Uiwang FlowMatch Diffusion Policy. Runs with ONLY the Hugging Face model id — no dataset download, no robot, no local checkpoint needed. The uploaded model bundles its normalization stats (policy_preprocessor / policy_postprocessor), so `make_pre_post_processors` loads everything straight from the Hub. What you need to provide at run time: * front_rgb : np.uint8 (H, W, 3) RGB — scene/zivid camera view * wrist_rgb : np.uint8 (H, W, 3) RGB — wrist camera view * state : np.float32 (26,) — arm joints (6) + hand joints (20) The policy resizes images internally (to 240x320 then center-crops), so the input camera resolution does not need to match training exactly — just pass the raw RGB frames. Output: np.float32 (26,) action = target arm joints (6) + target hand joints (20), at 30 Hz. Usage: # self-contained demo with synthetic frames (verifies the model loads + runs): python examples/hyundai_uiwang/inference_example.py # specify a different model / device: python examples/hyundai_uiwang/inference_example.py --model-id Ngseo/hyundai-uiwang-left-flowmatch --device cuda Install (once): pip install lerobot # or use this repo with PYTHONPATH=src """ from __future__ import annotations import argparse import numpy as np import torch from lerobot.policies.diffusion.modeling_diffusion import DiffusionPolicy from lerobot.policies.factory import make_pre_post_processors DEFAULT_MODEL_ID = "Ngseo/hyundai-uiwang-left-flowmatch" # Camera feature keys the model was trained with (see the model card / config.json). FRONT_KEY = "observation.images.front_rgb" WRIST_KEY = "observation.images.wrist_rgb" STATE_KEY = "observation.state" def load_policy(model_id: str = DEFAULT_MODEL_ID, device: str = "cuda"): """Load the policy + pre/post processors from the Hugging Face Hub.""" device = device if (device != "cuda" or torch.cuda.is_available()) else "cpu" policy = DiffusionPolicy.from_pretrained(model_id) policy.config.device = device # saved config pins device=cuda; align it to the runtime device policy.to(device) policy.eval() policy.reset() # clears the internal observation/action queues # pretrained_path=model_id -> normalization stats are loaded from the Hub repo. # Override the saved device_processor step so preprocessing targets `device` too. preprocess, postprocess = make_pre_post_processors( policy.config, model_id, preprocessor_overrides={"device_processor": {"device": device}} ) return policy, preprocess, postprocess, device @torch.no_grad() def predict_action( policy, preprocess, postprocess, front_rgb: np.ndarray, wrist_rgb: np.ndarray, state: np.ndarray, device: str = "cuda", ) -> np.ndarray: """Run one inference step and return a 26-d action as np.float32. Note: the policy keeps an internal queue (n_obs_steps / n_action_steps), so call this repeatedly at the control loop rate; `policy.reset()` starts a new episode. """ # Raw frame dict in the format expected by the preprocessor: # images: uint8 (H, W, C); state: float32 (D,) — batching/normalization # are handled by the processor pipeline. obs = { FRONT_KEY: torch.from_numpy(front_rgb).to(torch.float32).div(255).permute(2, 0, 1).unsqueeze(0).to(device), WRIST_KEY: torch.from_numpy(wrist_rgb).to(torch.float32).div(255).permute(2, 0, 1).unsqueeze(0).to(device), STATE_KEY: torch.from_numpy(state).to(torch.float32).unsqueeze(0).to(device), "task": "", "robot_type": "", } obs = preprocess(obs) action = policy.select_action(obs) # (1, 26), normalized action = postprocess(action) # unnormalized return action.squeeze(0).float().cpu().numpy() def main() -> None: ap = argparse.ArgumentParser() ap.add_argument("--model-id", default=DEFAULT_MODEL_ID) ap.add_argument("--device", default="cuda", choices=["cuda", "cpu", "mps"]) ap.add_argument("--steps", type=int, default=4, help="number of demo inference steps") args = ap.parse_args() print(f"Loading {args.model_id} ...") policy, preprocess, postprocess, device = load_policy(args.model_id, args.device) # Report what the model expects (handy when porting to a new robot). img_keys = [k for k in policy.config.input_features if "image" in k] state_dim = policy.config.input_features[STATE_KEY].shape[0] action_dim = policy.config.output_features["action"].shape[0] print(f"device={device} | cameras={img_keys} | state_dim={state_dim} | action_dim={action_dim}") # --- demo with synthetic frames (replace these with real camera/robot data) --- rng = np.random.default_rng(0) for t in range(args.steps): front_rgb = rng.integers(0, 256, size=(480, 640, 3), dtype=np.uint8) wrist_rgb = rng.integers(0, 256, size=(480, 640, 3), dtype=np.uint8) state = rng.standard_normal(state_dim).astype(np.float32) action = predict_action(policy, preprocess, postprocess, front_rgb, wrist_rgb, state, device) print(f"step {t}: action[26] = {np.array2string(action, precision=3, max_line_width=120)}") print("\nOK — model runs from the HF id alone. Swap the synthetic frames for your robot's cameras/state.") if __name__ == "__main__": main()