File size: 9,840 Bytes
35c70df
 
 
 
 
 
 
 
77c02fb
331cb9f
 
 
35c70df
331cb9f
35c70df
 
 
331cb9f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
35c70df
 
 
545e6f3
35c70df
 
331cb9f
 
35c70df
331cb9f
55290a8
331cb9f
 
 
 
 
35c70df
 
 
 
 
 
 
 
 
bbda733
77c02fb
 
331cb9f
77c02fb
cabea79
331cb9f
 
bbda733
 
 
 
 
 
 
 
 
 
 
 
 
35c70df
bbda733
331cb9f
cabea79
bbda733
77c02fb
 
331cb9f
 
 
 
 
bbda733
 
 
 
 
 
331cb9f
bbda733
 
331cb9f
bbda733
 
 
 
77c02fb
 
bbda733
331cb9f
bbda733
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
import logging
import os
import base64
import datetime
import dotenv
import pandas as pd
import streamlit as st
from streamlit_tags import st_tags
from PyPDF2 import PdfReader, PdfWriter
from presidio_analyzer import AnalyzerEngine, RecognizerRegistry, PatternRecognizer, RecognizerResult
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig

st.set_page_config(page_title="Presidio PHI De-identification", layout="wide", initial_sidebar_state="expanded", menu_items={"About": "https://microsoft.github.io/presidio/"})
dotenv.load_dotenv()
logger = logging.getLogger("presidio-streamlit")

def get_timestamp_prefix() -> str:
    """🕒 Stamps time like a boss with Central flair!"""
    central = pytz.timezone("US/Central")
    return datetime.now(central).strftime("%I%M%p_%d-%m-%y").upper()

def nlp_engine_and_registry(model_family: str, model_path: str) -> tuple[object, RecognizerRegistry]:
    """🤖 Fires up NLP engines with a spark of genius!"""
    registry = RecognizerRegistry()
    if model_family.lower() == "flair":
        from flair.models import SequenceTagger
        tagger = SequenceTagger.load(model_path)
        registry.load_predefined_recognizers()
        registry.add_recognizer_from_dict({"name": "flair_recognizer", "supported_language": "en", "supported_entities": ["PERSON", "LOCATION", "ORGANIZATION"], "model": model_path, "package": "flair"})
        return tagger, registry
    elif model_family.lower() == "huggingface":
        from transformers import pipeline
        nlp = pipeline("ner", model=model_path, tokenizer=model_path)
        registry.load_predefined_recognizers()
        registry.add_recognizer_from_dict({"name": "huggingface_recognizer", "supported_language": "en", "supported_entities": ["PERSON", "LOCATION", "ORGANIZATION", "DATE_TIME"], "model": model_path, "package": "transformers"})
        return nlp, registry
    raise ValueError(f"Model family {model_family} not supported")

def analyzer_engine(model_family: str, model_path: str) -> AnalyzerEngine:
    """🔍 Unleashes the PHI-sniffing bloodhound!"""
    nlp_engine, registry = nlp_engine_and_registry(model_family, model_path)
    return AnalyzerEngine(registry=registry)

def get_supported_entities(model_family: str, model_path: str) -> list[str]:
    """📋 Lists what secrets we’re hunting—PHI beware!"""
    if model_family.lower() == "huggingface":
        return ["PERSON", "LOCATION", "ORGANIZATION", "DATE_TIME"]
    elif model_family.lower() == "flair":
        return ["PERSON", "LOCATION", "ORGANIZATION"]
    return ["PERSON", "LOCATION", "ORGANIZATION"]

# Feature Spotlight: 🕵️‍♂️ The Great PHI Hunt Begins!
# With a flick of the wrist, we summon models to sniff out sensitive data in PDFs, making privacy a breeze! 😎

def analyze(analyzer: AnalyzerEngine, text: str, entities: list[str], language: str, score_threshold: float, return_decision_process: bool, allow_list: list[str], deny_list: list[str]) -> list[RecognizerResult]:
    """🦸 Swoops in to spot PHI with laser precision!"""
    results = analyzer.analyze(text=text, entities=entities, language=language, score_threshold=score_threshold, return_decision_process=return_decision_process)
    filtered_results = []
    for result in results:
        text_snippet = text[result.start:result.end].lower()
        if any(word.lower() in text_snippet for word in allow_list):
            continue
        if any(word.lower() in text_snippet for word in deny_list) or not deny_list:
            filtered_results.append(result)
    return filtered_results

def anonymize(text: str, operator: str, analyze_results: list[RecognizerResult], mask_char: str = "*", number_of_chars: int = 15) -> dict:
    """🕵️‍♀️ Cloaks PHI in a disguise—poof, it’s gone!"""
    anonymizer = AnonymizerEngine()
    operator_config = {"DEFAULT": OperatorConfig(operator, {})}
    if operator == "mask":
        operator_config["DEFAULT"] = OperatorConfig(operator, {"masking_char": mask_char, "chars_to_mask": number_of_chars})
    return anonymizer.anonymize(text=text, analyzer_results=analyze_results, operators=operator_config)

def create_ad_hoc_deny_list_recognizer(deny_list: list[str] = None) -> PatternRecognizer:
    """🚨 Builds a naughty list to catch sneaky PHI!"""
    if not deny_list:
        return None
    return PatternRecognizer(supported_entity="GENERIC_PII", deny_list=deny_list)

