Spaces:
Runtime error
Runtime error
import os | |
import math | |
import inspect | |
import numpy as np | |
from dataclasses import dataclass | |
from typing import Callable, Dict, List, Optional, Tuple, Union | |
import torch | |
from diffusers.callbacks import MultiPipelineCallbacks, PipelineCallback | |
from diffusers.models import AutoencoderKLCogVideoX | |
from diffusers.pipelines.pipeline_utils import DiffusionPipeline | |
from diffusers.schedulers import CogVideoXDDIMScheduler, CogVideoXDPMScheduler | |
from diffusers.utils import BaseOutput, logging | |
from diffusers.utils.torch_utils import randn_tensor | |
from diffusers.video_processor import VideoProcessor | |
from einops import rearrange | |
from PIL import Image | |
from torchvision import transforms | |
from .mvdit_transformer import Transformer3DModel | |
logger = logging.get_logger(__name__) # pylint: disable=invalid-name | |
def get_1d_rotary_pos_embed( | |
dim: int, | |
pos: Union[np.ndarray, int], | |
theta: float = 10000.0, | |
use_real=False, | |
linear_factor=1.0, | |
ntk_factor=1.0, | |
repeat_interleave_real=True, | |
freqs_dtype=torch.float32, # torch.float32, torch.float64 (flux) | |
): | |
""" | |
Precompute the frequency tensor for complex exponentials (cis) with given dimensions. | |
This function calculates a frequency tensor with complex exponentials using the given dimension 'dim' and the end | |
index 'end'. The 'theta' parameter scales the frequencies. The returned tensor contains complex values in complex64 | |
data type. | |
Args: | |
dim (`int`): Dimension of the frequency tensor. | |
pos (`np.ndarray` or `int`): Position indices for the frequency tensor. [S] or scalar | |
theta (`float`, *optional*, defaults to 10000.0): | |
Scaling factor for frequency computation. Defaults to 10000.0. | |
use_real (`bool`, *optional*): | |
If True, return real part and imaginary part separately. Otherwise, return complex numbers. | |
linear_factor (`float`, *optional*, defaults to 1.0): | |
Scaling factor for the context extrapolation. Defaults to 1.0. | |
ntk_factor (`float`, *optional*, defaults to 1.0): | |
Scaling factor for the NTK-Aware RoPE. Defaults to 1.0. | |
repeat_interleave_real (`bool`, *optional*, defaults to `True`): | |
If `True` and `use_real`, real part and imaginary part are each interleaved with themselves to reach `dim`. | |
Otherwise, they are concateanted with themselves. | |
freqs_dtype (`torch.float32` or `torch.float64`, *optional*, defaults to `torch.float32`): | |
the dtype of the frequency tensor. | |
Returns: | |
`torch.Tensor`: Precomputed frequency tensor with complex exponentials. [S, D/2] | |
""" | |
assert dim % 2 == 0 | |
if isinstance(pos, int): | |
pos = torch.arange(pos) | |
if isinstance(pos, np.ndarray): | |
pos = torch.from_numpy(pos) # type: ignore # [S] | |
theta = theta * ntk_factor | |
freqs = ( | |
1.0 | |
/ (theta ** (torch.arange(0, dim, 2, dtype=freqs_dtype, device=pos.device)[: (dim // 2)] / dim)) | |
/ linear_factor | |
) # [D/2] | |
freqs = torch.outer(pos, freqs) # type: ignore # [S, D/2] | |
if use_real and repeat_interleave_real: | |
freqs_cos = freqs.cos().repeat_interleave(2, dim=1).float() # [S, D] | |
freqs_sin = freqs.sin().repeat_interleave(2, dim=1).float() # [S, D] | |
return freqs_cos, freqs_sin | |
elif use_real: | |
freqs_cos = torch.cat([freqs.cos(), freqs.cos()], dim=-1).float() # [S, D] | |
freqs_sin = torch.cat([freqs.sin(), freqs.sin()], dim=-1).float() # [S, D] | |
return freqs_cos, freqs_sin | |
else: | |
freqs_cis = torch.polar(torch.ones_like(freqs), freqs) # complex64 # [S, D/2] | |
return freqs_cis | |
def get_3d_rotary_pos_embed( | |
embed_dim, crops_coords, grid_size, temporal_size, theta: int = 10000, use_real: bool = True | |
) -> Union[torch.Tensor, Tuple[torch.Tensor, torch.Tensor]]: | |
""" | |
RoPE for video tokens with 3D structure. | |
Args: | |
embed_dim: (`int`): | |
The embedding dimension size, corresponding to hidden_size_head. | |
crops_coords (`Tuple[int]`): | |
The top-left and bottom-right coordinates of the crop. | |
grid_size (`Tuple[int]`): | |
The grid size of the spatial positional embedding (height, width). | |
temporal_size (`int`): | |
The size of the temporal dimension. | |
theta (`float`): | |
Scaling factor for frequency computation. | |
Returns: | |
`torch.Tensor`: positional embedding with shape `(temporal_size * grid_size[0] * grid_size[1], embed_dim/2)`. | |
""" | |
if use_real is not True: | |
raise ValueError(" `use_real = False` is not currently supported for get_3d_rotary_pos_embed") | |
start, stop = crops_coords | |
grid_size_h, grid_size_w = grid_size | |
grid_h = np.linspace(start[0], stop[0], grid_size_h, endpoint=False, dtype=np.float32) | |
grid_w = np.linspace(start[1], stop[1], grid_size_w, endpoint=False, dtype=np.float32) | |
grid_t = np.linspace(0, temporal_size, temporal_size, endpoint=False, dtype=np.float32) | |
# Compute dimensions for each axis | |
dim_t = embed_dim // 4 | |
dim_h = embed_dim // 8 * 3 | |
dim_w = embed_dim // 8 * 3 | |
# Temporal frequencies | |
freqs_t = get_1d_rotary_pos_embed(dim_t, grid_t, use_real=True) | |
# Spatial frequencies for height and width | |
freqs_h = get_1d_rotary_pos_embed(dim_h, grid_h, use_real=True) | |
freqs_w = get_1d_rotary_pos_embed(dim_w, grid_w, use_real=True) | |
# BroadCast and concatenate temporal and spaial frequencie (height and width) into a 3d tensor | |
def combine_time_height_width(freqs_t, freqs_h, freqs_w): | |
freqs_t = freqs_t[:, None, None, :].expand( | |
-1, grid_size_h, grid_size_w, -1 | |
) # temporal_size, grid_size_h, grid_size_w, dim_t | |
freqs_h = freqs_h[None, :, None, :].expand( | |
temporal_size, -1, grid_size_w, -1 | |
) # temporal_size, grid_size_h, grid_size_2, dim_h | |
freqs_w = freqs_w[None, None, :, :].expand( | |
temporal_size, grid_size_h, -1, -1 | |
) # temporal_size, grid_size_h, grid_size_2, dim_w | |
freqs = torch.cat( | |
[freqs_t, freqs_h, freqs_w], dim=-1 | |
) # temporal_size, grid_size_h, grid_size_w, (dim_t + dim_h + dim_w) | |
freqs = freqs.view( | |
temporal_size * grid_size_h * grid_size_w, -1 | |
) # (temporal_size * grid_size_h * grid_size_w), (dim_t + dim_h + dim_w) | |
return freqs | |
t_cos, t_sin = freqs_t # both t_cos and t_sin has shape: temporal_size, dim_t | |
h_cos, h_sin = freqs_h # both h_cos and h_sin has shape: grid_size_h, dim_h | |
w_cos, w_sin = freqs_w # both w_cos and w_sin has shape: grid_size_w, dim_w | |
cos = combine_time_height_width(t_cos, h_cos, w_cos) | |
sin = combine_time_height_width(t_sin, h_sin, w_sin) | |
return cos, sin | |
def get_3d_motion_spatial_embed( | |
embed_dim: int, num_joints: int, joints_mean: np.ndarray, joints_std: np.ndarray, theta: float = 10000.0 | |
) -> Union[torch.Tensor, Tuple[torch.Tensor, torch.Tensor]]: | |
""" | |
""" | |
assert embed_dim % 2 == 0 and embed_dim % 3 == 0 | |
def create_rope_pe(dim, pos, freqs_dtype=torch.float32): | |
if isinstance(pos, np.ndarray): | |
pos = torch.from_numpy(pos) | |
freqs = ( | |
1.0 | |
/ (theta ** (torch.arange(0, dim, 2, dtype=freqs_dtype, device=pos.device)[: (dim // 2)] / dim)) | |
) # [D/2] | |
freqs = torch.outer(pos, freqs) # type: ignore # [S, D/2] | |
freqs_cos = freqs.cos().repeat_interleave(2, dim=1).float() # [S, D] | |
freqs_sin = freqs.sin().repeat_interleave(2, dim=1).float() # [S, D] | |
return freqs_cos, freqs_sin | |
# 为每个轴创建位置编码 | |
# relative_pos_x = joints_mean[:, 0] - joints_mean[0, 0] | |
# relative_pos_y = joints_mean[:, 1] - joints_mean[0, 1] | |
# relative_pos_z = joints_mean[:, 2] - joints_mean[0, 2] | |
# normalized_pos_x = relative_pos_x / joints_std[:, 0].mean() | |
# normalized_pos_y = relative_pos_y / joints_std[:, 1].mean() | |
# normalized_pos_z = relative_pos_z / joints_std[:, 2].mean() | |
pos_x = joints_mean[:, 0] | |
pos_y = joints_mean[:, 1] | |
pos_z = joints_mean[:, 2] | |
normalized_pos_x = (pos_x - pos_x.mean()) | |
normalized_pos_y = (pos_y - pos_y.mean()) | |
normalized_pos_z = (pos_z - pos_z.mean()) | |
freqs_cos_x, freqs_sin_x = create_rope_pe(embed_dim // 3, normalized_pos_x) | |
freqs_cos_y, freqs_sin_y = create_rope_pe(embed_dim // 3, normalized_pos_y) | |
freqs_cos_z, freqs_sin_z = create_rope_pe(embed_dim // 3, normalized_pos_z) | |
freqs_cos = torch.cat([freqs_cos_x, freqs_cos_y, freqs_cos_z], dim=-1) | |
freqs_sin = torch.cat([freqs_sin_x, freqs_sin_y, freqs_sin_z], dim=-1) | |
return freqs_cos, freqs_sin | |
# Similar to diffusers.pipelines.hunyuandit.pipeline_hunyuandit.get_resize_crop_region_for_grid | |
def get_resize_crop_region_for_grid(src, tgt_width, tgt_height): | |
tw = tgt_width | |
th = tgt_height | |
h, w = src | |
r = h / w | |
if r > (th / tw): | |
resize_height = th | |
resize_width = int(round(th / h * w)) | |
else: | |
resize_width = tw | |
resize_height = int(round(tw / w * h)) | |
crop_top = int(round((th - resize_height) / 2.0)) | |
crop_left = int(round((tw - resize_width) / 2.0)) | |
return (crop_top, crop_left), (crop_top + resize_height, crop_left + resize_width) | |
# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.retrieve_timesteps | |
def retrieve_timesteps( | |
scheduler, | |
num_inference_steps: Optional[int] = None, | |
device: Optional[Union[str, torch.device]] = None, | |
timesteps: Optional[List[int]] = None, | |
sigmas: Optional[List[float]] = None, | |
**kwargs, | |
): | |
""" | |
Calls the scheduler's `set_timesteps` method and retrieves timesteps from the scheduler after the call. Handles | |
custom timesteps. Any kwargs will be supplied to `scheduler.set_timesteps`. | |
Args: | |
scheduler (`SchedulerMixin`): | |
The scheduler to get timesteps from. | |
num_inference_steps (`int`): | |
The number of diffusion steps used when generating samples with a pre-trained model. If used, `timesteps` | |
must be `None`. | |
device (`str` or `torch.device`, *optional*): | |
The device to which the timesteps should be moved to. If `None`, the timesteps are not moved. | |
timesteps (`List[int]`, *optional*): | |
Custom timesteps used to override the timestep spacing strategy of the scheduler. If `timesteps` is passed, | |
`num_inference_steps` and `sigmas` must be `None`. | |
sigmas (`List[float]`, *optional*): | |
Custom sigmas used to override the timestep spacing strategy of the scheduler. If `sigmas` is passed, | |
`num_inference_steps` and `timesteps` must be `None`. | |
Returns: | |
`Tuple[torch.Tensor, int]`: A tuple where the first element is the timestep schedule from the scheduler and the | |
second element is the number of inference steps. | |
""" | |
if timesteps is not None and sigmas is not None: | |
raise ValueError('Only one of `timesteps` or `sigmas` can be passed. Please choose one to set custom values') | |
if timesteps is not None: | |
accepts_timesteps = 'timesteps' in set(inspect.signature(scheduler.set_timesteps).parameters.keys()) | |
if not accepts_timesteps: | |
raise ValueError( | |
f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom" | |
f' timestep schedules. Please check whether you are using the correct scheduler.' | |
) | |
scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs) | |
timesteps = scheduler.timesteps | |
num_inference_steps = len(timesteps) | |
elif sigmas is not None: | |
accept_sigmas = 'sigmas' in set(inspect.signature(scheduler.set_timesteps).parameters.keys()) | |
if not accept_sigmas: | |
raise ValueError( | |
f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom" | |
f' sigmas schedules. Please check whether you are using the correct scheduler.' | |
) | |
scheduler.set_timesteps(sigmas=sigmas, device=device, **kwargs) | |
timesteps = scheduler.timesteps | |
num_inference_steps = len(timesteps) | |
else: | |
scheduler.set_timesteps(num_inference_steps, device=device, **kwargs) | |
timesteps = scheduler.timesteps | |
return timesteps, num_inference_steps | |
class MTVCrafterPipelineOutput(BaseOutput): | |
r"""Output class for the MTVCrafter pipeline. | |
Args: | |
frames (`torch.Tensor`, `np.ndarray`, or List[List[PIL.Image.Image]]): | |
List of video outputs - It can be a nested list of length `batch_size,` with each sub-list containing | |
denoised PIL image sequences of length `num_frames.` It can also be a NumPy array or Torch tensor of shape | |
`(batch_size, num_frames, channels, height, width)`. | |
""" | |
frames: torch.Tensor | |
class MTVCrafterPipeline(DiffusionPipeline): | |
r"""Pipeline for MTVCrafter. | |
This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods the | |
library implements for all the pipelines (such as downloading or saving, running on a particular device, etc.) | |
Args: | |
vae ([`AutoencoderKL`]): | |
Variational Auto-Encoder (VAE) Model to encode and decode videos to and from latent representations. | |
transformer ([`Transformer3DModel`]): | |
A image conditioned `Transformer3DModel` to denoise the encoded video latents. | |
scheduler ([`SchedulerMixin`]): | |
A scheduler to be used in combination with `transformer` to denoise the encoded video latents. | |
""" | |
_callback_tensor_inputs = [ | |
'latents', | |
'prompt_embeds', | |
'negative_prompt_embeds', | |
] | |
def __init__( | |
self, | |
vae: AutoencoderKLCogVideoX, | |
transformer: Transformer3DModel, | |
scheduler: Union[CogVideoXDDIMScheduler, CogVideoXDPMScheduler], | |
): | |
super().__init__() | |
self.register_modules( | |
vae=vae, | |
transformer=transformer, | |
scheduler=scheduler, | |
) | |
self.vae_scale_factor_spatial = ( | |
2 ** (len(self.vae.config.block_out_channels) - 1) if hasattr(self, 'vae') and self.vae is not None else 8 | |
) | |
self.vae_scale_factor_temporal = ( | |
self.vae.config.temporal_compression_ratio if hasattr(self, 'vae') and self.vae is not None else 4 | |
) | |
self.video_processor = VideoProcessor(vae_scale_factor=self.vae_scale_factor_spatial) | |
self.normalize = transforms.Normalize([0.5], [0.5]) | |
def from_pretrained( | |
cls, | |
model_path, | |
transformer_model_path=None, | |
scheduler_type='ddim', | |
torch_dtype=None, | |
**kwargs, | |
): | |
if transformer_model_path is None: | |
transformer_model_path = os.path.join(model_path, 'transformer') | |
transformer = Transformer3DModel.from_pretrained( | |
transformer_model_path, torch_dtype=torch_dtype, **kwargs | |
) | |
if scheduler_type == 'ddim': | |
scheduler = CogVideoXDDIMScheduler.from_pretrained(model_path, subfolder='scheduler') | |
elif scheduler_type == 'dpm': | |
scheduler = CogVideoXDPMScheduler.from_pretrained(model_path, subfolder='scheduler') | |
else: | |
assert False | |
pipe = super().from_pretrained( | |
model_path, transformer=transformer, scheduler=scheduler, torch_dtype=torch_dtype, **kwargs | |
) | |
return pipe | |
def prepare_latents( | |
self, batch_size, num_channels_latents, num_frames, height, width, dtype, device, generator, latents=None | |
): | |
shape = ( | |
batch_size, | |
(num_frames - 1) // self.vae_scale_factor_temporal + 1, | |
num_channels_latents, | |
height // self.vae_scale_factor_spatial, | |
width // self.vae_scale_factor_spatial, | |
) | |
if isinstance(generator, list) and len(generator) != batch_size: | |
raise ValueError( | |
f'You have passed a list of generators of length {len(generator)}, but requested an effective batch' | |
f' size of {batch_size}. Make sure the batch size matches the length of the generators.' | |
) | |
if latents is None: | |
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype) | |
else: | |
latents = latents.to(device) | |
# scale the initial noise by the standard deviation required by the scheduler | |
latents = latents * self.scheduler.init_noise_sigma | |
return latents | |
def decode_latents(self, latents: torch.Tensor) -> torch.Tensor: | |
latents = latents.permute(0, 2, 1, 3, 4) # [batch_size, num_channels, num_frames, height, width] | |
latents = 1 / self.vae.config.scaling_factor * latents | |
frames = self.vae.decode(latents).sample | |
return frames | |
def prepare_extra_step_kwargs(self, generator, eta): | |
# prepare extra kwargs for the scheduler step, since not all schedulers have the same signature | |
# eta (η) is only used with the DDIMScheduler, it will be ignored for other schedulers. | |
# eta corresponds to η in DDIM paper and should be between [0, 1] | |
accepts_eta = 'eta' in set(inspect.signature(self.scheduler.step).parameters.keys()) | |
extra_step_kwargs = {} | |
if accepts_eta: | |
extra_step_kwargs['eta'] = eta | |
# check if the scheduler accepts generator | |
accepts_generator = 'generator' in set(inspect.signature(self.scheduler.step).parameters.keys()) | |
if accepts_generator: | |
extra_step_kwargs['generator'] = generator | |
return extra_step_kwargs | |
# Copied from diffusers.pipelines.latte.pipeline_latte.LattePipeline.check_inputs | |
def check_inputs( | |
self, | |
height, | |
width, | |
callback_on_step_end_tensor_inputs, | |
): | |
if height % 8 != 0 or width % 8 != 0: | |
raise ValueError(f'`height` and `width` have to be divisible by 8 but are {height} and {width}.') | |
if callback_on_step_end_tensor_inputs is not None and not all( | |
k in self._callback_tensor_inputs for k in callback_on_step_end_tensor_inputs | |
): | |
raise ValueError( | |
f'`callback_on_step_end_tensor_inputs` has to be in {self._callback_tensor_inputs}, but found ' | |
f'{[k for k in callback_on_step_end_tensor_inputs if k not in self._callback_tensor_inputs]}' | |
) | |
def _prepare_rotary_positional_embeddings( | |
self, | |
height: int, | |
width: int, | |
num_frames: int, | |
device: torch.device, | |
dtype: torch.dtype, | |
) -> Tuple[torch.Tensor, torch.Tensor]: | |
grid_height = height // (self.vae_scale_factor_spatial * self.transformer.config.patch_size) | |
grid_width = width // (self.vae_scale_factor_spatial * self.transformer.config.patch_size) | |
grid_crops_coords = ((0, 0), (grid_height, grid_width)) | |
freqs_cos, freqs_sin = get_3d_rotary_pos_embed( | |
embed_dim=self.transformer.config.attention_head_dim, | |
crops_coords=grid_crops_coords, | |
grid_size=(grid_height, grid_width), | |
temporal_size=num_frames, | |
) | |
freqs_cos = freqs_cos.to(device=device, dtype=dtype) | |
freqs_sin = freqs_sin.to(device=device, dtype=dtype) | |
return freqs_cos, freqs_sin | |
def _prepare_motion_embeddings(self, num_frames, num_joints, joints_mean, joints_std, device, dtype): | |
time_embed = get_1d_rotary_pos_embed(self.transformer.config.attention_head_dim // 4, num_frames, use_real=True) | |
time_embed_cos = time_embed[0][:, None, :].expand(-1, num_joints, -1).reshape(num_frames*num_joints, -1) | |
time_embed_sin = time_embed[1][:, None, :].expand(-1, num_joints, -1).reshape(num_frames*num_joints, -1) | |
spatial_motion_embed = get_3d_motion_spatial_embed(self.transformer.config.attention_head_dim // 4 * 3, num_joints, joints_mean, joints_std) | |
spatial_embed_cos = spatial_motion_embed[0][None, :, :].expand(num_frames, -1, -1).reshape(num_frames*num_joints, -1) | |
spatial_embed_sin = spatial_motion_embed[1][None, :, :].expand(num_frames, -1, -1).reshape(num_frames*num_joints, -1) | |
motion_embed_cos = torch.cat([time_embed_cos, spatial_embed_cos], dim=-1).to(device=device, dtype=dtype) | |
motion_embed_sin = torch.cat([time_embed_sin, spatial_embed_sin], dim=-1).to(device=device, dtype=dtype) | |
return motion_embed_cos, motion_embed_sin | |
def guidance_scale(self): | |
return self._guidance_scale | |
def num_timesteps(self): | |
return self._num_timesteps | |
def interrupt(self): | |
return self._interrupt | |
def __call__( | |
self, | |
prompt: Optional[Union[str, List[str]]] = None, | |
negative_prompt: Optional[Union[str, List[str]]] = None, | |
height: int = 480, | |
width: int = 720, | |
num_frames: int = 49, | |
num_inference_steps: int = 50, | |
timesteps: Optional[List[int]] = None, | |
guidance_scale: float = 6, | |
use_dynamic_cfg: bool = False, | |
num_videos_per_prompt: int = 1, | |
eta: float = 0.0, | |
seed: Optional[int] = -1, | |
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None, | |
latents: Optional[torch.FloatTensor] = None, | |
prompt_embeds: Optional[torch.FloatTensor] = None, | |
negative_prompt_embeds: Optional[torch.FloatTensor] = None, | |
output_type: str = 'pil', | |
return_dict: bool = True, | |
callback_on_step_end: Optional[ | |
Union[Callable[[int, int, Dict], None], PipelineCallback, MultiPipelineCallbacks] | |
] = None, | |
callback_on_step_end_tensor_inputs: List[str] = ['latents'], | |
max_sequence_length: int = 226, | |
ref_images: List[Image.Image] = None, | |
motion_embeds: Optional[torch.FloatTensor] = None, | |
joint_mean: Optional[np.ndarray] = None, | |
joint_std: Optional[np.ndarray] = None, | |
) -> Union[MTVCrafterPipelineOutput, Tuple]: | |
"""Function invoked when calling the pipeline for generation. | |
Args: | |
prompt (`str` or `List[str]`, *optional*): | |
The prompt or prompts to guide the image generation. If not defined, one has to pass `prompt_embeds`. | |
instead. | |
negative_prompt (`str` or `List[str]`, *optional*): | |
The prompt or prompts not to guide the image generation. If not defined, one has to pass | |
`negative_prompt_embeds` instead. Ignored when not using guidance (i.e., ignored if `guidance_scale` is | |
less than `1`). | |
height (`int`, *optional*, defaults to self.unet.config.sample_size * self.vae_scale_factor): | |
The height in pixels of the generated image. This is set to 1024 by default for the best results. | |
width (`int`, *optional*, defaults to self.unet.config.sample_size * self.vae_scale_factor): | |
The width in pixels of the generated image. This is set to 1024 by default for the best results. | |
num_frames (`int`, defaults to `48`): | |
Number of frames to generate. Must be divisible by self.vae_scale_factor_temporal. Generated video will | |
contain 1 extra frame because CogVideoX is conditioned with (num_seconds * fps + 1) frames where | |
num_seconds is 6 and fps is 4. However, since videos can be saved at any fps, the only condition that | |
needs to be satisfied is that of divisibility mentioned above. | |
num_inference_steps (`int`, *optional*, defaults to 50): | |
The number of denoising steps. More denoising steps usually lead to a higher quality image at the | |
expense of slower inference. | |
timesteps (`List[int]`, *optional*): | |
Custom timesteps to use for the denoising process with schedulers which support a `timesteps` argument | |
in their `set_timesteps` method. If not defined, the default behavior when `num_inference_steps` is | |
passed will be used. Must be in descending order. | |
guidance_scale (`float`, *optional*, defaults to 7.0): | |
Guidance scale as defined in [Classifier-Free Diffusion Guidance]. Guidance scale is enabled by setting `guidance_scale > | |
1`. Higher guidance scale encourages to generate images that are closely linked to the text `prompt`, | |
usually at the expense of lower image quality. | |
num_videos_per_prompt (`int`, *optional*, defaults to 1): | |
The number of videos to generate per prompt. | |
generator (`torch.Generator` or `List[torch.Generator]`, *optional*): | |
One or a list of [torch generator(s)] | |
to make generation deterministic. | |
latents (`torch.FloatTensor`, *optional*): | |
Pre-generated noisy latents, sampled from a Gaussian distribution, to be used as inputs for image | |
generation. Can be used to tweak the same generation with different prompts. If not provided, a latents | |
tensor will ge generated by sampling using the supplied random `generator`. | |
prompt_embeds (`torch.FloatTensor`, *optional*): | |
Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If not | |
provided, text embeddings will be generated from `prompt` input argument. | |
negative_prompt_embeds (`torch.FloatTensor`, *optional*): | |
Pre-generated negative text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt | |
weighting. If not provided, negative_prompt_embeds will be generated from `negative_prompt` input | |
argument. | |
output_type (`str`, *optional*, defaults to `"pil"`): | |
The output format of the generate image. Choose between `PIL.Image.Image` or `np.array`. | |
return_dict (`bool`, *optional*, defaults to `True`): | |
Whether or not to return a [`~pipelines.stable_diffusion_xl.StableDiffusionXLPipelineOutput`] instead | |
of a plain tuple. | |
callback_on_step_end (`Callable`, *optional*): | |
A function that calls at the end of each denoising steps during the inference. The function is called | |
with the following arguments: `callback_on_step_end(self: DiffusionPipeline, step: int, timestep: int, | |
callback_kwargs: Dict)`. `callback_kwargs` will include a list of all tensors as specified by | |
`callback_on_step_end_tensor_inputs`. | |
callback_on_step_end_tensor_inputs (`List`, *optional*): | |
The list of tensor inputs for the `callback_on_step_end` function. The tensors specified in the list | |
will be passed as `callback_kwargs` argument. You will only be able to include variables listed in the | |
`._callback_tensor_inputs` attribute of your pipeline class. | |
max_sequence_length (`int`, defaults to `226`): | |
Maximum sequence length in encoded prompt. Must be consistent with | |
`self.transformer.config.max_text_seq_length` otherwise may lead to poor results. | |
""" | |
if isinstance(callback_on_step_end, (PipelineCallback, MultiPipelineCallbacks)): | |
callback_on_step_end_tensor_inputs = callback_on_step_end.tensor_inputs | |
height = height or self.transformer.config.sample_size * self.vae_scale_factor_spatial | |
width = width or self.transformer.config.sample_size * self.vae_scale_factor_spatial | |
# 720 * 480 | |
num_videos_per_prompt = 1 | |
# 1. Check inputs. Raise error if not correct | |
self.check_inputs( | |
height, | |
width, | |
callback_on_step_end_tensor_inputs, | |
) | |
self._guidance_scale = guidance_scale | |
self._interrupt = False | |
# 2. Default call parameters | |
if prompt is not None and isinstance(prompt, str): | |
batch_size = 1 | |
elif prompt is not None and isinstance(prompt, list): | |
batch_size = len(prompt) | |
elif prompt is None: | |
batch_size = 1 | |
else: | |
batch_size = prompt_embeds.shape[0] | |
device = self._execution_device | |
if seed > 0: | |
generator = torch.Generator(device=device) | |
generator.manual_seed(seed) | |
do_classifier_free_guidance = guidance_scale > 1.0 | |
# 3. Prepare timesteps | |
timesteps, num_inference_steps = retrieve_timesteps(self.scheduler, num_inference_steps, device, timesteps) | |
self._num_timesteps = len(timesteps) | |
# 4. Prepare latents. | |
latent_channels = self.vae.config.latent_channels | |
latents = self.prepare_latents( | |
batch_size * num_videos_per_prompt, | |
latent_channels, | |
num_frames, | |
height, | |
width, | |
self.vae.dtype, | |
device, | |
generator, | |
latents, | |
) # [1, x, 16, h/8, w/8] | |
if ref_images is not None: | |
ref_images = rearrange(ref_images.unsqueeze(0), 'b f c h w -> b c f h w') | |
ref_latents = self.vae.encode( | |
ref_images.to(dtype=self.vae.dtype, device=self.vae.device) | |
).latent_dist.sample() | |
ref_latents = rearrange(ref_latents, 'b c f h w -> b f c h w') | |
if do_classifier_free_guidance: | |
ref_latents = torch.cat([ref_latents, ref_latents], dim=0) | |
motion_embeds = motion_embeds.to(latents.dtype) | |
if motion_embeds is not None and do_classifier_free_guidance: | |
motion_embeds = torch.cat([self.transformer.unconditional_motion_token.unsqueeze(0), motion_embeds], dim=0) | |
# 5. Prepare extra step kwargs. TODO: Logic should ideally just be moved out of the pipeline | |
extra_step_kwargs = self.prepare_extra_step_kwargs(generator, eta) | |
# 6. Create rotary embeds if required | |
image_rotary_emb = ( | |
self._prepare_rotary_positional_embeddings(height, width, latents.size(1), device, dtype=latents.dtype) | |
if self.transformer.config.use_rotary_positional_embeddings | |
else None | |
) | |
motion_rotary_emb = self._prepare_motion_embeddings(latents.size(1), 24, joint_mean, joint_std, device, dtype=latents.dtype) | |
# 7. Denoising loop | |
num_warmup_steps = max(len(timesteps) - num_inference_steps * self.scheduler.order, 0) | |
with self.progress_bar(total=num_inference_steps) as progress_bar: | |
# for DPM-solver++ | |
old_pred_original_sample = None | |
for i, t in enumerate(timesteps): | |
if self.interrupt: | |
continue | |
latent_model_input = torch.cat([latents] * 2) if do_classifier_free_guidance else latents | |
latent_model_input = self.scheduler.scale_model_input(latent_model_input, t) | |
# broadcast to batch dimension in a way that's compatible with ONNX/Core ML | |
timestep = t.expand(latent_model_input.shape[0]) | |
if ref_images is not None: | |
latent_model_input = torch.cat([latent_model_input, ref_latents], dim=2) | |
# predict noise model_output | |
noise_pred = self.transformer( | |
hidden_states=latent_model_input, | |
timestep=timestep.long(), | |
image_rotary_emb=image_rotary_emb, | |
motion_rotary_emb=motion_rotary_emb, | |
motion_emb=motion_embeds, | |
return_dict=False, | |
)[0] | |
noise_pred = noise_pred.float() # [b, f, c, h, w] | |
# perform guidance | |
if use_dynamic_cfg: | |
self._guidance_scale = 1 + guidance_scale * ( | |
(1 - math.cos(math.pi * ((num_inference_steps - t.item()) / num_inference_steps) ** 5.0)) / 2 | |
) | |
if do_classifier_free_guidance: | |
noise_pred_uncond, noise_pred_text = noise_pred.chunk(2) | |
noise_pred = noise_pred_uncond + self.guidance_scale * (noise_pred_text - noise_pred_uncond) | |
# compute the previous noisy sample x_t -> x_t-1 | |
if not isinstance(self.scheduler, CogVideoXDPMScheduler): | |
latents = self.scheduler.step(noise_pred, t, latents, **extra_step_kwargs, return_dict=False)[0] | |
else: | |
latents, old_pred_original_sample = self.scheduler.step( | |
noise_pred, | |
old_pred_original_sample, | |
t, | |
timesteps[i - 1] if i > 0 else None, | |
latents, | |
**extra_step_kwargs, | |
return_dict=False, | |
) | |
latents = latents.to(self.vae.dtype) | |
# call the callback, if provided | |
if callback_on_step_end is not None: | |
callback_kwargs = {} | |
for k in callback_on_step_end_tensor_inputs: | |
callback_kwargs[k] = locals()[k] | |
callback_outputs = callback_on_step_end(self, i, t, callback_kwargs) | |
latents = callback_outputs.pop('latents', latents) | |
prompt_embeds = callback_outputs.pop('prompt_embeds', prompt_embeds) | |
negative_prompt_embeds = callback_outputs.pop('negative_prompt_embeds', negative_prompt_embeds) | |
if i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0): | |
progress_bar.update() | |
if not output_type == 'latent': | |
video = self.decode_latents(latents) | |
video = self.video_processor.postprocess_video(video=video, output_type=output_type) | |
else: | |
video = latents | |
# Offload all models | |
self.maybe_free_model_hooks() | |
if not return_dict: | |
return (video,) | |
return MTVCrafterPipelineOutput(frames=video) | |