awacke1's picture
Update app.py
331cb9f verified
raw
history blame
9.84 kB
import logging
import os
import base64
import datetime
import dotenv
import pandas as pd
import streamlit as st
from streamlit_tags import st_tags
from PyPDF2 import PdfReader, PdfWriter
from presidio_analyzer import AnalyzerEngine, RecognizerRegistry, PatternRecognizer, RecognizerResult
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
st.set_page_config(page_title="Presidio PHI De-identification", layout="wide", initial_sidebar_state="expanded", menu_items={"About": "https://microsoft.github.io/presidio/"})
dotenv.load_dotenv()
logger = logging.getLogger("presidio-streamlit")
def get_timestamp_prefix() -> str:
"""🕒 Stamps time like a boss with Central flair!"""
central = pytz.timezone("US/Central")
return datetime.now(central).strftime("%I%M%p_%d-%m-%y").upper()
def nlp_engine_and_registry(model_family: str, model_path: str) -> tuple[object, RecognizerRegistry]:
"""🤖 Fires up NLP engines with a spark of genius!"""
registry = RecognizerRegistry()
if model_family.lower() == "flair":
from flair.models import SequenceTagger
tagger = SequenceTagger.load(model_path)
registry.load_predefined_recognizers()
registry.add_recognizer_from_dict({"name": "flair_recognizer", "supported_language": "en", "supported_entities": ["PERSON", "LOCATION", "ORGANIZATION"], "model": model_path, "package": "flair"})
return tagger, registry
elif model_family.lower() == "huggingface":
from transformers import pipeline
nlp = pipeline("ner", model=model_path, tokenizer=model_path)
registry.load_predefined_recognizers()
registry.add_recognizer_from_dict({"name": "huggingface_recognizer", "supported_language": "en", "supported_entities": ["PERSON", "LOCATION", "ORGANIZATION", "DATE_TIME"], "model": model_path, "package": "transformers"})
return nlp, registry
raise ValueError(f"Model family {model_family} not supported")
def analyzer_engine(model_family: str, model_path: str) -> AnalyzerEngine:
"""🔍 Unleashes the PHI-sniffing bloodhound!"""
nlp_engine, registry = nlp_engine_and_registry(model_family, model_path)
return AnalyzerEngine(registry=registry)
def get_supported_entities(model_family: str, model_path: str) -> list[str]:
"""📋 Lists what secrets we’re hunting—PHI beware!"""
if model_family.lower() == "huggingface":
return ["PERSON", "LOCATION", "ORGANIZATION", "DATE_TIME"]
elif model_family.lower() == "flair":
return ["PERSON", "LOCATION", "ORGANIZATION"]
return ["PERSON", "LOCATION", "ORGANIZATION"]
# Feature Spotlight: 🕵️‍♂️ The Great PHI Hunt Begins!
# With a flick of the wrist, we summon models to sniff out sensitive data in PDFs, making privacy a breeze! 😎
def analyze(analyzer: AnalyzerEngine, text: str, entities: list[str], language: str, score_threshold: float, return_decision_process: bool, allow_list: list[str], deny_list: list[str]) -> list[RecognizerResult]:
"""🦸 Swoops in to spot PHI with laser precision!"""
results = analyzer.analyze(text=text, entities=entities, language=language, score_threshold=score_threshold, return_decision_process=return_decision_process)
filtered_results = []
for result in results:
text_snippet = text[result.start:result.end].lower()
if any(word.lower() in text_snippet for word in allow_list):
continue
if any(word.lower() in text_snippet for word in deny_list) or not deny_list:
filtered_results.append(result)
return filtered_results
def anonymize(text: str, operator: str, analyze_results: list[RecognizerResult], mask_char: str = "*", number_of_chars: int = 15) -> dict:
"""🕵️‍♀️ Cloaks PHI in a disguise—poof, it’s gone!"""
anonymizer = AnonymizerEngine()
operator_config = {"DEFAULT": OperatorConfig(operator, {})}
if operator == "mask":
operator_config["DEFAULT"] = OperatorConfig(operator, {"masking_char": mask_char, "chars_to_mask": number_of_chars})
return anonymizer.anonymize(text=text, analyzer_results=analyze_results, operators=operator_config)
def create_ad_hoc_deny_list_recognizer(deny_list: list[str] = None) -> PatternRecognizer:
"""🚨 Builds a naughty list to catch sneaky PHI!"""
if not deny_list:
return None
return PatternRecognizer(supported_entity="GENERIC_PII", deny_list=deny_list)
def save_pdf(pdf_input) -> str:
"""💾 Drops PDFs onto disk like hot cakes!"""
original_name = pdf_input.name
with open(original_name, "wb") as f:
f.write(pdf_input.read())
return original_name
# Feature Spotlight: 📄 PDF Magic Unleashed!
# Upload a PDF, zap the PHI, and grab a shiny new file—all with a timestamp swagger! ✨
def read_pdf(pdf_path: str) -> str:
"""📖 Slurps up PDF text like a thirsty camel!"""
reader = PdfReader(pdf_path)
return "".join(page.extract_text() or "" + "\n" for page in reader.pages)
def create_pdf(text: str, input_path: str, output_filename: str) -> str:
"""🖨️ Crafts a fresh PDF with PHI-proof swagger!"""
reader = PdfReader(input_path)
writer = PdfWriter()
for page in reader.pages:
writer.add_page(page)
with open(output_filename, "wb") as f:
writer.write(f)
return output_filename
# Sidebar setup
st.sidebar.header("PHI De-identification with Presidio")
model_list = [
("flair/ner-english-large", "https://huggingface.co/flair/ner-english-large"),
("HuggingFace/obi/deid_roberta_i2b2", "https://huggingface.co/obi/deid_roberta_i2b2"),
("HuggingFace/StanfordAIMI/stanford-deidentifier-base", "https://huggingface.co/StanfordAIMI/stanford-deidentifier-base"),
]
st_model = st.sidebar.selectbox("NER model package", [model[0] for model in model_list], index=0, help="Pick your PHI-hunting hero!")
st.sidebar.markdown(f"[View model on HuggingFace]({next(url for model, url in model_list if model == st_model)})")
st_model_package = st_model.split("/")[0]
st_model = st_model if st_model_package.lower() != "huggingface" else "/".join(st_model.split("/")[1:])
analyzer_params = (st_model_package, st_model)
st.sidebar.warning("Models may take a sec to wake up!")
st_operator = st.sidebar.selectbox("De-identification approach", ["replace", "redact", "mask"], index=0, help="Choose how to zap PHI!")
st_threshold = st.sidebar.slider("Acceptance threshold", 0.0, 1.0, 0.35)
st_return_decision_process = st.sidebar.checkbox("Add analysis explanations", False)
with st.sidebar.expander("Allowlists and denylists"):
st_allow_list = st_tags(label="Add words to allowlist", text="Enter word and press enter.")
st_deny_list = st_tags(label="Add words to denylist", text="Enter word and press enter.")
# Main panel
col1, col2 = st.columns(2)
with col1:
st.subheader("Input")
uploaded_file = st.file_uploader("Upload PDF", type=["pdf"])
if uploaded_file:
try:
pdf_path = save_pdf(uploaded_file)
if not pdf_path:
raise ValueError("PDF save flopped!")
text = read_pdf(pdf_path)
if not text:
raise ValueError("No text in that PDF!")
analyzer = analyzer_engine(*analyzer_params)
st_analyze_results = analyze(
analyzer=analyzer,
text=text,
entities=get_supported_entities(*analyzer_params),
language="en",
score_threshold=st_threshold,
return_decision_process=st_return_decision_process,
allow_list=st_allow_list,
deny_list=st_deny_list,
)
phi_types = set(res.entity_type for res in st_analyze_results)
if phi_types:
st.success(f"Removed PHI types: {', '.join(phi_types)}")
else:
st.info("No PHI detected")
anonymized_result = anonymize(text=text, operator=st_operator, analyze_results=st_analyze_results)
timestamp = get_timestamp_prefix()
output_filename = f"{timestamp}_{uploaded_file.name}"
pdf_output = create_pdf(anonymized_result.text, pdf_path, output_filename)
if not pdf_output:
raise ValueError("PDF creation tanked!")
with open(output_filename, "rb") as f:
pdf_bytes = f.read()
b64 = base64.b64encode(pdf_bytes).decode()
st.markdown(f'<a href="data:application/pdf;base64,{b64}" download="{output_filename}">Download de-identified PDF</a>', unsafe_allow_html=True)
with col2:
st.subheader("Findings")
if st_analyze_results:
df = pd.DataFrame.from_records([r.to_dict() for r in st_analyze_results])
df["text"] = [text[res.start:res.end] for res in st_analyze_results]
df_subset = df[["entity_type", "text", "start", "end", "score"]].rename(
{"entity_type": "Entity type", "text": "Text", "start": "Start", "end": "End", "score": "Confidence"}, axis=1
)
if st_return_decision_process:
analysis_explanation_df = pd.DataFrame.from_records([r.analysis_explanation.to_dict() for r in st_analyze_results])
df_subset = pd.concat([df_subset, analysis_explanation_df], axis=1)
st.dataframe(df_subset.reset_index(drop=True), use_container_width=True)
else:
st.text("No findings")
if os.path.exists(pdf_path):
os.remove(pdf_path)
except Exception as e:
st.error(f"Oops, something broke: {str(e)}")
logger.error(f"Processing error: {str(e)}")