Spaces:
Running
Running
File size: 7,847 Bytes
35c70df 77c02fb 5b4c45e 35c70df 545e6f3 35c70df 545e6f3 35c70df 545e6f3 35c70df 55290a8 bbda733 35c70df cabea79 77c02fb cabea79 77c02fb cabea79 77c02fb cabea79 77c02fb cabea79 77c02fb cabea79 77c02fb cabea79 35c70df bbda733 77c02fb bbda733 77c02fb cabea79 bbda733 545e6f3 bbda733 35c70df bbda733 cabea79 bbda733 cabea79 77c02fb cabea79 bbda733 77c02fb bbda733 77c02fb bbda733 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 |
import logging
import os
import base64
import datetime
import dotenv
import pandas as pd
import streamlit as st
from streamlit_tags import st_tags
from PyPDF2 import PdfReader, PdfWriter
from presidio_helpers import (
analyzer_engine,
get_supported_entities,
analyze,
anonymize,
)
st.set_page_config(
page_title="Presidio PHI De-identification",
layout="wide",
initial_sidebar_state="expanded",
menu_items={"About": "https://microsoft.github.io/presidio/"},
)
dotenv.load_dotenv()
logger = logging.getLogger("presidio-streamlit")
# Sidebar
st.sidebar.header("PHI De-identification with Presidio")
model_help_text = "Select Named Entity Recognition (NER) model for PHI detection."
model_list = [
("flair/ner-english-large", "https://huggingface.co/flair/ner-english-large"),
("HuggingFace/obi/deid_roberta_i2b2", "https://huggingface.co/obi/deid_roberta_i2b2"),
("HuggingFace/StanfordAIMI/stanford-deidentifier-base", "https://huggingface.co/StanfordAIMI/stanford-deidentifier-base"),
]
st_model = st.sidebar.selectbox(
"NER model package",
[model[0] for model in model_list],
index=0,
help=model_help_text,
)
# Display HuggingFace link for selected model
selected_model_url = next(url for model, url in model_list if model == st_model)
st.sidebar.markdown(f"[View model on HuggingFace]({selected_model_url})")
# Extract model package
st_model_package = st_model.split("/")[0]
st_model = st_model if st_model_package.lower() not in ("huggingface") else "/".join(st_model.split("/")[1:])
analyzer_params = (st_model_package, st_model)
st.sidebar.warning("Note: Models might take some time to download on first run.")
st_operator = st.sidebar.selectbox(
"De-identification approach",
["replace", "redact", "mask"],
index=0,
help="Select PHI manipulation method.",
)
st_threshold = st.sidebar.slider(
label="Acceptance threshold",
min_value=0.0,
max_value=1.0,
value=0.35,
)
st_return_decision_process = st.sidebar.checkbox(
"Add analysis explanations",
value=False,
)
# Allow and deny lists
with st.sidebar.expander("Allowlists and denylists", expanded=False):
st_allow_list = st_tags(label="Add words to allowlist", text="Enter word and press enter.")
st_deny_list = st_tags(label="Add words to denylist", text="Enter word and press enter.")
# PDF processing functions
def get_timestamp_prefix():
central = pytz.timezone("US/Central")
now = datetime.now(central)
return now.strftime("%I%M%p_%d-%m-%y").upper()
def save_pdf(pdf_input):
"""Save uploaded PDF to disk."""
try:
original_name = pdf_input.name
with open(original_name, "wb") as f:
f.write(pdf_input.read())
return original_name
except Exception as e:
st.error(f"Failed to save PDF: {str(e)}")
return None
def read_pdf(pdf_path):
"""Read text from a PDF using PyPDF2."""
try:
reader = PdfReader(pdf_path)
text = ""
for page in reader.pages:
page_text = page.extract_text() or ""
text += page_text + "\n"
return text
except Exception as e:
st.error(f"Failed to read PDF: {str(e)}")
return None
def create_pdf(text, input_path, output_filename):
"""Create a PDF with anonymized text using PyPDF2."""
try:
reader = PdfReader(input_path)
writer = PdfWriter()
for page in reader.pages:
writer.add_page(page)
with open(output_filename, "wb") as f:
writer.write(f)
return output_filename
except Exception as e:
st.error(f"Failed to create PDF: {str(e)}")
return None
# Main panel
col1, col2 = st.columns(2)
with col1:
st.subheader("Input")
uploaded_file = st.file_uploader("Upload PDF", type=["pdf"])
if uploaded_file:
try:
# Save PDF to disk
pdf_path = save_pdf(uploaded_file)
if not pdf_path:
raise ValueError("Failed to save PDF")
# Read PDF
text = read_pdf(pdf_path)
if not text:
raise ValueError("No text extracted from PDF")
# Initialize analyzer
try:
analyzer = analyzer_engine(*analyzer_params)
except Exception as e:
st.error(f"Failed to load model: {str(e)}")
st.info("Ensure models are downloaded and check network/permissions.")
raise
# Analyze
st_analyze_results = analyze(
analyzer=analyzer,
text=text,
entities=get_supported_entities(*analyzer_params),
language="en",
score_threshold=st_threshold,
return_decision_process=st_return_decision_process,
allow_list=st_allow_list,
deny_list=st_deny_list,
)
# Process results
phi_types = set(res.entity_type for res in st_analyze_results)
if phi_types:
st.success(f"Removed PHI types: {', '.join(phi_types)}")
else:
st.info("No PHI detected")
# Anonymize
anonymized_result = anonymize(
text=text,
operator=st_operator,
analyze_results=st_analyze_results,
)
# Generate output filename with timestamp
timestamp = get_timestamp_prefix()
output_filename = f"{timestamp}_{uploaded_file.name}"
# Create new PDF
pdf_output = create_pdf(anonymized_result.text, pdf_path, output_filename)
if not pdf_output:
raise ValueError("Failed to generate PDF")
# Generate base64 download link
try:
with open(output_filename, "rb") as f:
pdf_bytes = f.read()
b64 = base64.b64encode(pdf_bytes).decode()
href = f'<a href="data:application/pdf;base64,{b64}" download="{output_filename}">Download de-identified PDF</a>'
st.markdown(href, unsafe_allow_html=True)
except Exception as e:
st.error(f"Error generating download link: {str(e)}")
raise
# Display findings
with col2:
st.subheader("Findings")
if st_analyze_results:
df = pd.DataFrame.from_records([r.to_dict() for r in st_analyze_results])
df["text"] = [text[res.start:res.end] for res in st_analyze_results]
df_subset = df[["entity_type", "text", "start", "end", "score"]].rename(
{
"entity_type": "Entity type",
"text": "Text",
"start": "Start",
"end": "End",
"score": "Confidence",
},
axis=1,
)
if st_return_decision_process:
analysis_explanation_df = pd.DataFrame.from_records(
[r.analysis_explanation.to_dict() for r in st_analyze_results]
)
df_subset = pd.concat([df_subset, analysis_explanation_df], axis=1)
st.dataframe(df_subset.reset_index(drop=True), use_container_width=True)
else:
st.text("No findings")
# Clean up temporary file
if os.path.exists(pdf_path):
os.remove(pdf_path)
except Exception as e:
st.error(f"An error occurred: {str(e)}")
logger.error(f"Processing error: {str(e)}") |