def save_pdf(pdf_input) -> str:
    """💾 Drops PDFs onto disk like hot cakes!"""
    original_name = pdf_input.name
    with open(original_name, "wb") as f:
        f.write(pdf_input.read())
    return original_name

# Feature Spotlight: 📄 PDF Magic Unleashed!
# Upload a PDF, zap the PHI, and grab a shiny new file—all with a timestamp swagger! ✨

def read_pdf(pdf_path: str) -> str:
    """📖 Slurps up PDF text like a thirsty camel!"""
    reader = PdfReader(pdf_path)
    return "".join(page.extract_text() or "" + "\n" for page in reader.pages)

def create_pdf(text: str, input_path: str, output_filename: str) -> str:
    """🖨️ Crafts a fresh PDF with PHI-proof swagger!"""
    reader = PdfReader(input_path)
    writer = PdfWriter()
    for page in reader.pages:
        writer.add_page(page)
    with open(output_filename, "wb") as f:
        writer.write(f)
    return output_filename

# Sidebar setup
st.sidebar.header("PHI De-identification with Presidio")
model_list = [
    ("flair/ner-english-large", "https://huggingface.co/flair/ner-english-large"),
    ("HuggingFace/obi/deid_roberta_i2b2", "https://huggingface.co/obi/deid_roberta_i2b2"),
    ("HuggingFace/StanfordAIMI/stanford-deidentifier-base", "https://huggingface.co/StanfordAIMI/stanford-deidentifier-base"),
]
st_model = st.sidebar.selectbox("NER model package", [model[0] for model in model_list], index=0, help="Pick your PHI-hunting hero!")
st.sidebar.markdown(f"[View model on HuggingFace]({next(url for model, url in model_list if model == st_model)})")
st_model_package = st_model.split("/")[0]
st_model = st_model if st_model_package.lower() != "huggingface" else "/".join(st_model.split("/")[1:])
analyzer_params = (st_model_package, st_model)
st.sidebar.warning("Models may take a sec to wake up!")
st_operator = st.sidebar.selectbox("De-identification approach", ["replace", "redact", "mask"], index=0, help="Choose how to zap PHI!")
st_threshold = st.sidebar.slider("Acceptance threshold", 0.0, 1.0, 0.35)
st_return_decision_process = st.sidebar.checkbox("Add analysis explanations", False)
with st.sidebar.expander("Allowlists and denylists"):
    st_allow_list = st_tags(label="Add words to allowlist", text="Enter word and press enter.")
    st_deny_list = st_tags(label="Add words to denylist", text="Enter word and press enter.")

# Main panel
col1, col2 = st.columns(2)
with col1:
    st.subheader("Input")
    uploaded_file = st.file_uploader("Upload PDF", type=["pdf"])
    if uploaded_file:
        try:
            pdf_path = save_pdf(uploaded_file)
            if not pdf_path:
                raise ValueError("PDF save flopped!")
            text = read_pdf(pdf_path)
            if not text:
                raise ValueError("No text in that PDF!")
            analyzer = analyzer_engine(*analyzer_params)
            st_analyze_results = analyze(
                analyzer=analyzer,
                text=text,
                entities=get_supported_entities(*analyzer_params),
                language="en",
                score_threshold=st_threshold,
                return_decision_process=st_return_decision_process,
                allow_list=st_allow_list,
                deny_list=st_deny_list,
            )
            phi_types = set(res.entity_type for res in st_analyze_results)
            if phi_types:
                st.success(f"Removed PHI types: {', '.join(phi_types)}")
            else:
                st.info("No PHI detected")
            anonymized_result = anonymize(text=text, operator=st_operator, analyze_results=st_analyze_results)
            timestamp = get_timestamp_prefix()
            output_filename = f"{timestamp}_{uploaded_file.name}"
            pdf_output = create_pdf(anonymized_result.text, pdf_path, output_filename)
            if not pdf_output:
                raise ValueError("PDF creation tanked!")
            with open(output_filename, "rb") as f:
                pdf_bytes = f.read()
                b64 = base64.b64encode(pdf_bytes).decode()
                st.markdown(f'<a href="data:application/pdf;base64,{b64}" download="{output_filename}">Download de-identified PDF</a>', unsafe_allow_html=True)
            with col2:
                st.subheader("Findings")
                if st_analyze_results:
                    df = pd.DataFrame.from_records([r.to_dict() for r in st_analyze_results])
                    df["text"] = [text[res.start:res.end] for res in st_analyze_results]
                    df_subset = df[["entity_type", "text", "start", "end", "score"]].rename(
                        {"entity_type": "Entity type", "text": "Text", "start": "Start", "end": "End", "score": "Confidence"}, axis=1
                    )
                    if st_return_decision_process:
                        analysis_explanation_df = pd.DataFrame.from_records([r.analysis_explanation.to_dict() for r in st_analyze_results])
                        df_subset = pd.concat([df_subset, analysis_explanation_df], axis=1)
                    st.dataframe(df_subset.reset_index(drop=True), use_container_width=True)
                else:
                    st.text("No findings")
            if os.path.exists(pdf_path):
                os.remove(pdf_path)
        except Exception as e:
            st.error(f"Oops, something broke: {str(e)}")
            logger.error(f"Processing error: {str(e)}")