|
import cv2 |
|
import numpy as np |
|
from PIL import Image |
|
import tempfile |
|
import os |
|
import subprocess |
|
import sys |
|
import json |
|
from typing import Dict, List, Tuple, Optional |
|
import logging |
|
|
|
|
|
logging.getLogger('deepface').setLevel(logging.ERROR) |
|
|
|
try: |
|
from deepface import DeepFace |
|
DEEPFACE_AVAILABLE = True |
|
except ImportError: |
|
DEEPFACE_AVAILABLE = False |
|
print("Warning: DeepFace not available. Face comparison will be disabled.") |
|
|
|
|
|
def run_deepface_in_subprocess(img1_path: str, img2_path: str) -> dict: |
|
""" |
|
Run DeepFace verification in a separate process to avoid TensorFlow conflicts. |
|
""" |
|
script_content = f''' |
|
import sys |
|
import json |
|
from deepface import DeepFace |
|
|
|
try: |
|
result = DeepFace.verify(img1_path="{img1_path}", img2_path="{img2_path}") |
|
print(json.dumps(result)) |
|
except Exception as e: |
|
print(json.dumps({{"error": str(e)}})) |
|
''' |
|
|
|
try: |
|
|
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as script_file: |
|
script_file.write(script_content) |
|
script_path = script_file.name |
|
|
|
|
|
result = subprocess.run([sys.executable, script_path], |
|
capture_output=True, text=True, timeout=30) |
|
|
|
|
|
os.unlink(script_path) |
|
|
|
if result.returncode == 0: |
|
return json.loads(result.stdout.strip()) |
|
else: |
|
return {"error": f"Subprocess failed: {result.stderr}"} |
|
|
|
except Exception as e: |
|
return {"error": str(e)} |
|
|
|
|
|
class FaceComparison: |
|
""" |
|
Handles face detection and comparison on full images. |
|
Only responsible for determining if faces match - does not handle segmentation. |
|
""" |
|
|
|
def __init__(self): |
|
""" |
|
Initialize face comparison using DeepFace's default verification threshold. |
|
""" |
|
self.available = DEEPFACE_AVAILABLE |
|
self.face_match_result = None |
|
self.comparison_log = [] |
|
|
|
def extract_faces(self, image_path: str) -> List[np.ndarray]: |
|
""" |
|
Extract faces from the full image using DeepFace (exactly like the working script). |
|
|
|
Args: |
|
image_path: Path to the image |
|
|
|
Returns: |
|
List of face arrays |
|
""" |
|
if not self.available: |
|
return [] |
|
|
|
try: |
|
faces = DeepFace.extract_faces(img_path=image_path, detector_backend='opencv') |
|
if len(faces) == 0: |
|
return [] |
|
return [f['face'] for f in faces] |
|
|
|
except Exception as e: |
|
print(f"Error extracting faces from {image_path}: {str(e)}") |
|
return [] |
|
|
|
def compare_all_faces(self, image1_path: str, image2_path: str) -> Tuple[bool, List[str]]: |
|
""" |
|
Compare all faces between two images (exactly like the working script). |
|
|
|
Args: |
|
image1_path: Path to first image |
|
image2_path: Path to second image |
|
|
|
Returns: |
|
Tuple of (match_found, log_messages) |
|
""" |
|
if not self.available: |
|
return False, ["Face comparison not available - DeepFace not installed"] |
|
|
|
log_messages = [] |
|
|
|
try: |
|
faces1 = self.extract_faces(image1_path) |
|
faces2 = self.extract_faces(image2_path) |
|
|
|
match_found = False |
|
|
|
log_messages.append(f"Found {len(faces1)} face(s) in Image 1 and {len(faces2)} face(s) in Image 2") |
|
|
|
if len(faces1) == 0 or len(faces2) == 0: |
|
log_messages.append("β No faces found in one or both images") |
|
return False, log_messages |
|
|
|
for idx1, face1 in enumerate(faces1): |
|
for idx2, face2 in enumerate(faces2): |
|
|
|
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as temp1, \ |
|
tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as temp2: |
|
|
|
|
|
face1_uint8 = (face1 * 255).astype(np.uint8) |
|
face2_uint8 = (face2 * 255).astype(np.uint8) |
|
|
|
cv2.imwrite(temp1.name, cv2.cvtColor(face1_uint8, cv2.COLOR_RGB2BGR)) |
|
cv2.imwrite(temp2.name, cv2.cvtColor(face2_uint8, cv2.COLOR_RGB2BGR)) |
|
|
|
try: |
|
|
|
result = run_deepface_in_subprocess(temp1.name, temp2.name) |
|
|
|
if "error" in result: |
|
|
|
result = DeepFace.verify(img1_path=temp1.name, img2_path=temp2.name) |
|
|
|
similarity = 1 - result['distance'] |
|
|
|
log_messages.append(f"Comparing Face1-{idx1} to Face2-{idx2} | Similarity: {similarity:.3f}") |
|
|
|
if result['verified']: |
|
log_messages.append(f"β
Match found between Face1-{idx1} and Face2-{idx2}") |
|
match_found = True |
|
else: |
|
log_messages.append(f"β No match between Face1-{idx1} and Face2-{idx2}") |
|
|
|
except Exception as e: |
|
log_messages.append(f"β Error comparing Face1-{idx1} to Face2-{idx2}: {str(e)}") |
|
|
|
|
|
try: |
|
os.unlink(temp1.name) |
|
os.unlink(temp2.name) |
|
except: |
|
pass |
|
|
|
if not match_found: |
|
log_messages.append("β No matching faces found between the two images.") |
|
|
|
return match_found, log_messages |
|
|
|
except Exception as e: |
|
log_messages.append(f"Error in face comparison: {str(e)}") |
|
return False, log_messages |
|
|
|
def run_face_comparison(self, img1_path: str, img2_path: str) -> Tuple[bool, List[str]]: |
|
""" |
|
Run face comparison and store results for later use. |
|
|
|
Args: |
|
img1_path: Path to first image |
|
img2_path: Path to second image |
|
|
|
Returns: |
|
Tuple of (faces_match, log_messages) |
|
""" |
|
faces_match, log_messages = self.compare_all_faces(img1_path, img2_path) |
|
|
|
|
|
self.face_match_result = faces_match |
|
self.comparison_log = log_messages |
|
|
|
return faces_match, log_messages |
|
|
|
def filter_human_regions_by_face_match(self, masks: Dict[str, np.ndarray]) -> Tuple[Dict[str, np.ndarray], List[str]]: |
|
""" |
|
Filter human regions based on previously computed face comparison results. |
|
This only includes/excludes human regions - fine-grained segmentation happens elsewhere. |
|
|
|
Args: |
|
masks: Dictionary of semantic masks |
|
|
|
Returns: |
|
Tuple of (filtered_masks, log_messages) |
|
""" |
|
if not self.available: |
|
return masks, ["Face comparison not available - DeepFace not installed"] |
|
|
|
if self.face_match_result is None: |
|
return masks, ["No face comparison results available. Run face comparison first."] |
|
|
|
filtered_masks = {} |
|
log_messages = [] |
|
|
|
|
|
human_labels = [label for label in masks.keys() if 'l3_human' in label.lower()] |
|
bio_labels = [label for label in masks.keys() if 'l2_bio' in label.lower()] |
|
|
|
log_messages.append(f"Found human labels: {human_labels}") |
|
log_messages.append(f"Found bio labels: {bio_labels}") |
|
|
|
|
|
for label, mask in masks.items(): |
|
if not any(human_term in label.lower() for human_term in ['l3_human', 'l2_bio']): |
|
filtered_masks[label] = mask |
|
log_messages.append(f"β
Including non-human region: {label}") |
|
else: |
|
log_messages.append(f"π Found human/bio region: {label}") |
|
|
|
|
|
if self.face_match_result: |
|
log_messages.append("β
Faces matched! Including human regions in color matching.") |
|
|
|
for label in human_labels + bio_labels: |
|
if label in masks: |
|
filtered_masks[label] = masks[label] |
|
log_messages.append(f"β
Including human region (faces matched): {label}") |
|
else: |
|
log_messages.append("β No face match found. Excluding human regions from color matching.") |
|
|
|
for label in human_labels + bio_labels: |
|
log_messages.append(f"β Excluding human region (no face match): {label}") |
|
|
|
log_messages.append(f"π Final filtered masks: {list(filtered_masks.keys())}") |
|
|
|
return filtered_masks, log_messages |