Skip to content

Utilities API

General-purpose utility functions, configuration, and helper classes.

embodied_gen.utils.config

embodied_gen.utils.log

embodied_gen.utils.enum

AssetType dataclass

AssetType()

Bases: str

Enumeration for asset types.

Supported types

MJCF: MuJoCo XML format. USD: Universal Scene Description format. URDF: Unified Robot Description Format. MESH: Mesh file format.

LayoutInfo dataclass

LayoutInfo(tree: dict[str, list], relation: dict[str, str | list[str]], objs_desc: dict[str, str] = dict(), objs_mapping: dict[str, str] = dict(), assets: dict[str, str] = dict(), quality: dict[str, str] = dict(), position: dict[str, list[float]] = dict())

Bases: DataClassJsonMixin

Data structure for layout information in a 3D scene.

Attributes:

Name Type Description
tree dict[str, list]

Hierarchical structure of scene objects.

relation dict[str, str | list[str]]

Spatial relations between objects.

objs_desc dict[str, str]

Descriptions of objects.

objs_mapping dict[str, str]

Mapping from object names to categories.

assets dict[str, str]

Asset file paths for objects.

quality dict[str, str]

Quality information for assets.

position dict[str, list[float]]

Position coordinates for objects.

RenderItems dataclass

RenderItems()

Bases: str, Enum

Enumeration of render item types for 3D scenes.

Attributes:

Name Type Description
IMAGE

Color image.

ALPHA

Mask image.

VIEW_NORMAL

View-space normal image.

GLOBAL_NORMAL

World-space normal image.

POSITION_MAP

Position map image.

DEPTH

Depth image.

ALBEDO

Albedo image.

DIFFUSE

Diffuse image.

RobotItemEnum dataclass

RobotItemEnum()

Bases: str, Enum

Enumeration of supported robot types.

Attributes:

Name Type Description
FRANKA

Franka robot.

UR5

UR5 robot.

PIPER

Piper robot.

Scene3DItemEnum dataclass

Scene3DItemEnum()

Bases: str, Enum

Enumeration of 3D scene item categories.

Attributes:

Name Type Description
BACKGROUND

Background objects.

CONTEXT

Contextual objects.

ROBOT

Robot entity.

MANIPULATED_OBJS

Objects manipulated by the robot.

DISTRACTOR_OBJS

Distractor objects.

OTHERS

Other objects.

Methods:

Name Description
object_list

Returns a list of objects in the scene.

object_mapping

Returns a mapping from object to category.

object_list classmethod
object_list(layout_relation: dict) -> list

Returns a list of objects in the scene.

Parameters:

Name Type Description Default
layout_relation dict

Dictionary mapping categories to objects.

required

Returns:

Type Description
list

List of objects in the scene.

Source code in embodied_gen/utils/enum.py
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
@classmethod
def object_list(cls, layout_relation: dict) -> list:
    """Returns a list of objects in the scene.

    Args:
        layout_relation: Dictionary mapping categories to objects.

    Returns:
        List of objects in the scene.
    """
    return (
        [
            layout_relation[cls.BACKGROUND.value],
            layout_relation[cls.CONTEXT.value],
        ]
        + layout_relation[cls.MANIPULATED_OBJS.value]
        + layout_relation[cls.DISTRACTOR_OBJS.value]
    )
object_mapping classmethod
object_mapping(layout_relation)

Returns a mapping from object to category.

Parameters:

Name Type Description Default
layout_relation

Dictionary mapping categories to objects.

required

Returns:

Type Description

Dictionary mapping object names to their category.

Source code in embodied_gen/utils/enum.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
@classmethod
def object_mapping(cls, layout_relation):
    """Returns a mapping from object to category.

    Args:
        layout_relation: Dictionary mapping categories to objects.

    Returns:
        Dictionary mapping object names to their category.
    """
    relation_mapping = {
        # layout_relation[cls.ROBOT.value]: cls.ROBOT.value,
        layout_relation[cls.BACKGROUND.value]: cls.BACKGROUND.value,
        layout_relation[cls.CONTEXT.value]: cls.CONTEXT.value,
    }
    relation_mapping.update(
        {
            item: cls.MANIPULATED_OBJS.value
            for item in layout_relation[cls.MANIPULATED_OBJS.value]
        }
    )
    relation_mapping.update(
        {
            item: cls.DISTRACTOR_OBJS.value
            for item in layout_relation[cls.DISTRACTOR_OBJS.value]
        }
    )

    return relation_mapping

SimAssetMapper

Maps simulator names to asset types.

Provides a mapping from simulator names to their corresponding asset type.

Example
from embodied_gen.utils.enum import SimAssetMapper
asset_type = SimAssetMapper["isaacsim"]
print(asset_type)  # Output: 'usd'

Methods:

Name Description
__class_getitem__

Returns the asset type for a given simulator name.

__class_getitem__ classmethod
__class_getitem__(key: str)

Returns the asset type for a given simulator name.

Parameters:

Name Type Description Default
key str

Name of the simulator.

required

Returns:

Type Description

AssetType corresponding to the simulator.

Raises:

Type Description
KeyError

If the simulator name is not recognized.

Source code in embodied_gen/utils/enum.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
@classmethod
def __class_getitem__(cls, key: str):
    """Returns the asset type for a given simulator name.

    Args:
        key: Name of the simulator.

    Returns:
        AssetType corresponding to the simulator.

    Raises:
        KeyError: If the simulator name is not recognized.
    """
    key = key.upper()
    if key.startswith("SAPIEN"):
        key = "SAPIEN"
    return cls._mapping[key]

SpatialRelationEnum dataclass

SpatialRelationEnum()

Bases: str, Enum

Enumeration of spatial relations for objects in a scene.

Attributes:

Name Type Description
ON

Objects on a surface (e.g., table).

IN

Objects in a container or room.

INSIDE

Objects inside a shelf or rack.

FLOOR

Objects on the floor.

embodied_gen.utils.geometry

all_corners_inside

all_corners_inside(hull: Path, box: list, threshold: int = 3) -> bool

Checks if at least threshold corners of a box are inside a hull.

Parameters:

Name Type Description Default
hull Path

Convex hull path.

required
box list

Box coordinates [x1, x2, y1, y2].

required
threshold int

Minimum corners inside.

3

Returns:

Name Type Description
bool bool

True if enough corners are inside.

Source code in embodied_gen/utils/geometry.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
def all_corners_inside(hull: Path, box: list, threshold: int = 3) -> bool:
    """Checks if at least `threshold` corners of a box are inside a hull.

    Args:
        hull (Path): Convex hull path.
        box (list): Box coordinates [x1, x2, y1, y2].
        threshold (int, optional): Minimum corners inside.

    Returns:
        bool: True if enough corners are inside.
    """
    x1, x2, y1, y2 = box
    corners = [[x1, y1], [x2, y1], [x1, y2], [x2, y2]]

    num_inside = sum(hull.contains_point(c) for c in corners)
    return num_inside >= threshold

bfs_placement

bfs_placement(layout_file: str, floor_margin: float = 0, beside_margin: float = 0.1, max_attempts: int = 3000, init_rpy: tuple = (1.5708, 0.0, 0.0), rotate_objs: bool = True, rotate_bg: bool = True, rotate_context: bool = True, limit_reach_range: tuple[float, float] | None = (0.2, 0.85), max_orient_diff: float | None = 60, robot_dim: float = 0.12, seed: int = None) -> LayoutInfo

Places objects in a scene layout using BFS traversal.

Parameters:

Name Type Description Default
layout_file str

Path to layout JSON file generated from layout-cli.

required
floor_margin float

Z-offset for objects placed on the floor.

0
beside_margin float

Minimum margin for objects placed 'beside' their parent, used when 'on' placement fails.

0.1
max_attempts int

Max attempts for a non-overlapping placement.

3000
init_rpy tuple

Initial rotation (rpy).

(1.5708, 0.0, 0.0)
rotate_objs bool

Whether to random rotate objects.

True
rotate_bg bool

Whether to random rotate background.

True
rotate_context bool

Whether to random rotate context asset.

True
limit_reach_range tuple[float, float] | None

If set, enforce a check that manipulated objects are within the robot's reach range, in meter.

(0.2, 0.85)
max_orient_diff float | None

If set, enforce a check that manipulated objects are within the robot's orientation range, in degree.

60
robot_dim float

The approximate robot size.

0.12
seed int

Random seed for reproducible placement.

None

Returns:

Name Type Description
LayoutInfo LayoutInfo

Layout information with object poses.

