import gradio as gr import os import subprocess import cv2 from moviepy.editor import VideoFileClip, concatenate_videoclips import math from huggingface_hub import snapshot_download model_ids = [ 'runwayml/stable-diffusion-v1-5', 'lllyasviel/sd-controlnet-depth', 'lllyasviel/sd-controlnet-canny', 'lllyasviel/sd-controlnet-openpose', ] for model_id in model_ids: model_name = model_id.split('/')[-1] snapshot_download(model_id, local_dir=f'checkpoints/{model_name}') def get_frame_count_in_duration(filepath): video = cv2.VideoCapture(filepath) fps = video.get(cv2.CAP_PROP_FPS) frame_count = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) duration = frame_count / fps width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT)) video.release() return gr.update(maximum=frame_count) def cut_mp4_into_chunks(input_file, chunk_size): video = VideoFileClip(input_file) frame_count = int(video.fps * video.duration) num_chunks = (frame_count + chunk_size - 1) // chunk_size # Ceiling division chunks = [] for i in range(num_chunks): start_frame = i * chunk_size end_frame = min((i + 1) * chunk_size, frame_count) chunk = video.subclip(start_frame / video.fps, end_frame / video.fps) chunk_frame_count = end_frame - start_frame chunks.append((chunk, chunk_frame_count)) return chunks def run_inference(prompt, video_path, condition, video_length): chunk_size = 6 chunks = cut_mp4_into_chunks(video_path, chunk_size) processed_chunks = [] output_path = 'output/' os.makedirs(output_path, exist_ok=True) # Accessing chunks and frame counts by index for i, (chunk, frame_count) in enumerate(chunks): # Check if the file already exists if os.path.exists(os.path.join(output_path, f"{prompt}.mp4")): # Delete the existing file os.remove(video_path_output) chunk.write_videofile(f'chunk_{i}.mp4') # Saving the chunk to a file chunk_path = f'chunk_{i}.mp4' print(f"Chunk {i}: Frame Count = {frame_count}") command = f"python inference.py --prompt '{prompt}' --condition '{condition}' --video_path 'chunk_{i}.mp4' --output_path '{output_path}' --video_length {frame_count}" subprocess.run(command, shell=True) # Construct the video path video_path_output = os.path.join(output_path, f"{prompt}.mp4") # rename new_file_name = os.path.join(output_path, f"new_file_name_{i}.mp4") os.rename(video_path_output, new_file_name) processed_chunks.append(new_file_name) output_path = "final_video.mp4" clips = [] for path in processed_chunks: clip = VideoFileClip(path) clips.append(clip) final_clip = concatenate_videoclips(clips) final_clip.write_videofile(output_path, codec="libx264") final_clip.close() return "done", "final_video.mp4" def working_run_inference(prompt, video_path, condition, video_length): output_path = 'output/' os.makedirs(output_path, exist_ok=True) # Construct the final video path video_path_output = os.path.join(output_path, f"{prompt}.mp4") # Check if the file already exists if os.path.exists(video_path_output): # Delete the existing file os.remove(video_path_output) if video_length > 12: command = f"python inference.py --prompt '{prompt}' --condition '{condition}' --video_path '{video_path}' --output_path '{output_path}' --video_length {video_length} --is_long_video" else: command = f"python inference.py --prompt '{prompt}' --condition '{condition}' --video_path '{video_path}' --output_path '{output_path}' --video_length {video_length}" subprocess.run(command, shell=True) # Construct the video path video_path_output = os.path.join(output_path, f"{prompt}.mp4") return "done", video_path_output with gr.Blocks() as demo: with gr.Column(): prompt = gr.Textbox(label="prompt") video_path = gr.Video(source="upload", type="filepath") condition = gr.Textbox(label="Condition", value="depth") video_length = gr.Slider(label="video length", minimum=1, maximum=15, step=1, value=2) #seed = gr.Number(label="seed", value=42) submit_btn = gr.Button("Submit") video_res = gr.Video(label="result") status = gr.Textbox(label="result") video_path.change(fn=get_frame_count_in_duration, inputs=[video_path], outputs=[video_length] ) submit_btn.click(fn=run_inference, inputs=[prompt, video_path, condition, video_length ], outputs=[status, video_res]) demo.queue(max_size=12).launch()