import os import importlib import importlib.util import shutil import subprocess import sys import re import logging from pathlib import Path logging.getLogger("torch.distributed.nn").setLevel(logging.ERROR) # sshh... logging.getLogger("xformers").addFilter( lambda record: "A matching Triton is not available" not in record.getMessage() ) python = sys.executable default_command_live = os.environ.get("LAUNCH_LIVE_OUTPUT") == "1" index_url = os.environ.get("INDEX_URL", "") modules_path = Path(__file__).resolve().parent script_path = modules_path.parent dir_repos = "repositories" def git_clone(url, dir, name, hash=None): try: import pygit2 pygit2.option(pygit2.GIT_OPT_SET_OWNER_VALIDATION, 0) try: repo = pygit2.Repository(dir) except: Path(dir).parent.mkdir(exist_ok=True) repo = pygit2.clone_repository(url, str(dir)) print(f"{name} cloned.") remote = repo.remotes["origin"] remote.fetch() commit = repo.get(hash) repo.checkout_tree(commit, strategy=pygit2.GIT_CHECKOUT_FORCE) print(f"{name} update check complete.") except Exception as e: print(f"Git clone failed for {name}: {str(e)}") def repo_dir(name): return str(Path(script_path) / dir_repos / name) def is_installed(package): try: spec = importlib.util.find_spec(package) except ModuleNotFoundError: return False return spec is not None def run( command, desc=None, errdesc=None, custom_env=None, live: bool = default_command_live ) -> str: if desc is not None: print(desc) run_kwargs = { "args": command, "shell": True, "env": os.environ if custom_env is None else custom_env, "encoding": "utf8", "errors": "ignore", } if not live: run_kwargs["stdout"] = run_kwargs["stderr"] = subprocess.PIPE result = subprocess.run(**run_kwargs) if result.returncode != 0: error_bits = [ f"{errdesc or 'Error running command'}.", f"Command: {command}", f"Error code: {result.returncode}", ] if result.stdout: error_bits.append(f"stdout: {result.stdout}") if result.stderr: error_bits.append(f"stderr: {result.stderr}") raise RuntimeError("\n".join(error_bits)) return result.stdout or "" def run_pip(command, desc=None, live=default_command_live): index_url_line = f" --index-url {index_url}" if index_url != "" else "" return run( f'"{python}" -m pip {command} --prefer-binary{index_url_line}', desc=f"Installing {desc}", errdesc=f"Couldn't install {desc}", live=live, ) def pip_rm(pkgs, desc=None, live=default_command_live): return run( f'"{python}" -m pip uninstall -y {pkgs}', desc=f"Uninstalling {desc}", errdesc=f"Couldn't uninstall {desc}", live=live, ) re_requirement = re.compile(r"\s*([-_a-zA-Z0-9]+)\s*(?:==\s*([-+_.a-zA-Z0-9]+))?\s*") def requirements_met(requirements_file): """ Does a simple parse of a requirements.txt file to determine if all rerqirements in it are already installed. Returns True if so, False if not installed or parsing fails. """ import importlib.metadata import packaging.version with open(requirements_file, "r", encoding="utf8") as file: for line in file: if line.strip() == "" or line.startswith("--"): continue m = re.match(re_requirement, line) if m is None: return False package = m.group(1).strip() version_required = (m.group(2) or "").strip() try: version_installed = re.sub( "\+.*$", "", importlib.metadata.version(package) ) except Exception: return False if version_required == "": continue if packaging.version.parse(version_required) != packaging.version.parse( version_installed ): return False return True