LPX55
feat: add Gradio API integration and ONNX preprocessing functions
4cb6734
def infer_gradio_api(image_path):
from gradio_client import Client, handle_file
import numpy as np
import logging
logger = logging.getLogger(__name__)
client = Client("aiwithoutborders-xyz/OpenSight-Community-Forensics-Preview")
result_dict = client.predict(
input_image=handle_file(image_path),
api_name="/simple_predict"
)
logger.info(f"Debug: Raw result_dict from Gradio API (model_8): {result_dict}, type: {type(result_dict)}")
fake_probability = result_dict.get('Fake Probability', 0.0)
logger.info(f"Debug: Parsed result_dict: {result_dict}, Extracted fake_probability: {fake_probability}")
return {"probabilities": np.array([fake_probability])}
def preprocess_gradio_api(image):
temp_file_path = "./temp_gradio_input.png"
image.save(temp_file_path)
return temp_file_path
def postprocess_gradio_api(gradio_output, class_names):
import numpy as np
import logging
logger = logging.getLogger(__name__)
probabilities_array = None
if isinstance(gradio_output, dict) and "probabilities" in gradio_output:
probabilities_array = gradio_output["probabilities"]
elif isinstance(gradio_output, np.ndarray):
probabilities_array = gradio_output
else:
logger.warning(f"Unexpected output type for Gradio API post-processing: {type(gradio_output)}. Expected dict with 'probabilities' or numpy.ndarray.")
return {class_names[0]: 0.0, class_names[1]: 1.0}
logger.info(f"Debug: Probabilities array entering postprocess_gradio_api: {probabilities_array}, type: {type(probabilities_array)}, shape: {getattr(probabilities_array, 'shape', None)}")
if probabilities_array is None or probabilities_array.size == 0:
logger.warning("Probabilities array is None or empty after extracting from Gradio API output. Returning default scores.")
return {class_names[0]: 0.0, class_names[1]: 1.0}
fake_prob = float(probabilities_array.item())
real_prob = 1.0 - fake_prob
return {class_names[0]: fake_prob, class_names[1]: real_prob}
def preprocess_resize_256(image):
if image.mode != 'RGB':
image = image.convert('RGB')
return transforms.Resize((256, 256))(image)
def preprocess_resize_224(image):
if image.mode != 'RGB':
image = image.convert('RGB')
return transforms.Resize((224, 224))(image)
def postprocess_pipeline(prediction, class_names):
# Assumes HuggingFace pipeline output
return {pred['label']: float(pred['score']) for pred in prediction}
def postprocess_logits(outputs, class_names):
# Assumes model output with logits
logits = outputs.logits.cpu().numpy()[0]
probabilities = softmax(logits)
return {class_names[i]: probabilities[i] for i in range(len(class_names))}
def postprocess_binary_output(output, class_names):
# output can be a dictionary {"probabilities": numpy_array} or directly a numpy_array
import logging
logger = logging.getLogger(__name__)
probabilities_array = None
if isinstance(output, dict) and "probabilities" in output:
probabilities_array = output["probabilities"]
elif isinstance(output, np.ndarray):
probabilities_array = output
else:
logger.warning(f"Unexpected output type for binary post-processing: {type(output)}. Expected dict with 'probabilities' or numpy.ndarray.")
return {class_names[0]: 0.0, class_names[1]: 1.0}
logger.info(f"Debug: Probabilities array entering postprocess_binary_output: {probabilities_array}, type: {type(probabilities_array)}, shape: {getattr(probabilities_array, 'shape', None)}")
if probabilities_array is None:
logger.warning("Probabilities array is None after extracting from output. Returning default scores.")
return {class_names[0]: 0.0, class_names[1]: 1.0}
if probabilities_array.size == 1:
fake_prob = float(probabilities_array.item())
elif probabilities_array.size == 2:
fake_prob = float(probabilities_array[0])
else:
logger.warning(f"Unexpected probabilities array shape for binary post-processing: {probabilities_array.shape}. Expected size 1 or 2.")
return {class_names[0]: 0.0, class_names[1]: 1.0}
real_prob = 1.0 - fake_prob # Ensure Fake and Real sum to 1
return {class_names[0]: fake_prob, class_names[1]: real_prob}
def to_float_scalar(value):
if isinstance(value, np.ndarray):
return float(value.item()) # Convert numpy array scalar to Python float
return float(value) # Already a Python scalar or convertible type
import numpy as np
import io
from PIL import Image, ImageFilter, ImageChops
from torchvision import transforms
def softmax(vector):
e = np.exp(vector - np.max(vector)) # for numerical stability
probabilities = e / e.sum()
return [float(p.item()) for p in probabilities] # Convert numpy array elements to Python floats using .item()
def augment_image(img_pil, methods, rotate_degrees=0, noise_level=0, sharpen_strength=1):
for method in methods:
if method == "rotate":
img_pil = img_pil.rotate(rotate_degrees)
elif method == "add_noise":
noise = np.random.normal(0, noise_level, img_pil.size[::-1] + (3,)).astype(np.uint8)
img_pil = Image.fromarray(np.clip(np.array(img_pil) + noise, 0, 255).astype(np.uint8))
elif method == "sharpen":
img_pil = img_pil.filter(ImageFilter.UnsharpMask(radius=2, percent=sharpen_strength, threshold=3))
return img_pil, img_pil
def convert_pil_to_bytes(image, format='JPEG'):
img_byte_arr = io.BytesIO()
image.save(img_byte_arr, format=format)
img_byte_arr = img_byte_arr.getvalue()
return img_byte_arr