File size: 5,413 Bytes
393d3de
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import os
import gym
import einops
import numpy as np
from . import benchmark
from . import get_libero_path
from .envs.env_wrapper import OffScreenRenderEnv


GOAL_PREDICATES = {
    "open_the_middle_drawer_of_the_cabinet": [
        ["open", "wooden_cabinet_1_middle_region"]
    ],
    "open_the_top_drawer_and_put_the_bowl_inside": [
        ["in", "akita_black_bowl_1", "wooden_cabinet_1_top_region"]
    ],
    "push_the_plate_to_the_front_of_the_stove": [
        ["on", "plate_1", "main_table_stove_front_region"]
    ],
    "put_the_bowl_on_the_plate": [["on", "akita_black_bowl_1", "plate_1"]],
    "put_the_bowl_on_the_stove": [
        ["on", "akita_black_bowl_1", "flat_stove_1_cook_region"]
    ],
    "put_the_bowl_on_top_of_the_cabinet": [
        ["on", "akita_black_bowl_1", "wooden_cabinet_1_top_side"]
    ],
    "put_the_cream_cheese_in_the_bowl": [
        ["on", "cream_cheese_1", "akita_black_bowl_1"]
    ],
    "put_the_wine_bottle_on_the_rack": [
        ["on", "wine_bottle_1", "wine_rack_1_top_region"]
    ],
    "put_the_wine_bottle_on_top_of_the_cabinet": [
        ["on", "wine_bottle_1", "wooden_cabinet_1_top_side"]
    ],
    "turn_on_the_stove": [["turnon", "flat_stove_1"]],
}
IMAGE_SIZE = 224


class LiberoEnv(gym.Env):
    """
    A wrapper for OffScreenRenderEnv to initialize environment based on task suite and task name.
    """

    metadata = {"render_modes": ["rgb_array"]}

    def __init__(
        self, task_suite_name="libero_goal", image_size=IMAGE_SIZE, id="libero_goal"
    ):
        self.action_space = gym.spaces.Box(low=-1, high=1, shape=(7,), dtype=np.float32)
        self.image_size = image_size
        self.observation_space = gym.spaces.Box(
            low=0, high=1, shape=(2, 3, image_size, image_size), dtype=np.float32
        )
        self.task_names = list(GOAL_PREDICATES.keys())
        self.benchmark_dict = benchmark.get_benchmark_dict()
        self.task_suite = self.benchmark_dict[task_suite_name]()
        self.env = None
        self.goal_predicates = GOAL_PREDICATES
        self.steps = 0
        self.goal_idx = 0
        self.episodes = 0

    def seed(self, seed=None):
        self._seed = seed

        # reset the episode count every time we seed
        # this is done in the main loop for every eval_on_env
        self.episodes = 0

    def reset(self, goal_idx, seed=None):
        self.episodes += 1
        self.goal_idx = goal_idx
        self.steps = 0
        task_name = self.task_names[goal_idx]
        task_bddl_file = self._get_task_bddl_file(task_name)

        env_args = {
            "bddl_file_name": task_bddl_file,
            "camera_heights": self.image_size,
            "camera_widths": self.image_size,
        }

        self.env = OffScreenRenderEnv(**env_args)
        self.env.seed(self._seed + self.episodes)
        obs = self.env.reset()
        zero_action = np.zeros(7)
        for i in range(20):
            obs, _, _, _ = self.env.step(zero_action)  # make sure objects are stable
        self.finished_tasks = {task_name: False for task_name in self.task_names}
        return (self._get_img_obs(obs) / 255.0).astype(np.float32)

    def step(self, action):
        self.steps += 1
        obs, _, done, info = self.env.step(action)
        done = done or self.steps >= 300
        info["state"] = obs
        obs = self._get_img_obs(obs)
        reward, info["task_rewards"] = self.get_rewards()
        info["finished_tasks"] = self.finished_tasks.copy()
        info["image"] = einops.rearrange(obs, "V C H W -> H (V W) C")
        info["all_completions_ids"] = []

        cur_task = self.task_names[self.goal_idx]
        info["all_completions_ids"] = self.finished_tasks[cur_task]
        obs = (obs / 255.0).astype(np.float32)
        return obs, reward, done, info

    def close(self):
        self.env.close()
        self.env = None

    def render(self, mode="rgb_array"):
        obs = self.env.env._get_observations()
        obs = self._get_img_obs(obs, channel_first=False)
        return np.concatenate((obs[0], obs[1]), axis=1).astype(np.uint8)

    def _get_img_obs(self, obs, flip=True, channel_first=True):
        if flip:
            obs["agentview_image"] = obs["agentview_image"][::-1]
            obs["robot0_eye_in_hand_image"] = obs["robot0_eye_in_hand_image"][::-1]
        obs = np.stack(
            [obs["agentview_image"], obs["robot0_eye_in_hand_image"]], axis=0
        )
        if channel_first:
            obs = einops.rearrange(obs, "V H W C -> V C H W")
        return obs

    def _get_task_bddl_file(self, task_name):
        task_id = self.task_suite.get_task_names().index(task_name)
        task = self.task_suite.get_task(task_id)
        task_bddl_file = os.path.join(
            get_libero_path("bddl_files"), task.problem_folder, task.bddl_file
        )
        return task_bddl_file

    def get_rewards(self):
        task_rewards = {}
        for task, goal_states in self.goal_predicates.items():
            task_completed = self.env.env._eval_predicate(goal_states[0])
            task_rewards[task] = int(task_completed and not self.finished_tasks[task])
            self.finished_tasks[task] = self.finished_tasks[task] or task_completed

        cur_task = self.task_names[self.goal_idx]
        reward = task_rewards[cur_task]
        task_rewards = {cur_task: task_rewards[cur_task]}
        return reward, task_rewards