Example
from embodied_gen.utils.geometry import bfs_placement
layout = bfs_placement("scene_layout.json", seed=42)
print(layout.position)
Source code in embodied_gen/utils/geometry.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
@with_seed("seed")
def bfs_placement(
    layout_file: str,
    floor_margin: float = 0,
    beside_margin: float = 0.1,
    max_attempts: int = 3000,
    init_rpy: tuple = (1.5708, 0.0, 0.0),
    rotate_objs: bool = True,
    rotate_bg: bool = True,
    rotate_context: bool = True,
    limit_reach_range: tuple[float, float] | None = (0.20, 0.85),
    max_orient_diff: float | None = 60,
    robot_dim: float = 0.12,
    seed: int = None,
) -> LayoutInfo:
    """Places objects in a scene layout using BFS traversal.

    Args:
        layout_file (str): Path to layout JSON file generated from `layout-cli`.
        floor_margin (float, optional): Z-offset for objects placed on the floor.
        beside_margin (float, optional): Minimum margin for objects placed 'beside' their parent, used when 'on' placement fails.
        max_attempts (int, optional): Max attempts for a non-overlapping placement.
        init_rpy (tuple, optional): Initial rotation (rpy).
        rotate_objs (bool, optional): Whether to random rotate objects.
        rotate_bg (bool, optional): Whether to random rotate background.
        rotate_context (bool, optional): Whether to random rotate context asset.
        limit_reach_range (tuple[float, float] | None, optional): If set, enforce a check that manipulated objects are within the robot's reach range, in meter.
        max_orient_diff (float | None, optional): If set, enforce a check that manipulated objects are within the robot's orientation range, in degree.
        robot_dim (float, optional): The approximate robot size.
        seed (int, optional): Random seed for reproducible placement.

    Returns:
        LayoutInfo: Layout information with object poses.

    Example:
        ```py
        from embodied_gen.utils.geometry import bfs_placement
        layout = bfs_placement("scene_layout.json", seed=42)
        print(layout.position)
        ```
    """
    layout_info = LayoutInfo.from_dict(json.load(open(layout_file, "r")))
    asset_dir = os.path.dirname(layout_file)
    object_mapping = layout_info.objs_mapping
    position = {}  # node: [x, y, z, qx, qy, qz, qw]
    parent_bbox_xy = {}
    placed_boxes_map = defaultdict(list)
    mesh_info = defaultdict(dict)
    robot_node = layout_info.relation[Scene3DItemEnum.ROBOT.value]
    for node in object_mapping:
        if object_mapping[node] == Scene3DItemEnum.BACKGROUND.value:
            bg_quat = (
                compute_axis_rotation_quat(
                    axis="y",
                    angle_rad=np.random.uniform(0, 2 * np.pi),
                )
                if rotate_bg
                else [0, 0, 0, 1]
            )
            bg_quat = [round(q, 4) for q in bg_quat]
            continue

        mesh_path = (
            f"{layout_info.assets[node]}/mesh/{node.replace(' ', '_')}.obj"
        )
        mesh_path = os.path.join(asset_dir, mesh_path)
        mesh_info[node]["path"] = mesh_path
        mesh = trimesh.load(mesh_path)
        rotation = R.from_euler("xyz", init_rpy, degrees=False)
        vertices = mesh.vertices @ rotation.as_matrix().T
        z1 = np.percentile(vertices[:, 2], 1)
        z2 = np.percentile(vertices[:, 2], 99)

        if object_mapping[node] == Scene3DItemEnum.CONTEXT.value:
            object_quat = [0, 0, 0, 1]
            if rotate_context:
                angle_rad = np.random.uniform(0, 2 * np.pi)
                object_quat = compute_axis_rotation_quat(
                    axis="z", angle_rad=angle_rad
                )
                rotation = R.from_quat(object_quat).as_matrix()
                vertices = vertices @ rotation.T

            mesh_info[node]["surface"] = compute_convex_hull_path(vertices)

            # Put robot in the CONTEXT edge.
            x, y = random.choice(mesh_info[node]["surface"].vertices)
            theta = np.arctan2(y, x)
            quat_initial = Quaternion(axis=[0, 0, 1], angle=theta)
            quat_extra = Quaternion(axis=[0, 0, 1], angle=np.pi)
            quat = quat_extra * quat_initial
            _pose = [x, y, z2 - z1, quat.x, quat.y, quat.z, quat.w]
            position[robot_node] = [round(v, 4) for v in _pose]
            node_box = [
                x - robot_dim / 2,
                x + robot_dim / 2,
                y - robot_dim / 2,
                y + robot_dim / 2,
            ]
            placed_boxes_map[node].append(node_box)
        elif rotate_objs:
            # For manipulated and distractor objects, apply random rotation
            angle_rad = np.random.uniform(0, 2 * np.pi)
            object_quat = compute_axis_rotation_quat(
                axis="z", angle_rad=angle_rad
            )
            rotation = R.from_quat(object_quat).as_matrix()
            vertices = vertices @ rotation.T

        x1, x2, y1, y2 = compute_xy_bbox(vertices)
        mesh_info[node]["pose"] = [x1, x2, y1, y2, z1, z2, *object_quat]
        mesh_info[node]["area"] = max(1e-5, (x2 - x1) * (y2 - y1))

    root = list(layout_info.tree.keys())[0]
    queue = deque([((root, None), layout_info.tree.get(root, []))])
    while queue:
        (node, relation), children = queue.popleft()
        if node not in object_mapping:
            continue

        if object_mapping[node] == Scene3DItemEnum.BACKGROUND.value:
            position[node] = [0, 0, floor_margin, *bg_quat]
        else:
            x1, x2, y1, y2, z1, z2, qx, qy, qz, qw = mesh_info[node]["pose"]
            if object_mapping[node] == Scene3DItemEnum.CONTEXT.value:
                position[node] = [0, 0, -round(z1, 4), qx, qy, qz, qw]
                parent_bbox_xy[node] = [x1, x2, y1, y2, z1, z2]
            elif object_mapping[node] in [
                Scene3DItemEnum.MANIPULATED_OBJS.value,
                Scene3DItemEnum.DISTRACTOR_OBJS.value,
            ]:
                parent_node = find_parent_node(node, layout_info.tree)
                parent_pos = position[parent_node]
                (
                    p_x1,
                    p_x2,
                    p_y1,
                    p_y2,
                    p_z1,
                    p_z2,
                ) = parent_bbox_xy[parent_node]

                obj_dx = x2 - x1
                obj_dy = y2 - y1
                hull_path = mesh_info[parent_node].get("surface")
                for _ in range(max_attempts):
                    node_x1 = random.uniform(p_x1, p_x2 - obj_dx)
                    node_y1 = random.uniform(p_y1, p_y2 - obj_dy)
                    node_box = [
                        node_x1,
                        node_x1 + obj_dx,
                        node_y1,
                        node_y1 + obj_dy,
                    ]
                    if hull_path and not all_corners_inside(
                        hull_path, node_box
                    ):
                        continue
                    # Make sure the manipulated object is reachable by robot.
                    if (
                        limit_reach_range is not None
                        and object_mapping[node]
                        == Scene3DItemEnum.MANIPULATED_OBJS.value
                    ):
                        cx = parent_pos[0] + node_box[0] + obj_dx / 2
                        cy = parent_pos[1] + node_box[2] + obj_dy / 2
                        cz = parent_pos[2] + p_z2 - z1
                        robot_pos = position[robot_node][:3]
                        if not check_reachable(
                            base_xyz=np.array(robot_pos),
                            reach_xyz=np.array([cx, cy, cz]),
                            min_reach=limit_reach_range[0],
                            max_reach=limit_reach_range[1],
                        ):
                            continue

                    # Make sure the manipulated object is inside the robot's orientation.
                    if (
                        max_orient_diff is not None
                        and object_mapping[node]
                        == Scene3DItemEnum.MANIPULATED_OBJS.value
                    ):
                        cx = parent_pos[0] + node_box[0] + obj_dx / 2
                        cy = parent_pos[1] + node_box[2] + obj_dy / 2
                        cx2, cy2 = position[robot_node][:2]
                        v1 = np.array([-cx2, -cy2])
                        v2 = np.array([cx - cx2, cy - cy2])
                        dot = np.dot(v1, v2)
                        norms = np.linalg.norm(v1) * np.linalg.norm(v2)
                        theta = np.arccos(np.clip(dot / norms, -1.0, 1.0))
                        theta = np.rad2deg(theta)
                        if theta > max_orient_diff:
                            continue

                    if not has_iou_conflict(
                        node_box, placed_boxes_map[parent_node]
                    ):
                        z_offset = 0
                        break
                else:
                    logger.warning(
                        f"Cannot place {node} on {parent_node} without overlap"
                        f" after {max_attempts} attempts, place beside {parent_node}."
                    )
                    for _ in range(max_attempts):
                        node_x1 = random.choice(
                            [
                                random.uniform(
                                    p_x1 - obj_dx - beside_margin,
                                    p_x1 - obj_dx,
                                ),
                                random.uniform(p_x2, p_x2 + beside_margin),
                            ]
                        )
                        node_y1 = random.choice(
                            [
                                random.uniform(
                                    p_y1 - obj_dy - beside_margin,
                                    p_y1 - obj_dy,
                                ),
                                random.uniform(p_y2, p_y2 + beside_margin),
                            ]
                        )
                        node_box = [
                            node_x1,
                            node_x1 + obj_dx,
                            node_y1,
                            node_y1 + obj_dy,
                        ]
                        z_offset = -(parent_pos[2] + p_z2)
                        if not has_iou_conflict(
                            node_box, placed_boxes_map[parent_node]
                        ):
                            break

                placed_boxes_map[parent_node].append(node_box)

                abs_cx = parent_pos[0] + node_box[0] + obj_dx / 2
                abs_cy = parent_pos[1] + node_box[2] + obj_dy / 2
                abs_cz = parent_pos[2] + p_z2 - z1 + z_offset
                position[node] = [
                    round(v, 4)
                    for v in [abs_cx, abs_cy, abs_cz, qx, qy, qz, qw]
                ]
                parent_bbox_xy[node] = [x1, x2, y1, y2, z1, z2]

        sorted_children = sorted(
            children, key=lambda x: -mesh_info[x[0]].get("area", 0)
        )
        for child, rel in sorted_children:
            queue.append(((child, rel), layout_info.tree.get(child, [])))

    layout_info.position = position

    return layout_info

check_reachable

check_reachable(base_xyz: ndarray, reach_xyz: ndarray, min_reach: float = 0.25, max_reach: float = 0.85) -> bool

Checks if the target point is within the reachable range.

Parameters:

Name Type Description Default
base_xyz ndarray

Base position.

required
reach_xyz ndarray

Target position.

required
min_reach float

Minimum reach distance.

0.25
max_reach float

Maximum reach distance.

0.85

Returns:

Name Type Description
bool bool

True if reachable, False otherwise.

Source code in embodied_gen/utils/geometry.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
def check_reachable(
    base_xyz: np.ndarray,
    reach_xyz: np.ndarray,
    min_reach: float = 0.25,
    max_reach: float = 0.85,
) -> bool:
    """Checks if the target point is within the reachable range.

    Args:
        base_xyz (np.ndarray): Base position.
        reach_xyz (np.ndarray): Target position.
        min_reach (float, optional): Minimum reach distance.
        max_reach (float, optional): Maximum reach distance.

    Returns:
        bool: True if reachable, False otherwise.
    """
    distance = np.linalg.norm(reach_xyz - base_xyz)

    return min_reach < distance < max_reach

compose_mesh_scene

compose_mesh_scene(layout_info: LayoutInfo, out_scene_path: str, with_bg: bool = False) -> None

Composes a mesh scene from layout information and saves to file.

Parameters:

Name Type Description Default
layout_info LayoutInfo

Layout information.

required
out_scene_path str

Output scene file path.

required
with_bg bool

Include background mesh.

False
Source code in embodied_gen/utils/geometry.py
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
def compose_mesh_scene(
    layout_info: LayoutInfo, out_scene_path: str, with_bg: bool = False
) -> None:
    """Composes a mesh scene from layout information and saves to file.

    Args:
        layout_info (LayoutInfo): Layout information.
        out_scene_path (str): Output scene file path.
        with_bg (bool, optional): Include background mesh.
    """
    object_mapping = Scene3DItemEnum.object_mapping(layout_info.relation)
    scene = trimesh.Scene()
    for node in layout_info.assets:
        if object_mapping[node] == Scene3DItemEnum.BACKGROUND.value:
            mesh_path = f"{layout_info.assets[node]}/mesh_model.ply"
            if not with_bg:
                continue
        else:
            mesh_path = (
                f"{layout_info.assets[node]}/mesh/{node.replace(' ', '_')}.obj"
            )

        mesh = trimesh.load(mesh_path)
        offset = np.array(layout_info.position[node])[[0, 2, 1]]
        mesh.vertices += offset
        scene.add_geometry(mesh, node_name=node)

    os.makedirs(os.path.dirname(out_scene_path), exist_ok=True)
    scene.export(out_scene_path)
    logger.info(f"Composed interactive 3D layout saved in {out_scene_path}")

    return

compute_axis_rotation_quat

compute_axis_rotation_quat(axis: Literal['x', 'y', 'z'], angle_rad: float) -> list[float]

Computes quaternion for rotation around a given axis.

Parameters:

Name Type Description Default
axis Literal['x', 'y', 'z']

Axis of rotation.

required
angle_rad float

Rotation angle in radians.

required

Returns:

Type Description
list[float]

list[float]: Quaternion [x, y, z, w].

Source code in embodied_gen/utils/geometry.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
def compute_axis_rotation_quat(
    axis: Literal["x", "y", "z"], angle_rad: float
) -> list[float]:
    """Computes quaternion for rotation around a given axis.

    Args:
        axis (Literal["x", "y", "z"]): Axis of rotation.
        angle_rad (float): Rotation angle in radians.

    Returns:
        list[float]: Quaternion [x, y, z, w].
    """
    if axis.lower() == "x":
        q = Quaternion(axis=[1, 0, 0], angle=angle_rad)
    elif axis.lower() == "y":
        q = Quaternion(axis=[0, 1, 0], angle=angle_rad)
    elif axis.lower() == "z":
        q = Quaternion(axis=[0, 0, 1], angle=angle_rad)
    else:
        raise ValueError(f"Unsupported axis '{axis}', must be one of x, y, z")

    return [q.x, q.y, q.z, q.w]

compute_convex_hull_path

compute_convex_hull_path(vertices: ndarray, z_threshold: float = 0.05, interp_per_edge: int = 10, margin: float = -0.02, x_axis: int = 0, y_axis: int = 1, z_axis: int = 2) -> Path

Computes a dense convex hull path for the top surface of a mesh.

Parameters:

Name Type Description Default
vertices ndarray

Mesh vertices.

required
z_threshold float

Z threshold for top surface.

0.05
interp_per_edge int

Interpolation points per edge.

10
margin float

Margin for polygon buffer.

-0.02
x_axis int

X axis index.

0
y_axis int

Y axis index.

1
z_axis int

Z axis index.

2

Returns:

Name Type Description
Path Path

Matplotlib path object for the convex hull.

