Spaces:
Running
Running
from cryptography.fernet import Fernet | |
import json | |
import os | |
import base64 | |
import hashlib | |
from typing import Dict, Any, Optional | |
from datetime import datetime, timedelta | |
class SecureStorage: | |
"""Hassas verileri şifreleyerek saklamak için sınıf""" | |
def __init__(self, storage_dir: str = "./secure_data"): | |
""" | |
Args: | |
storage_dir: Şifreli verilerin saklanacağı dizin | |
""" | |
# Depolama dizinini oluştur | |
os.makedirs(storage_dir, exist_ok=True) | |
self.storage_dir = storage_dir | |
# Şifreleme anahtarı | |
self.key = os.environ.get("ENCRYPTION_KEY") | |
if not self.key: | |
# Anahtar yoksa, yeni bir anahtar oluştur ve kaydet | |
self.key = Fernet.generate_key().decode() | |
print(f"YENİ ŞİFRELEME ANAHTARI OLUŞTURULDU! Bunu güvenli bir yerde saklayın: {self.key}") | |
# Fernet şifreleme nesnesi | |
self.fernet = Fernet(self.key.encode() if isinstance(self.key, str) else self.key) | |
def store_data(self, data: Dict[str, Any], session_id: str, ttl_days: int = 30) -> str: | |
""" | |
Hassas verileri şifreleyerek saklar | |
Args: | |
data: Saklanacak veriler | |
session_id: Benzersiz oturum kimliği | |
ttl_days: Verilerin saklanacağı süre (gün) | |
Returns: | |
Dosya yolu | |
""" | |
# Meta verileri ekle | |
data_with_meta = { | |
"data": data, | |
"created_at": datetime.now().isoformat(), | |
"expires_at": (datetime.now() + timedelta(days=ttl_days)).isoformat(), | |
"session_id": session_id | |
} | |
# JSON'a dönüştür ve şifrele | |
json_data = json.dumps(data_with_meta) | |
encrypted_data = self.fernet.encrypt(json_data.encode()) | |
# Dosyaya kaydet | |
file_path = os.path.join(self.storage_dir, f"{session_id}.enc") | |
with open(file_path, "wb") as f: | |
f.write(encrypted_data) | |
return file_path | |
def retrieve_data(self, session_id: str) -> Optional[Dict[str, Any]]: | |
""" | |
Şifreli verileri getirir ve çözer | |
Args: | |
session_id: Benzersiz oturum kimliği | |
Returns: | |
Çözülmüş veriler veya None (eğer yoksa) | |
""" | |
file_path = os.path.join(self.storage_dir, f"{session_id}.enc") | |
if not os.path.exists(file_path): | |
return None | |
try: | |
# Şifreli veriyi oku | |
with open(file_path, "rb") as f: | |
encrypted_data = f.read() | |
# Şifreyi çöz | |
decrypted_data = self.fernet.decrypt(encrypted_data).decode() | |
data_with_meta = json.loads(decrypted_data) | |
# Süre kontrolü | |
expires_at = datetime.fromisoformat(data_with_meta["expires_at"]) | |
if datetime.now() > expires_at: | |
# Süresi dolmuş veriler | |
os.remove(file_path) # Dosyayı sil | |
return None | |
return data_with_meta["data"] | |
except Exception as e: | |
print(f"Veri çözme hatası: {str(e)}") | |
return None | |
def delete_data(self, session_id: str) -> bool: | |
""" | |
Şifreli verileri siler | |
Args: | |
session_id: Benzersiz oturum kimliği | |
Returns: | |
Başarılı olup olmadığı | |
""" | |
file_path = os.path.join(self.storage_dir, f"{session_id}.enc") | |
if os.path.exists(file_path): | |
os.remove(file_path) | |
return True | |
return False |