Spaces:
Sleeping
Sleeping
| import os | |
| import gradio as gr | |
| from transformers import pipeline | |
| import spacy | |
| import lib.read_pdf | |
| import pandas as pd | |
| import re | |
| import matplotlib.pyplot as plt | |
| import matplotlib.patches as patches | |
| import io | |
| # Initialize spaCy model | |
| nlp = spacy.load('en_core_web_sm') | |
| nlp.add_pipe('sentencizer') | |
| def split_in_sentences(text): | |
| doc = nlp(text) | |
| return [str(sent).strip() for sent in doc.sents] | |
| def make_spans(text, results): | |
| results_list = [res['label'] for res in results] | |
| facts_spans = list(zip(split_in_sentences(text), results_list)) | |
| return facts_spans | |
| # Initialize pipelines | |
| summarizer = pipeline("summarization", model="human-centered-summarization/financial-summarization-pegasus") | |
| fin_model = pipeline("sentiment-analysis", model='yiyanghkust/finbert-tone', tokenizer='yiyanghkust/finbert-tone') | |
| fin_model_bis = pipeline("sentiment-analysis", model='ProsusAI/finbert', tokenizer='ProsusAI/finbert') | |
| def summarize_text(text): | |
| resp = summarizer(text) | |
| return resp[0]['summary_text'] | |
| def text_to_sentiment(text): | |
| sentiment = fin_model(text)[0]["label"] | |
| return sentiment | |
| def fin_ext(text): | |
| results = fin_model(split_in_sentences(text)) | |
| return make_spans(text, results) | |
| def fin_ext_bis(text): | |
| results = fin_model_bis(split_in_sentences(text)) | |
| return make_spans(text, results) | |
| def extract_and_summarize(pdf1, pdf2): | |
| if not pdf1 or not pdf2: | |
| return [], [] | |
| pdf1_path = os.path.join(PDF_FOLDER, pdf1) | |
| pdf2_path = os.path.join(PDF_FOLDER, pdf2) | |
| # Extract and format paragraphs | |
| paragraphs_1 = lib.read_pdf.extract_and_format_paragraphs(pdf1_path) | |
| paragraphs_2 = lib.read_pdf.extract_and_format_paragraphs(pdf2_path) | |
| start_keyword = "Main risks to" | |
| end_keywords = ["4. Appendix", "Annex:", "4. Annex", "Detailed tables", "ACKNOWLEDGEMENTS", "STATISTICAL ANNEX", "PROSPECTS BY MEMBER STATES"] | |
| start_index1, end_index1 = lib.read_pdf.find_text_range(paragraphs_1, start_keyword, end_keywords) | |
| start_index2, end_index2 = lib.read_pdf.find_text_range(paragraphs_2, start_keyword, end_keywords) | |
| paragraphs_1 = lib.read_pdf.extract_relevant_text(paragraphs_1, start_index1, end_index1) | |
| paragraphs_2 = lib.read_pdf.extract_relevant_text(paragraphs_2, start_index2, end_index2) | |
| paragraphs_1 = lib.read_pdf.split_text_into_paragraphs(paragraphs_1, 0) | |
| paragraphs_2 = lib.read_pdf.split_text_into_paragraphs(paragraphs_2, 0) | |
| return paragraphs_1, paragraphs_2 | |
| # Gradio interface setup | |
| PDF_FOLDER = "data" | |
| def get_pdf_files(folder): | |
| return [f for f in os.listdir(folder) if f.endswith('.pdf')] | |
| def show(name): | |
| return f"{name}" | |
| def get_excel_files(folder): | |
| return [f for f in os.listdir(folder) if f.endswith('.xlsx')] | |
| def get_sheet_names(file): | |
| xls = pd.ExcelFile(os.path.join(PDF_FOLDER, file)) | |
| return gr.update(choices=xls.sheet_names) | |
| def process_and_compare(file1, sheet1, file2, sheet2): | |
| def process_file(file_path, sheet_name): | |
| # Extract year from file name | |
| year = int(re.search(r'(\d{4})', file_path).group(1)) | |
| # Load the Excel file | |
| df = pd.read_excel(os.path.join(PDF_FOLDER, file_path), sheet_name=sheet_name, index_col=0) | |
| # Define expected columns based on extracted year | |
| historical_col = f'Historical {year - 1}' | |
| baseline_cols = [f'Baseline {year}', f'Baseline {year + 1}', f'Baseline {year + 2}'] | |
| adverse_cols = [f'Adverse {year}', f'Adverse {year + 1}', f'Adverse {year + 2}'] | |
| level_deviation_col = f'Level Deviation {year + 2}' | |
| # Drop rows and reset index | |
| df = df.iloc[4:].reset_index(drop=True) | |
| # Define the new column names | |
| new_columns = ['Country', 'Code', historical_col] + baseline_cols + adverse_cols + ['Adverse Cumulative', 'Adverse Minimum', level_deviation_col] | |
| # Ensure the number of columns matches | |
| if len(df.columns) == len(new_columns): | |
| df.columns = new_columns | |
| else: | |
| raise ValueError(f"Expected {len(new_columns)} columns, but found {len(df.columns)} columns in the data.") | |
| return df | |
| # Process both files | |
| df1 = process_file(file1, sheet1) | |
| df2 = process_file(file2, sheet2) | |
| year1 = int(re.search(r'(\d{4})', file1).group(1)) | |
| year2 = int(re.search(r'(\d{4})', file2).group(1)) | |
| # Calculate the differences | |
| # historical_col1 = f'Historical {int(year1) - 1}' | |
| # historical_col2 = f'Historical {int(year2) - 1}' | |
| # df1['Historical vs Adverse'] = df1[historical_col1] - df1['Adverse Cumulative'] | |
| # df2['Historical vs Adverse'] = df2[historical_col2] - df2['Adverse Cumulative'] | |
| # Merge dataframes on 'Country' | |
| merged_df = pd.merge(df2, df1, on='Country', suffixes=(f'_{year1}', f'_{year2}')) | |
| merged_df['Difference adverse cumulative growth'] = merged_df[f'Adverse Cumulative_{year2}'] - merged_df[f'Adverse Cumulative_{year1}'] | |
| # Ensure data types are correct | |
| merged_df['Country'] = merged_df['Country'].astype(str) | |
| merged_df['Difference adverse cumulative growth'] = pd.to_numeric(merged_df['Difference adverse cumulative growth'], errors='coerce') | |
| # Create histogram plot with color coding | |
| fig, ax = plt.subplots(figsize=(12, 8)) | |
| colors = plt.get_cmap('tab20').colors # Use a colormap with multiple colors | |
| num_countries = len(merged_df['Country']) | |
| bars = ax.bar(merged_df['Country'], merged_df['Difference adverse cumulative growth'], color=colors[:num_countries]) | |
| # Add a legend | |
| handles = [patches.Patch(color=color, label=country) for color, country in zip(colors[:num_countries], merged_df['Country'])] | |
| ax.legend(handles=handles, title='Countries', bbox_to_anchor=(1.05, 1), loc='upper left') | |
| ax.set_title(f'Histogram of Difference between Adverse cumulative growth of {year2} and {year1} for {sheet1}') | |
| ax.set_xlabel('Country') | |
| ax.set_ylabel('Difference') | |
| plt.xticks(rotation=90) | |
| # Save plot to a file | |
| file_path = 'output/plot.png' | |
| plt.savefig(file_path, format='png', bbox_inches='tight') | |
| plt.close() | |
| return file_path | |
| stored_paragraphs_1 = [] | |
| stored_paragraphs_2 = [] | |
| with gr.Blocks() as demo: | |
| with gr.Tab("Financial Report Text Analysis"): | |
| gr.Markdown("## Financial Report Paragraph Selection and Analysis on adverse macro-economy scenario") | |
| with gr.Row(): | |
| # Upload PDFs | |
| with gr.Column(): | |
| pdf1 = gr.Dropdown(choices=get_pdf_files(PDF_FOLDER), label="Select PDF 1") | |
| pdf2 = gr.Dropdown(choices=get_pdf_files(PDF_FOLDER), label="Select PDF 2") | |
| with gr.Column(): | |
| b1 = gr.Button("Extract and Display Paragraphs") | |
| paragraph_1_dropdown = gr.Dropdown(label="Select Paragraph from PDF 1") | |
| paragraph_2_dropdown = gr.Dropdown(label="Select Paragraph from PDF 2") | |
| def update_paragraphs(pdf1, pdf2): | |
| global stored_paragraphs_1, stored_paragraphs_2 | |
| stored_paragraphs_1, stored_paragraphs_2 = extract_and_summarize(pdf1, pdf2) | |
| updated_dropdown_1 = [f"Paragraph {i+1}: {p[:100]}..." for i, p in enumerate(stored_paragraphs_1)] | |
| updated_dropdown_2 = [f"Paragraph {i+1}: {p[:100]}..." for i, p in enumerate(stored_paragraphs_2)] | |
| return gr.update(choices=updated_dropdown_1), gr.update(choices=updated_dropdown_2) | |
| b1.click(fn=update_paragraphs, inputs=[pdf1, pdf2], outputs=[paragraph_1_dropdown, paragraph_2_dropdown]) | |
| with gr.Row(): | |
| # Process the selected paragraph from PDF 1 | |
| with gr.Column(): | |
| gr.Markdown("### PDF 1 Analysis") | |
| selected_paragraph_1 = gr.Textbox(label="Selected Paragraph 1 Content", lines=4) | |
| selected_paragraph_1.change(show, paragraph_1_dropdown, selected_paragraph_1) | |
| summarize_btn1 = gr.Button("Summarize Text from PDF 1") | |
| summary_textbox_1 = gr.Textbox(label="Summary for PDF 1", lines=2) | |
| summarize_btn1.click(fn=lambda p: process_paragraph_1_sum(p), inputs=paragraph_1_dropdown, outputs=summary_textbox_1) | |
| sentiment_btn1 = gr.Button("Classify Financial Tone from PDF 1") | |
| sentiment_textbox_1 = gr.Textbox(label="Classification for PDF 1", lines=1) | |
| sentiment_btn1.click(fn=lambda p: process_paragraph_1_sent(p), inputs=paragraph_1_dropdown, outputs=sentiment_textbox_1) | |
| analyze_btn1 = gr.Button("Analyze Financial Tone on each sentence with yiyanghkust/finbert-tone") | |
| fin_spans_1 = gr.HighlightedText(label="Financial Tone Analysis for PDF 1") | |
| analyze_btn1.click(fn=lambda p: process_paragraph_1_sent_tone(p), inputs=paragraph_1_dropdown, outputs=fin_spans_1) | |
| analyze_btn1_ = gr.Button("Analyze Financial Tone on each sentence with ProsusAI/finbert") | |
| fin_spans_1_ = gr.HighlightedText(label="Financial Tone Analysis for PDF 1 bis") | |
| analyze_btn1_.click(fn=lambda p: process_paragraph_1_sent_tone_bis(p), inputs=paragraph_1_dropdown, outputs=fin_spans_1_) | |
| # Process the selected paragraph from PDF 2 | |
| with gr.Column(): | |
| gr.Markdown("### PDF 2 Analysis") | |
| selected_paragraph_2 = gr.Textbox(label="Selected Paragraph 2 Content", lines=4) | |
| selected_paragraph_2.change(show, paragraph_2_dropdown, selected_paragraph_2) | |
| summarize_btn2 = gr.Button("Summarize Text from PDF 2") | |
| summary_textbox_2 = gr.Textbox(label="Summary for PDF 2", lines=2) | |
| summarize_btn2.click(fn=lambda p: process_paragraph_2_sum(p), inputs=paragraph_2_dropdown, outputs=summary_textbox_2) | |
| sentiment_btn2 = gr.Button("Classify Financial Tone from PDF 2") | |
| sentiment_textbox_2 = gr.Textbox(label="Classification for PDF 2", lines=1) | |
| sentiment_btn2.click(fn=lambda p: process_paragraph_2_sent(p), inputs=paragraph_2_dropdown, outputs=sentiment_textbox_2) | |
| analyze_btn2 = gr.Button("Analyze Financial Tone on each sentence with yiyanghkust/finbert-tone") | |
| fin_spans_2 = gr.HighlightedText(label="Financial Tone Analysis for PDF 2") | |
| analyze_btn2.click(fn=lambda p: process_paragraph_2_sent_tone(p), inputs=paragraph_2_dropdown, outputs=fin_spans_2) | |
| analyze_btn2_ = gr.Button("Analyze Financial Tone on each sentence with ProsusAI/finbert") | |
| fin_spans_2_ = gr.HighlightedText(label="Financial Tone Analysis for PDF 2 bis") | |
| analyze_btn2_.click(fn=lambda p: process_paragraph_2_sent_tone_bis(p), inputs=paragraph_2_dropdown, outputs=fin_spans_2_) | |
| with gr.Tab("Financial Report Table Analysis"): | |
| # New tab content goes here | |
| gr.Markdown("## Excel Data Comparison") | |
| with gr.Row(): | |
| with gr.Column(): | |
| file1 = gr.Dropdown(choices=get_excel_files(PDF_FOLDER), label="Select Excel File 1") | |
| file2 = gr.Dropdown(choices=get_excel_files(PDF_FOLDER), label="Select Excel File 2") | |
| sheet = gr.Dropdown(choices=[], label="Select Sheet for File 1 and 2") | |
| with gr.Column(): | |
| result = gr.Image(label="Comparison pLot") | |
| def update_sheets(file): | |
| return get_sheet_names(file) | |
| file1.change(fn=update_sheets, inputs=file1, outputs=sheet) | |
| b1 = gr.Button("Compare Data") | |
| b1.click(fn=process_and_compare, inputs=[file1, sheet, file2, sheet], outputs=result) | |
| demo.launch() | |