from cryptography.fernet import Fernet import json import os import base64 import hashlib from typing import Dict, Any, Optional from datetime import datetime, timedelta class SecureStorage: """Hassas verileri şifreleyerek saklamak için sınıf""" def __init__(self, storage_dir: str = "./secure_data"): """ Args: storage_dir: Şifreli verilerin saklanacağı dizin """ # Depolama dizinini oluştur os.makedirs(storage_dir, exist_ok=True) self.storage_dir = storage_dir # Şifreleme anahtarı self.key = os.environ.get("ENCRYPTION_KEY") if not self.key: # Anahtar yoksa, yeni bir anahtar oluştur ve kaydet self.key = Fernet.generate_key().decode() print(f"YENİ ŞİFRELEME ANAHTARI OLUŞTURULDU! Bunu güvenli bir yerde saklayın: {self.key}") # Fernet şifreleme nesnesi self.fernet = Fernet(self.key.encode() if isinstance(self.key, str) else self.key) def store_data(self, data: Dict[str, Any], session_id: str, ttl_days: int = 30) -> str: """ Hassas verileri şifreleyerek saklar Args: data: Saklanacak veriler session_id: Benzersiz oturum kimliği ttl_days: Verilerin saklanacağı süre (gün) Returns: Dosya yolu """ # Meta verileri ekle data_with_meta = { "data": data, "created_at": datetime.now().isoformat(), "expires_at": (datetime.now() + timedelta(days=ttl_days)).isoformat(), "session_id": session_id } # JSON'a dönüştür ve şifrele json_data = json.dumps(data_with_meta) encrypted_data = self.fernet.encrypt(json_data.encode()) # Dosyaya kaydet file_path = os.path.join(self.storage_dir, f"{session_id}.enc") with open(file_path, "wb") as f: f.write(encrypted_data) return file_path def retrieve_data(self, session_id: str) -> Optional[Dict[str, Any]]: """ Şifreli verileri getirir ve çözer Args: session_id: Benzersiz oturum kimliği Returns: Çözülmüş veriler veya None (eğer yoksa) """ file_path = os.path.join(self.storage_dir, f"{session_id}.enc") if not os.path.exists(file_path): return None try: # Şifreli veriyi oku with open(file_path, "rb") as f: encrypted_data = f.read() # Şifreyi çöz decrypted_data = self.fernet.decrypt(encrypted_data).decode() data_with_meta = json.loads(decrypted_data) # Süre kontrolü expires_at = datetime.fromisoformat(data_with_meta["expires_at"]) if datetime.now() > expires_at: # Süresi dolmuş veriler os.remove(file_path) # Dosyayı sil return None return data_with_meta["data"] except Exception as e: print(f"Veri çözme hatası: {str(e)}") return None def delete_data(self, session_id: str) -> bool: """ Şifreli verileri siler Args: session_id: Benzersiz oturum kimliği Returns: Başarılı olup olmadığı """ file_path = os.path.join(self.storage_dir, f"{session_id}.enc") if os.path.exists(file_path): os.remove(file_path) return True return False