import os import uuid import json import pandas as pd import numpy as np from datetime import datetime, timedelta from flask import Flask, request, jsonify, send_file from flask_cors import CORS from werkzeug.utils import secure_filename import threading import time import logging from scipy import stats import matplotlib matplotlib.use('Agg') # Use non-interactive backend import matplotlib.pyplot as plt import seaborn as sns import io import base64 from apscheduler.schedulers.background import BackgroundScheduler import atexit # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = Flask(__name__) CORS(app) # Configuration UPLOAD_FOLDER = '/tmp/uploads' PROCESSED_FOLDER = '/tmp/processed' MAX_FILE_SIZE = 512 * 1024 * 1024 # 512MB ALLOWED_EXTENSIONS = {'csv', 'xlsx', 'xls', 'json', 'parquet', 'tsv'} FILE_EXPIRY_HOURS = 1 # Ensure directories exist os.makedirs(UPLOAD_FOLDER, exist_ok=True) os.makedirs(PROCESSED_FOLDER, exist_ok=True) # File storage to track sessions and files file_storage = {} def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def get_file_age(filepath): """Get file age in hours""" if os.path.exists(filepath): file_time = os.path.getmtime(filepath) return (time.time() - file_time) / 3600 return float('inf') def cleanup_old_files(): """Remove files older than FILE_EXPIRY_HOURS""" try: for folder in [UPLOAD_FOLDER, PROCESSED_FOLDER]: for root, dirs, files in os.walk(folder): for file in files: filepath = os.path.join(root, file) if get_file_age(filepath) > FILE_EXPIRY_HOURS: os.remove(filepath) logger.info(f"Cleaned up old file: {filepath}") # Clean up file_storage entries current_time = datetime.now() sessions_to_remove = [] for session_id, files in file_storage.items(): files_to_remove = [] for file_id, file_info in files.items(): file_time = datetime.fromisoformat(file_info['timestamp']) if (current_time - file_time).total_seconds() > FILE_EXPIRY_HOURS * 3600: files_to_remove.append(file_id) for file_id in files_to_remove: del files[file_id] if not files: sessions_to_remove.append(session_id) for session_id in sessions_to_remove: del file_storage[session_id] except Exception as e: logger.error(f"Error during cleanup: {str(e)}") # Setup scheduler for automatic cleanup scheduler = BackgroundScheduler() scheduler.add_job(func=cleanup_old_files, trigger="interval", minutes=15) scheduler.start() atexit.register(lambda: scheduler.shutdown()) def load_data_file(filepath, filename): """Load data from various file formats""" try: file_ext = filename.rsplit('.', 1)[1].lower() if file_ext == 'csv': return pd.read_csv(filepath) elif file_ext in ['xlsx', 'xls']: return pd.read_excel(filepath) elif file_ext == 'json': return pd.read_json(filepath) elif file_ext == 'parquet': return pd.read_parquet(filepath) elif file_ext == 'tsv': return pd.read_csv(filepath, sep='\t') else: raise ValueError(f"Unsupported file format: {file_ext}") except Exception as e: raise Exception(f"Error loading file: {str(e)}") def perform_basic_statistics(df, columns=None): """Perform basic statistical analysis""" if columns: df = df[columns] numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist() categorical_cols = df.select_dtypes(exclude=[np.number]).columns.tolist() result = { 'numeric_summary': {}, 'categorical_summary': {}, 'general_info': { 'total_rows': len(df), 'total_columns': len(df.columns), 'numeric_columns': len(numeric_cols), 'categorical_columns': len(categorical_cols), 'missing_values': df.isnull().sum().to_dict() } } # Numeric statistics if numeric_cols: numeric_stats = df[numeric_cols].describe() result['numeric_summary'] = numeric_stats.to_dict() # Categorical statistics if categorical_cols: for col in categorical_cols: result['categorical_summary'][col] = { 'unique_values': df[col].nunique(), 'top_values': df[col].value_counts().head(10).to_dict(), 'missing_count': df[col].isnull().sum() } return result def perform_groupby_analysis(df, group_column, target_column, operation='mean', filters=None): """Perform group by analysis""" # Apply filters if provided if filters: for f in filters: col, op, val = f['column'], f['operator'], f['value'] if op == '>': df = df[df[col] > val] elif op == '<': df = df[df[col] < val] elif op == '==': df = df[df[col] == val] elif op == '!=': df = df[df[col] != val] elif op == '>=': df = df[df[col] >= val] elif op == '<=': df = df[df[col] <= val] # Perform groupby operation grouped = df.groupby(group_column)[target_column] if operation == 'mean': result = grouped.mean() elif operation == 'sum': result = grouped.sum() elif operation == 'count': result = grouped.count() elif operation == 'max': result = grouped.max() elif operation == 'min': result = grouped.min() elif operation == 'std': result = grouped.std() else: raise ValueError(f"Unsupported operation: {operation}") return { 'result': result.to_dict(), 'operation': operation, 'group_column': group_column, 'target_column': target_column, 'total_groups': len(result) } def perform_correlation_analysis(df, columns=None, method='pearson'): """Perform correlation analysis""" if columns: df = df[columns] # Only numeric columns numeric_df = df.select_dtypes(include=[np.number]) if numeric_df.empty: raise ValueError("No numeric columns found for correlation analysis") correlation_matrix = numeric_df.corr(method=method) return { 'correlation_matrix': correlation_matrix.to_dict(), 'method': method, 'columns': numeric_df.columns.tolist() } def detect_outliers(df, columns=None, method='iqr'): """Detect outliers in numeric columns""" if columns: df = df[columns] numeric_df = df.select_dtypes(include=[np.number]) outliers = {} for col in numeric_df.columns: if method == 'iqr': Q1 = numeric_df[col].quantile(0.25) Q3 = numeric_df[col].quantile(0.75) IQR = Q3 - Q1 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR outlier_indices = numeric_df[(numeric_df[col] < lower_bound) | (numeric_df[col] > upper_bound)].index.tolist() elif method == 'zscore': z_scores = np.abs(stats.zscore(numeric_df[col].dropna())) outlier_indices = numeric_df[z_scores > 3].index.tolist() outliers[col] = { 'count': len(outlier_indices), 'indices': outlier_indices[:100], # Limit to first 100 'percentage': (len(outlier_indices) / len(numeric_df)) * 100 } return outliers def generate_visualization(df, chart_type, x_column, y_column=None, group_column=None): """Generate visualization and return base64 encoded image""" plt.figure(figsize=(10, 6)) try: if chart_type == 'histogram': plt.hist(df[x_column], bins=30, alpha=0.7) plt.xlabel(x_column) plt.ylabel('Frequency') plt.title(f'Histogram of {x_column}') elif chart_type == 'scatter': if not y_column: raise ValueError("Y column required for scatter plot") plt.scatter(df[x_column], df[y_column], alpha=0.6) plt.xlabel(x_column) plt.ylabel(y_column) plt.title(f'{x_column} vs {y_column}') elif chart_type == 'bar': if group_column: grouped = df.groupby(group_column)[x_column].mean() if pd.api.types.is_numeric_dtype(df[x_column]) else df[group_column].value_counts() else: grouped = df[x_column].value_counts().head(20) grouped.plot(kind='bar') plt.xlabel(group_column or x_column) plt.ylabel('Count' if not pd.api.types.is_numeric_dtype(df[x_column]) else f'Mean {x_column}') plt.title(f'Bar Chart') plt.xticks(rotation=45) elif chart_type == 'line': if y_column: plt.plot(df[x_column], df[y_column]) plt.xlabel(x_column) plt.ylabel(y_column) else: df[x_column].plot() plt.ylabel(x_column) plt.title('Line Chart') elif chart_type == 'box': if group_column: df.boxplot(column=x_column, by=group_column) else: df.boxplot(column=x_column) plt.title('Box Plot') plt.tight_layout() # Convert plot to base64 string img_buffer = io.BytesIO() plt.savefig(img_buffer, format='png', dpi=150, bbox_inches='tight') img_buffer.seek(0) img_base64 = base64.b64encode(img_buffer.getvalue()).decode() plt.close() return img_base64 except Exception as e: plt.close() raise Exception(f"Error generating visualization: {str(e)}") def parse_natural_language_query(query, df_columns): """Simple natural language query parser""" query_lower = query.lower() # Define operation keywords operations = { 'average': 'mean', 'mean': 'mean', 'avg': 'mean', 'sum': 'sum', 'total': 'sum', 'count': 'count', 'number': 'count', 'max': 'max', 'maximum': 'max', 'highest': 'max', 'min': 'min', 'minimum': 'min', 'lowest': 'min' } # Find operation operation = 'mean' # default for keyword, op in operations.items(): if keyword in query_lower: operation = op break # Find columns mentioned in query mentioned_columns = [col for col in df_columns if col.lower() in query_lower] # Simple parsing patterns if 'by' in query_lower and len(mentioned_columns) >= 2: # Group by analysis target_col = mentioned_columns[0] group_col = mentioned_columns[-1] return { 'analysisType': 'groupby', 'parameters': { 'groupByColumn': group_col, 'targetColumn': target_col, 'operation': operation } } elif 'correlation' in query_lower: return { 'analysisType': 'correlation', 'parameters': { 'columns': mentioned_columns if mentioned_columns else None } } elif any(word in query_lower for word in ['chart', 'plot', 'graph', 'visualize']): chart_type = 'bar' # default if 'scatter' in query_lower: chart_type = 'scatter' elif 'line' in query_lower: chart_type = 'line' elif 'histogram' in query_lower: chart_type = 'histogram' return { 'analysisType': 'visualization', 'parameters': { 'chartType': chart_type, 'xColumn': mentioned_columns[0] if mentioned_columns else None, 'yColumn': mentioned_columns[1] if len(mentioned_columns) > 1 else None } } else: # Default to basic statistics return { 'analysisType': 'statistics', 'parameters': { 'columns': mentioned_columns if mentioned_columns else None } } @app.route('/api/health', methods=['GET']) def health_check(): return jsonify({'status': 'healthy', 'timestamp': datetime.now().isoformat()}) @app.route('/api/upload', methods=['POST']) def upload_file(): try: if 'file' not in request.files: return jsonify({'error': 'No file provided'}), 400 file = request.files['file'] session_id = request.form.get('sessionId') if not session_id: return jsonify({'error': 'Session ID required'}), 400 if file.filename == '': return jsonify({'error': 'No file selected'}), 400 if not allowed_file(file.filename): return jsonify({'error': 'File type not supported'}), 400 # Check file size file.seek(0, 2) # Seek to end file_size = file.tell() file.seek(0) # Reset to beginning if file_size > MAX_FILE_SIZE: return jsonify({'error': f'File too large. Maximum size is {MAX_FILE_SIZE // (1024*1024)}MB'}), 400 # Generate unique file ID and secure filename file_id = str(uuid.uuid4()) filename = secure_filename(file.filename) # Create session directory session_dir = os.path.join(UPLOAD_FOLDER, session_id) os.makedirs(session_dir, exist_ok=True) # Save file filepath = os.path.join(session_dir, f"{file_id}_{filename}") file.save(filepath) # Store file info if session_id not in file_storage: file_storage[session_id] = {} file_storage[session_id][file_id] = { 'filename': filename, 'filepath': filepath, 'size': file_size, 'timestamp': datetime.now().isoformat() } return jsonify({ 'fileId': file_id, 'filename': filename, 'size': file_size, 'message': 'File uploaded successfully' }) except Exception as e: logger.error(f"Upload error: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/preview/', methods=['GET']) def preview_file(file_id): try: session_id = request.args.get('sessionId') if not session_id or session_id not in file_storage: return jsonify({'error': 'Invalid session'}), 400 if file_id not in file_storage[session_id]: return jsonify({'error': 'File not found'}), 404 file_info = file_storage[session_id][file_id] # Load data and get preview df = load_data_file(file_info['filepath'], file_info['filename']) preview_data = { 'columns': df.columns.tolist(), 'dtypes': df.dtypes.astype(str).to_dict(), 'shape': df.shape, 'head': df.head(5).to_dict('records'), 'missing_values': df.isnull().sum().to_dict() } return jsonify(preview_data) except Exception as e: logger.error(f"Preview error: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/analyze', methods=['POST']) def analyze_data(): try: data = request.get_json() session_id = data.get('sessionId') file_id = data.get('fileId') analysis_type = data.get('analysisType') parameters = data.get('parameters', {}) natural_query = data.get('naturalQuery') if not all([session_id, file_id]): return jsonify({'error': 'Session ID and File ID required'}), 400 if session_id not in file_storage or file_id not in file_storage[session_id]: return jsonify({'error': 'File not found'}), 404 file_info = file_storage[session_id][file_id] df = load_data_file(file_info['filepath'], file_info['filename']) # Handle natural language query if natural_query and not analysis_type: parsed_query = parse_natural_language_query(natural_query, df.columns.tolist()) analysis_type = parsed_query['analysisType'] parameters = parsed_query['parameters'] result = {} if analysis_type == 'statistics': result = perform_basic_statistics(df, parameters.get('columns')) elif analysis_type == 'groupby': result = perform_groupby_analysis( df, parameters.get('groupByColumn'), parameters.get('targetColumn'), parameters.get('operation', 'mean'), parameters.get('filters') ) elif analysis_type == 'correlation': result = perform_correlation_analysis( df, parameters.get('columns'), parameters.get('method', 'pearson') ) elif analysis_type == 'outliers': result = detect_outliers( df, parameters.get('columns'), parameters.get('method', 'iqr') ) elif analysis_type == 'visualization': chart_base64 = generate_visualization( df, parameters.get('chartType', 'bar'), parameters.get('xColumn'), parameters.get('yColumn'), parameters.get('groupColumn') ) result = { 'chart': chart_base64, 'chartType': parameters.get('chartType', 'bar') } else: return jsonify({'error': 'Invalid analysis type'}), 400 # Save result to processed folder result_id = str(uuid.uuid4()) result_dir = os.path.join(PROCESSED_FOLDER, session_id) os.makedirs(result_dir, exist_ok=True) result_filepath = os.path.join(result_dir, f"{result_id}_result.json") with open(result_filepath, 'w') as f: json.dump(result, f, indent=2, default=str) return jsonify({ 'resultId': result_id, 'result': result, 'analysisType': analysis_type, 'timestamp': datetime.now().isoformat() }) except Exception as e: logger.error(f"Analysis error: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/files/', methods=['GET']) def list_files(session_id): try: if session_id not in file_storage: return jsonify({'files': []}) files = [] for file_id, file_info in file_storage[session_id].items(): # Check if file still exists if os.path.exists(file_info['filepath']): files.append({ 'fileId': file_id, 'filename': file_info['filename'], 'size': file_info['size'], 'timestamp': file_info['timestamp'] }) return jsonify({'files': files}) except Exception as e: logger.error(f"List files error: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/file/', methods=['DELETE']) def delete_file(file_id): try: session_id = request.args.get('sessionId') if not session_id or session_id not in file_storage: return jsonify({'error': 'Invalid session'}), 400 if file_id not in file_storage[session_id]: return jsonify({'error': 'File not found'}), 404 file_info = file_storage[session_id][file_id] # Remove file from filesystem if os.path.exists(file_info['filepath']): os.remove(file_info['filepath']) # Remove from storage del file_storage[session_id][file_id] return jsonify({'message': 'File deleted successfully'}) except Exception as e: logger.error(f"Delete error: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/download/', methods=['GET']) def download_result(result_id): try: session_id = request.args.get('sessionId') format_type = request.args.get('format', 'json') if not session_id: return jsonify({'error': 'Session ID required'}), 400 result_filepath = os.path.join(PROCESSED_FOLDER, session_id, f"{result_id}_result.json") if not os.path.exists(result_filepath): return jsonify({'error': 'Result not found'}), 404 if format_type == 'json': return send_file(result_filepath, as_attachment=True, download_name=f"analysis_result_{result_id}.json") else: return jsonify({'error': 'Format not supported'}), 400 except Exception as e: logger.error(f"Download error: {str(e)}") return jsonify({'error': str(e)}), 500 if __name__ == '__main__': app.run(host='0.0.0.0', port=7860, debug=False)