RuinedFooocus / modules /launch_util.py
malizec's picture
Upload folder using huggingface_hub
2de3774 verified
import os
import importlib
import importlib.util
import shutil
import subprocess
import sys
import re
import logging
from pathlib import Path
logging.getLogger("torch.distributed.nn").setLevel(logging.ERROR) # sshh...
logging.getLogger("xformers").addFilter(
lambda record: "A matching Triton is not available" not in record.getMessage()
)
python = sys.executable
default_command_live = os.environ.get("LAUNCH_LIVE_OUTPUT") == "1"
index_url = os.environ.get("INDEX_URL", "")
modules_path = Path(__file__).resolve().parent
script_path = modules_path.parent
dir_repos = "repositories"
def git_clone(url, dir, name, hash=None):
try:
import pygit2
pygit2.option(pygit2.GIT_OPT_SET_OWNER_VALIDATION, 0)
try:
repo = pygit2.Repository(dir)
except:
Path(dir).parent.mkdir(exist_ok=True)
repo = pygit2.clone_repository(url, str(dir))
print(f"{name} cloned.")
remote = repo.remotes["origin"]
remote.fetch()
commit = repo.get(hash)
repo.checkout_tree(commit, strategy=pygit2.GIT_CHECKOUT_FORCE)
print(f"{name} update check complete.")
except Exception as e:
print(f"Git clone failed for {name}: {str(e)}")
def repo_dir(name):
return str(Path(script_path) / dir_repos / name)
def is_installed(package):
try:
spec = importlib.util.find_spec(package)
except ModuleNotFoundError:
return False
return spec is not None
def run(
command, desc=None, errdesc=None, custom_env=None, live: bool = default_command_live
) -> str:
if desc is not None:
print(desc)
run_kwargs = {
"args": command,
"shell": True,
"env": os.environ if custom_env is None else custom_env,
"encoding": "utf8",
"errors": "ignore",
}
if not live:
run_kwargs["stdout"] = run_kwargs["stderr"] = subprocess.PIPE
result = subprocess.run(**run_kwargs)
if result.returncode != 0:
error_bits = [
f"{errdesc or 'Error running command'}.",
f"Command: {command}",
f"Error code: {result.returncode}",
]
if result.stdout:
error_bits.append(f"stdout: {result.stdout}")
if result.stderr:
error_bits.append(f"stderr: {result.stderr}")
raise RuntimeError("\n".join(error_bits))
return result.stdout or ""
def run_pip(command, desc=None, live=default_command_live):
index_url_line = f" --index-url {index_url}" if index_url != "" else ""
return run(
f'"{python}" -m pip {command} --prefer-binary{index_url_line}',
desc=f"Installing {desc}",
errdesc=f"Couldn't install {desc}",
live=live,
)
def pip_rm(pkgs, desc=None, live=default_command_live):
return run(
f'"{python}" -m pip uninstall -y {pkgs}',
desc=f"Uninstalling {desc}",
errdesc=f"Couldn't uninstall {desc}",
live=live,
)
re_requirement = re.compile(r"\s*([-_a-zA-Z0-9]+)\s*(?:==\s*([-+_.a-zA-Z0-9]+))?\s*")
def requirements_met(requirements_file):
"""
Does a simple parse of a requirements.txt file to determine if all rerqirements in it
are already installed. Returns True if so, False if not installed or parsing fails.
"""
import importlib.metadata
import packaging.version
with open(requirements_file, "r", encoding="utf8") as file:
for line in file:
if line.strip() == "" or line.startswith("--"):
continue
m = re.match(re_requirement, line)
if m is None:
return False
package = m.group(1).strip()
version_required = (m.group(2) or "").strip()
try:
version_installed = re.sub(
"\+.*$",
"",
importlib.metadata.version(package)
)
except Exception:
return False
if version_required == "":
continue
if packaging.version.parse(version_required) != packaging.version.parse(
version_installed
):
return False
return True