Spaces:
Sleeping
Sleeping
File size: 9,840 Bytes
35c70df 77c02fb 331cb9f 35c70df 331cb9f 35c70df 331cb9f 35c70df 545e6f3 35c70df 331cb9f 35c70df 331cb9f 55290a8 331cb9f 35c70df bbda733 77c02fb 331cb9f 77c02fb cabea79 331cb9f bbda733 35c70df bbda733 331cb9f cabea79 bbda733 77c02fb 331cb9f bbda733 331cb9f bbda733 331cb9f bbda733 77c02fb bbda733 331cb9f bbda733 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
import logging
import os
import base64
import datetime
import dotenv
import pandas as pd
import streamlit as st
from streamlit_tags import st_tags
from PyPDF2 import PdfReader, PdfWriter
from presidio_analyzer import AnalyzerEngine, RecognizerRegistry, PatternRecognizer, RecognizerResult
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
st.set_page_config(page_title="Presidio PHI De-identification", layout="wide", initial_sidebar_state="expanded", menu_items={"About": "https://microsoft.github.io/presidio/"})
dotenv.load_dotenv()
logger = logging.getLogger("presidio-streamlit")
def get_timestamp_prefix() -> str:
"""🕒 Stamps time like a boss with Central flair!"""
central = pytz.timezone("US/Central")
return datetime.now(central).strftime("%I%M%p_%d-%m-%y").upper()
def nlp_engine_and_registry(model_family: str, model_path: str) -> tuple[object, RecognizerRegistry]:
"""🤖 Fires up NLP engines with a spark of genius!"""
registry = RecognizerRegistry()
if model_family.lower() == "flair":
from flair.models import SequenceTagger
tagger = SequenceTagger.load(model_path)
registry.load_predefined_recognizers()
registry.add_recognizer_from_dict({"name": "flair_recognizer", "supported_language": "en", "supported_entities": ["PERSON", "LOCATION", "ORGANIZATION"], "model": model_path, "package": "flair"})
return tagger, registry
elif model_family.lower() == "huggingface":
from transformers import pipeline
nlp = pipeline("ner", model=model_path, tokenizer=model_path)
registry.load_predefined_recognizers()
registry.add_recognizer_from_dict({"name": "huggingface_recognizer", "supported_language": "en", "supported_entities": ["PERSON", "LOCATION", "ORGANIZATION", "DATE_TIME"], "model": model_path, "package": "transformers"})
return nlp, registry
raise ValueError(f"Model family {model_family} not supported")
def analyzer_engine(model_family: str, model_path: str) -> AnalyzerEngine:
"""🔍 Unleashes the PHI-sniffing bloodhound!"""
nlp_engine, registry = nlp_engine_and_registry(model_family, model_path)
return AnalyzerEngine(registry=registry)
def get_supported_entities(model_family: str, model_path: str) -> list[str]:
"""📋 Lists what secrets we’re hunting—PHI beware!"""
if model_family.lower() == "huggingface":
return ["PERSON", "LOCATION", "ORGANIZATION", "DATE_TIME"]
elif model_family.lower() == "flair":
return ["PERSON", "LOCATION", "ORGANIZATION"]
return ["PERSON", "LOCATION", "ORGANIZATION"]
# Feature Spotlight: 🕵️♂️ The Great PHI Hunt Begins!
# With a flick of the wrist, we summon models to sniff out sensitive data in PDFs, making privacy a breeze! 😎
def analyze(analyzer: AnalyzerEngine, text: str, entities: list[str], language: str, score_threshold: float, return_decision_process: bool, allow_list: list[str], deny_list: list[str]) -> list[RecognizerResult]:
"""🦸 Swoops in to spot PHI with laser precision!"""
results = analyzer.analyze(text=text, entities=entities, language=language, score_threshold=score_threshold, return_decision_process=return_decision_process)
filtered_results = []
for result in results:
text_snippet = text[result.start:result.end].lower()
if any(word.lower() in text_snippet for word in allow_list):
continue
if any(word.lower() in text_snippet for word in deny_list) or not deny_list:
filtered_results.append(result)
return filtered_results
def anonymize(text: str, operator: str, analyze_results: list[RecognizerResult], mask_char: str = "*", number_of_chars: int = 15) -> dict:
"""🕵️♀️ Cloaks PHI in a disguise—poof, it’s gone!"""
anonymizer = AnonymizerEngine()
operator_config = {"DEFAULT": OperatorConfig(operator, {})}
if operator == "mask":
operator_config["DEFAULT"] = OperatorConfig(operator, {"masking_char": mask_char, "chars_to_mask": number_of_chars})
return anonymizer.anonymize(text=text, analyzer_results=analyze_results, operators=operator_config)
def create_ad_hoc_deny_list_recognizer(deny_list: list[str] = None) -> PatternRecognizer:
"""🚨 Builds a naughty list to catch sneaky PHI!"""
if not deny_list:
return None
return PatternRecognizer(supported_entity="GENERIC_PII", deny_list=deny_list)
def save_pdf(pdf_input) -> str:
"""💾 Drops PDFs onto disk like hot cakes!"""
original_name = pdf_input.name
with open(original_name, "wb") as f:
f.write(pdf_input.read())
return original_name
# Feature Spotlight: 📄 PDF Magic Unleashed!
# Upload a PDF, zap the PHI, and grab a shiny new file—all with a timestamp swagger! ✨
def read_pdf(pdf_path: str) -> str:
"""📖 Slurps up PDF text like a thirsty camel!"""
reader = PdfReader(pdf_path)
return "".join(page.extract_text() or "" + "\n" for page in reader.pages)
def create_pdf(text: str, input_path: str, output_filename: str) -> str:
"""🖨️ Crafts a fresh PDF with PHI-proof swagger!"""
reader = PdfReader(input_path)
writer = PdfWriter()
for page in reader.pages:
writer.add_page(page)
with open(output_filename, "wb") as f:
writer.write(f)
return output_filename
# Sidebar setup
st.sidebar.header("PHI De-identification with Presidio")
model_list = [
("flair/ner-english-large", "https://huggingface.co/flair/ner-english-large"),
("HuggingFace/obi/deid_roberta_i2b2", "https://huggingface.co/obi/deid_roberta_i2b2"),
("HuggingFace/StanfordAIMI/stanford-deidentifier-base", "https://huggingface.co/StanfordAIMI/stanford-deidentifier-base"),
]
st_model = st.sidebar.selectbox("NER model package", [model[0] for model in model_list], index=0, help="Pick your PHI-hunting hero!")
st.sidebar.markdown(f"[View model on HuggingFace]({next(url for model, url in model_list if model == st_model)})")
st_model_package = st_model.split("/")[0]
st_model = st_model if st_model_package.lower() != "huggingface" else "/".join(st_model.split("/")[1:])
analyzer_params = (st_model_package, st_model)
st.sidebar.warning("Models may take a sec to wake up!")
st_operator = st.sidebar.selectbox("De-identification approach", ["replace", "redact", "mask"], index=0, help="Choose how to zap PHI!")
st_threshold = st.sidebar.slider("Acceptance threshold", 0.0, 1.0, 0.35)
st_return_decision_process = st.sidebar.checkbox("Add analysis explanations", False)
with st.sidebar.expander("Allowlists and denylists"):
st_allow_list = st_tags(label="Add words to allowlist", text="Enter word and press enter.")
st_deny_list = st_tags(label="Add words to denylist", text="Enter word and press enter.")
# Main panel
col1, col2 = st.columns(2)
with col1:
st.subheader("Input")
uploaded_file = st.file_uploader("Upload PDF", type=["pdf"])
if uploaded_file:
try:
pdf_path = save_pdf(uploaded_file)
if not pdf_path:
raise ValueError("PDF save flopped!")
text = read_pdf(pdf_path)
if not text:
raise ValueError("No text in that PDF!")
analyzer = analyzer_engine(*analyzer_params)
st_analyze_results = analyze(
analyzer=analyzer,
text=text,
entities=get_supported_entities(*analyzer_params),
language="en",
score_threshold=st_threshold,
return_decision_process=st_return_decision_process,
allow_list=st_allow_list,
deny_list=st_deny_list,
)
phi_types = set(res.entity_type for res in st_analyze_results)
if phi_types:
st.success(f"Removed PHI types: {', '.join(phi_types)}")
else:
st.info("No PHI detected")
anonymized_result = anonymize(text=text, operator=st_operator, analyze_results=st_analyze_results)
timestamp = get_timestamp_prefix()
output_filename = f"{timestamp}_{uploaded_file.name}"
pdf_output = create_pdf(anonymized_result.text, pdf_path, output_filename)
if not pdf_output:
raise ValueError("PDF creation tanked!")
with open(output_filename, "rb") as f:
pdf_bytes = f.read()
b64 = base64.b64encode(pdf_bytes).decode()
st.markdown(f'<a href="data:application/pdf;base64,{b64}" download="{output_filename}">Download de-identified PDF</a>', unsafe_allow_html=True)
with col2:
st.subheader("Findings")
if st_analyze_results:
df = pd.DataFrame.from_records([r.to_dict() for r in st_analyze_results])
df["text"] = [text[res.start:res.end] for res in st_analyze_results]
df_subset = df[["entity_type", "text", "start", "end", "score"]].rename(
{"entity_type": "Entity type", "text": "Text", "start": "Start", "end": "End", "score": "Confidence"}, axis=1
)
if st_return_decision_process:
analysis_explanation_df = pd.DataFrame.from_records([r.analysis_explanation.to_dict() for r in st_analyze_results])
df_subset = pd.concat([df_subset, analysis_explanation_df], axis=1)
st.dataframe(df_subset.reset_index(drop=True), use_container_width=True)
else:
st.text("No findings")
if os.path.exists(pdf_path):
os.remove(pdf_path)
except Exception as e:
st.error(f"Oops, something broke: {str(e)}")
logger.error(f"Processing error: {str(e)}") |