Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| from flask import Flask, request, render_template, redirect, url_for, session, flash, send_from_directory, send_file | |
| from werkzeug.utils import secure_filename | |
| from utils.file_to_text import extract_text_based_on_format, preprocess_text | |
| from utils.anoter_to_json import process_uploaded_json | |
| from utils.json_to_spacy import convert_json_to_spacy | |
| from utils.model import train_model | |
| import zipfile | |
| app = Flask(__name__) | |
| app.secret_key = 'your_secret_key' | |
| # Folder paths | |
| app.config['UPLOAD_FOLDER'] = 'uploads' | |
| app.config['JSON_FOLDER'] = 'JSON' | |
| app.config['DATA_FOLDER'] = 'data' | |
| app.config['MODELS_FOLDER'] = 'Models' | |
| # Allowed file extensions | |
| ALLOWED_EXTENSIONS = {'pdf', 'docx', 'rsf', 'odt', 'png', 'jpg', 'jpeg', 'json'} | |
| # Function to check file extensions | |
| def allowed_file(filename): | |
| return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
| def index(): | |
| return render_template('upload.html') | |
| # API for uploading Resume files | |
| def upload_file(): | |
| try: | |
| if 'file' not in request.files: | |
| flash('No file part', 'error') | |
| return redirect(request.url) | |
| file = request.files['file'] | |
| if file.filename == '': | |
| flash('No selected file', 'error') | |
| return redirect(request.url) | |
| if file and allowed_file(file.filename): | |
| filename = secure_filename(file.filename) | |
| file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) | |
| file.save(file_path) | |
| # Handle text extraction for non-JSON files | |
| if not filename.lower().endswith('.json'): | |
| return process_other_files(file_path, filename) | |
| flash('File type not allowed', 'error') | |
| except Exception as e: | |
| flash(f"Error: {str(e)}", 'error') | |
| return redirect(request.url) | |
| # Process non-JSON files, extract text and save to 'resume_text.txt' | |
| def process_other_files(file_path, filename): | |
| try: | |
| extracted_text, _ = extract_text_based_on_format(file_path) | |
| cleaned_text = preprocess_text(extracted_text) | |
| os.makedirs(app.config['DATA_FOLDER'], exist_ok=True) | |
| resume_file_path = os.path.join(app.config['DATA_FOLDER'], 'resume_text.txt') | |
| with open(resume_file_path, 'w', encoding='utf-8') as f: | |
| f.write(cleaned_text) | |
| session['uploaded_file'] = filename | |
| return render_template('text.html', text=cleaned_text) | |
| except Exception as e: | |
| flash(f"Error processing file {filename}: {str(e)}", 'error') | |
| return redirect(request.referrer) | |
| # API to handle the text editing and saving | |
| def edit_text(): | |
| try: | |
| # Get the edited text from the form | |
| edited_text = request.form['edited_text'] | |
| # Save the edited text back to 'resume_text.txt' | |
| resume_file_path = os.path.join(app.config['DATA_FOLDER'], 'resume_text.txt') | |
| with open(resume_file_path, 'w', encoding='utf-8') as f: | |
| f.write(edited_text) | |
| flash('Text edited successfully', 'success') | |
| # Pass the edited text back to the template | |
| return render_template('text.html', text=edited_text) | |
| except Exception as e: | |
| flash(f"Error saving edited text: {str(e)}", 'error') | |
| return redirect(request.referrer) | |
| # API for downloading the 'resume_text.txt' file | |
| def download_file(): | |
| try: | |
| return send_from_directory(app.config['DATA_FOLDER'], 'resume_text.txt', as_attachment=True) | |
| except Exception as e: | |
| flash(f"Error downloading file: {str(e)}", 'error') | |
| return redirect(request.referrer) | |
| def save_and_download(): | |
| try: | |
| # Get the edited text from the form | |
| edited_text = request.form['edited_text'] | |
| # Save the edited text back to 'resume_text.txt' | |
| resume_file_path = os.path.join(app.config['DATA_FOLDER'], 'resume_text.txt') | |
| with open(resume_file_path, 'w', encoding='utf-8') as f: | |
| f.write(edited_text) | |
| flash('Text edited successfully', 'success') | |
| # Now send the file as a download | |
| return send_from_directory(app.config['DATA_FOLDER'], 'resume_text.txt', as_attachment=True) | |
| except Exception as e: | |
| flash(f"Error saving and downloading file: {str(e)}", 'error') | |
| return redirect(request.referrer) | |
| # API for uploading and processing JSON files | |
| def upload_json_file(): | |
| try: | |
| if 'file' not in request.files: | |
| flash('No file part', 'error') | |
| return redirect(request.url) | |
| file = request.files['file'] | |
| if file.filename == '': | |
| flash('No selected file', 'error') | |
| return redirect(request.url) | |
| if file and file.filename.lower().endswith('.json'): | |
| filename = secure_filename(file.filename) | |
| json_path = os.path.join(app.config['JSON_FOLDER'], filename) | |
| os.makedirs(app.config['JSON_FOLDER'], exist_ok=True) | |
| file.save(json_path) | |
| session['uploaded_json'] = filename | |
| flash(f'JSON file {filename} uploaded successfully') | |
| else: | |
| flash('File type not allowed', 'error') | |
| except Exception as e: | |
| flash(f"Error: {str(e)}", 'error') | |
| return redirect(request.referrer) | |
| # Process uploaded JSON file and save formatted data | |
| def process_json_file(): | |
| try: | |
| json_folder = app.config['JSON_FOLDER'] | |
| json_files = os.listdir(json_folder) | |
| if not json_files: | |
| flash('No JSON files found in the folder', 'error') | |
| return redirect(request.referrer) | |
| filename = json_files[0] # Modify logic if needed to handle multiple files | |
| json_path = os.path.join(json_folder, filename) | |
| if not os.path.exists(json_path): | |
| flash(f'JSON file {filename} not found', 'error') | |
| return redirect(request.referrer) | |
| process_uploaded_json(json_path) | |
| os.makedirs(app.config['DATA_FOLDER'], exist_ok=True) | |
| processed_file_path = os.path.join(app.config['DATA_FOLDER'], f'Processed_{filename}') | |
| flash(f'JSON file {filename} processed successfully') | |
| except Exception as e: | |
| flash(f"Error processing JSON file: {str(e)}", 'error') | |
| return redirect(request.referrer) | |
| # API for removing uploaded JSON files | |
| def remove_all_json_files(): | |
| try: | |
| json_folder = app.config['JSON_FOLDER'] | |
| for filename in os.listdir(json_folder): | |
| file_path = os.path.join(json_folder, filename) | |
| if os.path.isfile(file_path): | |
| os.remove(file_path) | |
| session.pop('uploaded_json', None) | |
| flash('All JSON files removed successfully') | |
| except Exception as e: | |
| flash(f"Error removing files: {str(e)}", 'error') | |
| return redirect(request.referrer) | |
| # API for removing non-JSON files | |
| def remove_file(): | |
| try: | |
| upload_folder = app.config['UPLOAD_FOLDER'] | |
| # Check if the folder exists | |
| if os.path.exists(upload_folder): | |
| # Loop through all files in the upload folder and remove them | |
| for filename in os.listdir(upload_folder): | |
| file_path = os.path.join(upload_folder, filename) | |
| # Check if it is a file and remove it | |
| if os.path.isfile(file_path): | |
| os.remove(file_path) | |
| # Clear session data related to uploaded files | |
| session.pop('uploaded_file', None) | |
| flash('All files removed successfully') | |
| else: | |
| flash(f"Upload folder does not exist", 'error') | |
| except Exception as e: | |
| flash(f"Error removing files: {str(e)}", 'error') | |
| return redirect(url_for('index')) | |
| # HTML render routes (modify to fit your structure) | |
| def ner_preview(): | |
| return render_template('anoter.html') | |
| def json_file(): | |
| return render_template('savejson.html') | |
| def spacy_file(): | |
| return render_template('saveSpacy.html') | |
| # @app.route('/text', methods=['GET']) | |
| # def spacy_file(): | |
| # return render_template('text.html') | |
| def to_sapcy(): | |
| try: | |
| # Path to the JSON file | |
| json_file_path = 'data/Json_Data.json' | |
| # Convert the JSON file to a .spacy file | |
| spacy_file_path = 'data/Spacy_data.spacy' | |
| # Call the conversion function | |
| convert_json_to_spacy(json_file_path, spacy_file_path) | |
| flash('Model training data converted successfully', 'success') | |
| except Exception as e: | |
| flash(f"Error during conversion: {str(e)}", 'error') | |
| return redirect(request.referrer) | |
| def train_model_endpoint(): | |
| try: | |
| # Get the number of epochs and model version from the request | |
| epochs = int(request.form.get('epochs', 10)) # Default to 10 if not provided | |
| version = request.form.get('model_version', 'v1') # Default to 'v1' if not provided | |
| # Call the training function with user-defined parameters | |
| model_path = f"./Models/ner_model_{version}" | |
| train_model(epochs, model_path) | |
| flash('Model training completed successfully', 'success') | |
| except Exception as e: | |
| flash(f"Error during training: {str(e)}", 'error') | |
| return redirect(url_for('index')) | |
| # API for removing all files from specific folders | |
| def remove_files(): | |
| try: | |
| # Define folders to clear | |
| folders_to_clear = [app.config['UPLOAD_FOLDER'], app.config['JSON_FOLDER']] | |
| for folder_path in folders_to_clear: | |
| # Remove all files from the specified folder | |
| for filename in os.listdir(folder_path): | |
| file_path = os.path.join(folder_path, filename) | |
| if os.path.isfile(file_path): | |
| os.remove(file_path) | |
| # Clear session variables related to the removed folders | |
| session.pop('uploaded_file', None) | |
| session.pop('uploaded_json', None) | |
| flash('All files removed from folder successfully') | |
| except Exception as e: | |
| flash(f"Error removing files: {str(e)}", 'error') | |
| return redirect(url_for('index')) | |
| # API for downloading the latest trained model | |
| def download_latest_model(): | |
| try: | |
| models_dir = app.config['MODELS_FOLDER'] | |
| model_files = os.listdir(models_dir) | |
| if not model_files: | |
| flash('No model files found', 'error') | |
| return redirect(request.referrer) | |
| # Sort model files and get the latest one | |
| latest_model_file = sorted(model_files, reverse=True)[0] | |
| # Full path to the latest model file | |
| model_path = os.path.join(models_dir, latest_model_file) | |
| if not os.path.exists(model_path): | |
| flash('Model file not found on the server', 'error') | |
| return redirect(request.referrer) | |
| # Create a zip file with the model | |
| zip_filename = os.path.join(models_dir, f"{latest_model_file}.zip") | |
| with zipfile.ZipFile(zip_filename, 'w') as zipf: | |
| zipf.write(model_path, os.path.basename(model_path)) | |
| # Send the zip file as a download | |
| return send_file(zip_filename, as_attachment=True) | |
| except Exception as e: | |
| flash(f"Error while downloading the model: {str(e)}", 'error') | |
| return redirect(request.referrer) | |
| if __name__ == '__main__': | |
| app.run(debug=True) | |