Spaces:
Running
on
Zero
Running
on
Zero
File size: 3,306 Bytes
5ccb60d 9202118 2fbd94c 5ccb60d a87bfa9 5ccb60d a592742 5ccb60d 15b57f6 ea5a4a5 a592742 ea5a4a5 46bfcc6 9202118 46bfcc6 d40aa0a 2f7594c 2fbd94c ba738f8 2fbd94c a1816dd 575e475 ea5a4a5 e6a600e a1816dd 5ccb60d 15b57f6 5ccb60d 15b57f6 9202118 46bfcc6 5ccb60d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# logging.py
import os
import uuid
import time
from huggingface_hub import CommitScheduler, HfApi
from PIL import Image
import numpy as np
from diffusers.utils import load_image
APP_VERSION = "0_4"
HF_DATASET_REPO = "LPX55/upscaler_logs" # Change to your dataset repo
HF_TOKEN = os.environ.get("HUGGINGFACE_TOKEN") # Make sure this is set in your environment
LOG_DIR = "logs_" + APP_VERSION
IMAGE_DIR = os.path.join(LOG_DIR, "upscaler")
LOG_FILE = os.path.join(LOG_DIR, f"{int(time.time())}-logs.csv")
api = HfApi(token=HF_TOKEN)
scheduler = CommitScheduler(
repo_id=HF_DATASET_REPO,
repo_type="dataset",
folder_path=LOG_DIR,
every=5,
private=True,
token=HF_TOKEN,
path_in_repo="v" + APP_VERSION
)
# def cache_temp(img_id):
# api.upload_file(
# path_or_fileobj=os.path.join("/tmp/gradio", f"{img_id}"),
# path_in_repo="/v" + APP_VERSION + "/" + img_id,
# repo_id=HF_DATASET_REPO,
# repo_type="dataset",
# token=HF_TOKEN,
# )
def save_image(image_id, image_path: Image.Image) -> None:
os.makedirs(IMAGE_DIR, exist_ok=True)
print("Image ID: " + image_id)
print("Image ID Type: " + str(type(image_id)))
save_image_path = os.path.join(IMAGE_DIR, f"{image_id}")
print("Save image path: " + save_image_path)
try:
loaded = load_image(image_path)
cache_file = "/tmp/gradio/" + str(uuid.uuid4()) + ".png"
loaded.save(cache_file, "PNG")
print("Loaded Type: " + str(type(loaded)))
with scheduler.lock:
try:
# cache_temp(cache_file)
#print("Cache path: " + cache_file)
#print("Type: " + str(type(cache_file)))
img2 = Image.open(cache_file)
img2.save(save_image_path)
print(f"Saved image: {save_image_path}")
except Exception as e:
print(f"Error saving image: {str(e)}")
except Exception as e:
print(f"Error loading image: {str(e)}")
def log_params(
prompt, scale, steps, controlnet_conditioning_scale, guidance_scale, seed, guidance_end,
before_image, after_image, user=None
):
before_id = str(uuid.uuid4()) + "_before.png"
after_id = str(uuid.uuid4()) + "_after.png"
before_path = os.path.join(IMAGE_DIR, before_id)
after_path = os.path.join(IMAGE_DIR, after_id)
print("Type before: " + str(type(before_image)))
print("Type after: " + str(type(after_image)))
save_image(before_id, before_image)
save_image(after_id, after_image)
#print("Before path: " + before_path)
#print("After path: " + after_path)
is_new = not os.path.exists(LOG_FILE)
with open(LOG_FILE, "a", newline='') as f:
import csv
writer = csv.writer(f)
if is_new:
writer.writerow([
"timestamp", "user", "prompt", "scale", "steps", "controlnet_conditioning_scale",
"guidance_scale", "seed", "guidance_end", "before_image", "after_image"
])
writer.writerow([
time.strftime("%Y-%m-%dT%H:%M:%S"),
user or "anonymous",
prompt, scale, steps, controlnet_conditioning_scale,
guidance_scale, seed, guidance_end, before_path, after_path
])
|