# --------------------------------------------------------------------------------------- # Imports and Options # --------------------------------------------------------------------------------------- import streamlit as st import pandas as pd import requests import re import fitz # PyMuPDF import io import matplotlib.pyplot as plt from PIL import Image from transformers import AutoProcessor, AutoModelForVision2Seq from docling_core.types.doc import DoclingDocument from docling_core.types.doc.document import DocTagsDocument import torch # import logging # logging.basicConfig(level=logging.INFO) if 'pdf_processed' not in st.session_state: st.session_state['pdf_processed'] = False if 'markdown_texts' not in st.session_state: st.session_state['markdown_texts'] = [] if 'df' not in st.session_state: st.session_state['df'] = pd.DataFrame() # --------------------------------------------------------------------------------------- # API Configuration # --------------------------------------------------------------------------------------- API_URL = "https://api.stack-ai.com/inference/v0/run/2df89a6c-a4af-4576-880e-27058e498f02/67acad8b0603ba4631db38e7" headers = { 'Authorization': 'Bearer a9e4979e-cdbe-49ea-a193-53562a784805', 'Content-Type': 'application/json' } # --------------------------------------------------------------------------------------- # Survey Analysis Class # --------------------------------------------------------------------------------------- class SurveyAnalysis: def __init__(self, api_key=None): self.api_key = api_key def prepare_llm_input(self, survey_response, topics): # Create topic description string from user input topic_descriptions = "\n".join([f"- **{topic}**: {description}" for topic, description in topics.items()]) llm_input = f""" Your task is to review PDF docling and extract information related to the provided topics. Here are the topic descriptions: {topic_descriptions} **Instructions:** - Extract and summarize the PDF focusing only on the provided topics. - If a topic is not mentioned in the notes, it should not be included in the Topic_Summary. - Use **exact quotes** from the original text for each point in your Topic_Summary. - Exclude erroneous content. - Do not add additional explanations or instructions. **Format your response as follows:** [Topic] - "Exact quote" - "Exact quote" - "Exact quote" **Meeting Notes:** {survey_response} """ return llm_input def query_api(self, payload): response = requests.post(API_URL, headers=headers, json=payload) return response.json() def extract_meeting_notes(self, response): output = response.get('outputs', {}).get('out-0', '') return output def process_dataframe(self, df, topics): results = [] for _, row in df.iterrows(): llm_input = self.prepare_llm_input(row['Document_Text'], topics) payload = { "user_id": "", "in-0": llm_input } response = self.query_api(payload) meeting_notes = self.extract_meeting_notes(response) results.append({ 'Document_Text': row['Document_Text'], 'Topic_Summary': meeting_notes }) result_df = pd.DataFrame(results) df = df.reset_index(drop=True) return pd.concat([df, result_df[['Topic_Summary']]], axis=1) # --------------------------------------------------------------------------------------- # Function to Extract Excerpts # --------------------------------------------------------------------------------------- def extract_excerpts(processed_df): new_rows = [] for _, row in processed_df.iterrows(): Topic_Summary = row['Topic_Summary'] # Split the Topic_Summary by topic sections = re.split(r'\n(?=\[)', Topic_Summary) for section in sections: # Extract the topic topic_match = re.match(r'\[([^\]]+)\]', section) if topic_match: topic = topic_match.group(1) # Extract all excerpts within the section excerpts = re.findall(r'- "([^"]+)"', section) for excerpt in excerpts: new_rows.append({ 'Document_Text': row['Document_Text'], 'Topic_Summary': row['Topic_Summary'], 'Excerpt': excerpt, 'Topic': topic }) return pd.DataFrame(new_rows) #------------------------------------------------------------------------ # Streamlit Configuration #------------------------------------------------------------------------ # Set page configuration st.set_page_config( page_title="Choose Your Own Adventure (Topic Extraction) PDF Analysis App", page_icon=":bar_chart:", layout="centered", initial_sidebar_state="auto", menu_items={ 'Get Help': 'mailto:support@mtss.ai', 'About': "This app is built to support PDF analysis" } ) #------------------------------------------------------------------------ # Sidebar #------------------------------------------------------------------------ # Sidebar with image with st.sidebar: # Set the desired width in pixels image_width = 300 # Define the path to the image # image_path = "steelcase_small.png" image_path = "mtss.ai_small.png" # Display the image st.image(image_path, width=image_width) # Additional sidebar content with st.expander("**MTSS.ai**", expanded=True): st.write(""" - **Support**: Cheyne LeVesseur PhD - **Email**: support@mtss.ai """) st.divider() st.subheader('Instructions') Instructions = """ - **Step 1**: Upload your PDF file. - **Step 2**: Review the processed text. - **Step 3**: Add your topics and descriptions of interest. - **Step 4**: Review the extracted excerpts and classifications, and topic distribution and frequency. - **Step 5**: Review bar charts of topics. - **Step 6**: Download the processed data as a CSV file. """ st.markdown(Instructions) # Load SmolDocling model using transformers @st.cache_resource def load_smol_docling(): device = "cuda" if torch.cuda.is_available() else "cpu" processor = AutoProcessor.from_pretrained("ds4sd/SmolDocling-256M-preview") model = AutoModelForVision2Seq.from_pretrained( "ds4sd/SmolDocling-256M-preview", torch_dtype=torch.float32 ).to(device) return model, processor model, processor = load_smol_docling() # # Convert PDF to images # def convert_pdf_to_images(pdf_file): # images = [] # doc = fitz.open(stream=pdf_file.read(), filetype="pdf") # for page_number in range(len(doc)): # page = doc.load_page(page_number) # pix = page.get_pixmap(dpi=300) # Higher DPI for clarity # img_data = pix.tobytes("png") # image = Image.open(io.BytesIO(img_data)) # images.append(image) # return images # Improved PDF to image conversion def convert_pdf_to_images(pdf_file, dpi=150, max_size=1600): images = [] doc = fitz.open(stream=pdf_file.read(), filetype="pdf") for page_number in range(len(doc)): page = doc.load_page(page_number) pix = page.get_pixmap(dpi=dpi) img_data = pix.tobytes("png") image = Image.open(io.BytesIO(img_data)).convert("RGB") # Resize image to max dimension image.thumbnail((max_size, max_size), Image.LANCZOS) images.append(image) return images # Extract structured markdown text using SmolDocling (transformers) # def extract_markdown_from_image(image): # prompt_text = "Convert this page to docling." # device = "cuda" if torch.cuda.is_available() else "cpu" # # Prepare inputs # messages = [ # { # "role": "user", # "content": [ # {"type": "image"}, # {"type": "text", "text": prompt_text} # ] # } # ] # prompt = processor.apply_chat_template(messages, add_generation_prompt=True) # inputs = processor(text=prompt, images=[image], return_tensors="pt").to(device) # # Generate outputs # generated_ids = model.generate(**inputs, max_new_tokens=1024) # prompt_length = inputs.input_ids.shape[1] # trimmed_generated_ids = generated_ids[:, prompt_length:] # doctags = processor.batch_decode(trimmed_generated_ids, skip_special_tokens=False)[0].lstrip() # # Clean the output # doctags = doctags.replace("", "").strip() # # Populate document # doctags_doc = DocTagsDocument.from_doctags_and_image_pairs([doctags], [image]) # # Create a docling document # doc = DoclingDocument(name="ExtractedDocument") # doc.load_from_doctags(doctags_doc) # # Export as markdown # markdown_text = doc.export_to_markdown() # return markdown_text def extract_markdown_from_image(image): # start_time = time.time() prompt_text = "Convert this page to docling." device = "cuda" if torch.cuda.is_available() else "cpu" messages = [{"role": "user", "content": [{"type": "image"}, {"type": "text", "text": prompt_text}]}] prompt = processor.apply_chat_template(messages, add_generation_prompt=True) inputs = processor(text=prompt, images=[image], return_tensors="pt").to(device) with torch.no_grad(): # <-- Crucial for speed generated_ids = model.generate(**inputs, max_new_tokens=1024) prompt_length = inputs.input_ids.shape[1] trimmed_generated_ids = generated_ids[:, prompt_length:] doctags = processor.batch_decode(trimmed_generated_ids, skip_special_tokens=False)[0].lstrip() doctags = doctags.replace("", "").strip() doctags_doc = DocTagsDocument.from_doctags_and_image_pairs([doctags], [image]) doc = DoclingDocument(name="ExtractedDocument") doc.load_from_doctags(doctags_doc) markdown_text = doc.export_to_markdown() # processing_time = time.time() - start_time # logging.info(f"Inference took {processing_time:.2f} seconds") return markdown_text # Streamlit UI st.title("Choose Your Own Adventure (Topic Extraction) PDF Analysis App") uploaded_file = st.file_uploader("Upload PDF file", type=["pdf"]) if uploaded_file: if not st.session_state['pdf_processed']: with st.spinner("Processing PDF..."): images = convert_pdf_to_images(uploaded_file) markdown_texts = [] for idx, image in enumerate(images): markdown_text = extract_markdown_from_image(image) markdown_texts.append(markdown_text) df = pd.DataFrame({'Document_Text': markdown_texts}) # Save results into session state st.session_state['markdown_texts'] = markdown_texts st.session_state['df'] = df st.session_state['pdf_processed'] = True st.success("PDF processed successfully!") else: st.success("PDF already processed. Using cached results.") # Use cached dataframe for further processing df = st.session_state['df'] if df.empty or df['Document_Text'].isnull().all(): st.error("No meaningful text extracted from the PDF.") st.stop() st.markdown("### Extracted Markdown Preview") st.write(df.head()) if st.button("Reset / Upload New PDF"): st.session_state['pdf_processed'] = False st.session_state['markdown_texts'] = [] st.session_state['df'] = pd.DataFrame() st.experimental_rerun() # --------------------------------------------------------------------------------------- # User Input for Topics # --------------------------------------------------------------------------------------- st.markdown("### Enter Topics and Descriptions") num_topics = st.number_input("Number of topics", min_value=1, max_value=10, value=1, step=1) topics = {} for i in range(num_topics): topic = st.text_input(f"Topic {i+1} Name", key=f"topic_{i}") description = st.text_area(f"Topic {i+1} Description", key=f"description_{i}") if topic and description: topics[topic] = description # Add a button to execute the analysis if st.button("Run Analysis"): if not topics: st.warning("Please enter at least one topic and description.") st.stop() # --------------------------------------------------------------------------------------- # Your existing SurveyAnalysis and extract_excerpts functions remain unchanged here: # --------------------------------------------------------------------------------------- analyzer = SurveyAnalysis() processed_df = analyzer.process_dataframe(df, topics) df_VIP_extracted = extract_excerpts(processed_df) required_columns = ['Document_Text', 'Topic_Summary', 'Excerpt', 'Topic'] missing_columns = [col for col in required_columns if col not in df_VIP_extracted.columns] if missing_columns: st.error(f"Missing columns after processing: {missing_columns}") st.stop() df_VIP_extracted = df_VIP_extracted[required_columns] st.markdown("### Processed Meeting Notes") st.dataframe(df_VIP_extracted) st.write(f"**Number of meeting notes analyzed:** {len(df)}") st.write(f"**Number of excerpts extracted:** {len(df_VIP_extracted)}") # CSV download csv = df_VIP_extracted.to_csv(index=False) st.download_button( "Download data as CSV", data=csv, file_name='extracted_meeting_notes.csv', mime='text/csv' ) # Topic distribution visualization topic_counts = df_VIP_extracted['Topic'].value_counts() frequency_table = pd.DataFrame({'Topic': topic_counts.index, 'Count': topic_counts.values}) frequency_table['Percentage'] = (frequency_table['Count'] / frequency_table['Count'].sum() * 100).round(0) st.markdown("### Topic Distribution") st.dataframe(frequency_table) fig, ax = plt.subplots(figsize=(10, 5)) ax.bar(frequency_table['Topic'], frequency_table['Count'], color='#3d9aa1') ax.set_ylabel('Count') ax.set_title('Frequency of Topics') st.pyplot(fig) else: st.info("Please upload a PDF file to begin.")