VoiceToWrite / secure_storage.py
Seicas's picture
Upload 4 files
acabbdd verified
raw
history blame
3.88 kB
from cryptography.fernet import Fernet
import json
import os
import base64
import hashlib
from typing import Dict, Any, Optional
from datetime import datetime, timedelta
class SecureStorage:
"""Hassas verileri şifreleyerek saklamak için sınıf"""
def __init__(self, storage_dir: str = "./secure_data"):
"""
Args:
storage_dir: Şifreli verilerin saklanacağı dizin
"""
# Depolama dizinini oluştur
os.makedirs(storage_dir, exist_ok=True)
self.storage_dir = storage_dir
# Şifreleme anahtarı
self.key = os.environ.get("ENCRYPTION_KEY")
if not self.key:
# Anahtar yoksa, yeni bir anahtar oluştur ve kaydet
self.key = Fernet.generate_key().decode()
print(f"YENİ ŞİFRELEME ANAHTARI OLUŞTURULDU! Bunu güvenli bir yerde saklayın: {self.key}")
# Fernet şifreleme nesnesi
self.fernet = Fernet(self.key.encode() if isinstance(self.key, str) else self.key)
def store_data(self, data: Dict[str, Any], session_id: str, ttl_days: int = 30) -> str:
"""
Hassas verileri şifreleyerek saklar
Args:
data: Saklanacak veriler
session_id: Benzersiz oturum kimliği
ttl_days: Verilerin saklanacağı süre (gün)
Returns:
Dosya yolu
"""
# Meta verileri ekle
data_with_meta = {
"data": data,
"created_at": datetime.now().isoformat(),
"expires_at": (datetime.now() + timedelta(days=ttl_days)).isoformat(),
"session_id": session_id
}
# JSON'a dönüştür ve şifrele
json_data = json.dumps(data_with_meta)
encrypted_data = self.fernet.encrypt(json_data.encode())
# Dosyaya kaydet
file_path = os.path.join(self.storage_dir, f"{session_id}.enc")
with open(file_path, "wb") as f:
f.write(encrypted_data)
return file_path
def retrieve_data(self, session_id: str) -> Optional[Dict[str, Any]]:
"""
Şifreli verileri getirir ve çözer
Args:
session_id: Benzersiz oturum kimliği
Returns:
Çözülmüş veriler veya None (eğer yoksa)
"""
file_path = os.path.join(self.storage_dir, f"{session_id}.enc")
if not os.path.exists(file_path):
return None
try:
# Şifreli veriyi oku
with open(file_path, "rb") as f:
encrypted_data = f.read()
# Şifreyi çöz
decrypted_data = self.fernet.decrypt(encrypted_data).decode()
data_with_meta = json.loads(decrypted_data)
# Süre kontrolü
expires_at = datetime.fromisoformat(data_with_meta["expires_at"])
if datetime.now() > expires_at:
# Süresi dolmuş veriler
os.remove(file_path) # Dosyayı sil
return None
return data_with_meta["data"]
except Exception as e:
print(f"Veri çözme hatası: {str(e)}")
return None
def delete_data(self, session_id: str) -> bool:
"""
Şifreli verileri siler
Args:
session_id: Benzersiz oturum kimliği
Returns:
Başarılı olup olmadığı
"""
file_path = os.path.join(self.storage_dir, f"{session_id}.enc")
if os.path.exists(file_path):
os.remove(file_path)
return True
return False