Source code in embodied_gen/utils/geometry.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def compute_convex_hull_path(
    vertices: np.ndarray,
    z_threshold: float = 0.05,
    interp_per_edge: int = 10,
    margin: float = -0.02,
    x_axis: int = 0,
    y_axis: int = 1,
    z_axis: int = 2,
) -> Path:
    """Computes a dense convex hull path for the top surface of a mesh.

    Args:
        vertices (np.ndarray): Mesh vertices.
        z_threshold (float, optional): Z threshold for top surface.
        interp_per_edge (int, optional): Interpolation points per edge.
        margin (float, optional): Margin for polygon buffer.
        x_axis (int, optional): X axis index.
        y_axis (int, optional): Y axis index.
        z_axis (int, optional): Z axis index.

    Returns:
        Path: Matplotlib path object for the convex hull.
    """
    top_vertices = vertices[
        vertices[:, z_axis] > vertices[:, z_axis].max() - z_threshold
    ]
    top_xy = top_vertices[:, [x_axis, y_axis]]

    if len(top_xy) < 3:
        raise ValueError("Not enough points to form a convex hull")

    hull = ConvexHull(top_xy)
    hull_points = top_xy[hull.vertices]

    polygon = Polygon(hull_points)
    polygon = polygon.buffer(margin)
    hull_points = np.array(polygon.exterior.coords)

    dense_points = []
    for i in range(len(hull_points)):
        p1 = hull_points[i]
        p2 = hull_points[(i + 1) % len(hull_points)]
        for t in np.linspace(0, 1, interp_per_edge, endpoint=False):
            pt = (1 - t) * p1 + t * p2
            dense_points.append(pt)

    return Path(np.array(dense_points), closed=True)

compute_pinhole_intrinsics

compute_pinhole_intrinsics(image_w: int, image_h: int, fov_deg: float) -> np.ndarray

Computes pinhole camera intrinsic matrix from image size and FOV.

Parameters:

Name Type Description Default
image_w int

Image width.

required
image_h int

Image height.

required
fov_deg float

Field of view in degrees.

required

Returns:

Type Description
ndarray

np.ndarray: Intrinsic matrix K.

Source code in embodied_gen/utils/geometry.py
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
def compute_pinhole_intrinsics(
    image_w: int, image_h: int, fov_deg: float
) -> np.ndarray:
    """Computes pinhole camera intrinsic matrix from image size and FOV.

    Args:
        image_w (int): Image width.
        image_h (int): Image height.
        fov_deg (float): Field of view in degrees.

    Returns:
        np.ndarray: Intrinsic matrix K.
    """
    fov_rad = np.deg2rad(fov_deg)
    fx = image_w / (2 * np.tan(fov_rad / 2))
    fy = fx  # assuming square pixels
    cx = image_w / 2
    cy = image_h / 2
    K = np.array([[fx, 0, cx], [0, fy, cy], [0, 0, 1]])

    return K

compute_xy_bbox

compute_xy_bbox(vertices: ndarray, col_x: int = 0, col_y: int = 1) -> list[float]

Computes the bounding box in XY plane for given vertices.

Parameters:

Name Type Description Default
vertices ndarray

Vertex coordinates.

required
col_x int

Column index for X.

0
col_y int

Column index for Y.

1

Returns:

Type Description
list[float]

list[float]: [min_x, max_x, min_y, max_y]

Source code in embodied_gen/utils/geometry.py
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def compute_xy_bbox(
    vertices: np.ndarray, col_x: int = 0, col_y: int = 1
) -> list[float]:
    """Computes the bounding box in XY plane for given vertices.

    Args:
        vertices (np.ndarray): Vertex coordinates.
        col_x (int, optional): Column index for X.
        col_y (int, optional): Column index for Y.

    Returns:
        list[float]: [min_x, max_x, min_y, max_y]
    """
    x_vals = vertices[:, col_x]
    y_vals = vertices[:, col_y]
    return x_vals.min(), x_vals.max(), y_vals.min(), y_vals.max()

find_parent_node

find_parent_node(node: str, tree: dict) -> str | None

Finds the parent node of a given node in a tree.

Parameters:

Name Type Description Default
node str

Node name.

required
tree dict

Tree structure.

required

Returns:

Type Description
str | None

str | None: Parent node name or None.

Source code in embodied_gen/utils/geometry.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def find_parent_node(node: str, tree: dict) -> str | None:
    """Finds the parent node of a given node in a tree.

    Args:
        node (str): Node name.
        tree (dict): Tree structure.

    Returns:
        str | None: Parent node name or None.
    """
    for parent, children in tree.items():
        if any(child[0] == node for child in children):
            return parent
    return None

has_iou_conflict

has_iou_conflict(new_box: list[float], placed_boxes: list[list[float]], iou_threshold: float = 0.0) -> bool

Checks for intersection-over-union conflict between boxes.

Parameters:

Name Type Description Default
new_box list[float]

New box coordinates.

required
placed_boxes list[list[float]]

List of placed box coordinates.

required
iou_threshold float

IOU threshold.

0.0

Returns:

Name Type Description
bool bool

True if conflict exists, False otherwise.

Source code in embodied_gen/utils/geometry.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def has_iou_conflict(
    new_box: list[float],
    placed_boxes: list[list[float]],
    iou_threshold: float = 0.0,
) -> bool:
    """Checks for intersection-over-union conflict between boxes.

    Args:
        new_box (list[float]): New box coordinates.
        placed_boxes (list[list[float]]): List of placed box coordinates.
        iou_threshold (float, optional): IOU threshold.

    Returns:
        bool: True if conflict exists, False otherwise.
    """
    new_min_x, new_max_x, new_min_y, new_max_y = new_box
    for min_x, max_x, min_y, max_y in placed_boxes:
        ix1 = max(new_min_x, min_x)
        iy1 = max(new_min_y, min_y)
        ix2 = min(new_max_x, max_x)
        iy2 = min(new_max_y, max_y)
        inter_area = max(0, ix2 - ix1) * max(0, iy2 - iy1)
        if inter_area > iou_threshold:
            return True
    return False

matrix_to_pose

matrix_to_pose(matrix: ndarray) -> list[float]

Converts a 4x4 transformation matrix to a pose (x, y, z, qx, qy, qz, qw).

Parameters:

Name Type Description Default
matrix ndarray

4x4 transformation matrix.

required

Returns:

Type Description
list[float]

list[float]: Pose as [x, y, z, qx, qy, qz, qw].

Source code in embodied_gen/utils/geometry.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def matrix_to_pose(matrix: np.ndarray) -> list[float]:
    """Converts a 4x4 transformation matrix to a pose (x, y, z, qx, qy, qz, qw).

    Args:
        matrix (np.ndarray): 4x4 transformation matrix.

    Returns:
        list[float]: Pose as [x, y, z, qx, qy, qz, qw].
    """
    x, y, z = matrix[:3, 3]
    rot_mat = matrix[:3, :3]
    quat = R.from_matrix(rot_mat).as_quat()
    qx, qy, qz, qw = quat

    return [x, y, z, qx, qy, qz, qw]

pose_to_matrix

pose_to_matrix(pose: list[float]) -> np.ndarray

Converts pose (x, y, z, qx, qy, qz, qw) to a 4x4 transformation matrix.

Parameters:

Name Type Description Default
pose list[float]

Pose as [x, y, z, qx, qy, qz, qw].

required

Returns:

Type Description
ndarray

np.ndarray: 4x4 transformation matrix.

Source code in embodied_gen/utils/geometry.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def pose_to_matrix(pose: list[float]) -> np.ndarray:
    """Converts pose (x, y, z, qx, qy, qz, qw) to a 4x4 transformation matrix.

    Args:
        pose (list[float]): Pose as [x, y, z, qx, qy, qz, qw].

    Returns:
        np.ndarray: 4x4 transformation matrix.
    """
    x, y, z, qx, qy, qz, qw = pose
    r = R.from_quat([qx, qy, qz, qw])
    matrix = np.eye(4)
    matrix[:3, :3] = r.as_matrix()
    matrix[:3, 3] = [x, y, z]

    return matrix

quaternion_multiply

quaternion_multiply(init_quat: list[float], rotate_quat: list[float]) -> list[float]

Multiplies two quaternions.

Parameters:

Name Type Description Default
init_quat list[float]

Initial quaternion [x, y, z, w].

required
rotate_quat list[float]

Rotation quaternion [x, y, z, w].

required

Returns:

Type Description
list[float]

list[float]: Resulting quaternion [x, y, z, w].

Source code in embodied_gen/utils/geometry.py
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def quaternion_multiply(
    init_quat: list[float], rotate_quat: list[float]
) -> list[float]:
    """Multiplies two quaternions.

    Args:
        init_quat (list[float]): Initial quaternion [x, y, z, w].
        rotate_quat (list[float]): Rotation quaternion [x, y, z, w].

    Returns:
        list[float]: Resulting quaternion [x, y, z, w].
    """
    qx, qy, qz, qw = init_quat
    q1 = Quaternion(w=qw, x=qx, y=qy, z=qz)
    qx, qy, qz, qw = rotate_quat
    q2 = Quaternion(w=qw, x=qx, y=qy, z=qz)
    quat = q2 * q1

    return [quat.x, quat.y, quat.z, quat.w]

with_seed

with_seed(seed_attr_name: str = 'seed')

Decorator to temporarily set the random seed for reproducibility.

Parameters:

Name Type Description Default
seed_attr_name str

Name of the seed argument.

'seed'

Returns:

Name Type Description
function

Decorator function.

Source code in embodied_gen/utils/geometry.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def with_seed(seed_attr_name: str = "seed"):
    """Decorator to temporarily set the random seed for reproducibility.

    Args:
        seed_attr_name (str, optional): Name of the seed argument.

    Returns:
        function: Decorator function.
    """

    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            seed = kwargs.get(seed_attr_name, None)
            if seed is not None:
                py_state = random.getstate()
                np_state = np.random.get_state()
                torch_state = torch.get_rng_state()

                random.seed(seed)
                np.random.seed(seed)
                torch.manual_seed(seed)
                try:
                    result = func(*args, **kwargs)
                finally:
                    random.setstate(py_state)
                    np.random.set_state(np_state)
                    torch.set_rng_state(torch_state)
                return result
            else:
                return func(*args, **kwargs)

        return wrapper

    return decorator

embodied_gen.utils.gaussian

export_splats

export_splats(means: Tensor, scales: Tensor, quats: Tensor, opacities: Tensor, sh0: Tensor, shN: Tensor, format: Literal['ply'] = 'ply', save_to: Optional[str] = None) -> bytes

Export a Gaussian Splats model to bytes in PLY file format.

Source code in embodied_gen/utils/gaussian.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def export_splats(
    means: torch.Tensor,
    scales: torch.Tensor,
    quats: torch.Tensor,
    opacities: torch.Tensor,
    sh0: torch.Tensor,
    shN: torch.Tensor,
    format: Literal["ply"] = "ply",
    save_to: Optional[str] = None,
) -> bytes:
    """Export a Gaussian Splats model to bytes in PLY file format."""
    total_splats = means.shape[0]
    assert means.shape == (total_splats, 3), "Means must be of shape (N, 3)"
    assert scales.shape == (total_splats, 3), "Scales must be of shape (N, 3)"
    assert quats.shape == (
        total_splats,
        4,
    ), "Quaternions must be of shape (N, 4)"
    assert opacities.shape == (
        total_splats,
    ), "Opacities must be of shape (N,)"
    assert sh0.shape == (total_splats, 1, 3), "sh0 must be of shape (N, 1, 3)"
    assert (
        shN.ndim == 3 and shN.shape[0] == total_splats and shN.shape[2] == 3
    ), f"shN must be of shape (N, K, 3), got {shN.shape}"

    # Reshape spherical harmonics
    sh0 = sh0.squeeze(1)  # Shape (N, 3)
    shN = shN.permute(0, 2, 1).reshape(means.shape[0], -1)  # Shape (N, K * 3)

    # Check for NaN or Inf values
    invalid_mask = (
        torch.isnan(means).any(dim=1)
        | torch.isinf(means).any(dim=1)
        | torch.isnan(scales).any(dim=1)
        | torch.isinf(scales).any(dim=1)
        | torch.isnan(quats).any(dim=1)
        | torch.isinf(quats).any(dim=1)
        | torch.isnan(opacities).any(dim=0)
        | torch.isinf(opacities).any(dim=0)
        | torch.isnan(sh0).any(dim=1)
        | torch.isinf(sh0).any(dim=1)
        | torch.isnan(shN).any(dim=1)
        | torch.isinf(shN).any(dim=1)
    )

    # Filter out invalid entries
    valid_mask = ~invalid_mask
    means = means[valid_mask]
    scales = scales[valid_mask]
    quats = quats[valid_mask]
    opacities = opacities[valid_mask]
    sh0 = sh0[valid_mask]
    shN = shN[valid_mask]

    if format == "ply":
        data = splat2ply_bytes(means, scales, quats, opacities, sh0, shN)
    else:
        raise ValueError(f"Unsupported format: {format}")

    if save_to:
        with open(save_to, "wb") as binary_file:
            binary_file.write(data)

    return data

