File size: 5,718 Bytes
4cb6734
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6555f50
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
67f3560
 
c08bf6c
67f3560
 
 
 
057dd29
 
67f3560
33581d9
5ccc3f6
 
 
 
33581d9
5ccc3f6
 
33581d9
5ccc3f6
67f3560
 
 
 
 
c08bf6c
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def infer_gradio_api(image_path):
    from gradio_client import Client, handle_file
    import numpy as np
    import logging
    logger = logging.getLogger(__name__)
    client = Client("aiwithoutborders-xyz/OpenSight-Community-Forensics-Preview")
    result_dict = client.predict(
        input_image=handle_file(image_path),
        api_name="/simple_predict"
    )
    logger.info(f"Debug: Raw result_dict from Gradio API (model_8): {result_dict}, type: {type(result_dict)}")
    fake_probability = result_dict.get('Fake Probability', 0.0)
    logger.info(f"Debug: Parsed result_dict: {result_dict}, Extracted fake_probability: {fake_probability}")
    return {"probabilities": np.array([fake_probability])}

def preprocess_gradio_api(image):
    temp_file_path = "./temp_gradio_input.png"
    image.save(temp_file_path)
    return temp_file_path

def postprocess_gradio_api(gradio_output, class_names):
    import numpy as np
    import logging
    logger = logging.getLogger(__name__)
    probabilities_array = None
    if isinstance(gradio_output, dict) and "probabilities" in gradio_output:
        probabilities_array = gradio_output["probabilities"]
    elif isinstance(gradio_output, np.ndarray):
        probabilities_array = gradio_output
    else:
        logger.warning(f"Unexpected output type for Gradio API post-processing: {type(gradio_output)}. Expected dict with 'probabilities' or numpy.ndarray.")
        return {class_names[0]: 0.0, class_names[1]: 1.0}

    logger.info(f"Debug: Probabilities array entering postprocess_gradio_api: {probabilities_array}, type: {type(probabilities_array)}, shape: {getattr(probabilities_array, 'shape', None)}")

    if probabilities_array is None or probabilities_array.size == 0:
        logger.warning("Probabilities array is None or empty after extracting from Gradio API output. Returning default scores.")
        return {class_names[0]: 0.0, class_names[1]: 1.0}

    fake_prob = float(probabilities_array.item())
    real_prob = 1.0 - fake_prob
    return {class_names[0]: fake_prob, class_names[1]: real_prob}

def preprocess_resize_256(image):
    if image.mode != 'RGB':
        image = image.convert('RGB')
    return transforms.Resize((256, 256))(image)

def preprocess_resize_224(image):
    if image.mode != 'RGB':
        image = image.convert('RGB')
    return transforms.Resize((224, 224))(image)

def postprocess_pipeline(prediction, class_names):
    # Assumes HuggingFace pipeline output
    return {pred['label']: float(pred['score']) for pred in prediction}

def postprocess_logits(outputs, class_names):
    # Assumes model output with logits
    logits = outputs.logits.cpu().numpy()[0]
    probabilities = softmax(logits)
    return {class_names[i]: probabilities[i] for i in range(len(class_names))}

def postprocess_binary_output(output, class_names):
    # output can be a dictionary {"probabilities": numpy_array} or directly a numpy_array
    import logging
    logger = logging.getLogger(__name__)
    probabilities_array = None
    if isinstance(output, dict) and "probabilities" in output:
        probabilities_array = output["probabilities"]
    elif isinstance(output, np.ndarray):
        probabilities_array = output
    else:
        logger.warning(f"Unexpected output type for binary post-processing: {type(output)}. Expected dict with 'probabilities' or numpy.ndarray.")
        return {class_names[0]: 0.0, class_names[1]: 1.0}

    logger.info(f"Debug: Probabilities array entering postprocess_binary_output: {probabilities_array}, type: {type(probabilities_array)}, shape: {getattr(probabilities_array, 'shape', None)}")

    if probabilities_array is None:
        logger.warning("Probabilities array is None after extracting from output. Returning default scores.")
        return {class_names[0]: 0.0, class_names[1]: 1.0}

    if probabilities_array.size == 1:
        fake_prob = float(probabilities_array.item())
    elif probabilities_array.size == 2:
        fake_prob = float(probabilities_array[0])
    else:
        logger.warning(f"Unexpected probabilities array shape for binary post-processing: {probabilities_array.shape}. Expected size 1 or 2.")
        return {class_names[0]: 0.0, class_names[1]: 1.0}

    real_prob = 1.0 - fake_prob # Ensure Fake and Real sum to 1
    return {class_names[0]: fake_prob, class_names[1]: real_prob}

def to_float_scalar(value):
    if isinstance(value, np.ndarray):
        return float(value.item()) # Convert numpy array scalar to Python float
    return float(value) # Already a Python scalar or convertible type
import numpy as np
import io
from PIL import Image, ImageFilter, ImageChops
from torchvision import transforms

def softmax(vector):
    e = np.exp(vector - np.max(vector))  # for numerical stability
    probabilities = e / e.sum()
    return [float(p.item()) for p in probabilities] # Convert numpy array elements to Python floats using .item()

def augment_image(img_pil, methods, rotate_degrees=0, noise_level=0, sharpen_strength=1):
    for method in methods:
        if method == "rotate":
            img_pil = img_pil.rotate(rotate_degrees)
        elif method == "add_noise":
            noise = np.random.normal(0, noise_level, img_pil.size[::-1] + (3,)).astype(np.uint8)
            img_pil = Image.fromarray(np.clip(np.array(img_pil) + noise, 0, 255).astype(np.uint8))
        elif method == "sharpen":
            img_pil = img_pil.filter(ImageFilter.UnsharpMask(radius=2, percent=sharpen_strength, threshold=3))
    return img_pil, img_pil

def convert_pil_to_bytes(image, format='JPEG'):
    img_byte_arr = io.BytesIO()
    image.save(img_byte_arr, format=format)
    img_byte_arr = img_byte_arr.getvalue()
    return img_byte_arr