from flask import Flask, request, jsonify, send_file from flask_cors import CORS from werkzeug.utils import secure_filename import tempfile import uuid import os import io import base64 import time import json import hashlib import qrcode from PIL import Image import requests import random import string from flask_socketio import SocketIO, emit, join_room, leave_room import threading app = Flask(__name__) CORS(app) # FIX: Configure SocketIO for production deployment socketio = SocketIO(app, cors_allowed_origins="*", async_mode='threading', logger=False, engineio_logger=False) # ADD THIS new storage for chat rooms: CHAT_ROOMS = {} # { room_id: { admin_session, active_sessions, settings, created_at, expires_at } } CHAT_MESSAGES = {} # In-memory storage SECRETS = {} # { id: { data, file_data, file_type, expire_at, view_once, theme, analytics, etc. } } SHORT_LINKS = {} # { short_id: full_id } ANALYTICS = {} # { secret_id: [analytics_entries] } # Configuration MAX_FILE_SIZE = 5 * 1024 * 1024 # 5MB ALLOWED_EXTENSIONS = { 'image': ['png', 'jpg', 'jpeg', 'gif', 'webp', 'svg'], 'video': ['mp4', 'avi', 'mov', 'wmv', 'flv', 'webm'], 'audio': ['mp3', 'wav', 'ogg', 'aac', 'm4a'], 'document': ['pdf', 'txt', 'doc', 'docx', 'rtf', 'odt'] } def get_file_type(filename): """Determine file type based on extension""" if not filename: return 'unknown' ext = filename.rsplit('.', 1)[1].lower() if '.' in filename else '' for file_type, extensions in ALLOWED_EXTENSIONS.items(): if ext in extensions: return file_type return 'unknown' def generate_short_id(): """Generate a short, unique ID""" return ''.join(random.choices(string.ascii_lowercase + string.digits, k=6)) def get_client_ip(request): """Get client IP address""" if request.headers.get('X-Forwarded-For'): return request.headers.get('X-Forwarded-For').split(',')[0].strip() elif request.headers.get('X-Real-IP'): return request.headers.get('X-Real-IP') else: return request.remote_addr def get_location_info(ip): """Get location information from IP (mock implementation)""" # In production, use a real geolocation service like ipapi.co, ipstack.com, etc. try: # Mock data - replace with real API call if ip == '127.0.0.1' or ip.startswith('192.168.'): return { 'country': 'Local', 'city': 'Local', 'region': 'Local', 'timezone': 'Local' } # Example with ipapi.co (uncomment for production) # response = requests.get(f'https://ipapi.co/{ip}/json/', timeout=5) # if response.status_code == 200: # data = response.json() # return { # 'country': data.get('country_name', 'Unknown'), # 'city': data.get('city', 'Unknown'), # 'region': data.get('region', 'Unknown'), # 'timezone': data.get('timezone', 'Unknown') # } return { 'country': 'Unknown', 'city': 'Unknown', 'region': 'Unknown', 'timezone': 'Unknown' } except: return { 'country': 'Unknown', 'city': 'Unknown', 'region': 'Unknown', 'timezone': 'Unknown' } def generate_qr_code(data): """Generate QR code for the given data""" qr = qrcode.QRCode( version=1, error_correction=qrcode.constants.ERROR_CORRECT_L, box_size=10, border=4, ) qr.add_data(data) qr.make(fit=True) img = qr.make_image(fill_color="black", back_color="white") # Convert to base64 buffer = io.BytesIO() img.save(buffer, format='PNG') img_str = base64.b64encode(buffer.getvalue()).decode() return f"data:image/png;base64,{img_str}" def record_access(secret_id, request): """Record access analytics""" ip = get_client_ip(request) location = get_location_info(ip) user_agent = request.headers.get('User-Agent', '') # Determine device type device_type = 'desktop' if any(mobile in user_agent.lower() for mobile in ['mobile', 'android', 'iphone', 'ipad']): device_type = 'mobile' analytics_entry = { 'timestamp': time.time(), 'ip': ip, 'location': location, 'user_agent': user_agent, 'device_type': device_type, 'referer': request.headers.get('Referer', ''), 'accept_language': request.headers.get('Accept-Language', '') } if secret_id not in ANALYTICS: ANALYTICS[secret_id] = [] ANALYTICS[secret_id].append(analytics_entry) return analytics_entry @app.route("/") def index(): """Health check endpoint""" return jsonify({ "status": "running", "service": "Sharelock Backend", "version": "2.0.0", "features": [ "End-to-end encryption", "File uploads (5MB max)", "QR code generation", "Analytics tracking", "Short URLs", "Self-destruct messages", "Real-time chat rooms" ] }) @app.route("/api/store", methods=["POST"]) def store(): """Store encrypted secret with enhanced features""" try: form = request.form data = form.get("data") if not data: return jsonify({"error": "Data is required"}), 400 # Parse parameters ttl = int(form.get("ttl", 300)) view_once = form.get("view_once", "false").lower() == "true" delay_seconds = int(form.get("delay_seconds", 0)) theme = form.get("theme", "default") password_hint = form.get("password_hint", "") # Handle file upload file_data = None file_type = None file_name = None if 'file' in request.files: file = request.files['file'] if file and file.filename: # Check file size file.seek(0, os.SEEK_END) file_size = file.tell() file.seek(0) if file_size > MAX_FILE_SIZE: return jsonify({"error": f"File too large. Max size: {MAX_FILE_SIZE/1024/1024:.1f}MB"}), 400 # Process file file_name = secure_filename(file.filename) file_type = get_file_type(file_name) if file_type == 'unknown': return jsonify({"error": "File type not supported"}), 400 # Read and encode file file_content = file.read() file_data = base64.b64encode(file_content).decode('utf-8') # Generate IDs secret_id = str(uuid.uuid4()) short_id = generate_short_id() # Ensure short_id is unique while short_id in SHORT_LINKS: short_id = generate_short_id() # Store secret SECRETS[secret_id] = { "data": data, "file_data": file_data, "file_type": file_type, "file_name": file_name, "expire_at": time.time() + ttl, "view_once": view_once, "delay_seconds": delay_seconds, "theme": theme, "password_hint": password_hint, "created_at": time.time(), "creator_ip": get_client_ip(request), "access_count": 0 } # Store short link mapping SHORT_LINKS[short_id] = secret_id # Generate QR code base_url = request.host_url.rstrip('/') secret_url = f"{base_url}/tools/sharelock?id={secret_id}" qr_code = generate_qr_code(secret_url) return jsonify({ "id": secret_id, "short_id": short_id, "short_url": f"{base_url}/s/{short_id}", "qr_code": qr_code, "expires_at": SECRETS[secret_id]["expire_at"], "has_file": file_data is not None }) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/fetch/") def fetch(secret_id): """Fetch and decrypt secret with analytics - MODIFIED TO HANDLE verify_only""" try: # Check if it's a short link if secret_id in SHORT_LINKS: secret_id = SHORT_LINKS[secret_id] secret = SECRETS.get(secret_id) if not secret: return jsonify({"error": "Secret not found"}), 404 # Check expiration if time.time() > secret["expire_at"]: # Clean up expired secret if secret_id in SECRETS: del SECRETS[secret_id] # Clean up short link for short_id, full_id in list(SHORT_LINKS.items()): if full_id == secret_id: del SHORT_LINKS[short_id] return jsonify({"error": "Secret has expired"}), 410 # CHECK FOR verify_only PARAMETER verify_only = request.args.get('verify_only', 'false').lower() == 'true' # Only record access and increment count if NOT verify_only if not verify_only: # Record access analytics analytics_entry = record_access(secret_id, request) # Increment access count secret["access_count"] += 1 # Prepare response response = { "data": secret["data"], "theme": secret.get("theme", "default"), "delay_seconds": secret.get("delay_seconds", 0), "password_hint": secret.get("password_hint", ""), "access_count": secret["access_count"] } # Include file data if present if secret.get("file_data"): response["file_data"] = secret["file_data"] response["file_type"] = secret.get("file_type", "unknown") response["file_name"] = secret.get("file_name", "unknown") # Handle view-once deletion (only if not verify_only) if secret["view_once"] and not verify_only: # Delete the secret del SECRETS[secret_id] # Clean up short link for short_id, full_id in list(SHORT_LINKS.items()): if full_id == secret_id: del SHORT_LINKS[short_id] break return jsonify(response) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/analytics/") def get_analytics(secret_id): """Get analytics for a specific secret - MODIFIED TO HANDLE verify_only""" try: # CHECK FOR verify_only PARAMETER verify_only = request.args.get('verify_only', 'false').lower() == 'true' # Verify secret exists or existed if secret_id not in SECRETS and secret_id not in ANALYTICS: return jsonify({"error": "Secret not found"}), 404 # Only record access if NOT verify_only if not verify_only: # Record access analytics for the analytics request itself record_access(secret_id, request) analytics_data = ANALYTICS.get(secret_id, []) # Format analytics for frontend formatted_analytics = [] for entry in analytics_data: formatted_analytics.append({ "timestamp": entry["timestamp"], "datetime": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(entry["timestamp"])), "ip": entry["ip"], "location": entry["location"], "device_type": entry["device_type"], "user_agent": entry["user_agent"][:100] + "..." if len(entry["user_agent"]) > 100 else entry["user_agent"] }) return jsonify({ "secret_id": secret_id, "total_accesses": len(formatted_analytics), "analytics": formatted_analytics }) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/secrets") def list_secrets(): """List all active secrets (for dashboard)""" try: current_time = time.time() active_secrets = [] for secret_id, secret in SECRETS.items(): if current_time <= secret["expire_at"]: # Find short link short_id = None for s_id, full_id in SHORT_LINKS.items(): if full_id == secret_id: short_id = s_id break active_secrets.append({ "id": secret_id, "short_id": short_id, "created_at": secret["created_at"], "expires_at": secret["expire_at"], "view_once": secret["view_once"], "has_file": secret.get("file_data") is not None, "file_type": secret.get("file_type"), "theme": secret.get("theme", "default"), "access_count": secret.get("access_count", 0), "preview": secret["data"][:100] + "..." if len(secret["data"]) > 100 else secret["data"] }) return jsonify({ "secrets": active_secrets, "total": len(active_secrets) }) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/delete/", methods=["DELETE"]) def delete_secret(secret_id): """Manually delete a secret - MODIFIED TO HANDLE verify_only""" try: # CHECK FOR verify_only PARAMETER verify_only = request.args.get('verify_only', 'false').lower() == 'true' if secret_id not in SECRETS: return jsonify({"error": "Secret not found"}), 404 # Only record access if NOT verify_only if not verify_only: # Record access analytics for the delete request record_access(secret_id, request) # Delete secret del SECRETS[secret_id] # Clean up short link for short_id, full_id in list(SHORT_LINKS.items()): if full_id == secret_id: del SHORT_LINKS[short_id] break return jsonify({"message": "Secret deleted successfully"}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/s/") def redirect_short_link(short_id): """Redirect short link to full URL""" if short_id not in SHORT_LINKS: return jsonify({"error": "Short link not found"}), 404 secret_id = SHORT_LINKS[short_id] base_url = request.host_url.rstrip('/') return f""" Sharelock - Redirecting...