restore_scene_scale_and_position

restore_scene_scale_and_position(real_height: float, mesh_path: str, gs_path: str) -> None

Scales a mesh and corresponding GS model to match a given real-world height.

Uses the 1st and 99th percentile of mesh Z-axis to estimate height, applies scaling and vertical alignment, and updates both the mesh and GS model.

Parameters:

Name Type Description Default
real_height float

Target real-world height among Z axis.

required
mesh_path str

Path to the input mesh file.

required
gs_path str

Path to the Gaussian Splatting model file.

required
Source code in embodied_gen/utils/gaussian.py
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
def restore_scene_scale_and_position(
    real_height: float, mesh_path: str, gs_path: str
) -> None:
    """Scales a mesh and corresponding GS model to match a given real-world height.

    Uses the 1st and 99th percentile of mesh Z-axis to estimate height,
    applies scaling and vertical alignment, and updates both the mesh and GS model.

    Args:
        real_height (float): Target real-world height among Z axis.
        mesh_path (str): Path to the input mesh file.
        gs_path (str): Path to the Gaussian Splatting model file.
    """
    mesh = trimesh.load(mesh_path)
    z_min = np.percentile(mesh.vertices[:, 1], 1)
    z_max = np.percentile(mesh.vertices[:, 1], 99)
    height = z_max - z_min
    scale = real_height / height

    rot = Rotation.from_quat([0, 1, 0, 0])
    mesh.vertices = rot.apply(mesh.vertices)
    mesh.vertices[:, 1] -= z_min
    mesh.vertices *= scale
    mesh.export(mesh_path)

    gs_model: GaussianOperator = GaussianOperator.load_from_ply(gs_path)
    gs_model = gs_model.get_gaussians(
        instance_pose=torch.tensor([0.0, -z_min, 0, 0, 1, 0, 0])
    )
    gs_model.rescale(scale)
    gs_model.save_to_ply(gs_path)

embodied_gen.utils.gpt_clients

GPTclient

GPTclient(endpoint: str, api_key: str, model_name: str = 'yfb-gpt-4o', api_version: str = None, check_connection: bool = True, verbose: bool = False)

A client to interact with GPT models via OpenAI or Azure API.

Supports text and image prompts, connection checking, and configurable parameters.

Parameters:

Name Type Description Default
endpoint str

API endpoint URL.

required
api_key str

API key for authentication.

required
model_name str

Model name to use.

'yfb-gpt-4o'
api_version str

API version (for Azure).

None
check_connection bool

Whether to check API connection.

True
verbose bool

Enable verbose logging.

False
Example

export ENDPOINT="https://yfb-openai-sweden.openai.azure.com"
export API_KEY="xxxxxx"
export API_VERSION="2025-03-01-preview"
export MODEL_NAME="yfb-gpt-4o-sweden"
from embodied_gen.utils.gpt_clients import GPT_CLIENT

response = GPT_CLIENT.query("Describe the physics of a falling apple.")
response = GPT_CLIENT.query(
    text_prompt="Describe the content in each image."
    image_base64=["path/to/image1.png", "path/to/image2.jpg"],
)

Source code in embodied_gen/utils/gpt_clients.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def __init__(
    self,
    endpoint: str,
    api_key: str,
    model_name: str = "yfb-gpt-4o",
    api_version: str = None,
    check_connection: bool = True,
    verbose: bool = False,
):
    if api_version is not None:
        self.client = AzureOpenAI(
            azure_endpoint=endpoint,
            api_key=api_key,
            api_version=api_version,
        )
    else:
        self.client = OpenAI(
            base_url=endpoint,
            api_key=api_key,
        )

    self.endpoint = endpoint
    self.model_name = model_name
    self.image_formats = {".png", ".jpg", ".jpeg", ".webp", ".bmp", ".gif"}
    self.verbose = verbose
    if check_connection:
        self.check_connection()

    logger.info(f"Using GPT model: {self.model_name}.")
check_connection
check_connection() -> None

Checks whether the GPT API connection is working.

Raises:

Type Description
ConnectionError

If connection fails.

Source code in embodied_gen/utils/gpt_clients.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
def check_connection(self) -> None:
    """Checks whether the GPT API connection is working.

    Raises:
        ConnectionError: If connection fails.
    """
    try:
        response = self.completion_with_backoff(
            messages=[
                {"role": "system", "content": "You are a test system."},
                {"role": "user", "content": "Hello"},
            ],
            model=self.model_name,
            temperature=0,
            max_tokens=100,
        )
        content = response.choices[0].message.content
        logger.info(f"Connection check success.")
    except Exception as e:
        raise ConnectionError(
            f"Failed to connect to GPT API at {self.endpoint}, "
            f"please check setting in `{CONFIG_FILE}` and `README`."
        )
completion_with_backoff
completion_with_backoff(**kwargs)

Performs a chat completion request with retry/backoff.

Source code in embodied_gen/utils/gpt_clients.py
108
109
110
111
112
113
114
@retry(
    wait=wait_random_exponential(min=1, max=20),
    stop=(stop_after_attempt(10) | stop_after_delay(30)),
)
def completion_with_backoff(self, **kwargs):
    """Performs a chat completion request with retry/backoff."""
    return self.client.chat.completions.create(**kwargs)
query
query(text_prompt: str, image_base64: Optional[list[str | Image]] = None, system_role: Optional[str] = None, params: Optional[dict] = None) -> Optional[str]

Queries the GPT model with text and optional image prompts.

Parameters:

Name Type Description Default
text_prompt str

Main text input.

required
image_base64 Optional[list[str | Image]]

List of image base64 strings, file paths, or PIL Images.

None
system_role Optional[str]

System-level instructions.

None
params Optional[dict]

Additional GPT parameters.

None

Returns:

Type Description
Optional[str]

Optional[str]: Model response content, or None if error.

Source code in embodied_gen/utils/gpt_clients.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def query(
    self,
    text_prompt: str,
    image_base64: Optional[list[str | Image.Image]] = None,
    system_role: Optional[str] = None,
    params: Optional[dict] = None,
) -> Optional[str]:
    """Queries the GPT model with text and optional image prompts.

    Args:
        text_prompt (str): Main text input.
        image_base64 (Optional[list[str | Image.Image]], optional): List of image base64 strings, file paths, or PIL Images.
        system_role (Optional[str], optional): System-level instructions.
        params (Optional[dict], optional): Additional GPT parameters.

    Returns:
        Optional[str]: Model response content, or None if error.
    """
    if system_role is None:
        system_role = "You are a highly knowledgeable assistant specializing in physics, engineering, and object properties."  # noqa

    content_user = [
        {
            "type": "text",
            "text": text_prompt,
        },
    ]

    # Process images if provided
    if image_base64 is not None:
        if not isinstance(image_base64, list):
            image_base64 = [image_base64]
        # Hardcode tmp because of the openrouter can't input multi images.
        if "openrouter" in self.endpoint:
            image_base64 = combine_images_to_grid(image_base64)
        for img in image_base64:
            if isinstance(img, Image.Image):
                buffer = BytesIO()
                img.save(buffer, format=img.format or "PNG")
                buffer.seek(0)
                image_binary = buffer.read()
                img = base64.b64encode(image_binary).decode("utf-8")
            elif (
                len(os.path.splitext(img)) > 1
                and os.path.splitext(img)[-1].lower() in self.image_formats
            ):
                if not os.path.exists(img):
                    raise FileNotFoundError(f"Image file not found: {img}")
                with open(img, "rb") as f:
                    img = base64.b64encode(f.read()).decode("utf-8")

            content_user.append(
                {
                    "type": "image_url",
                    "image_url": {"url": f"data:image/png;base64,{img}"},
                }
            )

    payload = {
        "messages": [
            {"role": "system", "content": system_role},
            {"role": "user", "content": content_user},
        ],
        "temperature": 0.1,
        "max_tokens": 500,
        "top_p": 0.1,
        "frequency_penalty": 0,
        "presence_penalty": 0,
        "stop": None,
        "model": self.model_name,
    }

    if params:
        payload.update(params)

    response = None
    try:
        response = self.completion_with_backoff(**payload)
        response = response.choices[0].message.content
    except Exception as e:
        logger.error(f"Error GPTclint {self.endpoint} API call: {e}")
        response = None

    if self.verbose:
        logger.info(f"Prompt: {text_prompt}")
        logger.info(f"Response: {response}")

    return response

embodied_gen.utils.process_media

SceneTreeVisualizer

SceneTreeVisualizer(layout_info: LayoutInfo)

Visualizes a scene tree layout using networkx and matplotlib.

Parameters:

Name Type Description Default
layout_info LayoutInfo

Layout information for the scene.

required
Example
from embodied_gen.utils.process_media import SceneTreeVisualizer
visualizer = SceneTreeVisualizer(layout_info)
visualizer.render(save_path="tree.png")
Source code in embodied_gen/utils/process_media.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
def __init__(self, layout_info: LayoutInfo) -> None:
    self.tree = layout_info.tree
    self.relation = layout_info.relation
    self.objs_desc = layout_info.objs_desc
    self.G = nx.DiGraph()
    self.root = self._find_root()
    self._build_graph()

    self.role_colors = {
        Scene3DItemEnum.BACKGROUND.value: "plum",
        Scene3DItemEnum.CONTEXT.value: "lightblue",
        Scene3DItemEnum.ROBOT.value: "lightcoral",
        Scene3DItemEnum.MANIPULATED_OBJS.value: "lightgreen",
        Scene3DItemEnum.DISTRACTOR_OBJS.value: "lightgray",
        Scene3DItemEnum.OTHERS.value: "orange",
    }
render
render(save_path: str, figsize=(8, 6), dpi=300, title: str = 'Scene 3D Hierarchy Tree')

Renders the scene tree and saves to file.

Parameters:

Name Type Description Default
save_path str

Path to save the rendered image.

required
figsize tuple

Figure size.

(8, 6)
dpi int

Image DPI.

300
title str

Plot image title.

'Scene 3D Hierarchy Tree'
Source code in embodied_gen/utils/process_media.py
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
def render(
    self,
    save_path: str,
    figsize=(8, 6),
    dpi=300,
    title: str = "Scene 3D Hierarchy Tree",
):
    """Renders the scene tree and saves to file.

    Args:
        save_path (str): Path to save the rendered image.
        figsize (tuple, optional): Figure size.
        dpi (int, optional): Image DPI.
        title (str, optional): Plot image title.
    """
    node_colors = [
        self.role_colors[self._get_node_role(n)] for n in self.G.nodes
    ]
    pos = self._get_positions(self.root)

    plt.figure(figsize=figsize)
    nx.draw(
        self.G,
        pos,
        with_labels=True,
        arrows=False,
        node_size=2000,
        node_color=node_colors,
        font_size=10,
        font_weight="bold",
    )

    # Draw edge labels
    edge_labels = nx.get_edge_attributes(self.G, "relation")
    nx.draw_networkx_edge_labels(
        self.G,
        pos,
        edge_labels=edge_labels,
        font_size=9,
        font_color="black",
    )

    # Draw small description text under each node (if available)
    for node, (x, y) in pos.items():
        desc = self.objs_desc.get(node)
        if desc:
            wrapped = "\n".join(textwrap.wrap(desc, width=30))
            plt.text(
                x,
                y - 0.006,
                wrapped,
                fontsize=6,
                ha="center",
                va="top",
                wrap=True,
                color="black",
                bbox=dict(
                    facecolor="dimgray",
                    edgecolor="darkgray",
                    alpha=0.1,
                    boxstyle="round,pad=0.2",
                ),
            )

    plt.title(title, fontsize=12)
    task_desc = self.relation.get("task_desc", "")
    if task_desc:
        plt.suptitle(
            f"Task Description: {task_desc}", fontsize=10, y=0.999
        )

    plt.axis("off")

    legend_handles = [
        Patch(facecolor=color, edgecolor='black', label=role)
        for role, color in self.role_colors.items()
    ]
    plt.legend(
        handles=legend_handles,
        loc="lower center",
        ncol=3,
        bbox_to_anchor=(0.5, -0.1),
        fontsize=9,
    )

    os.makedirs(os.path.dirname(save_path), exist_ok=True)
    plt.savefig(save_path, dpi=dpi, bbox_inches="tight")
    plt.close()

