import logging import shutil import tempfile import subprocess from pathlib import Path from moviepy.editor import VideoFileClip import gradio as gr import requests from urllib.parse import urlparse from ftplib import FTP import paramiko logging.basicConfig(level=logging.INFO) def download_file(url, destination): """Downloads a file from a url to a destination.""" response = requests.get(url) response.raise_for_status() with open(destination, 'wb') as f: f.write(response.content) def get_input_path(video_file, video_url, temp_dir): """Returns the path to the video file, downloading it if necessary.""" if video_file is not None: return Path(video_file.name) elif video_url: url_path = urlparse(video_url).path file_name = Path(url_path).name destination = temp_dir / file_name download_file(video_url, destination) return destination else: raise ValueError("No input was provided.") def upload_to_ftp(server, port, username, password, source_file, destination_file): """Uploads a file to an FTP server.""" with FTP(server, username, password) as ftp: ftp.connect(server, int(port)) # add this line to specify the port with open(source_file, 'rb') as f: ftp.storbinary(f'STOR {destination_file}', f) def upload_to_sftp(server, port, username, password, source_file, destination_file): """Uploads a file to an SFTP server.""" transport = paramiko.Transport((server, int(port))) transport.connect(username=username, password=password) sftp = paramiko.SFTPClient.from_transport(transport) sftp.put(source_file, destination_file) sftp.close() transport.close() def get_output_path(input_path, temp_dir, res): """Returns the path to the output file, creating it if necessary.""" output_path = temp_dir / (Path(input_path).stem + f"_{res}.m3u8") return output_path def get_aspect_ratio(input_path, aspect_ratio): """Returns the aspect ratio of the video, calculating it if necessary.""" if aspect_ratio is not None: return aspect_ratio video = VideoFileClip(str(input_path)) return f"{video.size[0]}:{video.size[1]}" def create_master_playlist(output_paths, temp_dir): """Creates a master playlist .m3u8 file that includes all other .m3u8 files.""" master_playlist_path = temp_dir / "master_playlist.m3u8" with open(master_playlist_path, 'w') as f: f.write("#EXTM3U\n") for path in output_paths: f.write(f"#EXT-X-STREAM-INF:BANDWIDTH={1000*1000},RESOLUTION={path.stem.split('_')[-1]}\n") f.write(f"{path.name}\n") return master_playlist_path def convert_video(video_file, quality, aspect_ratio, video_url, ftp_server, ftp_username, ftp_password, ftp_path, upload): standard_resolutions = [4320, 2160, 1440, 1080, 720, 480, 360, 240, 144] # 8K, 4K, 2K, Full HD, HD, SD in pixels with tempfile.TemporaryDirectory() as temp_dir: temp_dir = Path(temp_dir) input_path = get_input_path(video_file, video_url, temp_dir) aspect_ratio = get_aspect_ratio(input_path, aspect_ratio) video = VideoFileClip(str(input_path)) original_height = video.size[1] output_paths = [] # Define output_paths as an empty list for res in standard_resolutions: # Skip if resolution is higher than original if res > original_height: continue scale = "-1:" + str(res) # we scale the height to res and keep aspect ratio output_path = get_output_path(input_path, temp_dir, str(res) + 'p') # pass the resolution to create a unique output file ffmpeg_command = [ "ffmpeg", "-i", str(input_path), "-c:v", "libx264", "-crf", str(quality), "-vf", f"scale={scale}:force_original_aspect_ratio=decrease,pad=ceil(iw/2)*2:ceil(ih/2)*2,setsar={aspect_ratio}", "-hls_time", "6", "-hls_playlist_type", "vod", "-f", "hls", str(output_path) ] try: result = subprocess.run(ffmpeg_command, check=True, timeout=600, capture_output=True, text=True) except subprocess.CalledProcessError as e: logging.error("ffmpeg command failed with the following error:\n%s", e.stderr) error_file_path = tempfile.gettempdir() + "/error.txt" with open(error_file_path, 'w') as error_file: error_file.write("ffmpeg command failed with the following error:\n" + e.stderr) return error_file_path except subprocess.TimeoutExpired: logging.exception("ffmpeg command timed out.") return "ffmpeg command timed out." except FileNotFoundError: logging.exception("ffmpeg is not installed.") return "ffmpeg is not installed." except Exception as e: logging.exception("An error occurred.") return f"An error occurred: {str(e)}" output_paths.append(output_path) # Append the output_path to output_paths if upload: ftp_files = [] for path in [master_playlist_copy_path] + output_copy_paths: source_file = path # This should be the full local path of the file destination_file = ftp_path + "/" + path.name # This should be the desired remote path of the file upload_to_ftp(ftp_server, ftp_username, ftp_password, source_file, destination_file) ftp_files.append(destination_file) return ftp_files else: return [master_playlist_copy_path] + output_copy_paths if not output_paths: return "The video is smaller than the smallest standard resolution." # Create master playlist master_playlist_path = create_master_playlist(output_paths, temp_dir) output_copy_paths = [shutil.copy2(path, tempfile.gettempdir()) for path in output_paths] master_playlist_copy_path = shutil.copy2(master_playlist_path, tempfile.gettempdir()) return [master_playlist_copy_path] + output_copy_paths def process_output(output): """Process output and display it appropriately.""" if isinstance(output, str): # If output is a string, assume it's an error message and display it as text. return gr.outputs.Textbox()(output) elif isinstance(output, Path): # If output is a Path, assume it's an error file and provide it as a downloadable file. return gr.outputs.File()(str(output)) elif isinstance(output, list): # If output is a list, assume it's a list of Paths or URLs and display it as a markdown list. return gr.outputs.Markdown()("\n".join(f"- {o}" for o in output)) else: raise TypeError("Unexpected output type") def main(): video_file = gr.inputs.File(label="Video File") quality = gr.inputs.Dropdown( choices=["18", "23", "27", "28", "32"], label="Quality", default="27") aspect_ratio = gr.inputs.Dropdown( choices=["16:9", "1:1", "4:3", "3:2", "5:4", "21:9", "1.85:1", "2.35:1", "3:1", "360", "9:16", "16:9", "2:1", "1:2", "9:1"], label="Aspect Ratio", default="16:9") video_url = gr.inputs.Textbox(label="Video URL") ftp_server = gr.inputs.Textbox(label="FTP Server") ftp_port = gr.inputs.Textbox(label="FTP Port", placeholder="21") ftp_username = gr.inputs.Textbox(label="FTP Username") ftp_password = gr.inputs.Textbox(label="FTP Password", type="text") ftp_path = gr.inputs.Textbox(label="FTP Path") upload = gr.inputs.Checkbox(label="Upload to FTP server", default=False) gr.Interface( convert_video, inputs=[video_file, quality, aspect_ratio, video_url, ftp_server, ftp_username, ftp_password, ftp_path, upload], outputs=[gr.outputs.Textbox(), gr.outputs.File(), gr.outputs.Markdown()], output_processor=process_output, allow_flagging=False, live=False, ).launch() if __name__ == "__main__": main()