Spaces:
Sleeping
Sleeping
| ##########modules/auth/auth.py | |
| import os | |
| import streamlit as st | |
| from azure.cosmos import CosmosClient, exceptions | |
| from azure.cosmos.exceptions import CosmosHttpResponseError | |
| import bcrypt | |
| import base64 | |
| from ..database.sql_db import ( | |
| get_user, | |
| get_student_user, | |
| get_admin_user, | |
| create_student_user, | |
| update_student_user, | |
| delete_student_user, | |
| record_login, | |
| record_logout | |
| ) | |
| import logging | |
| from datetime import datetime, timezone | |
| logger = logging.getLogger(__name__) | |
| def clean_and_validate_key(key): | |
| """Limpia y valida la clave de CosmosDB""" | |
| key = key.strip() | |
| while len(key) % 4 != 0: | |
| key += '=' | |
| try: | |
| base64.b64decode(key) | |
| return key | |
| except: | |
| raise ValueError("La clave proporcionada no es v谩lida") | |
| # Verificar las variables de entorno | |
| endpoint = os.getenv("COSMOS_ENDPOINT") | |
| key = os.getenv("COSMOS_KEY") | |
| if not endpoint or not key: | |
| raise ValueError("Las variables de entorno COSMOS_ENDPOINT y COSMOS_KEY deben estar configuradas") | |
| key = clean_and_validate_key(key) | |
| def authenticate_user(username, password): | |
| """Autentica un usuario y registra el inicio de sesi贸n""" | |
| try: | |
| user_item = get_user(username) | |
| if not user_item: | |
| logger.warning(f"Usuario no encontrado: {username}") | |
| return False, None | |
| if verify_password(user_item['password'], password): | |
| logger.info(f"Usuario autenticado: {username}, Rol: {user_item['role']}") | |
| try: | |
| session_id = record_login(username) | |
| if session_id: | |
| st.session_state.session_id = session_id | |
| st.session_state.username = username | |
| st.session_state.login_time = datetime.now(timezone.utc).isoformat() | |
| logger.info(f"Sesi贸n iniciada: {session_id}") | |
| else: | |
| logger.warning("No se pudo registrar la sesi贸n") | |
| except Exception as e: | |
| logger.error(f"Error al registrar inicio de sesi贸n: {str(e)}") | |
| return True, user_item['role'] | |
| logger.warning(f"Contrase帽a incorrecta para usuario: {username}") | |
| return False, None | |
| except Exception as e: | |
| logger.error(f"Error durante la autenticaci贸n del usuario: {str(e)}") | |
| return False, None | |
| def authenticate_student(username, password): | |
| """Autentica un estudiante""" | |
| success, role = authenticate_user(username, password) | |
| if success and role == 'Estudiante': | |
| return True, role | |
| return False, None | |
| def authenticate_admin(username, password): | |
| """Autentica un administrador""" | |
| success, role = authenticate_user(username, password) | |
| if success and role == 'Administrador': | |
| return True, role | |
| return False, None | |
| def register_student(username, password, additional_info=None): | |
| """Registra un nuevo estudiante""" | |
| try: | |
| if get_student_user(username): | |
| logger.warning(f"Estudiante ya existe: {username}") | |
| return False | |
| hashed_password = hash_password(password) | |
| # Asegurarse que additional_info tenga el rol correcto | |
| if not additional_info: | |
| additional_info = {} | |
| additional_info['role'] = 'Estudiante' | |
| success = create_student_user(username, hashed_password, additional_info) | |
| if success: | |
| logger.info(f"Nuevo estudiante registrado: {username}") | |
| return True | |
| logger.error(f"Error al crear estudiante: {username}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error al registrar estudiante: {str(e)}") | |
| return False | |
| def update_student_info(username, new_info): | |
| """Actualiza la informaci贸n de un estudiante""" | |
| try: | |
| if 'password' in new_info: | |
| new_info['password'] = hash_password(new_info['password']) | |
| success = update_student_user(username, new_info) | |
| if success: | |
| logger.info(f"Informaci贸n actualizada: {username}") | |
| return True | |
| logger.error(f"Error al actualizar: {username}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error en actualizaci贸n: {str(e)}") | |
| return False | |
| def delete_student(username): | |
| """Elimina un estudiante""" | |
| try: | |
| success = delete_student_user(username) | |
| if success: | |
| logger.info(f"Estudiante eliminado: {username}") | |
| return True | |
| logger.error(f"Error al eliminar: {username}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error en eliminaci贸n: {str(e)}") | |
| return False | |
| def logout(): | |
| """Cierra la sesi贸n del usuario""" | |
| try: | |
| if 'session_id' in st.session_state and 'username' in st.session_state: | |
| success = record_logout( | |
| st.session_state.username, | |
| st.session_state.session_id | |
| ) | |
| if success: | |
| logger.info(f"Sesi贸n cerrada: {st.session_state.username}") | |
| else: | |
| logger.warning(f"Error al registrar cierre de sesi贸n: {st.session_state.username}") | |
| except Exception as e: | |
| logger.error(f"Error en logout: {str(e)}") | |
| finally: | |
| st.session_state.clear() | |
| def hash_password(password): | |
| """Hashea una contrase帽a""" | |
| return bcrypt.hashpw( | |
| password.encode('utf-8'), | |
| bcrypt.gensalt() | |
| ).decode('utf-8') | |
| def verify_password(stored_password, provided_password): | |
| """Verifica una contrase帽a""" | |
| return bcrypt.checkpw( | |
| provided_password.encode('utf-8'), | |
| stored_password.encode('utf-8') | |
| ) | |
| __all__ = [ | |
| 'authenticate_user', | |
| 'authenticate_admin', | |
| 'authenticate_student', | |
| 'register_student', | |
| 'update_student_info', | |
| 'delete_student', | |
| 'logout', | |
| 'hash_password', | |
| 'verify_password' | |
| ] |