alpha_blend_rgba

alpha_blend_rgba(fg_image: Union[str, Image, ndarray], bg_image: Union[str, Image, ndarray]) -> Image.Image

Alpha blends a foreground RGBA image over a background RGBA image.

Parameters:

Name Type Description Default
fg_image Union[str, Image, ndarray]

Foreground image (str, PIL Image, or ndarray).

required
bg_image Union[str, Image, ndarray]

Background image (str, PIL Image, or ndarray).

required

Returns:

Type Description
Image

Image.Image: Alpha-blended RGBA image.

Example
from embodied_gen.utils.process_media import alpha_blend_rgba
result = alpha_blend_rgba("fg.png", "bg.png")
result.save("blended.png")
Source code in embodied_gen/utils/process_media.py
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
def alpha_blend_rgba(
    fg_image: Union[str, Image.Image, np.ndarray],
    bg_image: Union[str, Image.Image, np.ndarray],
) -> Image.Image:
    """Alpha blends a foreground RGBA image over a background RGBA image.

    Args:
        fg_image: Foreground image (str, PIL Image, or ndarray).
        bg_image: Background image (str, PIL Image, or ndarray).

    Returns:
        Image.Image: Alpha-blended RGBA image.

    Example:
        ```py
        from embodied_gen.utils.process_media import alpha_blend_rgba
        result = alpha_blend_rgba("fg.png", "bg.png")
        result.save("blended.png")
        ```
    """
    if isinstance(fg_image, str):
        fg_image = Image.open(fg_image)
    elif isinstance(fg_image, np.ndarray):
        fg_image = Image.fromarray(fg_image)

    if isinstance(bg_image, str):
        bg_image = Image.open(bg_image)
    elif isinstance(bg_image, np.ndarray):
        bg_image = Image.fromarray(bg_image)

    if fg_image.size != bg_image.size:
        raise ValueError(
            f"Image sizes not match {fg_image.size} v.s. {bg_image.size}."
        )

    fg = fg_image.convert("RGBA")
    bg = bg_image.convert("RGBA")

    return Image.alpha_composite(bg, fg)

check_object_edge_truncated

check_object_edge_truncated(mask: ndarray, edge_threshold: int = 5) -> bool

Checks if a binary object mask is truncated at the image edges.

Parameters:

Name Type Description Default
mask ndarray

2D binary mask.

required
edge_threshold int

Edge pixel threshold.

5

Returns:

Name Type Description
bool bool

True if object is fully enclosed, False if truncated.

Source code in embodied_gen/utils/process_media.py
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
def check_object_edge_truncated(
    mask: np.ndarray, edge_threshold: int = 5
) -> bool:
    """Checks if a binary object mask is truncated at the image edges.

    Args:
        mask (np.ndarray): 2D binary mask.
        edge_threshold (int, optional): Edge pixel threshold.

    Returns:
        bool: True if object is fully enclosed, False if truncated.
    """
    top = mask[:edge_threshold, :].any()
    bottom = mask[-edge_threshold:, :].any()
    left = mask[:, :edge_threshold].any()
    right = mask[:, -edge_threshold:].any()

    return not (top or bottom or left or right)

combine_images_to_grid

combine_images_to_grid(images: list[str | Image], cat_row_col: tuple[int, int] = None, target_wh: tuple[int, int] = (512, 512), image_mode: str = 'RGB') -> list[Image.Image]

Combines multiple images into a grid.

Parameters:

Name Type Description Default
images list[str | Image]

List of image paths or PIL Images.

required
cat_row_col tuple[int, int]

Grid rows and columns.

None
target_wh tuple[int, int]

Target image size.

(512, 512)
image_mode str

Image mode.

'RGB'

Returns:

Type Description
list[Image]

list[Image.Image]: List containing the grid image.

Example
from embodied_gen.utils.process_media import combine_images_to_grid
grid = combine_images_to_grid(["img1.png", "img2.png"])
grid[0].save("grid.png")
Source code in embodied_gen/utils/process_media.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def combine_images_to_grid(
    images: list[str | Image.Image],
    cat_row_col: tuple[int, int] = None,
    target_wh: tuple[int, int] = (512, 512),
    image_mode: str = "RGB",
) -> list[Image.Image]:
    """Combines multiple images into a grid.

    Args:
        images (list[str | Image.Image]): List of image paths or PIL Images.
        cat_row_col (tuple[int, int], optional): Grid rows and columns.
        target_wh (tuple[int, int], optional): Target image size.
        image_mode (str, optional): Image mode.

    Returns:
        list[Image.Image]: List containing the grid image.

    Example:
        ```py
        from embodied_gen.utils.process_media import combine_images_to_grid
        grid = combine_images_to_grid(["img1.png", "img2.png"])
        grid[0].save("grid.png")
        ```
    """
    n_images = len(images)
    if n_images == 1:
        return images

    if cat_row_col is None:
        n_col = math.ceil(math.sqrt(n_images))
        n_row = math.ceil(n_images / n_col)
    else:
        n_row, n_col = cat_row_col

    images = [
        Image.open(p).convert(image_mode) if isinstance(p, str) else p
        for p in images
    ]
    images = [img.resize(target_wh) for img in images]

    grid_w, grid_h = n_col * target_wh[0], n_row * target_wh[1]
    grid = Image.new(image_mode, (grid_w, grid_h), (0, 0, 0))

    for idx, img in enumerate(images):
        row, col = divmod(idx, n_col)
        grid.paste(img, (col * target_wh[0], row * target_wh[1]))

    return [grid]

filter_image_small_connected_components

filter_image_small_connected_components(image: Union[Image, ndarray], area_ratio: float = 10, connectivity: int = 8) -> np.ndarray

Removes small connected components from the alpha channel of an image.

Parameters:

Name Type Description Default
image Union[Image, ndarray]

Input image.

required
area_ratio float

Minimum area ratio.

10
connectivity int

Connectivity for labeling.

8

Returns:

Type Description
ndarray

np.ndarray: Image with filtered alpha channel.

Source code in embodied_gen/utils/process_media.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def filter_image_small_connected_components(
    image: Union[Image.Image, np.ndarray],
    area_ratio: float = 10,
    connectivity: int = 8,
) -> np.ndarray:
    """Removes small connected components from the alpha channel of an image.

    Args:
        image (Union[Image.Image, np.ndarray]): Input image.
        area_ratio (float, optional): Minimum area ratio.
        connectivity (int, optional): Connectivity for labeling.

    Returns:
        np.ndarray: Image with filtered alpha channel.
    """
    if isinstance(image, Image.Image):
        image = image.convert("RGBA")
        image = np.array(image)

    mask = image[..., 3]
    mask = filter_small_connected_components(mask, area_ratio, connectivity)
    image[..., 3] = mask

    return image

filter_small_connected_components

filter_small_connected_components(mask: Union[Image, ndarray], area_ratio: float, connectivity: int = 8) -> np.ndarray

Removes small connected components from a binary mask.

Parameters:

Name Type Description Default
mask Union[Image, ndarray]

Input mask.

required
area_ratio float

Minimum area ratio for components.

required
connectivity int

Connectivity for labeling.

8

Returns:

Type Description
ndarray

np.ndarray: Mask with small components removed.

Source code in embodied_gen/utils/process_media.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def filter_small_connected_components(
    mask: Union[Image.Image, np.ndarray],
    area_ratio: float,
    connectivity: int = 8,
) -> np.ndarray:
    """Removes small connected components from a binary mask.

    Args:
        mask (Union[Image.Image, np.ndarray]): Input mask.
        area_ratio (float): Minimum area ratio for components.
        connectivity (int, optional): Connectivity for labeling.

    Returns:
        np.ndarray: Mask with small components removed.
    """
    if isinstance(mask, Image.Image):
        mask = np.array(mask)
    num_labels, labels, stats, _ = cv2.connectedComponentsWithStats(
        mask,
        connectivity=connectivity,
    )

    small_components = np.zeros_like(mask, dtype=np.uint8)
    mask_area = (mask != 0).sum()
    min_area = mask_area // area_ratio
    for label in range(1, num_labels):
        area = stats[label, cv2.CC_STAT_AREA]
        if area < min_area:
            small_components[labels == label] = 255

    mask = cv2.bitwise_and(mask, cv2.bitwise_not(small_components))

    return mask

is_image_file

is_image_file(filename: str) -> bool

Checks if a filename is an image file.

Parameters:

Name Type Description Default
filename str

Filename to check.

required

Returns:

Name Type Description
bool bool

True if image file, False otherwise.

Source code in embodied_gen/utils/process_media.py
479
480
481
482
483
484
485
486
487
488
489
490
def is_image_file(filename: str) -> bool:
    """Checks if a filename is an image file.

    Args:
        filename (str): Filename to check.

    Returns:
        bool: True if image file, False otherwise.
    """
    mime_type, _ = mimetypes.guess_type(filename)

    return mime_type is not None and mime_type.startswith('image')

load_scene_dict

load_scene_dict(file_path: str) -> dict

Loads a scene description dictionary from a file.

Parameters:

Name Type Description Default
file_path str

Path to the scene description file.

required

Returns:

Name Type Description
dict dict

Mapping from scene ID to description.

Source code in embodied_gen/utils/process_media.py
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
def load_scene_dict(file_path: str) -> dict:
    """Loads a scene description dictionary from a file.

    Args:
        file_path (str): Path to the scene description file.

    Returns:
        dict: Mapping from scene ID to description.
    """
    scene_dict = {}
    with open(file_path, "r", encoding='utf-8') as f:
        for line in f:
            line = line.strip()
            if not line or ":" not in line:
                continue
            scene_id, desc = line.split(":", 1)
            scene_dict[scene_id.strip()] = desc.strip()

    return scene_dict

merge_images_video

merge_images_video(color_images, normal_images, output_path) -> None

Merges color and normal images into a video.

Parameters:

Name Type Description Default
color_images list[ndarray]

List of color images.

required
normal_images list[ndarray]

List of normal images.

required
output_path str

Path to save the output video.