Redirecting to secure message...

If you are not redirected automatically, click here.

""" @app.route("/api/qr/") def get_qr_code(secret_id): """Generate QR code for a secret""" try: if secret_id not in SECRETS: return jsonify({"error": "Secret not found"}), 404 base_url = request.host_url.rstrip('/') secret_url = f"{base_url}/tools/sharelock?id={secret_id}" qr_code = generate_qr_code(secret_url) return jsonify({"qr_code": qr_code}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/stats") def get_stats(): """Get overall statistics""" try: total_secrets = len(SECRETS) total_accesses = sum(len(analytics) for analytics in ANALYTICS.values()) # Count by file type file_types = {} for secret in SECRETS.values(): file_type = secret.get("file_type", "text") file_types[file_type] = file_types.get(file_type, 0) + 1 # Count by theme themes = {} for secret in SECRETS.values(): theme = secret.get("theme", "default") themes[theme] = themes.get(theme, 0) + 1 return jsonify({ "total_secrets": total_secrets, "total_accesses": total_accesses, "file_types": file_types, "themes": themes, "active_short_links": len(SHORT_LINKS) }) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/cleanup", methods=["POST"]) def cleanup_expired(): """Clean up expired secrets""" try: current_time = time.time() expired_count = 0 # Find expired secrets expired_secrets = [] for secret_id, secret in SECRETS.items(): if current_time > secret["expire_at"]: expired_secrets.append(secret_id) # Delete expired secrets for secret_id in expired_secrets: del SECRETS[secret_id] expired_count += 1 # Clean up short links for short_id, full_id in list(SHORT_LINKS.items()): if full_id == secret_id: del SHORT_LINKS[short_id] break return jsonify({ "message": f"Cleaned up {expired_count} expired secrets", "expired_count": expired_count }) except Exception as e: return jsonify({"error": str(e)}), 500 # ADD THESE CHAT ENDPOINTS: @app.route("/api/chat/create", methods=["POST"]) def create_chat_room(): try: form = request.form ttl = int(form.get("ttl", 3600)) max_receivers = int(form.get("max_receivers", 5)) password = form.get("password", "") allow_files = form.get("allow_files", "true").lower() == "true" room_id = str(uuid.uuid4()) admin_session = str(uuid.uuid4()) CHAT_ROOMS[room_id] = { "admin_session": admin_session, "created_at": time.time(), "expires_at": time.time() + ttl, "settings": { "max_receivers": max_receivers, "password": password, "allow_files": allow_files, "burn_on_admin_exit": True }, "active_sessions": {}, "receiver_counter": 0 } CHAT_MESSAGES[room_id] = [] # Return only IDs - let frontend create URLs return jsonify({ "room_id": room_id, "admin_session": admin_session, "expires_at": CHAT_ROOMS[room_id]["expires_at"] }) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/chat/join/") def join_chat_room(room_id): try: password = request.args.get("password", "") admin_session = request.args.get("admin", "") if room_id not in CHAT_ROOMS: return jsonify({"error": "Chat room not found"}), 404 room = CHAT_ROOMS[room_id] # Check if expired if time.time() > room["expires_at"]: return jsonify({"error": "Chat room has expired"}), 410 # Check password if room["settings"]["password"] and password != room["settings"]["password"]: return jsonify({"error": "Wrong password"}), 403 # FIXED: Proper role assignment if admin_session and admin_session == room["admin_session"]: # Only admin if the session matches the room's admin session role = "admin" session_id = admin_session receiver_number = None else: # Everyone else is a receiver active_receivers = sum(1 for s in room["active_sessions"].values() if s["role"] == "receiver") if active_receivers >= room["settings"]["max_receivers"]: return jsonify({"error": "Chat room is full"}), 403 role = "receiver" session_id = str(uuid.uuid4()) room["receiver_counter"] += 1 receiver_number = room["receiver_counter"] return jsonify({ "session_id": session_id, "role": role, "receiver_number": receiver_number, "room_settings": room["settings"], "expires_at": room["expires_at"] }) except Exception as e: return jsonify({"error": str(e)}), 500 # ADD WEBSOCKET EVENTS: @socketio.on('join_chat') def handle_join_chat(data): room_id = data['room_id'] session_id = data['session_id'] role = data['role'] if room_id not in CHAT_ROOMS: return join_room(room_id) # Add to active sessions CHAT_ROOMS[room_id]["active_sessions"][session_id] = { "role": role, "receiver_number": data.get('receiver_number'), "joined_at": time.time(), "last_seen": time.time() } # Notify others emit('user_joined', { 'role': role, 'receiver_number': data.get('receiver_number'), 'active_count': len(CHAT_ROOMS[room_id]["active_sessions"]) }, room=room_id, include_self=False) @socketio.on('start_typing') def handle_start_typing(data): room_id = data['room_id'] session_id = data['session_id'] if room_id in CHAT_ROOMS and session_id in CHAT_ROOMS[room_id]["active_sessions"]: session = CHAT_ROOMS[room_id]["active_sessions"][session_id] emit('user_typing', { 'role': session["role"], 'receiver_number': session.get("receiver_number") }, room=room_id, include_self=False) @socketio.on('stop_typing') def handle_stop_typing(data): room_id = data['room_id'] session_id = data['session_id'] if room_id in CHAT_ROOMS and session_id in CHAT_ROOMS[room_id]["active_sessions"]: session = CHAT_ROOMS[room_id]["active_sessions"][session_id] emit('user_stopped_typing', { 'role': session["role"], 'receiver_number': session.get("receiver_number") }, room=room_id, include_self=False) @socketio.on('send_chat_message') def handle_chat_message(data): room_id = data['room_id'] session_id = data['session_id'] if room_id not in CHAT_ROOMS or session_id not in CHAT_ROOMS[room_id]["active_sessions"]: return session = CHAT_ROOMS[room_id]["active_sessions"][session_id] message = { "id": str(uuid.uuid4()), "sender_role": session["role"], "sender_number": session.get("receiver_number"), "content": data['content'], "timestamp": time.time(), "type": "text" } CHAT_MESSAGES[room_id].append(message) # Broadcast to room emit('new_message', message, room=room_id) @socketio.on('leave_chat') def handle_leave_chat(data): room_id = data['room_id'] session_id = data['session_id'] if room_id in CHAT_ROOMS and session_id in CHAT_ROOMS[room_id]["active_sessions"]: session = CHAT_ROOMS[room_id]["active_sessions"][session_id] del CHAT_ROOMS[room_id]["active_sessions"][session_id] # If admin left and burn_on_admin_exit is true if session["role"] == "admin" and CHAT_ROOMS[room_id]["settings"]["burn_on_admin_exit"]: emit('room_closing', {'reason': 'Admin left the room'}, room=room_id) del CHAT_ROOMS[room_id] if room_id in CHAT_MESSAGES: del CHAT_MESSAGES[room_id] else: emit('user_left', { 'role': session["role"], 'receiver_number': session.get("receiver_number"), 'active_count': len(CHAT_ROOMS[room_id]["active_sessions"]) }, room=room_id) leave_room(room_id) # Error handlers @app.errorhandler(404) def not_found(error): return jsonify({"error": "Endpoint not found"}), 404 @app.errorhandler(500) def internal_error(error): return jsonify({"error": "Internal server error"}), 500 # FIX: Modified startup section if __name__ == "__main__": print("🔐 Sharelock Backend Starting...") print("📊 Features enabled:") print(" ✅ End-to-end encryption") print(" ✅ File uploads (5MB max)") print(" ✅ QR code generation") print(" ✅ Analytics tracking") print(" ✅ Short URLs") print(" ✅ Self-destruct messages") print(" ✅ Multiple themes") print(" ✅ Password hints") print(" ✅ verify_only parameter support") print(" ✅ Real-time chat rooms") print("🚀 Server running on http://0.0.0.0:7860") # FIX: Add allow_unsafe_werkzeug=True for HuggingFace Spaces socketio.run(app, host="0.0.0.0", port=7860, debug=False, allow_unsafe_werkzeug=True)