import logging import shutil import tempfile import subprocess from pathlib import Path from moviepy.editor import VideoFileClip import gradio as gr import requests from urllib.parse import urlparse logging.basicConfig(level=logging.INFO) def download_file(url, destination): """Downloads a file from a url to a destination.""" response = requests.get(url) response.raise_for_status() with open(destination, 'wb') as f: f.write(response.content) def get_input_path(video_file, video_url, temp_dir): """Returns the path to the video file, downloading it if necessary.""" if video_file is not None: return Path(video_file.name) elif video_url: url_path = urlparse(video_url).path file_name = Path(url_path).name destination = temp_dir / file_name download_file(video_url, destination) return destination else: raise ValueError("No input was provided.") def get_output_path(input_path, temp_dir, res): """Returns the path to the output file, creating it if necessary.""" output_path = temp_dir / (Path(input_path).stem + f"_{res}.m3u8") return output_path def process_output(output): """Process output and display it appropriately.""" if isinstance(output, str): # If output is a string, assume it's an error message and display it as text. return gr.outputs.Textbox()(output) elif isinstance(output, Path): # If output is a Path, assume it's an error file and provide it as a downloadable file. return gr.outputs.File()(str(output)) elif isinstance(output, list): # If output is a list, assume it's a list of Paths or URLs and display it as a markdown list. return gr.outputs.Markdown()("\n".join(f"- {o}" for o in output)) else: raise TypeError("Unexpected output type") gr.Interface( convert_video, inputs=[video_file, quality, aspect_ratio, video_url, api_key, upload], outputs=gr.outputs.Any(), output_processor=process_output, allow_flagging=False, live=False, ).launch() def get_aspect_ratio(input_path, aspect_ratio): """Returns the aspect ratio of the video, calculating it if necessary.""" if aspect_ratio is not None: return aspect_ratio video = VideoFileClip(str(input_path)) return f"{video.size[0]}:{video.size[1]}" def upload_to_web3_storage(api_key, path): """Uploads a file to web3.storage using the HTTP API.""" headers = {"Authorization": f"Bearer {api_key}"} with open(path, 'rb') as f: response = requests.post( "https://api.web3.storage/upload", headers=headers, data=f, ) response.raise_for_status() # Raises an exception if the request failed try: cid = response.json()["value"]["cid"] return f"https://dweb.link/ipfs/{cid}" except KeyError: logging.exception("An error occurred while uploading to web3.storage.") return f"An error occurred while uploading to web3.storage: {response.json()}" def create_master_playlist(output_paths, temp_dir): """Creates a master playlist .m3u8 file that includes all other .m3u8 files.""" master_playlist_path = temp_dir / "master_playlist.m3u8" with open(master_playlist_path, 'w') as f: f.write("#EXTM3U\n") for path in output_paths: f.write(f"#EXT-X-STREAM-INF:BANDWIDTH={1000*1000},RESOLUTION={path.stem.split('_')[-1]}\n") f.write(f"{path.name}\n") return master_playlist_path def convert_video(video_file, quality, aspect_ratio, video_url, api_key, upload): standard_resolutions = [4320, 2160, 1440, 1080, 720, 480] # 8K, 4K, 2K, Full HD, HD, SD in pixels with tempfile.TemporaryDirectory() as temp_dir: temp_dir = Path(temp_dir) input_path = get_input_path(video_file, video_url, temp_dir) aspect_ratio = get_aspect_ratio(input_path, aspect_ratio) video = VideoFileClip(str(input_path)) original_height = video.size[1] output_paths = [] # Define output_paths as an empty list for res in standard_resolutions: # Skip if resolution is higher than original if res > original_height: continue scale = "-1:" + str(res) # we scale the height to res and keep aspect ratio output_path = get_output_path(input_path, temp_dir, str(res) + 'p') # pass the resolution to create a unique output file ffmpeg_command = [ "ffmpeg", "-i", str(input_path), "-c:v", "libx264", "-crf", str(quality), "-vf", f"scale={scale},setsar={aspect_ratio}", "-hls_time", "6", "-hls_playlist_type", "vod", "-f", "hls", str(output_path) ] try: logging.info("Running command: %s", " ".join(ffmpeg_command)) subprocess.run(ffmpeg_command, check=True, timeout=600, stderr=subprocess.PIPE) except subprocess.CalledProcessError as e: logging.exception("ffmpeg command failed.") error_file_path = temp_dir / "error.txt" with open(error_file_path, 'w') as error_file: error_file.write("ffmpeg command failed:\n") error_file.write(e.stderr.decode()) return error_file_path except subprocess.TimeoutExpired: logging.exception("ffmpeg command timed out.") return "ffmpeg command timed out." except FileNotFoundError: logging.exception("ffmpeg is not installed.") return "ffmpeg is not installed." except Exception as e: logging.exception("An error occurred.") return f"An error occurred: {str(e)}" output_paths.append(output_path) # Append the output_path to output_paths if not output_paths: return "The video is smaller than the smallest standard resolution." # Create master playlist master_playlist_path = create_master_playlist(output_paths, temp_dir) output_copy_paths = [shutil.copy2(path, tempfile.gettempdir()) for path in output_paths] master_playlist_copy_path = shutil.copy2(master_playlist_path, tempfile.gettempdir()) if upload: return [upload_to_web3_storage(api_key, path) for path in [master_playlist_copy_path] + output_copy_paths] else: return [master_playlist_copy_path] + output_copy_paths def main(): video_file = gr.inputs.File(label="Video File") quality = gr.inputs.Dropdown( choices=["18", "23", "27", "28", "32"], label="Quality", default="27") aspect_ratio = gr.inputs.Dropdown( choices=["16:9", "1:1", "4:3", "3:2", "5:4", "21:9", "1.85:1", "2.35:1", "3:1", "360", "9:16", "16:9", "2:1", "1:2", "9:1"], label="Aspect Ratio", default="16:9") video_url = gr.inputs.Textbox(label="Video URL") api_key = gr.inputs.Textbox(label="web3.storage API Key") upload = gr.inputs.Checkbox(label="Upload to web3.storage", default=False) gr.Interface( convert_video, inputs=[video_file, quality, aspect_ratio, video_url, api_key, upload], outputs=gr.outputs.File(label="Download File"), allow_flagging=False, live=False, ).launch() if __name__ == "__main__": main()