required
Source code in embodied_gen/utils/process_media.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def merge_images_video(color_images, normal_images, output_path) -> None:
    """Merges color and normal images into a video.

    Args:
        color_images (list[np.ndarray]): List of color images.
        normal_images (list[np.ndarray]): List of normal images.
        output_path (str): Path to save the output video.
    """
    width = color_images[0].shape[1]
    combined_video = [
        np.hstack([rgb_img[:, : width // 2], normal_img[:, width // 2 :]])
        for rgb_img, normal_img in zip(color_images, normal_images)
    ]
    imageio.mimsave(output_path, combined_video, fps=50)

    return

merge_video_video

merge_video_video(video_path1: str, video_path2: str, output_path: str) -> None

Merges two videos by combining their left and right halves.

Parameters:

Name Type Description Default
video_path1 str

Path to first video.

required
video_path2 str

Path to second video.

required
output_path str

Path to save the merged video.

required
Source code in embodied_gen/utils/process_media.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def merge_video_video(
    video_path1: str, video_path2: str, output_path: str
) -> None:
    """Merges two videos by combining their left and right halves.

    Args:
        video_path1 (str): Path to first video.
        video_path2 (str): Path to second video.
        output_path (str): Path to save the merged video.
    """
    clip1 = VideoFileClip(video_path1)
    clip2 = VideoFileClip(video_path2)

    if clip1.size != clip2.size:
        raise ValueError("The resolutions of the two videos do not match.")

    width, height = clip1.size
    clip1_half = clip1.crop(x1=0, y1=0, x2=width // 2, y2=height)
    clip2_half = clip2.crop(x1=width // 2, y1=0, x2=width, y2=height)
    final_clip = clips_array([[clip1_half, clip2_half]])
    final_clip.write_videofile(output_path, codec="libx264")

parse_text_prompts

parse_text_prompts(prompts: list[str]) -> list[str]

Parses text prompts from a list or file.

Parameters:

Name Type Description Default
prompts list[str]

List of prompts or a file path.

required

Returns:

Type Description
list[str]

list[str]: List of parsed prompts.

Source code in embodied_gen/utils/process_media.py
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
def parse_text_prompts(prompts: list[str]) -> list[str]:
    """Parses text prompts from a list or file.

    Args:
        prompts (list[str]): List of prompts or a file path.

    Returns:
        list[str]: List of parsed prompts.
    """
    if len(prompts) == 1 and prompts[0].endswith(".txt"):
        with open(prompts[0], "r") as f:
            prompts = [
                line.strip()
                for line in f
                if line.strip() and not line.strip().startswith("#")
            ]
    return prompts

render_asset3d

render_asset3d(mesh_path: str, output_root: str, distance: float = 5.0, num_images: int = 1, elevation: list[float] = (0.0,), pbr_light_factor: float = 1.2, return_key: str = 'image_color/*', output_subdir: str = 'renders', gen_color_mp4: bool = False, gen_viewnormal_mp4: bool = False, gen_glonormal_mp4: bool = False, no_index_file: bool = False, with_mtl: bool = True) -> list[str]

Renders a 3D mesh asset and returns output image paths.

Parameters:

Name Type Description Default
mesh_path str

Path to the mesh file.

required
output_root str

Directory to save outputs.

required
distance float

Camera distance.

5.0
num_images int

Number of views to render.

1
elevation list[float]

Camera elevation angles.

(0.0,)
pbr_light_factor float

PBR lighting factor.

1.2
return_key str

Glob pattern for output images.

'image_color/*'
output_subdir str

Subdirectory for outputs.

'renders'
gen_color_mp4 bool

Generate color MP4 video.

False
gen_viewnormal_mp4 bool

Generate view normal MP4.

False
gen_glonormal_mp4 bool

Generate global normal MP4.

False
no_index_file bool

Skip index file saving.

False
with_mtl bool

Use mesh material.

True

Returns:

Type Description
list[str]

list[str]: List of output image file paths.

Example
from embodied_gen.utils.process_media import render_asset3d

image_paths = render_asset3d(
    mesh_path="path_to_mesh.obj",
    output_root="path_to_save_dir",
    num_images=6,
    elevation=(30, -30),
    output_subdir="renders",
    no_index_file=True,
)
Source code in embodied_gen/utils/process_media.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
@spaces.GPU
def render_asset3d(
    mesh_path: str,
    output_root: str,
    distance: float = 5.0,
    num_images: int = 1,
    elevation: list[float] = (0.0,),
    pbr_light_factor: float = 1.2,
    return_key: str = "image_color/*",
    output_subdir: str = "renders",
    gen_color_mp4: bool = False,
    gen_viewnormal_mp4: bool = False,
    gen_glonormal_mp4: bool = False,
    no_index_file: bool = False,
    with_mtl: bool = True,
) -> list[str]:
    """Renders a 3D mesh asset and returns output image paths.

    Args:
        mesh_path (str): Path to the mesh file.
        output_root (str): Directory to save outputs.
        distance (float, optional): Camera distance.
        num_images (int, optional): Number of views to render.
        elevation (list[float], optional): Camera elevation angles.
        pbr_light_factor (float, optional): PBR lighting factor.
        return_key (str, optional): Glob pattern for output images.
        output_subdir (str, optional): Subdirectory for outputs.
        gen_color_mp4 (bool, optional): Generate color MP4 video.
        gen_viewnormal_mp4 (bool, optional): Generate view normal MP4.
        gen_glonormal_mp4 (bool, optional): Generate global normal MP4.
        no_index_file (bool, optional): Skip index file saving.
        with_mtl (bool, optional): Use mesh material.

    Returns:
        list[str]: List of output image file paths.

    Example:
        ```py
        from embodied_gen.utils.process_media import render_asset3d

        image_paths = render_asset3d(
            mesh_path="path_to_mesh.obj",
            output_root="path_to_save_dir",
            num_images=6,
            elevation=(30, -30),
            output_subdir="renders",
            no_index_file=True,
        )
        ```
    """
    input_args = dict(
        mesh_path=mesh_path,
        output_root=output_root,
        uuid=output_subdir,
        distance=distance,
        num_images=num_images,
        elevation=elevation,
        pbr_light_factor=pbr_light_factor,
        with_mtl=with_mtl,
        gen_color_mp4=gen_color_mp4,
        gen_viewnormal_mp4=gen_viewnormal_mp4,
        gen_glonormal_mp4=gen_glonormal_mp4,
        no_index_file=no_index_file,
    )

    try:
        _ = render_api(**input_args)
    except Exception as e:
        logger.error(f"Error occurred during rendering: {e}.")

    dst_paths = glob(os.path.join(output_root, output_subdir, return_key))

    return dst_paths

vcat_pil_images

vcat_pil_images(images: list[Image], image_mode: str = 'RGB') -> Image.Image

Vertically concatenates a list of PIL images.

Parameters:

Name Type Description Default
images list[Image]

List of images.

required
image_mode str

Image mode.

'RGB'

Returns:

Type Description
Image

Image.Image: Vertically concatenated image.

Example
from embodied_gen.utils.process_media import vcat_pil_images
img = vcat_pil_images([Image.open("a.png"), Image.open("b.png")])
img.save("vcat.png")
Source code in embodied_gen/utils/process_media.py
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
def vcat_pil_images(
    images: list[Image.Image], image_mode: str = "RGB"
) -> Image.Image:
    """Vertically concatenates a list of PIL images.

    Args:
        images (list[Image.Image]): List of images.
        image_mode (str, optional): Image mode.

    Returns:
        Image.Image: Vertically concatenated image.

    Example:
        ```py
        from embodied_gen.utils.process_media import vcat_pil_images
        img = vcat_pil_images([Image.open("a.png"), Image.open("b.png")])
        img.save("vcat.png")
        ```
    """
    widths, heights = zip(*(img.size for img in images))
    total_height = sum(heights)
    max_width = max(widths)
    new_image = Image.new(image_mode, (max_width, total_height))
    y_offset = 0
    for image in images:
        new_image.paste(image, (0, y_offset))
        y_offset += image.size[1]

    return new_image

embodied_gen.utils.simulation

FrankaPandaGrasper

FrankaPandaGrasper(agent: BaseAgent, control_freq: float, joint_vel_limits: float = 2.0, joint_acc_limits: float = 1.0, finger_length: float = 0.025)

Bases: object

Provides grasp planning and control for Franka Panda robot.

Attributes:

Name Type Description
agent BaseAgent

The robot agent.

robot

The robot instance.

control_freq float

Control frequency.

control_timestep float

Control timestep.

joint_vel_limits float

Joint velocity limits.

joint_acc_limits float

Joint acceleration limits.

finger_length float

Length of gripper fingers.

planners

Motion planners for each environment.

Initialize the grasper.

Source code in embodied_gen/utils/simulation.py
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
def __init__(
    self,
    agent: BaseAgent,
    control_freq: float,
    joint_vel_limits: float = 2.0,
    joint_acc_limits: float = 1.0,
    finger_length: float = 0.025,
) -> None:
    """Initialize the grasper."""
    self.agent = agent
    self.robot = agent.robot
    self.control_freq = control_freq
    self.control_timestep = 1 / control_freq
    self.joint_vel_limits = joint_vel_limits
    self.joint_acc_limits = joint_acc_limits
    self.finger_length = finger_length
    self.planners = self._setup_planner()
compute_grasp_action
compute_grasp_action(actor: Entity, reach_target_only: bool = True, offset: tuple[float, float, float] = [0, 0, -0.05], env_idx: int = 0) -> np.ndarray

Compute grasp actions for a target actor.

Parameters:

Name Type Description Default
actor Entity

Target actor to grasp.

required
reach_target_only bool

Only reach the target pose if True.

True
offset tuple[float, float, float]

Offset for reach pose.

[0, 0, -0.05]
env_idx int

Environment index.

0

Returns:

Type Description
ndarray

np.ndarray: Array of grasp actions.

Source code in embodied_gen/utils/simulation.py
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
def compute_grasp_action(
    self,
    actor: sapien.pysapien.Entity,
    reach_target_only: bool = True,
    offset: tuple[float, float, float] = [0, 0, -0.05],
    env_idx: int = 0,
) -> np.ndarray:
    """Compute grasp actions for a target actor.

    Args:
        actor (sapien.pysapien.Entity): Target actor to grasp.
        reach_target_only (bool): Only reach the target pose if True.
        offset (tuple[float, float, float]): Offset for reach pose.
        env_idx (int): Environment index.

    Returns:
        np.ndarray: Array of grasp actions.
    """
    physx_rigid = actor.components[1]
    mesh = get_component_mesh(physx_rigid, to_world_frame=True)
    obb = mesh.bounding_box_oriented
    approaching = np.array([0, 0, -1])
    tcp_pose = self.agent.tcp.pose[env_idx]
    target_closing = (
        tcp_pose.to_transformation_matrix()[0, :3, 1].cpu().numpy()
    )
    grasp_info = compute_grasp_info_by_obb(
        obb,
        approaching=approaching,
        target_closing=target_closing,
        depth=self.finger_length,
    )

    closing, center = grasp_info["closing"], grasp_info["center"]
    raw_tcp_pose = tcp_pose.sp
    grasp_pose = self.agent.build_grasp_pose(approaching, closing, center)
    reach_pose = grasp_pose * sapien.Pose(p=offset)
    grasp_pose = grasp_pose * sapien.Pose(p=[0, 0, 0.01])
    actions = []
    reach_actions = self.move_to_pose(
        reach_pose,
        self.control_timestep,
        gripper_state=1,
        env_idx=env_idx,
    )
    actions.append(reach_actions)

    if reach_actions is None:
        logger.warning(
            f"Failed to reach the grasp pose for node `{actor.name}`, skipping grasping."
        )
        return None

    if not reach_target_only:
        grasp_actions = self.move_to_pose(
            grasp_pose,
            self.control_timestep,
            gripper_state=1,
            env_idx=env_idx,
        )
        actions.append(grasp_actions)
        close_actions = self.control_gripper(
            gripper_state=-1,
            env_idx=env_idx,
        )
        actions.append(close_actions)
        back_actions = self.move_to_pose(
            raw_tcp_pose,
            self.control_timestep,
            gripper_state=-1,
            env_idx=env_idx,
        )
        actions.append(back_actions)

    return np.concatenate(actions, axis=0)
control_gripper
control_gripper(gripper_state: Literal[-1, 1], n_step: int = 10) -> np.ndarray

Generate gripper control actions.

Parameters:

Name Type Description Default
gripper_state Literal[-1, 1]

Desired gripper state.

required
n_step int

Number of steps.

10

Returns:

Type Description
ndarray

np.ndarray: Array of gripper actions.

Source code in embodied_gen/utils/simulation.py
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
def control_gripper(
    self,
    gripper_state: Literal[-1, 1],
    n_step: int = 10,
) -> np.ndarray:
    """Generate gripper control actions.

    Args:
        gripper_state (Literal[-1, 1]): Desired gripper state.
        n_step (int): Number of steps.

    Returns:
        np.ndarray: Array of gripper actions.
    """
    qpos = self.robot.get_qpos()[0, :-2].cpu().numpy()
    actions = []
    for _ in range(n_step):
        action = np.hstack([qpos, gripper_state])[None, ...]
        actions.append(action)

    return np.concatenate(actions, axis=0)
move_to_pose
move_to_pose(pose: Pose, control_timestep: float, gripper_state: Literal[-1, 1], use_point_cloud: bool = False, n_max_step: int = 100, action_key: str = 'position', env_idx: int = 0) -> np.ndarray

Plan and execute motion to a target pose.

Parameters:

Name Type Description Default
pose Pose

Target pose.

required
control_timestep float

Control timestep.

required
gripper_state Literal[-1, 1]

Desired gripper state.

required
use_point_cloud bool

Use point cloud for planning.

False
n_max_step int

Max number of steps.

100
action_key str

Key for action in result.

'position'
env_idx int

Environment index.

0

Returns:

Type Description
ndarray

np.ndarray: Array of actions to reach the pose.

Source code in embodied_gen/utils/simulation.py
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
def move_to_pose(
    self,
    pose: sapien.Pose,
    control_timestep: float,
    gripper_state: Literal[-1, 1],
    use_point_cloud: bool = False,
    n_max_step: int = 100,
    action_key: str = "position",
    env_idx: int = 0,
) -> np.ndarray:
    """Plan and execute motion to a target pose.

    Args:
        pose (sapien.Pose): Target pose.
        control_timestep (float): Control timestep.
        gripper_state (Literal[-1, 1]): Desired gripper state.
        use_point_cloud (bool): Use point cloud for planning.
        n_max_step (int): Max number of steps.
        action_key (str): Key for action in result.
        env_idx (int): Environment index.

    Returns:
        np.ndarray: Array of actions to reach the pose.
    """
    result = self.planners[env_idx].plan_qpos_to_pose(
        np.concatenate([pose.p, pose.q]),
        self.robot.get_qpos().cpu().numpy()[0],
        time_step=control_timestep,
        use_point_cloud=use_point_cloud,
    )

    if result["status"] != "Success":
        result = self.planners[env_idx].plan_screw(
            np.concatenate([pose.p, pose.q]),
            self.robot.get_qpos().cpu().numpy()[0],
            time_step=control_timestep,
            use_point_cloud=use_point_cloud,
        )

    if result["status"] != "Success":
        return

    sample_ratio = (len(result[action_key]) // n_max_step) + 1
    result[action_key] = result[action_key][::sample_ratio]

    n_step = len(result[action_key])
    actions = []
    for i in range(n_step):
        qpos = result[action_key][i]
        action = np.hstack([qpos, gripper_state])[None, ...]
        actions.append(action)

    return np.concatenate(actions, axis=0)

SapienSceneManager

SapienSceneManager(sim_freq: int, ray_tracing: bool, device: str = 'cuda')

Manages SAPIEN simulation scenes, cameras, and rendering.

This class provides utilities for setting up scenes, adding cameras, stepping simulation, and rendering images.

Attributes:

Name Type Description
sim_freq int

Simulation frequency.

ray_tracing bool

Whether to use ray tracing.

device str

Device for simulation.

renderer SapienRenderer

SAPIEN renderer.

scene Scene

Simulation scene.

cameras list

List of camera components.

actors dict

Mapping of actor names to entities.

Example see embodied_gen/scripts/simulate_sapien.py.

Initialize the scene manager.

Parameters:

Name Type Description Default
sim_freq int

Simulation frequency.

required
ray_tracing bool

Enable ray tracing.

required
device str

Device for simulation.

'cuda'
Source code in embodied_gen/utils/simulation.py
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
def __init__(
    self, sim_freq: int, ray_tracing: bool, device: str = "cuda"
) -> None:
    """Initialize the scene manager.

    Args:
        sim_freq (int): Simulation frequency.
        ray_tracing (bool): Enable ray tracing.
        device (str): Device for simulation.
    """
    self.sim_freq = sim_freq
    self.ray_tracing = ray_tracing
    self.device = device
    self.renderer = sapien.SapienRenderer()
    self.scene = self._setup_scene()
    self.cameras: list[sapien.render.RenderCameraComponent] = []
    self.actors: dict[str, sapien.pysapien.Entity] = {}
create_camera
create_camera(cam_name: str, pose: Pose, image_hw: tuple[int, int], fovy_deg: float) -> sapien.render.RenderCameraComponent

Create a camera in the scene.

Parameters:

Name Type Description Default
cam_name str

Camera name.

required
pose Pose

Camera pose.

required
image_hw tuple[int, int]

Image resolution (height, width).

required
fovy_deg float

Field of view in degrees.

required

Returns:

Type Description
RenderCameraComponent

sapien.render.RenderCameraComponent: The created camera.

Source code in embodied_gen/utils/simulation.py
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
def create_camera(
    self,
    cam_name: str,
    pose: sapien.Pose,
    image_hw: tuple[int, int],
    fovy_deg: float,
) -> sapien.render.RenderCameraComponent:
    """Create a camera in the scene.

    Args:
        cam_name (str): Camera name.
        pose (sapien.Pose): Camera pose.
        image_hw (tuple[int, int]): Image resolution (height, width).
        fovy_deg (float): Field of view in degrees.

    Returns:
        sapien.render.RenderCameraComponent: The created camera.
    """
    cam_actor = self.scene.create_actor_builder().build_kinematic()
    cam_actor.set_pose(pose)
    camera = self.scene.add_mounted_camera(
        name=cam_name,
        mount=cam_actor,
        pose=sapien.Pose(p=[0, 0, 0], q=[1, 0, 0, 0]),
        width=image_hw[1],
        height=image_hw[0],
        fovy=np.deg2rad(fovy_deg),
        near=0.01,
        far=100,
    )
    self.cameras.append(camera)

    return camera
initialize_circular_cameras
initialize_circular_cameras(num_cameras: int, radius: float, height: float, target_pt: list[float], image_hw: tuple[int, int], fovy_deg: float) -> list[sapien.render.RenderCameraComponent]

Initialize multiple cameras arranged in a circle.

Parameters:

Name Type Description Default
num_cameras int

Number of cameras.

required
radius float

Circle radius.

required
height float

Camera height.

required
target_pt list[float]

Target point to look at.

required
image_hw tuple[int, int]

Image resolution.

required
fovy_deg float

Field of view in degrees.

required

Returns:

Type Description
list[RenderCameraComponent]

list[sapien.render.RenderCameraComponent]: List of cameras.

Source code in embodied_gen/utils/simulation.py
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
def initialize_circular_cameras(
    self,
    num_cameras: int,
    radius: float,
    height: float,
    target_pt: list[float],
    image_hw: tuple[int, int],
    fovy_deg: float,
) -> list[sapien.render.RenderCameraComponent]:
    """Initialize multiple cameras arranged in a circle.

    Args:
        num_cameras (int): Number of cameras.
        radius (float): Circle radius.
        height (float): Camera height.
        target_pt (list[float]): Target point to look at.
        image_hw (tuple[int, int]): Image resolution.
        fovy_deg (float): Field of view in degrees.

    Returns:
        list[sapien.render.RenderCameraComponent]: List of cameras.
    """
    angle_step = 2 * np.pi / num_cameras
    world_up_vec = np.array([0.0, 0.0, 1.0])
    target_pt = np.array(target_pt)

    for i in range(num_cameras):
        angle = i * angle_step
        cam_x = radius * np.cos(angle)
        cam_y = radius * np.sin(angle)
        cam_z = height
        eye_pos = [cam_x, cam_y, cam_z]

        forward_vec = target_pt - eye_pos
        forward_vec = forward_vec / np.linalg.norm(forward_vec)
        temp_right_vec = np.cross(forward_vec, world_up_vec)

        if np.linalg.norm(temp_right_vec) < 1e-6:
            temp_right_vec = np.array([1.0, 0.0, 0.0])
            if np.abs(np.dot(temp_right_vec, forward_vec)) > 0.99:
                temp_right_vec = np.array([0.0, 1.0, 0.0])

        right_vec = temp_right_vec / np.linalg.norm(temp_right_vec)
        up_vec = np.cross(right_vec, forward_vec)
        rotation_matrix = np.array([forward_vec, -right_vec, up_vec]).T

        rot = R.from_matrix(rotation_matrix)
        scipy_quat = rot.as_quat()  # (x, y, z, w)
        quat = [
            scipy_quat[3],
            scipy_quat[0],
            scipy_quat[1],
            scipy_quat[2],
        ]  # (w, x, y, z)

        self.create_camera(
            f"camera_{i}",
            sapien.Pose(p=eye_pos, q=quat),
            image_hw,
            fovy_deg,
        )

    return self.cameras
step_action
step_action(agent: BaseAgent, action: Tensor, cameras: list[RenderCameraComponent], render_keys: list[str], sim_steps_per_control: int = 1) -> dict

Step the simulation and render images from cameras.

Parameters:

Name Type Description Default
agent BaseAgent

The robot agent.

required
action Tensor

Action to apply.

required
cameras list

List of camera components.

required
render_keys list[str]

Types of images to render.

required
sim_steps_per_control int

Simulation steps per control.

1

Returns:

Name Type Description
dict dict

Dictionary of rendered frames per camera.

Source code in embodied_gen/utils/simulation.py
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
def step_action(
    self,
    agent: BaseAgent,
    action: torch.Tensor,
    cameras: list[sapien.render.RenderCameraComponent],
    render_keys: list[str],
    sim_steps_per_control: int = 1,
) -> dict:
    """Step the simulation and render images from cameras.

    Args:
        agent (BaseAgent): The robot agent.
        action (torch.Tensor): Action to apply.
        cameras (list): List of camera components.
        render_keys (list[str]): Types of images to render.
        sim_steps_per_control (int): Simulation steps per control.

    Returns:
        dict: Dictionary of rendered frames per camera.
    """
    agent.set_action(action)
    frames = defaultdict(list)
    for _ in range(sim_steps_per_control):
        self.scene.step()

    self.scene.update_render()
    for camera in cameras:
        camera.take_picture()
        images = render_images(camera, render_keys=render_keys)
        frames[camera.name].append(images)

    return frames

load_actor_from_urdf

load_actor_from_urdf(scene: Scene | ManiSkillScene, file_path: str, pose: Pose | None = None, env_idx: int = None, use_static: bool = False, update_mass: bool = False, scale: float | ndarray = 1.0) -> sapien.pysapien.Entity

Load an sapien actor from a URDF file and add it to the scene.

Parameters:

Name Type Description Default
scene Scene | ManiSkillScene

The simulation scene.

required
file_path str

Path to the URDF file.

required
pose Pose | None

Initial pose of the actor.

None
env_idx int

Environment index for multi-env setup.

None
use_static bool

Whether the actor is static.

False
update_mass bool

Whether to update the actor's mass from URDF.

False
scale float | ndarray

Scale factor for the actor.

1.0

Returns:

Type Description
Entity

sapien.pysapien.Entity: The created actor entity.

Source code in embodied_gen/utils/simulation.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def load_actor_from_urdf(
    scene: sapien.Scene | ManiSkillScene,
    file_path: str,
    pose: sapien.Pose | None = None,
    env_idx: int = None,
    use_static: bool = False,
    update_mass: bool = False,
    scale: float | np.ndarray = 1.0,
) -> sapien.pysapien.Entity:
    """Load an sapien actor from a URDF file and add it to the scene.

    Args:
        scene (sapien.Scene | ManiSkillScene): The simulation scene.
        file_path (str): Path to the URDF file.
        pose (sapien.Pose | None): Initial pose of the actor.
        env_idx (int): Environment index for multi-env setup.
        use_static (bool): Whether the actor is static.
        update_mass (bool): Whether to update the actor's mass from URDF.
        scale (float | np.ndarray): Scale factor for the actor.

    Returns:
        sapien.pysapien.Entity: The created actor entity.
    """

    def _get_local_pose(origin_tag: ET.Element | None) -> sapien.Pose:
        local_pose = sapien.Pose(p=[0, 0, 0], q=[1, 0, 0, 0])
        if origin_tag is not None:
            xyz = list(map(float, origin_tag.get("xyz", "0 0 0").split()))
            rpy = list(map(float, origin_tag.get("rpy", "0 0 0").split()))
            qx, qy, qz, qw = R.from_euler("xyz", rpy, degrees=False).as_quat()
            local_pose = sapien.Pose(p=xyz, q=[qw, qx, qy, qz])

        return local_pose

    tree = ET.parse(file_path)
    root = tree.getroot()
    node_name = root.get("name")
    file_dir = os.path.dirname(file_path)

    visual_mesh = root.find(".//visual/geometry/mesh")
    visual_file = visual_mesh.get("filename")
    visual_scale = visual_mesh.get("scale", "1.0 1.0 1.0")
    visual_scale = np.array([float(x) for x in visual_scale.split()])
    visual_scale *= np.array(scale)

    collision_mesh = root.find(".//collision/geometry/mesh")
    collision_file = collision_mesh.get("filename")
    collision_scale = collision_mesh.get("scale", "1.0 1.0 1.0")
    collision_scale = np.array([float(x) for x in collision_scale.split()])
    collision_scale *= np.array(scale)

    visual_pose = _get_local_pose(root.find(".//visual/origin"))
    collision_pose = _get_local_pose(root.find(".//collision/origin"))

    visual_file = os.path.join(file_dir, visual_file)
    collision_file = os.path.join(file_dir, collision_file)
    static_fric = root.find(".//collision/gazebo/mu1").text
    dynamic_fric = root.find(".//collision/gazebo/mu2").text

    material = physx.PhysxMaterial(
        static_friction=np.clip(float(static_fric), 0.1, 0.7),
        dynamic_friction=np.clip(float(dynamic_fric), 0.1, 0.6),
        restitution=0.05,
    )
    builder = scene.create_actor_builder()

    body_type = "static" if use_static else "dynamic"
    builder.set_physx_body_type(body_type)
    builder.add_multiple_convex_collisions_from_file(
        collision_file,
        material=material,
        scale=collision_scale,
        # decomposition="coacd",
        # decomposition_params=dict(
        #     threshold=0.05, max_convex_hull=64, verbose=False
        # ),
        pose=collision_pose,
    )

    builder.add_visual_from_file(
        visual_file,
        scale=visual_scale,
        pose=visual_pose,
    )
    if pose is None:
        pose = sapien.Pose(p=[0, 0, 0], q=[1, 0, 0, 0])

    builder.set_initial_pose(pose)
    if isinstance(scene, ManiSkillScene) and env_idx is not None:
        builder.set_scene_idxs([env_idx])

    actor = builder.build(
        name=node_name if env_idx is None else f"{node_name}-{env_idx}"
    )

    if update_mass and hasattr(actor.components[1], "mass"):
        node_mass = float(root.find(".//inertial/mass").get("value"))
        actor.components[1].set_mass(node_mass)

    return actor

load_assets_from_layout_file

load_assets_from_layout_file(scene: ManiSkillScene | Scene, layout: str, z_offset: float = 0.0, init_quat: list[float] = [0, 0, 0, 1], env_idx: int = None) -> dict[str, sapien.pysapien.Entity]

Load assets from an EmbodiedGen layout file and create sapien actors in the scene.

Parameters:

Name Type Description Default
scene ManiSkillScene | Scene

The sapien simulation scene.

required
layout str

Path to the embodiedgen layout file.

required
z_offset float

Z offset for non-context objects.

0.0
init_quat list[float]

Initial quaternion for orientation.

[0, 0, 0, 1]
env_idx int

Environment index.

None

Returns:

Type Description
dict[str, Entity]

dict[str, sapien.pysapien.Entity]: Mapping from object names to actor entities.

Source code in embodied_gen/utils/simulation.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def load_assets_from_layout_file(
    scene: ManiSkillScene | sapien.Scene,
    layout: str,
    z_offset: float = 0.0,
    init_quat: list[float] = [0, 0, 0, 1],
    env_idx: int = None,
) -> dict[str, sapien.pysapien.Entity]:
    """Load assets from an EmbodiedGen layout file and create sapien actors in the scene.

    Args:
        scene (ManiSkillScene | sapien.Scene): The sapien simulation scene.
        layout (str): Path to the embodiedgen layout file.
        z_offset (float): Z offset for non-context objects.
        init_quat (list[float]): Initial quaternion for orientation.
        env_idx (int): Environment index.

    Returns:
        dict[str, sapien.pysapien.Entity]: Mapping from object names to actor entities.
    """
    asset_root = os.path.dirname(layout)
    layout = LayoutInfo.from_dict(json.load(open(layout, "r")))
    actors = dict()
    for node in layout.assets:
        file_dir = layout.assets[node]
        file_name = f"{node.replace(' ', '_')}.urdf"
        urdf_file = os.path.join(asset_root, file_dir, file_name)

        if layout.objs_mapping[node] == Scene3DItemEnum.BACKGROUND.value:
            continue

        position = layout.position[node].copy()
        if layout.objs_mapping[node] != Scene3DItemEnum.CONTEXT.value:
            position[2] += z_offset

        use_static = (
            layout.relation.get(Scene3DItemEnum.CONTEXT.value, None) == node
        )

        # Combine initial quaternion with object quaternion
        x, y, z, qx, qy, qz, qw = position
        qx, qy, qz, qw = quaternion_multiply([qx, qy, qz, qw], init_quat)
        actor = load_actor_from_urdf(
            scene,
            urdf_file,
            sapien.Pose(p=[x, y, z], q=[qw, qx, qy, qz]),
            env_idx,
            use_static=use_static,
            update_mass=False,
        )
        actors[node] = actor

    return actors

load_mani_skill_robot

load_mani_skill_robot(scene: Scene | ManiSkillScene, layout: LayoutInfo | str, control_freq: int = 20, robot_init_qpos_noise: float = 0.0, control_mode: str = 'pd_joint_pos', backend_str: tuple[str, str] = ('cpu', 'gpu')) -> BaseAgent

Load a ManiSkill robot agent into the scene.

Parameters:

Name Type Description Default
scene Scene | ManiSkillScene

The simulation scene.

required
layout LayoutInfo | str

Layout info or path to layout file.

required
control_freq int

Control frequency.

20
robot_init_qpos_noise float

Noise for initial joint positions.

0.0
control_mode str

Robot control mode.

'pd_joint_pos'
backend_str tuple[str, str]

Simulation/render backend.

('cpu', 'gpu')

Returns:

Name Type Description
BaseAgent BaseAgent

The loaded robot agent.

Source code in embodied_gen/utils/simulation.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
def load_mani_skill_robot(
    scene: sapien.Scene | ManiSkillScene,
    layout: LayoutInfo | str,
    control_freq: int = 20,
    robot_init_qpos_noise: float = 0.0,
    control_mode: str = "pd_joint_pos",
    backend_str: tuple[str, str] = ("cpu", "gpu"),
) -> BaseAgent:
    """Load a ManiSkill robot agent into the scene.

    Args:
        scene (sapien.Scene | ManiSkillScene): The simulation scene.
        layout (LayoutInfo | str): Layout info or path to layout file.
        control_freq (int): Control frequency.
        robot_init_qpos_noise (float): Noise for initial joint positions.
        control_mode (str): Robot control mode.
        backend_str (tuple[str, str]): Simulation/render backend.

    Returns:
        BaseAgent: The loaded robot agent.
    """
    from mani_skill.agents import REGISTERED_AGENTS
    from mani_skill.envs.scene import ManiSkillScene
    from mani_skill.envs.utils.system.backend import (
        parse_sim_and_render_backend,
    )

    if isinstance(layout, str) and layout.endswith(".json"):
        layout = LayoutInfo.from_dict(json.load(open(layout, "r")))

    robot_name = layout.relation[Scene3DItemEnum.ROBOT.value]
    x, y, z, qx, qy, qz, qw = layout.position[robot_name]
    delta_z = 0.002  # Add small offset to avoid collision.
    pose = sapien.Pose([x, y, z + delta_z], [qw, qx, qy, qz])

    if robot_name not in REGISTERED_AGENTS:
        logger.warning(
            f"Robot `{robot_name}` not registered, chosen from {REGISTERED_AGENTS.keys()}, use `panda` instead."
        )
        robot_name = "panda"

    ROBOT_CLS = REGISTERED_AGENTS[robot_name].agent_cls
    backend = parse_sim_and_render_backend(*backend_str)
    if isinstance(scene, sapien.Scene):
        scene = ManiSkillScene([scene], device=backend_str[0], backend=backend)
    robot = ROBOT_CLS(
        scene=scene,
        control_freq=control_freq,
        control_mode=control_mode,
        initial_pose=pose,
    )

    # Set robot init joint rad agree(joint0 to joint6 w 2 finger).
    qpos = np.array(
        [
            0.0,
            np.pi / 8,
            0,
            -np.pi * 3 / 8,
            0,
            np.pi * 3 / 4,
            np.pi / 4,
            0.04,
            0.04,
        ]
    )
    qpos = (
        np.random.normal(
            0, robot_init_qpos_noise, (len(scene.sub_scenes), len(qpos))
        )
        + qpos
    )
    qpos[:, -2:] = 0.04
    robot.reset(qpos)
    robot.init_qpos = robot.robot.qpos
    robot.controller.controllers["gripper"].reset()

    return robot

render_images

render_images(camera: RenderCameraComponent, render_keys: list[Literal['Color', 'Segmentation', 'Normal', 'Mask', 'Depth', 'Foreground']] = None) -> dict[str, Image.Image]

Render images from a given SAPIEN camera.

Parameters:

Name Type Description Default
camera RenderCameraComponent

Camera to render from.

required
render_keys list[str]

Types of images to render.

None

Returns:

Type Description
dict[str, Image]

dict[str, Image.Image]: Dictionary of rendered images.

Source code in embodied_gen/utils/simulation.py
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
def render_images(
    camera: sapien.render.RenderCameraComponent,
    render_keys: list[
        Literal[
            "Color",
            "Segmentation",
            "Normal",
            "Mask",
            "Depth",
            "Foreground",
        ]
    ] = None,
) -> dict[str, Image.Image]:
    """Render images from a given SAPIEN camera.

    Args:
        camera (sapien.render.RenderCameraComponent): Camera to render from.
        render_keys (list[str], optional): Types of images to render.

    Returns:
        dict[str, Image.Image]: Dictionary of rendered images.
    """
    if render_keys is None:
        render_keys = [
            "Color",
            "Segmentation",
            "Normal",
            "Mask",
            "Depth",
            "Foreground",
        ]

    results: dict[str, Image.Image] = {}
    if "Color" in render_keys:
        color = camera.get_picture("Color")
        color_rgb = (np.clip(color[..., :3], 0, 1) * 255).astype(np.uint8)
        results["Color"] = Image.fromarray(color_rgb)

    if "Mask" in render_keys:
        alpha = (np.clip(color[..., 3], 0, 1) * 255).astype(np.uint8)
        results["Mask"] = Image.fromarray(alpha)

    if "Segmentation" in render_keys:
        seg_labels = camera.get_picture("Segmentation")
        label0 = seg_labels[..., 0].astype(np.uint8)
        seg_color = COLOR_PALETTE[label0]
        results["Segmentation"] = Image.fromarray(seg_color)

    if "Foreground" in render_keys:
        seg_labels = camera.get_picture("Segmentation")
        label0 = seg_labels[..., 0]
        mask = np.where((label0 > 1), 255, 0).astype(np.uint8)
        color = camera.get_picture("Color")
        color_rgb = (np.clip(color[..., :3], 0, 1) * 255).astype(np.uint8)
        foreground = np.concatenate([color_rgb, mask[..., None]], axis=-1)
        results["Foreground"] = Image.fromarray(foreground)

    if "Normal" in render_keys:
        normal = camera.get_picture("Normal")[..., :3]
        normal_img = (((normal + 1) / 2) * 255).astype(np.uint8)
        results["Normal"] = Image.fromarray(normal_img)

    if "Depth" in render_keys:
        position_map = camera.get_picture("Position")
        depth = -position_map[..., 2]
        alpha = torch.tensor(color[..., 3], dtype=torch.float32)
        norm_depth = DiffrastRender.normalize_map_by_mask(
            torch.tensor(depth), alpha
        )
        depth_img = (norm_depth * 255).to(torch.uint8).numpy()
        results["Depth"] = Image.fromarray(depth_img)

    return results

embodied_gen.utils.tags

embodied_gen.utils.trender

embodied_gen.utils.monkey_patches