Spaces:
Runtime error
Runtime error
# docker build -t reward-simulator .docker run -p 7860:7860 -v $(pwd)/data:/app/data reward-simulator | |
from PIL import Image | |
import numpy as np | |
import io | |
import faiss | |
import requests | |
import torch | |
from request import get_ft, get_topk | |
from flickrapi import FlickrAPI | |
from flask import Flask, request, render_template, jsonify, send_from_directory | |
app = Flask(__name__) | |
PRESET_IMAGES = { | |
1: "static/1.webp", | |
2: "static/2.webp", | |
3: "static/3.webp" | |
} | |
# Add Flickr configuration | |
FLICKR_API_KEY = '80ef21a6f7eb0984ea613c316a89ca69' | |
FLICKR_API_SECRET = '4d0e8ce6734f4b3f' | |
flickr = FlickrAPI(FLICKR_API_KEY, FLICKR_API_SECRET, format='parsed-json', store_token=False) | |
def get_photo_id(url): | |
"""Extract photo ID from Flickr URL""" | |
try: | |
return url.split('/')[-1].split('_')[0] | |
except: | |
return None | |
def get_other_info(url): | |
"""Get author information from Flickr""" | |
try: | |
photo_id = get_photo_id(url) | |
if photo_id: | |
photo_info = flickr.photos.getInfo(photo_id=photo_id) | |
license = photo_info['photo']['license'] | |
owner = photo_info['photo']['owner'] | |
flickr_url = f"https://www.flickr.com/photos/{owner.get('nsid', '')}/{photo_id}" | |
return { | |
'username': owner.get('username', ''), | |
'realname': owner.get('realname', ''), | |
'nsid': owner.get('nsid', ''), | |
'flickr_url': flickr_url, | |
'license': license | |
} | |
except: | |
pass | |
return { | |
'username': 'Unknown', | |
'realname': 'Unknown', | |
'nsid': '', | |
'flickr_url': '', | |
'license': 'Unknown' | |
} | |
def load_model(): | |
"""Load DINOv2 model once and cache it""" | |
torch.hub.set_dir('static') | |
model = torch.hub.load('facebookresearch/dinov2', 'dinov2_vits14') | |
model.eval() | |
model.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu')) | |
return model | |
def load_index(index_path): | |
"""Load FAISS index once and cache it""" | |
return faiss.read_index(index_path) | |
def distance_to_similarity(distances, temp=1e-4): | |
"""Convert distance to similarity""" | |
for ii in range(len(distances)): | |
contribs = distances[ii].max() - distances[ii] | |
contribs = contribs / temp | |
sum_contribs = np.exp(contribs).sum() | |
distances[ii] = np.exp(contribs) / sum_contribs | |
return distances | |
import os | |
import os | |
from PIL import Image | |
import numpy as np | |
def calculate_rewards(subscription, num_generations, author_share, ro_share, num_users_k, similarities, num_authors=1800): | |
"""Calculate raw similarity (distance) between two static images""" | |
try: | |
if os.path.exists("static/1.webp") and os.path.exists("static/2.webp"): | |
image1 = Image.open("static/1.webp") | |
image2 = Image.open("static/2.webp") | |
features1 = get_ft(model, image1) | |
features2 = get_ft(model, image1) # temporaire : remettre image2 | |
euclid = float(np.linalg.norm(features1 - features2)) | |
else: | |
euclid = 0.0 | |
except Exception as e: | |
print(f"Erreur lors du chargement des images : {e}") | |
euclid = 0.0 | |
rewards = [{ | |
'raw_similarity': euclid | |
}] | |
return rewards | |
# Global variables for model and index | |
model = None | |
index = None | |
urls = None | |
def init_model(): | |
global model, index, urls | |
model = load_model() | |
index = load_index("data/openimages_index.bin") | |
with open("data/openimages_urls.txt", "r") as f: | |
urls = f.readlines() | |
def home(): | |
return render_template('index.html') | |
def serve_static(filename): | |
return send_from_directory('static', filename) | |
DEFAULT_PARAMS = { | |
'subscription': 12, | |
'num_generations': 60, | |
'author_share': 5, | |
'ro_share': 10, | |
'num_users_k': 500, | |
'num_neighbors': 10, | |
'num_authors': 2000 | |
} | |
def select_preset(preset_id): | |
if preset_id not in PRESET_IMAGES: | |
return jsonify({'error': 'Invalid preset ID'}), 400 | |
try: | |
image_path = PRESET_IMAGES[preset_id] | |
image = Image.open(image_path).convert('RGB') | |
# Use default parameters for presets | |
params = DEFAULT_PARAMS.copy() | |
# Get features and search | |
features = get_ft(model, image) | |
distances, indices = get_topk(index, features, topk=params['num_neighbors']) | |
# Collect valid results first | |
valid_results = [] | |
valid_similarities = [] | |
for i in range(params['num_neighbors']): | |
image_url = urls[indices[0][i]].strip() | |
try: | |
response = requests.head(image_url) | |
if response.status_code == 200: | |
valid_results.append({ | |
'index': i, | |
'url': image_url | |
}) | |
valid_similarities.append(distances[0][i]) | |
except requests.RequestException: | |
continue | |
# Renormalize similarities for valid results | |
if valid_similarities: | |
similarities = distance_to_similarity(np.array([valid_similarities]), temp=1e-5) | |
# Calculate rewards with renormalized similarities | |
rewards = calculate_rewards( | |
params['subscription'], | |
params['num_generations'], | |
params['author_share'], | |
params['ro_share'], | |
params['num_users_k'], | |
similarities, | |
params['num_authors'] | |
) | |
# Build final results | |
results = [] | |
for i, result in enumerate(valid_results): | |
other_info = get_other_info(result['url']) | |
results.append({ | |
'image_url': result['url'], | |
'rewards': rewards[i], | |
'other': other_info | |
}) | |
return jsonify({'results': results}) | |
except Exception as e: | |
return jsonify({'error': str(e)}), 500 | |
def process_images(): | |
if 'image1' not in request.files or 'image2' not in request.files: | |
return jsonify({'error': 'Two images must be provided (image1 and image2)'}), 400 | |
try: | |
# Charger les deux images | |
image_file1 = request.files['image1'] | |
image1 = Image.open(io.BytesIO(image_file1.read())).convert('RGB') | |
image_file2 = request.files['image2'] | |
image2 = Image.open(io.BytesIO(image_file2.read())).convert('RGB') | |
# Extraire les features des deux images | |
features1 = get_ft(model, image1) | |
features2 = get_ft(model, image2) | |
# Calculer la distance euclidienne entre les deux feature vectors | |
distance = float(np.linalg.norm(features1 - features2)) # Convertir en float Python natif pour JSON | |
# Retourner la distance | |
return jsonify({'distance': distance}) | |
except Exception as e: | |
return jsonify({'error': str(e)}), 500 | |
if __name__ == '__main__': | |
init_model() | |
app.run(host='0.0.0.0', port=7860) | |