import os import gradio as gr from transformers import pipeline import spacy import lib.read_pdf import pandas as pd import re import matplotlib.pyplot as plt import matplotlib.patches as patches import io # Initialize spaCy model nlp = spacy.load('en_core_web_sm') nlp.add_pipe('sentencizer') def split_in_sentences(text): doc = nlp(text) return [str(sent).strip() for sent in doc.sents] def make_spans(text, results): results_list = [res['label'] for res in results] facts_spans = list(zip(split_in_sentences(text), results_list)) return facts_spans # Initialize pipelines summarizer = pipeline("summarization", model="human-centered-summarization/financial-summarization-pegasus") fin_model = pipeline("sentiment-analysis", model='yiyanghkust/finbert-tone', tokenizer='yiyanghkust/finbert-tone') fin_model_bis = pipeline("sentiment-analysis", model='ProsusAI/finbert', tokenizer='ProsusAI/finbert') def summarize_text(text): resp = summarizer(text) return resp[0]['summary_text'] def text_to_sentiment(text): sentiment = fin_model(text)[0]["label"] return sentiment def fin_ext(text): results = fin_model(split_in_sentences(text)) return make_spans(text, results) def fin_ext_bis(text): results = fin_model_bis(split_in_sentences(text)) return make_spans(text, results) def extract_and_summarize(pdf1, pdf2): if not pdf1 or not pdf2: return [], [] pdf1_path = os.path.join(PDF_FOLDER, pdf1) pdf2_path = os.path.join(PDF_FOLDER, pdf2) # Extract and format paragraphs paragraphs_1 = lib.read_pdf.extract_and_format_paragraphs(pdf1_path) paragraphs_2 = lib.read_pdf.extract_and_format_paragraphs(pdf2_path) start_keyword = "Main risks to" end_keywords = ["4. Appendix", "Annex:", "4. Annex", "Detailed tables", "ACKNOWLEDGEMENTS", "STATISTICAL ANNEX", "PROSPECTS BY MEMBER STATES"] start_index1, end_index1 = lib.read_pdf.find_text_range(paragraphs_1, start_keyword, end_keywords) start_index2, end_index2 = lib.read_pdf.find_text_range(paragraphs_2, start_keyword, end_keywords) paragraphs_1 = lib.read_pdf.extract_relevant_text(paragraphs_1, start_index1, end_index1) paragraphs_2 = lib.read_pdf.extract_relevant_text(paragraphs_2, start_index2, end_index2) paragraphs_1 = lib.read_pdf.split_text_into_paragraphs(paragraphs_1, 0) paragraphs_2 = lib.read_pdf.split_text_into_paragraphs(paragraphs_2, 0) return paragraphs_1, paragraphs_2 # Gradio interface setup PDF_FOLDER = "data" def get_pdf_files(folder): return [f for f in os.listdir(folder) if f.endswith('.pdf')] def show(name): return f"{name}" def get_excel_files(folder): return [f for f in os.listdir(folder) if f.endswith('.xlsx')] def get_sheet_names(file): xls = pd.ExcelFile(os.path.join(PDF_FOLDER, file)) return gr.update(choices=xls.sheet_names) def process_and_compare(file1, sheet1, file2, sheet2): def process_file(file_path, sheet_name): # Extract year from file name year = int(re.search(r'(\d{4})', file_path).group(1)) # Load the Excel file df = pd.read_excel(os.path.join(PDF_FOLDER, file_path), sheet_name=sheet_name, index_col=0) # Define expected columns based on extracted year historical_col = f'Historical {year - 1}' baseline_cols = [f'Baseline {year}', f'Baseline {year + 1}', f'Baseline {year + 2}'] adverse_cols = [f'Adverse {year}', f'Adverse {year + 1}', f'Adverse {year + 2}'] level_deviation_col = f'Level Deviation {year + 2}' # Drop rows and reset index df = df.iloc[4:].reset_index(drop=True) # Define the new column names new_columns = ['Country', 'Code', historical_col] + baseline_cols + adverse_cols + ['Adverse Cumulative', 'Adverse Minimum', level_deviation_col] # Ensure the number of columns matches if len(df.columns) == len(new_columns): df.columns = new_columns else: raise ValueError(f"Expected {len(new_columns)} columns, but found {len(df.columns)} columns in the data.") return df # Process both files df1 = process_file(file1, sheet1) df2 = process_file(file2, sheet2) year1 = int(re.search(r'(\d{4})', file1).group(1)) year2 = int(re.search(r'(\d{4})', file2).group(1)) # Calculate the differences # historical_col1 = f'Historical {int(year1) - 1}' # historical_col2 = f'Historical {int(year2) - 1}' # df1['Historical vs Adverse'] = df1[historical_col1] - df1['Adverse Cumulative'] # df2['Historical vs Adverse'] = df2[historical_col2] - df2['Adverse Cumulative'] # Merge dataframes on 'Country' merged_df = pd.merge(df2, df1, on='Country', suffixes=(f'_{year1}', f'_{year2}')) merged_df['Difference adverse cumulative growth'] = merged_df[f'Adverse Cumulative_{year2}'] - merged_df[f'Adverse Cumulative_{year1}'] # Ensure data types are correct merged_df['Country'] = merged_df['Country'].astype(str) merged_df['Difference adverse cumulative growth'] = pd.to_numeric(merged_df['Difference adverse cumulative growth'], errors='coerce') # Create histogram plot with color coding fig, ax = plt.subplots(figsize=(12, 8)) colors = plt.get_cmap('tab20').colors # Use a colormap with multiple colors num_countries = len(merged_df['Country']) bars = ax.bar(merged_df['Country'], merged_df['Difference adverse cumulative growth'], color=colors[:num_countries]) # Add a legend handles = [patches.Patch(color=color, label=country) for color, country in zip(colors[:num_countries], merged_df['Country'])] ax.legend(handles=handles, title='Countries', bbox_to_anchor=(1.05, 1), loc='upper left') ax.set_title(f'Histogram of Difference between Adverse cumulative growth of {year2} and {year1} for {sheet1}') ax.set_xlabel('Country') ax.set_ylabel('Difference') plt.xticks(rotation=90) # Save plot to a file file_path = 'output/plot.png' plt.savefig(file_path, format='png', bbox_inches='tight') plt.close() return file_path stored_paragraphs_1 = [] stored_paragraphs_2 = [] with gr.Blocks() as demo: with gr.Tab("Financial Report Text Analysis"): gr.Markdown("## Financial Report Paragraph Selection and Analysis on adverse macro-economy scenario") with gr.Row(): # Upload PDFs with gr.Column(): pdf1 = gr.Dropdown(choices=get_pdf_files(PDF_FOLDER), label="Select PDF 1") pdf2 = gr.Dropdown(choices=get_pdf_files(PDF_FOLDER), label="Select PDF 2") with gr.Column(): b1 = gr.Button("Extract and Display Paragraphs") paragraph_1_dropdown = gr.Dropdown(label="Select Paragraph from PDF 1") paragraph_2_dropdown = gr.Dropdown(label="Select Paragraph from PDF 2") def update_paragraphs(pdf1, pdf2): global stored_paragraphs_1, stored_paragraphs_2 stored_paragraphs_1, stored_paragraphs_2 = extract_and_summarize(pdf1, pdf2) updated_dropdown_1 = [f"Paragraph {i+1}: {p[:100]}..." for i, p in enumerate(stored_paragraphs_1)] updated_dropdown_2 = [f"Paragraph {i+1}: {p[:100]}..." for i, p in enumerate(stored_paragraphs_2)] return gr.update(choices=updated_dropdown_1), gr.update(choices=updated_dropdown_2) b1.click(fn=update_paragraphs, inputs=[pdf1, pdf2], outputs=[paragraph_1_dropdown, paragraph_2_dropdown]) with gr.Row(): # Process the selected paragraph from PDF 1 with gr.Column(): gr.Markdown("### PDF 1 Analysis") selected_paragraph_1 = gr.Textbox(label="Selected Paragraph 1 Content", lines=4) selected_paragraph_1.change(show, paragraph_1_dropdown, selected_paragraph_1) summarize_btn1 = gr.Button("Summarize Text from PDF 1") summary_textbox_1 = gr.Textbox(label="Summary for PDF 1", lines=2) summarize_btn1.click(fn=lambda p: process_paragraph_1_sum(p), inputs=paragraph_1_dropdown, outputs=summary_textbox_1) sentiment_btn1 = gr.Button("Classify Financial Tone from PDF 1") sentiment_textbox_1 = gr.Textbox(label="Classification for PDF 1", lines=1) sentiment_btn1.click(fn=lambda p: process_paragraph_1_sent(p), inputs=paragraph_1_dropdown, outputs=sentiment_textbox_1) analyze_btn1 = gr.Button("Analyze Financial Tone on each sentence with yiyanghkust/finbert-tone") fin_spans_1 = gr.HighlightedText(label="Financial Tone Analysis for PDF 1") analyze_btn1.click(fn=lambda p: process_paragraph_1_sent_tone(p), inputs=paragraph_1_dropdown, outputs=fin_spans_1) analyze_btn1_ = gr.Button("Analyze Financial Tone on each sentence with ProsusAI/finbert") fin_spans_1_ = gr.HighlightedText(label="Financial Tone Analysis for PDF 1 bis") analyze_btn1_.click(fn=lambda p: process_paragraph_1_sent_tone_bis(p), inputs=paragraph_1_dropdown, outputs=fin_spans_1_) # Process the selected paragraph from PDF 2 with gr.Column(): gr.Markdown("### PDF 2 Analysis") selected_paragraph_2 = gr.Textbox(label="Selected Paragraph 2 Content", lines=4) selected_paragraph_2.change(show, paragraph_2_dropdown, selected_paragraph_2) summarize_btn2 = gr.Button("Summarize Text from PDF 2") summary_textbox_2 = gr.Textbox(label="Summary for PDF 2", lines=2) summarize_btn2.click(fn=lambda p: process_paragraph_2_sum(p), inputs=paragraph_2_dropdown, outputs=summary_textbox_2) sentiment_btn2 = gr.Button("Classify Financial Tone from PDF 2") sentiment_textbox_2 = gr.Textbox(label="Classification for PDF 2", lines=1) sentiment_btn2.click(fn=lambda p: process_paragraph_2_sent(p), inputs=paragraph_2_dropdown, outputs=sentiment_textbox_2) analyze_btn2 = gr.Button("Analyze Financial Tone on each sentence with yiyanghkust/finbert-tone") fin_spans_2 = gr.HighlightedText(label="Financial Tone Analysis for PDF 2") analyze_btn2.click(fn=lambda p: process_paragraph_2_sent_tone(p), inputs=paragraph_2_dropdown, outputs=fin_spans_2) analyze_btn2_ = gr.Button("Analyze Financial Tone on each sentence with ProsusAI/finbert") fin_spans_2_ = gr.HighlightedText(label="Financial Tone Analysis for PDF 2 bis") analyze_btn2_.click(fn=lambda p: process_paragraph_2_sent_tone_bis(p), inputs=paragraph_2_dropdown, outputs=fin_spans_2_) with gr.Tab("Financial Report Table Analysis"): # New tab content goes here gr.Markdown("## Excel Data Comparison") with gr.Row(): with gr.Column(): file1 = gr.Dropdown(choices=get_excel_files(PDF_FOLDER), label="Select Excel File 1") file2 = gr.Dropdown(choices=get_excel_files(PDF_FOLDER), label="Select Excel File 2") sheet = gr.Dropdown(choices=[], label="Select Sheet for File 1 and 2") with gr.Column(): result = gr.Image(label="Comparison pLot") def update_sheets(file): return get_sheet_names(file) file1.change(fn=update_sheets, inputs=file1, outputs=sheet) b1 = gr.Button("Compare Data") b1.click(fn=process_and_compare, inputs=[file1, sheet, file2, sheet], outputs=result) demo.launch()