|
import os |
|
import imageio |
|
import numpy as np |
|
|
|
|
|
class VideoWriter: |
|
def __init__(self, video_path, save_video=False, fps=30, single_video=True): |
|
self.video_path = video_path |
|
self.save_video = save_video |
|
self.fps = fps |
|
self.image_buffer = {} |
|
self.last_images = {} |
|
self.single_video = single_video |
|
|
|
def __enter__(self): |
|
return self |
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb): |
|
self.save() |
|
|
|
def append_image(self, img, idx=0): |
|
"""Directly append an image to the video.""" |
|
if self.save_video: |
|
if idx not in self.image_buffer: |
|
self.image_buffer[idx] = [] |
|
self.image_buffer[idx].append(img) |
|
|
|
def append_obs(self, obs, done, idx=0, camera_name="agentview_image"): |
|
"""Append a camera observation to the video.""" |
|
if self.save_video: |
|
if idx not in self.image_buffer: |
|
self.image_buffer[idx] = [] |
|
if idx not in self.last_images: |
|
self.last_images[idx] = None |
|
if not done: |
|
self.image_buffer[idx].append(obs[camera_name][::-1]) |
|
else: |
|
if self.last_images[idx] is None: |
|
self.last_images[idx] = obs[camera_name][::-1] |
|
original_image = np.copy(self.last_images[idx]) |
|
blank_image = np.ones_like(original_image) * 128 |
|
blank_image[:, :, 0] = 0 |
|
blank_image[:, :, -1] = 0 |
|
transparency = 0.7 |
|
original_image = ( |
|
original_image * (1 - transparency) + blank_image * transparency |
|
) |
|
|
|
self.image_buffer[idx].append(original_image.astype(np.uint8)) |
|
|
|
def reset(self): |
|
if self.save_video: |
|
self.last_images = {} |
|
|
|
def append_vector_obs(self, obs, dones, camera_name="agentview_image"): |
|
if self.save_video: |
|
for i in range(len(obs)): |
|
self.append_obs(obs[i], dones[i], i, camera_name) |
|
|
|
def save(self): |
|
if self.save_video: |
|
os.makedirs(self.video_path, exist_ok=True) |
|
if self.single_video: |
|
video_name = os.path.join(self.video_path, f"video.mp4") |
|
video_writer = imageio.get_writer(video_name, fps=self.fps) |
|
for idx in self.image_buffer.keys(): |
|
for im in self.image_buffer[idx]: |
|
video_writer.append_data(im) |
|
video_writer.close() |
|
else: |
|
for idx in self.image_buffer.keys(): |
|
video_name = os.path.join(self.video_path, f"{idx}.mp4") |
|
video_writer = imageio.get_writer(video_name, fps=self.fps) |
|
for im in self.image_buffer[idx]: |
|
video_writer.append_data(im) |
|
video_writer.close() |
|
print(f"Saved videos to {self.video_path}.") |
|
|