Spaces:
Running
Running
from flask import Flask, render_template, request, jsonify, session, redirect, url_for | |
from pymongo import MongoClient | |
from werkzeug.security import generate_password_hash, check_password_hash | |
from datetime import datetime, timedelta | |
import os | |
from functools import wraps | |
import uuid | |
import base64 | |
import random | |
from dotenv import load_dotenv | |
load_dotenv() | |
app = Flask(__name__) | |
app.secret_key = os.urandom(24) | |
# Configure session to be permanent and last for 30 days | |
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=30) | |
app.config['SESSION_COOKIE_SECURE'] = False # Set to True in production with HTTPS | |
app.config['SESSION_COOKIE_HTTPONLY'] = True | |
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax' | |
# MongoDB connection | |
MONGO_URI = os.getenv('MONGO_URI') | |
client = MongoClient(MONGO_URI) | |
db = client.startpage | |
# Collections | |
users = db.users | |
user_bookmarks = db.bookmarks | |
user_notes = db.notes | |
site_stats = db.site_stats | |
vault_passwords = db.vault_passwords | |
def login_required(f): | |
def decorated_function(*args, **kwargs): | |
if 'user_id' not in session: | |
return jsonify({'error': 'Authentication required'}), 401 | |
return f(*args, **kwargs) | |
return decorated_function | |
def index(): | |
# Track page visit | |
today = datetime.now().strftime('%Y-%m-%d') | |
site_stats.update_one( | |
{'date': today}, | |
{'$inc': {'page_visits': 1}}, | |
upsert=True | |
) | |
# Check if user is logged in | |
if 'user_id' not in session: | |
return render_template('auth.html') | |
return render_template('index.html', username=session.get('username', 'User')) | |
def register(): | |
data = request.json | |
username = data.get('username') | |
password = data.get('password') | |
if not username or not password: | |
return jsonify({'error': 'Username and password required'}), 400 | |
if users.find_one({'username': username}): | |
return jsonify({'error': 'Username already exists'}), 400 | |
user_id = str(uuid.uuid4()) | |
hashed_password = generate_password_hash(password) | |
users.insert_one({ | |
'user_id': user_id, | |
'username': username, | |
'password': hashed_password, | |
'created_at': datetime.now() | |
}) | |
# Add default bookmarks for new user | |
default_bookmarks = [ | |
{ 'name': 'YouTube', 'url': 'https://youtube.com', 'icon': 'play_circle', 'user_id': user_id, 'created_at': datetime.now() }, | |
{ 'name': 'GitHub', 'url': 'https://github.com', 'icon': 'code', 'user_id': user_id, 'created_at': datetime.now() }, | |
{ 'name': 'Gmail', 'url': 'https://gmail.com', 'icon': 'mail', 'user_id': user_id, 'created_at': datetime.now() }, | |
{ 'name': 'Google Drive', 'url': 'https://drive.google.com', 'icon': 'cloud', 'user_id': user_id, 'created_at': datetime.now() }, | |
{ 'name': 'Netflix', 'url': 'https://netflix.com', 'icon': 'movie', 'user_id': user_id, 'created_at': datetime.now() }, | |
{ 'name': 'Reddit', 'url': 'https://reddit.com', 'icon': 'forum', 'user_id': user_id, 'created_at': datetime.now() }, | |
{ 'name': 'Twitter', 'url': 'https://twitter.com', 'icon': 'alternate_email', 'user_id': user_id, 'created_at': datetime.now() }, | |
{ 'name': 'LinkedIn', 'url': 'https://linkedin.com', 'icon': 'work', 'user_id': user_id, 'created_at': datetime.now() } | |
] | |
user_bookmarks.insert_many(default_bookmarks) | |
# Make session permanent so it persists across browser sessions | |
session.permanent = True | |
session['user_id'] = user_id | |
session['username'] = username | |
return jsonify({'message': 'Registration successful'}) | |
def login(): | |
data = request.json | |
username = data.get('username') | |
password = data.get('password') | |
remember_me = data.get('rememberMe', True) # Default to True for backward compatibility | |
if not username or not password: | |
return jsonify({'error': 'Username and password required'}), 400 | |
user = users.find_one({'username': username}) | |
if not user or not check_password_hash(user['password'], password): | |
return jsonify({'error': 'Invalid credentials'}), 401 | |
# Set session permanence based on user choice | |
session.permanent = remember_me | |
session['user_id'] = user['user_id'] | |
session['username'] = username | |
session['remember_me'] = remember_me | |
return jsonify({'message': 'Login successful'}) | |
def logout(): | |
session.clear() | |
return jsonify({'message': 'Logout successful'}) | |
def get_bookmarks(): | |
user_id = session['user_id'] | |
bookmarks = list(user_bookmarks.find({'user_id': user_id}, {'_id': 0})) | |
return jsonify(bookmarks) | |
def save_bookmarks(): | |
user_id = session['user_id'] | |
data = request.json | |
user_bookmarks.delete_many({'user_id': user_id}) | |
for bookmark in data.get('bookmarks', []): | |
bookmark['user_id'] = user_id | |
bookmark['created_at'] = datetime.now() | |
user_bookmarks.insert_one(bookmark) | |
return jsonify({'message': 'Bookmarks saved'}) | |
def get_notes(): | |
user_id = session['user_id'] | |
notes = list(user_notes.find({'user_id': user_id}, {'_id': 0})) | |
return jsonify(notes) | |
def save_notes(): | |
user_id = session['user_id'] | |
data = request.json | |
user_notes.delete_many({'user_id': user_id}) | |
for note in data.get('notes', []): | |
note['user_id'] = user_id | |
note['updated_at'] = datetime.now() | |
user_notes.insert_one(note) | |
return jsonify({'message': 'Notes saved'}) | |
def track_search(): | |
today = datetime.now().strftime('%Y-%m-%d') | |
site_stats.update_one( | |
{'date': today}, | |
{'$inc': {'search_count': 1}}, | |
upsert=True | |
) | |
return jsonify({'message': 'Search tracked'}) | |
def get_stats(): | |
today = datetime.now().strftime('%Y-%m-%d') | |
stats = site_stats.find_one({'date': today}) | |
if not stats: | |
return jsonify({'page_visits': 0, 'search_count': 0}) | |
return jsonify({ | |
'page_visits': stats.get('page_visits', 0), | |
'search_count': stats.get('search_count', 0) | |
}) | |
def change_password(): | |
data = request.json | |
current_password = data.get('currentPassword') | |
new_password = data.get('newPassword') | |
if not current_password or not new_password: | |
return jsonify({'error': 'Current password and new password required'}), 400 | |
user_id = session['user_id'] | |
user = users.find_one({'user_id': user_id}) | |
if not user or not check_password_hash(user['password'], current_password): | |
return jsonify({'error': 'Current password is incorrect'}), 400 | |
if len(new_password) < 6: | |
return jsonify({'error': 'Password must be at least 6 characters long'}), 400 | |
# Update password | |
hashed_password = generate_password_hash(new_password) | |
users.update_one( | |
{'user_id': user_id}, | |
{'$set': {'password': hashed_password, 'updated_at': datetime.now()}} | |
) | |
return jsonify({'message': 'Password changed successfully'}) | |
def get_developer_stats(): | |
# Total users count | |
total_users = users.count_documents({}) | |
# Total visits and searches across all time | |
all_stats = list(site_stats.find({})) | |
total_visits = sum(stat.get('page_visits', 0) for stat in all_stats) | |
total_searches = sum(stat.get('search_count', 0) for stat in all_stats) | |
# Monthly data (last 12 months) | |
monthly_data = [] | |
current_date = datetime.now() | |
for i in range(12): | |
# Calculate the first day of each month | |
target_date = datetime(current_date.year, current_date.month - i, 1) if current_date.month - i > 0 else datetime(current_date.year - 1, current_date.month - i + 12, 1) | |
# Find all stats for that month | |
start_of_month = target_date | |
if target_date.month == 12: | |
end_of_month = datetime(target_date.year + 1, 1, 1) | |
else: | |
end_of_month = datetime(target_date.year, target_date.month + 1, 1) | |
# Query stats for the month | |
month_stats = site_stats.aggregate([ | |
{ | |
'$match': { | |
'date': { | |
'$gte': start_of_month.strftime('%Y-%m-%d'), | |
'$lt': end_of_month.strftime('%Y-%m-%d') | |
} | |
} | |
}, | |
{ | |
'$group': { | |
'_id': None, | |
'total_visits': {'$sum': '$page_visits'}, | |
'total_searches': {'$sum': '$search_count'} | |
} | |
} | |
]) | |
month_result = list(month_stats) | |
month_name = target_date.strftime('%b') | |
if month_result: | |
monthly_data.append({ | |
'month': month_name, | |
'visits': month_result[0]['total_visits'], | |
'searches': month_result[0]['total_searches'] | |
}) | |
else: | |
monthly_data.append({ | |
'month': month_name, | |
'visits': 0, | |
'searches': 0 | |
}) | |
# Reverse to get chronological order | |
monthly_data.reverse() | |
return jsonify({ | |
'totalUsers': total_users, | |
'totalVisits': total_visits, | |
'totalSearches': total_searches, | |
'monthlyData': monthly_data | |
}) | |
# Vault API endpoints | |
def authenticate_vault(): | |
data = request.json | |
password = data.get('password') | |
if not password: | |
return jsonify({'error': 'Password required'}), 400 | |
user_id = session['user_id'] | |
user = users.find_one({'user_id': user_id}) | |
if not user or not check_password_hash(user['password'], password): | |
return jsonify({'error': 'Invalid password'}), 401 | |
return jsonify({'message': 'Authentication successful'}) | |
def get_vault_passwords(): | |
user_id = session['user_id'] | |
passwords = list(vault_passwords.find({'user_id': user_id}, {'_id': 0})) | |
# Decode passwords for display (simple base64 encoding for demo) | |
for password in passwords: | |
try: | |
password['password'] = base64.b64decode(password['password'].encode()).decode() | |
except: | |
pass # Keep original if decoding fails | |
return jsonify(passwords) | |
def save_vault_password(): | |
user_id = session['user_id'] | |
data = request.json | |
# Validate required fields | |
if not data.get('title') or not data.get('password'): | |
return jsonify({'error': 'Title and password are required'}), 400 | |
# Simple base64 encoding for password storage (demo purposes) | |
encoded_password = base64.b64encode(data['password'].encode()).decode() | |
password_entry = { | |
'user_id': user_id, | |
'title': data.get('title'), | |
'username': data.get('username', ''), | |
'password': encoded_password, | |
'website': data.get('website', ''), | |
'notes': data.get('notes', ''), | |
'created_at': datetime.now(), | |
'updated_at': datetime.now() | |
} | |
# Check if updating existing entry | |
if data.get('id'): | |
password_entry['id'] = data['id'] | |
vault_passwords.update_one( | |
{'user_id': user_id, 'id': data['id']}, | |
{'$set': password_entry} | |
) | |
else: | |
password_entry['id'] = str(uuid.uuid4()) | |
vault_passwords.insert_one(password_entry) | |
return jsonify({'message': 'Password saved successfully'}) | |
def delete_vault_password(password_id): | |
user_id = session['user_id'] | |
result = vault_passwords.delete_one({ | |
'user_id': user_id, | |
'id': password_id | |
}) | |
if result.deleted_count > 0: | |
return jsonify({'message': 'Password deleted successfully'}) | |
else: | |
return jsonify({'error': 'Password not found'}), 404 | |
if __name__ == '__main__': | |
app.run(debug=True, port=7860) |