|
import os |
|
import re |
|
import pandas as pd |
|
import numpy as np |
|
import pandas as pd |
|
import string |
|
import random |
|
from xml.etree.ElementTree import Element, SubElement, tostring, parse |
|
from xml.dom import minidom |
|
import uuid |
|
from typing import List, Tuple, Dict, Set |
|
from gradio_image_annotation import image_annotator |
|
from gradio_image_annotation.image_annotator import AnnotatedImageData |
|
from pymupdf import Document, Rect |
|
import pymupdf |
|
from PIL import ImageDraw, Image |
|
from datetime import datetime, timezone, timedelta |
|
from collections import defaultdict |
|
import gradio as gr |
|
|
|
from tools.config import OUTPUT_FOLDER, MAX_IMAGE_PIXELS, INPUT_FOLDER, COMPRESS_REDACTED_PDF |
|
from tools.file_conversion import is_pdf, convert_annotation_json_to_review_df, convert_review_df_to_annotation_json, process_single_page_for_image_conversion, multiply_coordinates_by_page_sizes, convert_annotation_data_to_dataframe, remove_duplicate_images_with_blank_boxes, fill_missing_ids, divide_coordinates_by_page_sizes, save_pdf_with_or_without_compression, fill_missing_ids_in_list |
|
from tools.helper_functions import get_file_name_without_type, detect_file_type |
|
from tools.file_redaction import redact_page_with_pymupdf |
|
|
|
if not MAX_IMAGE_PIXELS: Image.MAX_IMAGE_PIXELS = None |
|
|
|
def decrease_page(number:int, all_annotations:dict): |
|
''' |
|
Decrease page number for review redactions page. |
|
''' |
|
if not all_annotations: |
|
raise Warning("No annotator object loaded") |
|
|
|
if number > 1: |
|
return number - 1, number - 1 |
|
elif number <= 1: |
|
|
|
raise Warning("At first page") |
|
else: |
|
raise Warning("At first page") |
|
|
|
def increase_page(number:int, all_annotations:dict): |
|
''' |
|
Increase page number for review redactions page. |
|
''' |
|
|
|
if not all_annotations: |
|
raise Warning("No annotator object loaded") |
|
|
|
|
|
max_pages = len(all_annotations) |
|
|
|
if number < max_pages: |
|
return number + 1, number + 1 |
|
|
|
|
|
else: |
|
raise Warning("At last page") |
|
|
|
def update_zoom(current_zoom_level:int, annotate_current_page:int, decrease:bool=True): |
|
if decrease == False: |
|
if current_zoom_level >= 70: |
|
current_zoom_level -= 10 |
|
else: |
|
if current_zoom_level < 110: |
|
current_zoom_level += 10 |
|
|
|
return current_zoom_level, annotate_current_page |
|
|
|
def update_dropdown_list_based_on_dataframe(df:pd.DataFrame, column:str) -> List["str"]: |
|
''' |
|
Gather unique elements from a string pandas Series, then append 'ALL' to the start and return the list. |
|
''' |
|
if isinstance(df, pd.DataFrame): |
|
|
|
if column not in df.columns or df[column].empty or df[column].isna().all(): |
|
return ["ALL"] |
|
elif column != "page": |
|
entities = df[column].astype(str).unique().tolist() |
|
entities_for_drop = sorted(entities) |
|
entities_for_drop.insert(0, "ALL") |
|
else: |
|
|
|
try: |
|
entities = df[column].astype(int).unique() |
|
entities_for_drop = sorted(entities) |
|
entities_for_drop = [str(e) for e in entities_for_drop] |
|
entities_for_drop.insert(0, "ALL") |
|
except ValueError: |
|
return ["ALL"] |
|
|
|
return entities_for_drop |
|
else: |
|
return ["ALL"] |
|
|
|
def get_filtered_recogniser_dataframe_and_dropdowns(page_image_annotator_object:AnnotatedImageData, |
|
recogniser_dataframe_base:pd.DataFrame, |
|
recogniser_dropdown_value:str, |
|
text_dropdown_value:str, |
|
page_dropdown_value:str, |
|
review_df:pd.DataFrame=list(), |
|
page_sizes:List[str]=list()): |
|
''' |
|
Create a filtered recogniser dataframe and associated dropdowns based on current information in the image annotator and review data frame. |
|
''' |
|
|
|
recogniser_entities_list = ["Redaction"] |
|
recogniser_dataframe_out = recogniser_dataframe_base |
|
recogniser_dataframe_out_gr = gr.Dataframe() |
|
review_dataframe = review_df |
|
|
|
try: |
|
|
|
|
|
review_dataframe = convert_annotation_json_to_review_df(page_image_annotator_object, review_df, page_sizes) |
|
|
|
recogniser_entities_for_drop = update_dropdown_list_based_on_dataframe(review_dataframe, "label") |
|
recogniser_entities_drop = gr.Dropdown(value=recogniser_dropdown_value, choices=recogniser_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
|
|
recogniser_entities_list = [entity for entity in recogniser_entities_for_drop.copy() if entity != 'Redaction' and entity != 'ALL'] |
|
recogniser_entities_list.insert(0, 'Redaction') |
|
|
|
text_entities_for_drop = update_dropdown_list_based_on_dataframe(review_dataframe, "text") |
|
text_entities_drop = gr.Dropdown(value=text_dropdown_value, choices=text_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
page_entities_for_drop = update_dropdown_list_based_on_dataframe(review_dataframe, "page") |
|
page_entities_drop = gr.Dropdown(value=page_dropdown_value, choices=page_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
recogniser_dataframe_out_gr = gr.Dataframe(review_dataframe[["page", "label", "text", "id"]], show_search="filter", col_count=(4, "fixed"), type="pandas", headers=["page", "label", "text", "id"], show_fullscreen_button=True, wrap=True, max_height=400, static_columns=[0,1,2,3]) |
|
|
|
recogniser_dataframe_out = review_dataframe[["page", "label", "text", "id"]] |
|
|
|
except Exception as e: |
|
print("Could not extract recogniser information:", e) |
|
recogniser_dataframe_out = recogniser_dataframe_base[["page", "label", "text", "id"]] |
|
|
|
label_choices = review_dataframe["label"].astype(str).unique().tolist() |
|
text_choices = review_dataframe["text"].astype(str).unique().tolist() |
|
page_choices = review_dataframe["page"].astype(str).unique().tolist() |
|
|
|
recogniser_entities_drop = gr.Dropdown(value=recogniser_dropdown_value, choices=label_choices, allow_custom_value=True, interactive=True) |
|
recogniser_entities_list = ["Redaction"] |
|
text_entities_drop = gr.Dropdown(value=text_dropdown_value, choices=text_choices, allow_custom_value=True, interactive=True) |
|
page_entities_drop = gr.Dropdown(value=page_dropdown_value, choices=page_choices, allow_custom_value=True, interactive=True) |
|
|
|
return recogniser_dataframe_out_gr, recogniser_dataframe_out, recogniser_entities_drop, recogniser_entities_list, text_entities_drop, page_entities_drop |
|
|
|
def update_recogniser_dataframes(page_image_annotator_object:AnnotatedImageData, recogniser_dataframe_base:pd.DataFrame, recogniser_entities_dropdown_value:str="ALL", text_dropdown_value:str="ALL", page_dropdown_value:str="ALL", review_df:pd.DataFrame=list(), page_sizes:list[str]=list()): |
|
''' |
|
Update recogniser dataframe information that appears alongside the pdf pages on the review screen. |
|
''' |
|
recogniser_entities_list = ["Redaction"] |
|
recogniser_dataframe_out = pd.DataFrame() |
|
recogniser_dataframe_out_gr = gr.Dataframe() |
|
|
|
|
|
if recogniser_dataframe_base.empty: |
|
recogniser_dataframe_out_gr, recogniser_dataframe_out, recogniser_entities_drop, recogniser_entities_list, text_entities_drop, page_entities_drop = get_filtered_recogniser_dataframe_and_dropdowns(page_image_annotator_object, recogniser_dataframe_base, recogniser_entities_dropdown_value, text_dropdown_value, page_dropdown_value, review_df, page_sizes) |
|
elif recogniser_dataframe_base.iloc[0,0] == "": |
|
recogniser_dataframe_out_gr, recogniser_dataframe_out, recogniser_entities_dropdown_value, recogniser_entities_list, text_entities_drop, page_entities_drop = get_filtered_recogniser_dataframe_and_dropdowns(page_image_annotator_object, recogniser_dataframe_base, recogniser_entities_dropdown_value, text_dropdown_value, page_dropdown_value, review_df, page_sizes) |
|
else: |
|
recogniser_dataframe_out_gr, recogniser_dataframe_out, recogniser_entities_dropdown, recogniser_entities_list, text_dropdown, page_dropdown = get_filtered_recogniser_dataframe_and_dropdowns(page_image_annotator_object, recogniser_dataframe_base, recogniser_entities_dropdown_value, text_dropdown_value, page_dropdown_value, review_df, page_sizes) |
|
|
|
review_dataframe, text_entities_drop, page_entities_drop = update_entities_df_recogniser_entities(recogniser_entities_dropdown_value, recogniser_dataframe_out, page_dropdown_value, text_dropdown_value) |
|
|
|
recogniser_dataframe_out_gr = gr.Dataframe(review_dataframe[["page", "label", "text", "id"]], show_search="filter", col_count=(4, "fixed"), type="pandas", headers=["page", "label", "text", "id"], show_fullscreen_button=True, wrap=True, max_height=400, static_columns=[0,1,2,3]) |
|
|
|
recogniser_entities_for_drop = update_dropdown_list_based_on_dataframe(recogniser_dataframe_out, "label") |
|
recogniser_entities_drop = gr.Dropdown(value=recogniser_entities_dropdown_value, choices=recogniser_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
recogniser_entities_list_base = recogniser_dataframe_out["label"].astype(str).unique().tolist() |
|
|
|
|
|
recogniser_entities_list = [entity for entity in recogniser_entities_list_base if entity != 'Redaction'] |
|
recogniser_entities_list.insert(0, 'Redaction') |
|
|
|
return recogniser_entities_list, recogniser_dataframe_out_gr, recogniser_dataframe_out, recogniser_entities_drop, text_entities_drop, page_entities_drop |
|
|
|
def undo_last_removal(backup_review_state:pd.DataFrame, backup_image_annotations_state:list[dict], backup_recogniser_entity_dataframe_base:pd.DataFrame): |
|
|
|
if backup_image_annotations_state: |
|
return backup_review_state, backup_image_annotations_state, backup_recogniser_entity_dataframe_base |
|
else: |
|
raise Warning("No actions have been taken to undo") |
|
|
|
def update_annotator_page_from_review_df( |
|
review_df: pd.DataFrame, |
|
image_file_paths:List[str], |
|
page_sizes:List[dict], |
|
current_image_annotations_state:List[str], |
|
current_page_annotator:object, |
|
selected_recogniser_entity_df_row:pd.DataFrame, |
|
input_folder:str, |
|
doc_full_file_name_textbox:str |
|
) -> Tuple[object, List[dict], int, List[dict], pd.DataFrame, int]: |
|
''' |
|
Update the visible annotation object and related objects with the latest review file information, |
|
optimising by processing only the current page's data. |
|
''' |
|
|
|
out_image_annotations_state: List[dict] = list(current_image_annotations_state) |
|
out_current_page_annotator: dict = current_page_annotator |
|
|
|
|
|
|
|
gradio_annotator_current_page_number: int = 1 |
|
annotate_previous_page: int = 0 |
|
|
|
if not selected_recogniser_entity_df_row.empty and 'page' in selected_recogniser_entity_df_row.columns: |
|
try: |
|
selected_page= selected_recogniser_entity_df_row['page'].iloc[0] |
|
gradio_annotator_current_page_number = int(selected_page) |
|
annotate_previous_page = gradio_annotator_current_page_number |
|
except (IndexError, ValueError, TypeError): |
|
print("Warning: Could not extract valid page number from selected_recogniser_entity_df_row. Defaulting to page 1.") |
|
gradio_annotator_current_page_number = 1 |
|
|
|
|
|
if gradio_annotator_current_page_number <= 0: gradio_annotator_current_page_number = 1 |
|
|
|
page_max_reported = len(page_sizes) |
|
if gradio_annotator_current_page_number > page_max_reported: |
|
print("current page is greater than highest page:", page_max_reported) |
|
gradio_annotator_current_page_number = page_max_reported |
|
|
|
page_num_reported_zero_indexed = gradio_annotator_current_page_number - 1 |
|
|
|
|
|
page_sizes_df = pd.DataFrame(page_sizes) |
|
if not page_sizes_df.empty: |
|
|
|
page_sizes_df["page"] = pd.to_numeric(page_sizes_df["page"], errors="coerce") |
|
page_sizes_df.dropna(subset=["page"], inplace=True) |
|
if not page_sizes_df.empty: |
|
page_sizes_df["page"] = page_sizes_df["page"].astype(int) |
|
else: |
|
print("Warning: Page sizes DataFrame became empty after processing.") |
|
|
|
if not review_df.empty: |
|
|
|
|
|
if 'page' in review_df.columns: |
|
review_df['page'] = pd.to_numeric(review_df['page'], errors='coerce').fillna(-1).astype(int) |
|
|
|
current_image_path = out_image_annotations_state[page_num_reported_zero_indexed]['image'] |
|
|
|
replaced_image_path, page_sizes_df = replace_placeholder_image_with_real_image(doc_full_file_name_textbox, current_image_path, page_sizes_df, gradio_annotator_current_page_number, input_folder) |
|
|
|
|
|
page_sizes = page_sizes_df.to_dict(orient='records') |
|
review_df.loc[review_df["page"]==gradio_annotator_current_page_number, 'image'] = replaced_image_path |
|
images_list = list(page_sizes_df["image_path"]) |
|
images_list[page_num_reported_zero_indexed] = replaced_image_path |
|
out_image_annotations_state[page_num_reported_zero_indexed]['image'] = replaced_image_path |
|
|
|
current_page_review_df = review_df[review_df['page'] == gradio_annotator_current_page_number].copy() |
|
current_page_review_df = multiply_coordinates_by_page_sizes(current_page_review_df, page_sizes_df) |
|
|
|
else: |
|
print(f"Warning: 'page' column not found in review_df. Cannot filter for page {gradio_annotator_current_page_number}. Skipping update from review_df.") |
|
current_page_review_df = pd.DataFrame() |
|
|
|
if not current_page_review_df.empty: |
|
|
|
|
|
current_page_annotations_list = list() |
|
|
|
|
|
expected_annotation_keys = ['label', 'color', 'xmin', 'ymin', 'xmax', 'ymax', 'text', 'id'] |
|
|
|
|
|
for key in expected_annotation_keys: |
|
if key not in current_page_review_df.columns: |
|
|
|
|
|
default_value = np.nan if key in ['xmin', 'ymin', 'xmax', 'ymax'] else '' |
|
current_page_review_df[key] = default_value |
|
|
|
|
|
|
|
current_page_annotations_list_raw = current_page_review_df[expected_annotation_keys].to_dict(orient='records') |
|
|
|
current_page_annotations_list = current_page_annotations_list_raw |
|
|
|
|
|
page_state_entry_found = False |
|
for i, page_state_entry in enumerate(out_image_annotations_state): |
|
|
|
|
|
match = re.search(r"(\d+)\.png$", page_state_entry['image']) |
|
if match: page_no = int(match.group(1)) |
|
else: page_no = 0 |
|
|
|
if 'image' in page_state_entry and page_no == page_num_reported_zero_indexed: |
|
|
|
out_image_annotations_state[i]['boxes'] = current_page_annotations_list |
|
|
|
|
|
|
|
if 'image' in current_page_review_df.columns and not current_page_review_df.empty: |
|
|
|
out_image_annotations_state[i]['image'] = current_page_review_df['image'].iloc[0] |
|
page_state_entry_found = True |
|
break |
|
|
|
if not page_state_entry_found: |
|
print(f"Warning: Entry for page {gradio_annotator_current_page_number} not found in current_image_annotations_state. Cannot update page annotations.") |
|
|
|
|
|
|
|
current_image_path = None |
|
if len(out_image_annotations_state) > page_num_reported_zero_indexed and 'image' in out_image_annotations_state[page_num_reported_zero_indexed]: |
|
current_image_path = out_image_annotations_state[page_num_reported_zero_indexed]['image'] |
|
else: |
|
print(f"Warning: Could not get image path from state for page index {page_num_reported_zero_indexed}.") |
|
|
|
|
|
|
|
if current_image_path and not page_sizes_df.empty: |
|
try: |
|
replaced_image_path, page_sizes_df = replace_placeholder_image_with_real_image( |
|
doc_full_file_name_textbox, current_image_path, page_sizes_df, |
|
gradio_annotator_current_page_number, input_folder |
|
) |
|
|
|
|
|
if len(out_image_annotations_state) > page_num_reported_zero_indexed: |
|
out_image_annotations_state[page_num_reported_zero_indexed]['image'] = replaced_image_path |
|
|
|
if 'page' in review_df.columns and 'image' in review_df.columns: |
|
review_df.loc[review_df["page"]==gradio_annotator_current_page_number, 'image'] = replaced_image_path |
|
|
|
except Exception as e: |
|
print(f"Error during image path replacement for page {gradio_annotator_current_page_number}: {e}") |
|
|
|
|
|
|
|
if not page_sizes_df.empty: |
|
page_sizes = page_sizes_df.to_dict(orient='records') |
|
else: |
|
page_sizes = list() |
|
|
|
|
|
|
|
try: |
|
out_image_annotations_state = remove_duplicate_images_with_blank_boxes(out_image_annotations_state) |
|
except Exception as e: |
|
print(f"Error during duplicate removal: {e}. Proceeding without duplicate removal.") |
|
|
|
|
|
|
|
if len(out_image_annotations_state) > page_num_reported_zero_indexed: |
|
out_current_page_annotator = out_image_annotations_state[page_num_reported_zero_indexed] |
|
else: |
|
print(f"Warning: Cannot select current page annotator object for index {page_num_reported_zero_indexed}.") |
|
out_current_page_annotator = {} |
|
|
|
|
|
final_page_number_returned = gradio_annotator_current_page_number |
|
|
|
return (out_current_page_annotator, |
|
out_image_annotations_state, |
|
final_page_number_returned, |
|
page_sizes, |
|
review_df, |
|
annotate_previous_page) |
|
|
|
|
|
|
|
def _generate_unique_ids( |
|
num_ids_to_generate: int, |
|
existing_ids_set: Set[str] |
|
) -> List[str]: |
|
""" |
|
Generates a specified number of unique, 12-character alphanumeric IDs. |
|
|
|
This is a batch-oriented, performant version of the original |
|
`fill_missing_ids_in_list` logic, designed to work efficiently |
|
with DataFrames. |
|
|
|
Args: |
|
num_ids_to_generate (int): The number of unique IDs to create. |
|
existing_ids_set (Set[str]): A set of IDs that are already in use and |
|
should be avoided. |
|
|
|
Returns: |
|
List[str]: A list of newly generated unique IDs. |
|
""" |
|
id_length = 12 |
|
character_set = string.ascii_letters + string.digits |
|
|
|
newly_generated_ids = set() |
|
|
|
|
|
|
|
while len(newly_generated_ids) < num_ids_to_generate: |
|
candidate_id = ''.join(random.choices(character_set, k=id_length)) |
|
|
|
|
|
if candidate_id not in existing_ids_set and candidate_id not in newly_generated_ids: |
|
newly_generated_ids.add(candidate_id) |
|
|
|
return list(newly_generated_ids) |
|
|
|
def _merge_horizontally_adjacent_boxes( |
|
df: pd.DataFrame, |
|
x_merge_threshold: int = 0.02 |
|
) -> pd.DataFrame: |
|
""" |
|
Merges horizontally adjacent bounding boxes within the same line. |
|
|
|
Args: |
|
df (pd.DataFrame): DataFrame containing annotation boxes with columns |
|
like 'page', 'line', 'xmin', 'xmax', etc. |
|
x_merge_threshold (int): The maximum pixel gap on the x-axis to |
|
consider two boxes as adjacent. |
|
|
|
Returns: |
|
pd.DataFrame: A new DataFrame with adjacent boxes merged. |
|
""" |
|
if df.empty: |
|
return df |
|
|
|
|
|
df_sorted = df.sort_values(by=['page', 'line', 'xmin']).copy() |
|
|
|
|
|
|
|
prev_xmax = df_sorted['xmax'].shift(1) |
|
prev_page = df_sorted['page'].shift(1) |
|
prev_line = df_sorted['line'].shift(1) |
|
|
|
|
|
|
|
is_adjacent = ( |
|
(df_sorted['page'] == prev_page) & |
|
(df_sorted['line'] == prev_line) & |
|
(df_sorted['xmin'] - prev_xmax <= x_merge_threshold) |
|
) |
|
|
|
|
|
|
|
df_sorted['merge_group'] = (~is_adjacent).cumsum() |
|
|
|
|
|
|
|
agg_funcs = { |
|
'xmin': 'min', |
|
'ymin': 'min', |
|
'xmax': 'max', |
|
'ymax': 'max', |
|
'text': lambda s: ' '.join(s.astype(str)), |
|
|
|
'page': 'first', |
|
'line': 'first', |
|
'image': 'first', |
|
'label': 'first', |
|
'color': 'first', |
|
} |
|
|
|
merged_df = df_sorted.groupby('merge_group').agg(agg_funcs).reset_index(drop=True) |
|
|
|
print(f"Merged {len(df)} annotations into {len(merged_df)}.") |
|
|
|
return merged_df |
|
|
|
def create_annotation_objects_from_filtered_ocr_results_with_words( |
|
filtered_ocr_results_with_words_df: pd.DataFrame, |
|
ocr_results_with_words_df_base: pd.DataFrame, |
|
page_sizes: List[Dict], |
|
existing_annotations_df: pd.DataFrame, |
|
existing_annotations_list: List[Dict], |
|
existing_recogniser_entity_df: pd.DataFrame, |
|
redaction_label:str = "Redaction", |
|
colour_label:str = '(0, 0, 0)', |
|
progress:gr.Progress=gr.Progress()) -> Tuple[List[Dict], List[Dict], pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]: |
|
""" |
|
This function processes filtered OCR results with words to create new annotation objects. It merges these new annotations with existing ones, ensuring that horizontally adjacent boxes are combined for cleaner redactions. The function also updates the existing recogniser entity DataFrame and returns the updated annotations in both DataFrame and list-of-dicts formats. |
|
|
|
Args: |
|
filtered_ocr_results_with_words_df (pd.DataFrame): A DataFrame containing filtered OCR results with words. |
|
ocr_results_with_words_df_base (pd.DataFrame): The base DataFrame of OCR results with words. |
|
page_sizes (List[Dict]): A list of dictionaries containing page sizes. |
|
existing_annotations_df (pd.DataFrame): A DataFrame of existing annotations. |
|
existing_annotations_list (List[Dict]): A list of dictionaries representing existing annotations. |
|
existing_recogniser_entity_df (pd.DataFrame): A DataFrame of existing recogniser entities. |
|
progress (gr.Progress, optional): A progress tracker. Defaults to gr.Progress(track_tqdm=True). |
|
|
|
Returns: |
|
Tuple[List[Dict], List[Dict], pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]: A tuple containing the updated annotations list, updated existing annotations list, updated annotations DataFrame, updated existing annotations DataFrame, updated recogniser entity DataFrame, and the original existing recogniser entity DataFrame. |
|
""" |
|
|
|
|
|
|
|
fallback_colour = '(0, 0, 0,)' |
|
try: |
|
valid = False |
|
if isinstance(colour_label, str): |
|
label_str = colour_label.strip() |
|
match = re.match(r"^\(\s*(\d{1,3})\s*,\s*(\d{1,3})\s*,\s*(\d{1,3})\s*,?\s*\)$", label_str) |
|
if match: |
|
r_val, g_val, b_val = (int(match.group(1)), int(match.group(2)), int(match.group(3))) |
|
if 0 <= r_val <= 255 and 0 <= g_val <= 255 and 0 <= b_val <= 255: |
|
valid = True |
|
elif isinstance(colour_label, (tuple, list)) and len(colour_label) == 3: |
|
r_val, g_val, b_val = colour_label |
|
if all(isinstance(v, int) for v in (r_val, g_val, b_val)) and all(0 <= v <= 255 for v in (r_val, g_val, b_val)): |
|
colour_label = f'({r_val}, {g_val}, {b_val},)' |
|
valid = True |
|
if not valid: |
|
colour_label = fallback_colour |
|
except Exception: |
|
colour_label = fallback_colour |
|
|
|
progress(0.2, desc="Identifying new redactions to add") |
|
print("Identifying new redactions to add") |
|
if filtered_ocr_results_with_words_df.empty: |
|
print("No new annotations to add.") |
|
updated_annotations_df = existing_annotations_df.copy() |
|
else: |
|
|
|
filtered_ocr_results_with_words_df.index = filtered_ocr_results_with_words_df["index"] |
|
new_annotations_df = ocr_results_with_words_df_base.loc[filtered_ocr_results_with_words_df.index].copy() |
|
|
|
if new_annotations_df.empty: |
|
print("No new annotations to add.") |
|
updated_annotations_df = existing_annotations_df.copy() |
|
else: |
|
page_to_image_map = {item['page']: item['image_path'] for item in page_sizes} |
|
|
|
|
|
new_annotations_df = new_annotations_df.assign( |
|
image=lambda df: df['page'].map(page_to_image_map), |
|
label= redaction_label, |
|
color= colour_label |
|
).rename(columns={ |
|
'word_x0': 'xmin', |
|
'word_y0': 'ymin', |
|
'word_x1': 'xmax', |
|
'word_y1': 'ymax', |
|
'word_text': 'text' |
|
}) |
|
|
|
progress(0.3, desc="Checking for adjacent annotations to merge...") |
|
print("Checking for adjacent annotations to merge...") |
|
new_annotations_df = _merge_horizontally_adjacent_boxes(new_annotations_df) |
|
|
|
progress(0.4, desc="Creating new redaction IDs...") |
|
print("Creating new redaction IDs...") |
|
existing_ids = set(existing_annotations_df['id'].dropna()) if 'id' in existing_annotations_df.columns else set() |
|
num_new_ids = len(new_annotations_df) |
|
new_id_list = _generate_unique_ids(num_new_ids, existing_ids) |
|
new_annotations_df['id'] = new_id_list |
|
|
|
annotation_cols = ['image', 'page', 'label', 'color', 'xmin', 'ymin', 'xmax', 'ymax', 'text', 'id'] |
|
new_annotations_df = new_annotations_df[annotation_cols] |
|
|
|
key_cols = ['page', 'label', 'xmin', 'ymin', 'xmax', 'ymax', 'text'] |
|
|
|
progress(0.5, desc="Checking for duplicate redactions") |
|
|
|
if existing_annotations_df.empty or not all(col in existing_annotations_df.columns for col in key_cols): |
|
unique_new_df = new_annotations_df |
|
else: |
|
|
|
merged = pd.merge( |
|
new_annotations_df, |
|
existing_annotations_df[key_cols].drop_duplicates(), |
|
on=key_cols, |
|
how='left', |
|
indicator=True |
|
) |
|
unique_new_df = merged[merged['_merge'] == 'left_only'].drop(columns=['_merge']) |
|
|
|
|
|
print(f"Found {len(unique_new_df)} new unique annotations to add.") |
|
gr.Info(f"Found {len(unique_new_df)} new unique annotations to add.") |
|
updated_annotations_df = pd.concat([existing_annotations_df, unique_new_df], ignore_index=True) |
|
|
|
|
|
updated_recogniser_entity_df = pd.DataFrame() |
|
if not updated_annotations_df.empty: |
|
updated_recogniser_entity_df = updated_annotations_df[["page", "label", "text", "id"]] |
|
|
|
if not page_sizes: |
|
print("Warning: page_sizes is empty. No pages to process.") |
|
return [], existing_annotations_list, pd.DataFrame(), existing_annotations_df, pd.DataFrame(), existing_recogniser_entity_df |
|
|
|
all_pages_df = pd.DataFrame(page_sizes).rename(columns={'image_path': 'image'}) |
|
|
|
if not updated_annotations_df.empty: |
|
page_to_image_map = {item['page']: item['image_path'] for item in page_sizes} |
|
updated_annotations_df['image'] = updated_annotations_df['page'].map(page_to_image_map) |
|
merged_df = pd.merge(all_pages_df[['image']], updated_annotations_df, on='image', how='left') |
|
else: |
|
merged_df = all_pages_df[['image']] |
|
|
|
|
|
|
|
image_order = all_pages_df['image'].tolist() |
|
|
|
|
|
|
|
merged_df['image'] = pd.Categorical(merged_df['image'], categories=image_order, ordered=True) |
|
|
|
|
|
merged_df = merged_df.sort_values('image') |
|
|
|
|
|
final_annotations_list = list() |
|
box_cols = ['label', 'color', 'xmin', 'ymin', 'xmax', 'ymax', 'text', 'id'] |
|
|
|
|
|
|
|
|
|
for image_path, group in merged_df.groupby('image', sort=False): |
|
|
|
|
|
|
|
|
|
|
|
if pd.isna(group.iloc[0].get('id')): |
|
boxes = list() |
|
else: |
|
valid_box_cols = [col for col in box_cols if col in group.columns] |
|
|
|
sorted_group = group.sort_values(by=['ymin', 'xmin']) |
|
boxes = sorted_group[valid_box_cols].to_dict('records') |
|
|
|
final_annotations_list.append({ |
|
"image": image_path, |
|
"boxes": boxes |
|
}) |
|
|
|
progress(1.0, desc="Completed annotation processing") |
|
|
|
return final_annotations_list, existing_annotations_list, updated_annotations_df, existing_annotations_df, updated_recogniser_entity_df, existing_recogniser_entity_df |
|
|
|
def exclude_selected_items_from_redaction(review_df: pd.DataFrame, |
|
selected_rows_df: pd.DataFrame, |
|
image_file_paths:List[str], |
|
page_sizes:List[dict], |
|
image_annotations_state:dict, |
|
recogniser_entity_dataframe_base:pd.DataFrame): |
|
''' |
|
Remove selected items from the review dataframe from the annotation object and review dataframe. |
|
''' |
|
|
|
backup_review_state = review_df |
|
backup_image_annotations_state = image_annotations_state |
|
backup_recogniser_entity_dataframe_base = recogniser_entity_dataframe_base |
|
|
|
if not selected_rows_df.empty and not review_df.empty: |
|
use_id = ( |
|
"id" in selected_rows_df.columns |
|
and "id" in review_df.columns |
|
and not selected_rows_df["id"].isnull().all() |
|
and not review_df["id"].isnull().all() |
|
) |
|
|
|
selected_merge_cols = ["id"] if use_id else ["label", "page", "text"] |
|
|
|
|
|
selected_subset = selected_rows_df[selected_merge_cols].drop_duplicates(subset=selected_merge_cols) |
|
|
|
|
|
merged_df = review_df.merge(selected_subset, on=selected_merge_cols, how='left', indicator=True) |
|
out_review_df = merged_df[merged_df['_merge'] == 'left_only'].drop(columns=['_merge']) |
|
|
|
out_image_annotations_state = convert_review_df_to_annotation_json(out_review_df, image_file_paths, page_sizes) |
|
|
|
out_recogniser_entity_dataframe_base = out_review_df[["page", "label", "text", "id"]] |
|
|
|
|
|
else: |
|
out_review_df = review_df |
|
out_recogniser_entity_dataframe_base = recogniser_entity_dataframe_base |
|
out_image_annotations_state = image_annotations_state |
|
|
|
return out_review_df, out_image_annotations_state, out_recogniser_entity_dataframe_base, backup_review_state, backup_image_annotations_state, backup_recogniser_entity_dataframe_base |
|
|
|
def replace_annotator_object_img_np_array_with_page_sizes_image_path( |
|
all_image_annotations:List[dict], |
|
page_image_annotator_object:AnnotatedImageData, |
|
page_sizes:List[dict], |
|
page:int): |
|
|
|
''' |
|
Check if the image value in an AnnotatedImageData dict is a placeholder or np.array. If either of these, replace the value with the file path of the image that is hopefully already loaded into the app related to this page. |
|
''' |
|
|
|
page_zero_index = page - 1 |
|
|
|
if isinstance(all_image_annotations[page_zero_index]["image"], np.ndarray) or "placeholder_image" in all_image_annotations[page_zero_index]["image"] or isinstance(page_image_annotator_object['image'], np.ndarray): |
|
page_sizes_df = pd.DataFrame(page_sizes) |
|
page_sizes_df[["page"]] = page_sizes_df[["page"]].apply(pd.to_numeric, errors="coerce") |
|
|
|
|
|
matching_paths = page_sizes_df.loc[page_sizes_df['page'] == page, "image_path"].unique() |
|
|
|
if matching_paths.size > 0: |
|
image_path = matching_paths[0] |
|
page_image_annotator_object['image'] = image_path |
|
all_image_annotations[page_zero_index]["image"] = image_path |
|
else: |
|
print(f"No image path found for page {page}.") |
|
|
|
return page_image_annotator_object, all_image_annotations |
|
|
|
def replace_placeholder_image_with_real_image(doc_full_file_name_textbox:str, current_image_path:str, page_sizes_df:pd.DataFrame, page_num_reported:int, input_folder:str): |
|
''' If image path is still not valid, load in a new image an overwrite it. Then replace all items in the image annotation object for all pages based on the updated information.''' |
|
|
|
page_num_reported_zero_indexed = page_num_reported - 1 |
|
|
|
if not os.path.exists(current_image_path): |
|
|
|
page_num, replaced_image_path, width, height = process_single_page_for_image_conversion(doc_full_file_name_textbox, page_num_reported_zero_indexed, input_folder=input_folder) |
|
|
|
|
|
page_sizes_df.loc[page_sizes_df['page']==page_num_reported, "image_width"] = width |
|
page_sizes_df.loc[page_sizes_df['page']==page_num_reported, "image_height"] = height |
|
page_sizes_df.loc[page_sizes_df['page']==page_num_reported, "image_path"] = replaced_image_path |
|
|
|
else: |
|
if not page_sizes_df.loc[page_sizes_df['page']==page_num_reported, "image_width"].isnull().all(): |
|
width = page_sizes_df.loc[page_sizes_df['page']==page_num_reported, "image_width"].max() |
|
height = page_sizes_df.loc[page_sizes_df['page']==page_num_reported, "image_height"].max() |
|
else: |
|
image = Image.open(current_image_path) |
|
width = image.width |
|
height = image.height |
|
|
|
page_sizes_df.loc[page_sizes_df['page']==page_num_reported, "image_width"] = width |
|
page_sizes_df.loc[page_sizes_df['page']==page_num_reported, "image_height"] = height |
|
|
|
page_sizes_df.loc[page_sizes_df['page']==page_num_reported, "image_path"] = current_image_path |
|
|
|
replaced_image_path = current_image_path |
|
|
|
return replaced_image_path, page_sizes_df |
|
|
|
def update_annotator_object_and_filter_df( |
|
all_image_annotations:List[AnnotatedImageData], |
|
gradio_annotator_current_page_number:int, |
|
recogniser_entities_dropdown_value:str="ALL", |
|
page_dropdown_value:str="ALL", |
|
page_dropdown_redaction_value:str="1", |
|
text_dropdown_value:str="ALL", |
|
recogniser_dataframe_base:pd.DataFrame=None, |
|
zoom:int=100, |
|
review_df:pd.DataFrame=None, |
|
page_sizes:List[dict]=list(), |
|
doc_full_file_name_textbox:str='', |
|
input_folder:str=INPUT_FOLDER |
|
) -> Tuple[image_annotator, gr.Number, gr.Number, int, str, gr.Dataframe, pd.DataFrame, List[str], List[str], List[dict], List[AnnotatedImageData]]: |
|
''' |
|
Update a gradio_image_annotation object with new annotation data for the current page |
|
and update filter dataframes, optimizing by processing only the current page's data for display. |
|
''' |
|
|
|
zoom_str = str(zoom) + '%' |
|
|
|
|
|
if review_df is None or not isinstance(review_df, pd.DataFrame): |
|
review_df = pd.DataFrame(columns=["image", "page", "label", "color", "xmin", "ymin", "xmax", "ymax", "text", "id"]) |
|
if recogniser_dataframe_base is None: |
|
recogniser_dataframe_base = gr.Dataframe(pd.DataFrame(data={"page":[], "label":[], "text":[], "id":[]})) |
|
|
|
|
|
|
|
if not all_image_annotations: |
|
print("No all_image_annotation object found") |
|
|
|
|
|
blank_annotator = image_annotator( |
|
value = None, boxes_alpha=0.1, box_thickness=1, label_list=list(), label_colors=list(), |
|
show_label=False, height=zoom_str, width=zoom_str, box_min_size=1, |
|
box_selected_thickness=2, handle_size=4, sources=None, |
|
show_clear_button=False, show_share_button=False, show_remove_button=False, |
|
handles_cursor=True, interactive=True, use_default_label=True |
|
) |
|
blank_df_out_gr = gr.Dataframe(pd.DataFrame(columns=["page", "label", "text", "id"])) |
|
blank_df_modified = pd.DataFrame(columns=["page", "label", "text", "id"]) |
|
|
|
return (blank_annotator, gr.Number(value=1), gr.Number(value=1), 1, |
|
recogniser_entities_dropdown_value, blank_df_out_gr, blank_df_modified, |
|
[], [], [], [], []) |
|
|
|
|
|
page_num_reported = max(1, gradio_annotator_current_page_number) |
|
page_max_reported = len(all_image_annotations) |
|
if page_num_reported > page_max_reported: |
|
page_num_reported = page_max_reported |
|
|
|
page_num_reported_zero_indexed = page_num_reported - 1 |
|
annotate_previous_page = page_num_reported |
|
|
|
|
|
page_sizes_df = pd.DataFrame(page_sizes) |
|
if not page_sizes_df.empty: |
|
page_sizes_df["page"] = pd.to_numeric(page_sizes_df["page"], errors="coerce") |
|
page_sizes_df.dropna(subset=["page"], inplace=True) |
|
if not page_sizes_df.empty: |
|
page_sizes_df["page"] = page_sizes_df["page"].astype(int) |
|
else: |
|
print("Warning: Page sizes DataFrame became empty after processing.") |
|
|
|
|
|
|
|
if len(all_image_annotations) > page_num_reported_zero_indexed: |
|
|
|
page_object_to_update = all_image_annotations[page_num_reported_zero_indexed] |
|
|
|
|
|
updated_page_object, all_image_annotations_after_img_replace = replace_annotator_object_img_np_array_with_page_sizes_image_path( |
|
all_image_annotations, page_object_to_update, page_sizes, page_num_reported) |
|
|
|
all_image_annotations = all_image_annotations_after_img_replace |
|
|
|
|
|
current_image_path = updated_page_object.get('image') |
|
|
|
if current_image_path and not page_sizes_df.empty: |
|
try: |
|
replaced_image_path, page_sizes_df = replace_placeholder_image_with_real_image( |
|
doc_full_file_name_textbox, current_image_path, page_sizes_df, |
|
page_num_reported, input_folder=input_folder |
|
) |
|
|
|
|
|
|
|
if len(all_image_annotations) > page_num_reported_zero_indexed: |
|
all_image_annotations[page_num_reported_zero_indexed]['image'] = replaced_image_path |
|
|
|
|
|
if 'page' in review_df.columns and 'image' in review_df.columns: |
|
|
|
review_df['page'] = pd.to_numeric(review_df['page'], errors='coerce').fillna(-1).astype(int) |
|
review_df.loc[review_df["page"]==page_num_reported, 'image'] = replaced_image_path |
|
|
|
|
|
except Exception as e: |
|
print(f"Error during image path replacement for page {page_num_reported}: {e}") |
|
else: |
|
print(f"Warning: Page index {page_num_reported_zero_indexed} out of bounds for all_image_annotations list.") |
|
|
|
|
|
|
|
if not page_sizes_df.empty: |
|
page_sizes = page_sizes_df.to_dict(orient='records') |
|
else: |
|
page_sizes = list() |
|
|
|
|
|
current_page_image_annotator_object = None |
|
if len(all_image_annotations) > page_num_reported_zero_indexed: |
|
page_data_for_display = all_image_annotations[page_num_reported_zero_indexed] |
|
|
|
|
|
|
|
current_page_annotations_df = convert_annotation_data_to_dataframe([page_data_for_display]) |
|
|
|
if not current_page_annotations_df.empty and not page_sizes_df.empty: |
|
|
|
try: |
|
|
|
page_size_row = page_sizes_df[page_sizes_df['page'] == page_num_reported] |
|
if not page_size_row.empty: |
|
current_page_annotations_df = multiply_coordinates_by_page_sizes( |
|
current_page_annotations_df, page_size_row, |
|
xmin="xmin", xmax="xmax", ymin="ymin", ymax="ymax" |
|
) |
|
|
|
except Exception as e: |
|
print(f"Warning: Error during coordinate multiplication for page {page_num_reported}: {e}. Using original coordinates.") |
|
|
|
|
|
if "color" not in current_page_annotations_df.columns: |
|
current_page_annotations_df['color'] = '(0, 0, 0)' |
|
|
|
|
|
processed_current_page_annotations_list = current_page_annotations_df[["xmin", "xmax", "ymin", "ymax", "label", "color", "text", "id"]].to_dict(orient='records') |
|
|
|
|
|
current_page_image_annotator_object: AnnotatedImageData = { |
|
'image': page_data_for_display.get('image'), |
|
'boxes': processed_current_page_annotations_list |
|
} |
|
|
|
|
|
|
|
|
|
try: |
|
recogniser_entities_list, recogniser_dataframe_out_gr, recogniser_dataframe_modified, recogniser_entities_dropdown_value, text_entities_drop, page_entities_drop = update_recogniser_dataframes( |
|
all_image_annotations, |
|
recogniser_dataframe_base, |
|
recogniser_entities_dropdown_value, |
|
text_dropdown_value, |
|
page_dropdown_value, |
|
review_df.copy(), |
|
page_sizes |
|
) |
|
|
|
recogniser_colour_list = [(0, 0, 0) for _ in range(len(recogniser_entities_list))] |
|
|
|
except Exception as e: |
|
print(f"Error calling update_recogniser_dataframes: {e}. Returning empty/default filter data.") |
|
recogniser_entities_list = list() |
|
recogniser_colour_list = list() |
|
recogniser_dataframe_out_gr = gr.Dataframe(pd.DataFrame(columns=["page", "label", "text", "id"])) |
|
recogniser_dataframe_modified = pd.DataFrame(columns=["page", "label", "text", "id"]) |
|
text_entities_drop = list() |
|
page_entities_drop = list() |
|
|
|
|
|
|
|
page_number_reported_gradio_comp = gr.Number(label = "Current page", value=page_num_reported, precision=0) |
|
|
|
|
|
|
|
if current_page_image_annotator_object is None: |
|
|
|
|
|
print("Warning: Could not prepare annotator object for the current page.") |
|
out_image_annotator = image_annotator(value=None, interactive=False) |
|
else: |
|
out_image_annotator = image_annotator( |
|
value = current_page_image_annotator_object, |
|
boxes_alpha=0.1, |
|
box_thickness=1, |
|
label_list=recogniser_entities_list, |
|
label_colors=recogniser_colour_list, |
|
show_label=False, |
|
height=zoom_str, |
|
width=zoom_str, |
|
box_min_size=1, |
|
box_selected_thickness=2, |
|
handle_size=4, |
|
sources=None, |
|
show_clear_button=False, |
|
show_share_button=False, |
|
show_remove_button=False, |
|
handles_cursor=True, |
|
interactive=True |
|
) |
|
|
|
page_entities_drop_redaction_list = list() |
|
all_pages_in_doc_list = [str(i) for i in range(1, len(page_sizes) + 1)] |
|
page_entities_drop_redaction_list.extend(all_pages_in_doc_list) |
|
|
|
page_entities_drop_redaction = gr.Dropdown(value = page_dropdown_redaction_value, choices=page_entities_drop_redaction_list, label="Page", allow_custom_value=True) |
|
|
|
return (out_image_annotator, |
|
page_number_reported_gradio_comp, |
|
page_number_reported_gradio_comp, |
|
page_num_reported, |
|
recogniser_entities_dropdown_value, |
|
recogniser_dataframe_out_gr, |
|
recogniser_dataframe_modified, |
|
text_entities_drop, |
|
page_entities_drop, |
|
page_entities_drop_redaction, |
|
page_sizes, |
|
all_image_annotations) |
|
|
|
def update_all_page_annotation_object_based_on_previous_page( |
|
page_image_annotator_object:AnnotatedImageData, |
|
current_page:int, |
|
previous_page:int, |
|
all_image_annotations:List[AnnotatedImageData], |
|
page_sizes:List[dict]=list(), |
|
clear_all:bool=False |
|
): |
|
''' |
|
Overwrite image annotations on the page we are moving from with modifications. |
|
''' |
|
|
|
if current_page > len(page_sizes): |
|
raise Warning("Selected page is higher than last page number") |
|
elif current_page <= 0: |
|
raise Warning("Selected page is lower than first page") |
|
|
|
|
|
previous_page_zero_index = previous_page -1 |
|
|
|
if not current_page: current_page = 1 |
|
|
|
|
|
page_image_annotator_object, all_image_annotations = replace_annotator_object_img_np_array_with_page_sizes_image_path(all_image_annotations, page_image_annotator_object, page_sizes, previous_page) |
|
|
|
if clear_all == False: all_image_annotations[previous_page_zero_index] = page_image_annotator_object |
|
else: all_image_annotations[previous_page_zero_index]["boxes"] = list() |
|
|
|
return all_image_annotations, current_page, current_page |
|
|
|
def apply_redactions_to_review_df_and_files(page_image_annotator_object:AnnotatedImageData, |
|
file_paths:List[str], |
|
doc:Document, |
|
all_image_annotations:List[AnnotatedImageData], |
|
current_page:int, |
|
review_file_state:pd.DataFrame, |
|
output_folder:str = OUTPUT_FOLDER, |
|
save_pdf:bool=True, |
|
page_sizes:List[dict]=list(), |
|
COMPRESS_REDACTED_PDF:bool=COMPRESS_REDACTED_PDF, |
|
progress=gr.Progress(track_tqdm=True)): |
|
''' |
|
Apply modified redactions to a pymupdf and export review files. |
|
''' |
|
|
|
output_files = list() |
|
output_log_files = list() |
|
pdf_doc = list() |
|
review_df = review_file_state |
|
|
|
page_image_annotator_object = all_image_annotations[current_page - 1] |
|
|
|
|
|
page_image_annotator_object, all_image_annotations = replace_annotator_object_img_np_array_with_page_sizes_image_path(all_image_annotations, page_image_annotator_object, page_sizes, current_page) |
|
page_image_annotator_object['image'] = all_image_annotations[current_page - 1]["image"] |
|
|
|
if not page_image_annotator_object: |
|
print("No image annotations object found for page") |
|
return doc, all_image_annotations, output_files, output_log_files, review_df |
|
|
|
if isinstance(file_paths, str): |
|
file_paths = [file_paths] |
|
|
|
for file_path in file_paths: |
|
file_name_without_ext = get_file_name_without_type(file_path) |
|
file_name_with_ext = os.path.basename(file_path) |
|
|
|
file_extension = os.path.splitext(file_path)[1].lower() |
|
|
|
if save_pdf == True: |
|
|
|
if (is_pdf(file_path) == False) & (file_extension not in '.csv'): |
|
image = Image.open(file_paths[-1]) |
|
|
|
draw = ImageDraw.Draw(image) |
|
|
|
for img_annotation_box in page_image_annotator_object['boxes']: |
|
coords = [img_annotation_box["xmin"], |
|
img_annotation_box["ymin"], |
|
img_annotation_box["xmax"], |
|
img_annotation_box["ymax"]] |
|
|
|
fill = img_annotation_box["color"] |
|
|
|
|
|
if isinstance(fill, tuple) and len(fill) == 3: |
|
|
|
if all(isinstance(c, int) and 0 <= c <= 255 for c in fill): |
|
pass |
|
|
|
else: |
|
print(f"Invalid color values: {fill}. Defaulting to black.") |
|
fill = (0, 0, 0) |
|
else: |
|
print(f"Invalid fill format: {fill}. Defaulting to black.") |
|
fill = (0, 0, 0) |
|
|
|
|
|
if image.mode not in ("RGB", "RGBA"): |
|
image = image.convert("RGB") |
|
|
|
draw = ImageDraw.Draw(image) |
|
|
|
draw.rectangle(coords, fill=fill) |
|
|
|
output_image_path = output_folder + file_name_without_ext + "_redacted.png" |
|
image.save(output_folder + file_name_without_ext + "_redacted.png") |
|
|
|
output_files.append(output_image_path) |
|
|
|
doc = [image] |
|
|
|
elif file_extension in '.csv': |
|
pdf_doc = list() |
|
|
|
|
|
elif is_pdf(file_path) == True: |
|
pdf_doc = pymupdf.open(file_path) |
|
orig_pdf_file_path = file_path |
|
|
|
output_files.append(orig_pdf_file_path) |
|
|
|
number_of_pages = pdf_doc.page_count |
|
original_cropboxes = list() |
|
|
|
page_sizes_df = pd.DataFrame(page_sizes) |
|
page_sizes_df[["page"]] = page_sizes_df[["page"]].apply(pd.to_numeric, errors="coerce") |
|
|
|
for i in progress.tqdm(range(0, number_of_pages), desc="Saving redacted pages to file", unit = "pages"): |
|
|
|
image_loc = all_image_annotations[i]['image'] |
|
|
|
|
|
if isinstance(image_loc, np.ndarray): |
|
image = Image.fromarray(image_loc.astype('uint8')) |
|
elif isinstance(image_loc, Image.Image): |
|
image = image_loc |
|
elif isinstance(image_loc, str): |
|
if not os.path.exists(image_loc): |
|
image=page_sizes_df.loc[page_sizes_df['page']==i, "image_path"] |
|
try: |
|
image = Image.open(image_loc) |
|
except Exception as e: |
|
image = None |
|
|
|
pymupdf_page = pdf_doc.load_page(i) |
|
original_cropboxes.append(pymupdf_page.cropbox) |
|
pymupdf_page.set_cropbox(pymupdf_page.mediabox) |
|
|
|
pymupdf_page = redact_page_with_pymupdf(page=pymupdf_page, page_annotations=all_image_annotations[i], image=image, original_cropbox=original_cropboxes[-1], page_sizes_df= page_sizes_df) |
|
else: |
|
print("File type not recognised.") |
|
|
|
progress(0.9, "Saving output files") |
|
|
|
|
|
if pdf_doc: |
|
out_pdf_file_path = output_folder + file_name_without_ext + "_redacted.pdf" |
|
save_pdf_with_or_without_compression(pdf_doc, out_pdf_file_path, COMPRESS_REDACTED_PDF) |
|
output_files.append(out_pdf_file_path) |
|
|
|
else: |
|
print("PDF input not found. Outputs not saved to PDF.") |
|
|
|
|
|
else: |
|
if is_pdf(file_path) == True: |
|
orig_pdf_file_path = file_path |
|
output_files.append(orig_pdf_file_path) |
|
|
|
try: |
|
|
|
review_df = convert_annotation_json_to_review_df(all_image_annotations, review_file_state.copy(), page_sizes=page_sizes) |
|
|
|
page_sizes_df = pd.DataFrame(page_sizes) |
|
page_sizes_df .loc[:, "page"] = pd.to_numeric(page_sizes_df["page"], errors="coerce") |
|
review_df = divide_coordinates_by_page_sizes(review_df, page_sizes_df) |
|
|
|
review_df = review_df[["image", "page", "label","color", "xmin", "ymin", "xmax", "ymax", "text", "id"]] |
|
|
|
out_review_file_file_path = output_folder + file_name_with_ext + '_review_file.csv' |
|
|
|
review_df.to_csv(out_review_file_file_path, index=None) |
|
output_files.append(out_review_file_file_path) |
|
|
|
except Exception as e: |
|
print("In apply redactions function, could not save annotations to csv file:", e) |
|
|
|
return doc, all_image_annotations, output_files, output_log_files, review_df |
|
|
|
def get_boxes_json(annotations:AnnotatedImageData): |
|
return annotations["boxes"] |
|
|
|
def update_all_entity_df_dropdowns(df:pd.DataFrame, label_dropdown_value:str, page_dropdown_value:str, text_dropdown_value:str): |
|
''' |
|
Update all dropdowns based on rows that exist in a dataframe |
|
''' |
|
|
|
if isinstance(label_dropdown_value, str): |
|
label_dropdown_value = [label_dropdown_value] |
|
if isinstance(page_dropdown_value, str): |
|
page_dropdown_value = [page_dropdown_value] |
|
if isinstance(text_dropdown_value, str): |
|
text_dropdown_value = [text_dropdown_value] |
|
|
|
filtered_df = df.copy() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
recogniser_entities_for_drop = update_dropdown_list_based_on_dataframe(filtered_df, "label") |
|
recogniser_entities_drop = gr.Dropdown(value=label_dropdown_value[0], choices=recogniser_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
text_entities_for_drop = update_dropdown_list_based_on_dataframe(filtered_df, "text") |
|
text_entities_drop = gr.Dropdown(value=text_dropdown_value[0], choices=text_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
page_entities_for_drop = update_dropdown_list_based_on_dataframe(filtered_df, "page") |
|
page_entities_drop = gr.Dropdown(value=page_dropdown_value[0], choices=page_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
return recogniser_entities_drop, text_entities_drop, page_entities_drop |
|
|
|
def update_entities_df_recogniser_entities(choice:str, df:pd.DataFrame, page_dropdown_value:str, text_dropdown_value:str): |
|
''' |
|
Update the rows in a dataframe depending on the user choice from a dropdown |
|
''' |
|
|
|
if isinstance(choice, str): |
|
choice = [choice] |
|
if isinstance(page_dropdown_value, str): |
|
page_dropdown_value = [page_dropdown_value] |
|
if isinstance(text_dropdown_value, str): |
|
text_dropdown_value = [text_dropdown_value] |
|
|
|
filtered_df = df.copy() |
|
|
|
|
|
if not "ALL" in page_dropdown_value: |
|
filtered_df = filtered_df[filtered_df["page"].astype(str).isin(page_dropdown_value)] |
|
|
|
if not "ALL" in text_dropdown_value: |
|
filtered_df = filtered_df[filtered_df["text"].astype(str).isin(text_dropdown_value)] |
|
|
|
if not "ALL" in choice: |
|
filtered_df = filtered_df[filtered_df["label"].astype(str).isin(choice)] |
|
|
|
recogniser_entities_for_drop = update_dropdown_list_based_on_dataframe(filtered_df, "label") |
|
recogniser_entities_drop = gr.Dropdown(value=choice[0], choices=recogniser_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
text_entities_for_drop = update_dropdown_list_based_on_dataframe(filtered_df, "text") |
|
text_entities_drop = gr.Dropdown(value=text_dropdown_value[0], choices=text_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
page_entities_for_drop = update_dropdown_list_based_on_dataframe(filtered_df, "page") |
|
page_entities_drop = gr.Dropdown(value=page_dropdown_value[0], choices=page_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
return filtered_df, text_entities_drop, page_entities_drop |
|
|
|
def update_entities_df_page(choice:str, df:pd.DataFrame, label_dropdown_value:str, text_dropdown_value:str): |
|
''' |
|
Update the rows in a dataframe depending on the user choice from a dropdown |
|
''' |
|
if isinstance(choice, str): |
|
choice = [choice] |
|
elif not isinstance(choice, list): |
|
choice = [str(choice)] |
|
if isinstance(label_dropdown_value, str): |
|
label_dropdown_value = [label_dropdown_value] |
|
elif not isinstance(label_dropdown_value, list): |
|
label_dropdown_value = [str(label_dropdown_value)] |
|
if isinstance(text_dropdown_value, str): |
|
text_dropdown_value = [text_dropdown_value] |
|
elif not isinstance(text_dropdown_value, list): |
|
text_dropdown_value = [str(text_dropdown_value)] |
|
|
|
filtered_df = df.copy() |
|
|
|
|
|
if not "ALL" in text_dropdown_value: |
|
filtered_df = filtered_df[filtered_df["text"].astype(str).isin(text_dropdown_value)] |
|
|
|
if not "ALL" in label_dropdown_value: |
|
filtered_df = filtered_df[filtered_df["label"].astype(str).isin(label_dropdown_value)] |
|
|
|
if not "ALL" in choice: |
|
filtered_df = filtered_df[filtered_df["page"].astype(str).isin(choice)] |
|
|
|
recogniser_entities_for_drop = update_dropdown_list_based_on_dataframe(filtered_df, "label") |
|
recogniser_entities_drop = gr.Dropdown(value=label_dropdown_value[0], choices=recogniser_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
text_entities_for_drop = update_dropdown_list_based_on_dataframe(filtered_df, "text") |
|
text_entities_drop = gr.Dropdown(value=text_dropdown_value[0], choices=text_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
page_entities_for_drop = update_dropdown_list_based_on_dataframe(filtered_df, "page") |
|
page_entities_drop = gr.Dropdown(value=choice[0], choices=page_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
return filtered_df, recogniser_entities_drop, text_entities_drop |
|
|
|
def update_redact_choice_df_from_page_dropdown(choice:str, df:pd.DataFrame): |
|
''' |
|
Update the rows in a dataframe depending on the user choice from a dropdown |
|
''' |
|
if isinstance(choice, str): |
|
choice = [choice] |
|
elif not isinstance(choice, list): |
|
choice = [str(choice)] |
|
|
|
if "index" not in df.columns: |
|
df["index"] = df.index |
|
|
|
filtered_df = df[["page", "line", "word_text", "word_x0", "word_y0", "word_x1", "word_y1", "index"]].copy() |
|
|
|
|
|
if not "ALL" in choice: |
|
filtered_df = filtered_df.loc[filtered_df["page"].astype(str).isin(choice)] |
|
|
|
page_entities_for_drop = update_dropdown_list_based_on_dataframe(filtered_df, "page") |
|
page_entities_drop = gr.Dropdown(value=choice[0], choices=page_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
return filtered_df |
|
|
|
def update_entities_df_text(choice:str, df:pd.DataFrame, label_dropdown_value:str, page_dropdown_value:str): |
|
''' |
|
Update the rows in a dataframe depending on the user choice from a dropdown |
|
''' |
|
if isinstance(choice, str): |
|
choice = [choice] |
|
if isinstance(label_dropdown_value, str): |
|
label_dropdown_value = [label_dropdown_value] |
|
if isinstance(page_dropdown_value, str): |
|
page_dropdown_value = [page_dropdown_value] |
|
|
|
filtered_df = df.copy() |
|
|
|
|
|
if not "ALL" in page_dropdown_value: |
|
filtered_df = filtered_df[filtered_df["page"].astype(str).isin(page_dropdown_value)] |
|
|
|
if not "ALL" in label_dropdown_value: |
|
filtered_df = filtered_df[filtered_df["label"].astype(str).isin(label_dropdown_value)] |
|
|
|
if not "ALL" in choice: |
|
filtered_df = filtered_df[filtered_df["text"].astype(str).isin(choice)] |
|
|
|
recogniser_entities_for_drop = update_dropdown_list_based_on_dataframe(filtered_df, "label") |
|
recogniser_entities_drop = gr.Dropdown(value=label_dropdown_value[0], choices=recogniser_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
text_entities_for_drop = update_dropdown_list_based_on_dataframe(filtered_df, "text") |
|
text_entities_drop = gr.Dropdown(value=choice[0], choices=text_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
page_entities_for_drop = update_dropdown_list_based_on_dataframe(filtered_df, "page") |
|
page_entities_drop = gr.Dropdown(value=page_dropdown_value[0], choices=page_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
return filtered_df, recogniser_entities_drop, page_entities_drop |
|
|
|
def reset_dropdowns(df:pd.DataFrame): |
|
''' |
|
Return Gradio dropdown objects with value 'ALL'. |
|
''' |
|
|
|
recogniser_entities_for_drop = update_dropdown_list_based_on_dataframe(df, "label") |
|
recogniser_entities_drop = gr.Dropdown(value="ALL", choices=recogniser_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
text_entities_for_drop = update_dropdown_list_based_on_dataframe(df, "text") |
|
text_entities_drop = gr.Dropdown(value="ALL", choices=text_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
page_entities_for_drop = update_dropdown_list_based_on_dataframe(df, "page") |
|
page_entities_drop = gr.Dropdown(value="ALL", choices=page_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
return recogniser_entities_drop, text_entities_drop, page_entities_drop |
|
|
|
def increase_bottom_page_count_based_on_top(page_number:int): |
|
return int(page_number) |
|
|
|
def df_select_callback_dataframe_row_ocr_with_words(df: pd.DataFrame, evt: gr.SelectData): |
|
|
|
row_value_page = int(evt.row_value[0]) |
|
row_value_line = int(evt.row_value[1]) |
|
row_value_text = evt.row_value[2] |
|
|
|
row_value_x0 = evt.row_value[3] |
|
row_value_y0 = evt.row_value[4] |
|
row_value_x1 = evt.row_value[5] |
|
row_value_y1 = evt.row_value[6] |
|
row_value_index = evt.row_value[7] |
|
|
|
row_value_df = pd.DataFrame(data={"page":[row_value_page], "line":[row_value_line], "word_text":[row_value_text], |
|
"word_x0":[row_value_x0], "word_y0":[row_value_y0], "word_x1":[row_value_x1], "word_y1":[row_value_y1], "index":row_value_index |
|
}) |
|
|
|
return row_value_df, row_value_text |
|
|
|
def df_select_callback_dataframe_row(df: pd.DataFrame, evt: gr.SelectData): |
|
|
|
row_value_page = int(evt.row_value[0]) |
|
row_value_label = evt.row_value[1] |
|
row_value_text = evt.row_value[2] |
|
row_value_id = evt.row_value[3] |
|
|
|
row_value_df = pd.DataFrame(data={"page":[row_value_page], "label":[row_value_label], "text":[row_value_text], "id":[row_value_id]}) |
|
|
|
return row_value_df, row_value_text |
|
|
|
def df_select_callback_textract_api(df: pd.DataFrame, evt: gr.SelectData): |
|
|
|
row_value_job_id = evt.row_value[0] |
|
|
|
row_value_job_type = evt.row_value[2] |
|
|
|
row_value_df = pd.DataFrame(data={"job_id":[row_value_job_id], "label":[row_value_job_type]}) |
|
|
|
return row_value_job_id, row_value_job_type, row_value_df |
|
|
|
def df_select_callback_cost(df: pd.DataFrame, evt: gr.SelectData): |
|
|
|
row_value_code = evt.row_value[0] |
|
|
|
|
|
|
|
|
|
return row_value_code |
|
|
|
def df_select_callback_ocr(df: pd.DataFrame, evt: gr.SelectData): |
|
|
|
row_value_page = int(evt.row_value[0]) |
|
row_value_text = evt.row_value[1] |
|
|
|
row_value_df = pd.DataFrame(data={"page":[row_value_page], "text":[row_value_text]}) |
|
|
|
return row_value_page, row_value_df |
|
|
|
|
|
def store_duplicate_selection(evt: gr.SelectData): |
|
if not evt.empty: |
|
selected_index = evt.index[0] |
|
else: |
|
selected_index = None |
|
|
|
return selected_index |
|
|
|
def get_all_rows_with_same_text(df: pd.DataFrame, text: str): |
|
''' |
|
Get all rows with the same text as the selected row |
|
''' |
|
if text: |
|
|
|
return df.loc[df["text"] == text] |
|
else: |
|
return pd.DataFrame(columns=["page", "label", "text", "id"]) |
|
|
|
def get_all_rows_with_same_text_redact(df: pd.DataFrame, text: str): |
|
''' |
|
Get all rows with the same text as the selected row for redaction tasks |
|
''' |
|
if "index" not in df.columns: |
|
df["index"] = df.index |
|
|
|
if text and not df.empty: |
|
|
|
return df.loc[df["word_text"] == text] |
|
else: |
|
return pd.DataFrame(columns=["page", "line", "label", "word_text", "word_x0", "word_y0", "word_x1", "word_y1", "index"]) |
|
|
|
def update_selected_review_df_row_colour( |
|
redaction_row_selection: pd.DataFrame, |
|
review_df: pd.DataFrame, |
|
previous_id: str = "", |
|
previous_colour: str = '(0, 0, 0)', |
|
colour: str = '(1, 0, 255)' |
|
) -> tuple[pd.DataFrame, str, str]: |
|
''' |
|
Update the colour of a single redaction box based on the values in a selection row |
|
(Optimized Version) |
|
''' |
|
|
|
|
|
if "color" not in review_df.columns: |
|
review_df["color"] = previous_colour if previous_id else '(0, 0, 0)' |
|
|
|
|
|
if "id" not in review_df.columns: |
|
|
|
|
|
|
|
print("Warning: 'id' column not found. Calling fill_missing_ids.") |
|
review_df = fill_missing_ids(review_df) |
|
|
|
|
|
|
|
if previous_id and previous_id in review_df["id"].values: |
|
review_df.loc[review_df["id"] == previous_id, "color"] = previous_colour |
|
|
|
|
|
|
|
|
|
|
|
review_df.loc[review_df["color"] == colour, "color"] = '(0, 0, 0)' |
|
|
|
|
|
if not redaction_row_selection.empty and not review_df.empty: |
|
use_id = ( |
|
"id" in redaction_row_selection.columns |
|
and "id" in review_df.columns |
|
and not redaction_row_selection["id"].isnull().all() |
|
and not review_df["id"].isnull().all() |
|
) |
|
|
|
selected_merge_cols = ["id"] if use_id else ["label", "page", "text"] |
|
|
|
|
|
|
|
merged_reviews = review_df.merge( |
|
redaction_row_selection[selected_merge_cols], |
|
on=selected_merge_cols, |
|
how="inner" |
|
) |
|
|
|
if not merged_reviews.empty: |
|
|
|
|
|
|
|
new_previous_colour = str(merged_reviews["color"].iloc[0]) |
|
new_previous_id = merged_reviews["id"].iloc[0] |
|
|
|
|
|
|
|
if use_id: |
|
|
|
review_df.loc[review_df["id"].isin(merged_reviews["id"]), "color"] = colour |
|
else: |
|
|
|
|
|
def create_merge_key(df, cols): |
|
return df[cols].astype(str).agg('_'.join, axis=1) |
|
|
|
review_df_key = create_merge_key(review_df, selected_merge_cols) |
|
merged_reviews_key = create_merge_key(merged_reviews, selected_merge_cols) |
|
|
|
review_df.loc[review_df_key.isin(merged_reviews_key), "color"] = colour |
|
|
|
previous_colour = new_previous_colour |
|
previous_id = new_previous_id |
|
else: |
|
|
|
print("No reviews found matching selection criteria") |
|
|
|
|
|
|
|
previous_colour = '(0, 0, 0)' |
|
previous_id = '' |
|
|
|
else: |
|
|
|
review_df.loc[review_df["color"] == colour, "color"] = '(0, 0, 0)' |
|
previous_colour = '(0, 0, 0)' |
|
previous_id = '' |
|
|
|
|
|
|
|
|
|
if set(["image", "page", "label", "color", "xmin","ymin", "xmax", "ymax", "text", "id"]).issubset(review_df.columns): |
|
review_df = review_df[["image", "page", "label", "color", "xmin","ymin", "xmax", "ymax", "text", "id"]] |
|
else: |
|
print("Warning: Not all expected columns are present in review_df for reordering.") |
|
|
|
|
|
return review_df, previous_id, previous_colour |
|
|
|
def update_boxes_color(images: list, redaction_row_selection: pd.DataFrame, colour: tuple = (0, 255, 0)): |
|
""" |
|
Update the color of bounding boxes in the images list based on redaction_row_selection. |
|
|
|
Parameters: |
|
- images (list): List of dictionaries containing image paths and box metadata. |
|
- redaction_row_selection (pd.DataFrame): DataFrame with 'page', 'label', and optionally 'text' columns. |
|
- colour (tuple): RGB tuple for the new color. |
|
|
|
Returns: |
|
- Updated list with modified colors. |
|
""" |
|
|
|
selection_set = set(zip(redaction_row_selection["page"], redaction_row_selection["label"])) |
|
|
|
for page_idx, image_obj in enumerate(images): |
|
if "boxes" in image_obj: |
|
for box in image_obj["boxes"]: |
|
if (page_idx, box["label"]) in selection_set: |
|
box["color"] = colour |
|
|
|
return images |
|
|
|
def update_other_annotator_number_from_current(page_number_first_counter:int): |
|
return page_number_first_counter |
|
|
|
def convert_image_coords_to_adobe(pdf_page_width:float, pdf_page_height:float, image_width:float, image_height:float, x1:float, y1:float, x2:float, y2:float): |
|
''' |
|
Converts coordinates from image space to Adobe PDF space. |
|
|
|
Parameters: |
|
- pdf_page_width: Width of the PDF page |
|
- pdf_page_height: Height of the PDF page |
|
- image_width: Width of the source image |
|
- image_height: Height of the source image |
|
- x1, y1, x2, y2: Coordinates in image space |
|
- page_sizes: List of dicts containing sizes of page as pymupdf page or PIL image |
|
|
|
Returns: |
|
- Tuple of converted coordinates (x1, y1, x2, y2) in Adobe PDF space |
|
''' |
|
|
|
|
|
|
|
|
|
scale_width = pdf_page_width / image_width |
|
scale_height = pdf_page_height / image_height |
|
|
|
|
|
pdf_x1 = x1 * scale_width |
|
pdf_x2 = x2 * scale_width |
|
|
|
|
|
|
|
pdf_y1 = pdf_page_height - (y1 * scale_height) |
|
pdf_y2 = pdf_page_height - (y2 * scale_height) |
|
|
|
|
|
if pdf_y1 > pdf_y2: |
|
pdf_y1, pdf_y2 = pdf_y2, pdf_y1 |
|
|
|
return pdf_x1, pdf_y1, pdf_x2, pdf_y2 |
|
|
|
def convert_pymupdf_coords_to_adobe(x1: float, y1: float, x2: float, y2: float, pdf_page_height: float): |
|
""" |
|
Converts coordinates from PyMuPDF (fitz) space to Adobe PDF space. |
|
|
|
Parameters: |
|
- x1, y1, x2, y2: Coordinates in PyMuPDF space |
|
- pdf_page_height: Total height of the PDF page |
|
|
|
Returns: |
|
- Tuple of converted coordinates (x1, y1, x2, y2) in Adobe PDF space |
|
""" |
|
|
|
|
|
adobe_y1 = pdf_page_height - y2 |
|
adobe_y2 = pdf_page_height - y1 |
|
|
|
return x1, adobe_y1, x2, adobe_y2 |
|
|
|
def create_xfdf(review_file_df:pd.DataFrame, pdf_path:str, pymupdf_doc:object, image_paths:List[str]=list(), document_cropboxes:List=list(), page_sizes:List[dict]=list()): |
|
''' |
|
Create an xfdf file from a review csv file and a pdf |
|
''' |
|
xfdf_root = Element('xfdf', xmlns="http://ns.adobe.com/xfdf/", **{'xml:space':"preserve"}) |
|
annots = SubElement(xfdf_root, 'annots') |
|
|
|
if page_sizes: |
|
page_sizes_df = pd.DataFrame(page_sizes) |
|
if not page_sizes_df.empty and "mediabox_width" not in review_file_df.columns: |
|
review_file_df = review_file_df.merge(page_sizes_df, how="left", on="page") |
|
if "xmin" in review_file_df.columns and review_file_df["xmin"].max() <= 1: |
|
if "mediabox_width" in review_file_df.columns and "mediabox_height" in review_file_df.columns: |
|
review_file_df["xmin"] = review_file_df["xmin"] * review_file_df["mediabox_width"] |
|
review_file_df["xmax"] = review_file_df["xmax"] * review_file_df["mediabox_width"] |
|
review_file_df["ymin"] = review_file_df["ymin"] * review_file_df["mediabox_height"] |
|
review_file_df["ymax"] = review_file_df["ymax"] * review_file_df["mediabox_height"] |
|
elif "image_width" in review_file_df.columns and not page_sizes_df.empty : |
|
review_file_df = multiply_coordinates_by_page_sizes(review_file_df, page_sizes_df, xmin="xmin", xmax="xmax", ymin="ymin", ymax="ymax") |
|
|
|
for _, row in review_file_df.iterrows(): |
|
page_num_reported = int(row["page"]) |
|
page_python_format = page_num_reported - 1 |
|
pymupdf_page = pymupdf_doc.load_page(page_python_format) |
|
|
|
if document_cropboxes and page_python_format < len(document_cropboxes): |
|
match = re.findall(r"[-+]?\d*\.\d+|\d+", document_cropboxes[page_python_format]) |
|
if match and len(match) == 4: |
|
rect_values = list(map(float, match)) |
|
pymupdf_page.set_cropbox(Rect(*rect_values)) |
|
|
|
pdf_page_height = pymupdf_page.mediabox.height |
|
redact_annot = SubElement(annots, 'redact') |
|
redact_annot.set('opacity', "0.500000") |
|
redact_annot.set('interior-color', "#000000") |
|
|
|
now = datetime.now(timezone(timedelta(hours=1))) |
|
date_str = now.strftime("D:%Y%m%d%H%M%S") + now.strftime("%z")[:3] + "'" + now.strftime("%z")[3:] + "'" |
|
redact_annot.set('date', date_str) |
|
|
|
annot_id = str(uuid.uuid4()) |
|
redact_annot.set('name', annot_id) |
|
redact_annot.set('page', str(page_python_format)) |
|
redact_annot.set('mimetype', "Form") |
|
|
|
x1_pdf, y1_pdf, x2_pdf, y2_pdf = row['xmin'], row['ymin'], row['xmax'], row['ymax'] |
|
adobe_x1, adobe_y1, adobe_x2, adobe_y2 = convert_pymupdf_coords_to_adobe( |
|
x1_pdf, y1_pdf, x2_pdf, y2_pdf, pdf_page_height |
|
) |
|
redact_annot.set('rect', f"{adobe_x1:.6f},{adobe_y1:.6f},{adobe_x2:.6f},{adobe_y2:.6f}") |
|
|
|
redact_annot.set('subject', str(row['label'])) |
|
redact_annot.set('title', str(row.get('label', 'Unknown'))) |
|
|
|
contents_richtext = SubElement(redact_annot, 'contents-richtext') |
|
body_attrs = { |
|
'xmlns': "http://www.w3.org/1999/xhtml", |
|
'{http://www.xfa.org/schema/xfa-data/1.0/}APIVersion': "Acrobat:25.1.0", |
|
'{http://www.xfa.org/schema/xfa-data/1.0/}spec': "2.0.2" |
|
} |
|
body = SubElement(contents_richtext, 'body', attrib=body_attrs) |
|
p_element = SubElement(body, 'p', dir="ltr") |
|
span_attrs = { |
|
'dir': "ltr", |
|
'style': "font-size:10.0pt;text-align:left;color:#000000;font-weight:normal;font-style:normal" |
|
} |
|
span_element = SubElement(p_element, 'span', attrib=span_attrs) |
|
span_element.text = str(row['text']).strip() |
|
|
|
pdf_ops_for_black_fill_and_outline = [ |
|
"1 w", |
|
"0 g", |
|
"0 G", |
|
"1 0 0 1 0 0 cm", |
|
f"{adobe_x1:.2f} {adobe_y1:.2f} m", |
|
f"{adobe_x2:.2f} {adobe_y1:.2f} l", |
|
f"{adobe_x2:.2f} {adobe_y2:.2f} l", |
|
f"{adobe_x1:.2f} {adobe_y2:.2f} l", |
|
"h", |
|
"B" |
|
] |
|
data_content_string = "\n".join(pdf_ops_for_black_fill_and_outline) + "\n" |
|
data_element = SubElement(redact_annot, 'data') |
|
data_element.set('MODE', "filtered") |
|
data_element.set('encoding', "ascii") |
|
data_element.set('length', str(len(data_content_string.encode('ascii')))) |
|
data_element.text = data_content_string |
|
|
|
rough_string = tostring(xfdf_root, encoding='unicode', method='xml') |
|
reparsed = minidom.parseString(rough_string) |
|
return reparsed.toxml() |
|
|
|
def convert_df_to_xfdf(input_files:List[str], pdf_doc:Document, image_paths:List[str], output_folder:str = OUTPUT_FOLDER, document_cropboxes:List=list(), page_sizes:List[dict]=list()): |
|
''' |
|
Load in files to convert a review file into an Adobe comment file format |
|
''' |
|
output_paths = list() |
|
pdf_name = "" |
|
file_path_name = "" |
|
|
|
if isinstance(input_files, str): |
|
file_paths_list = [input_files] |
|
else: |
|
file_paths_list = input_files |
|
|
|
|
|
file_paths_list = sorted(file_paths_list, key=lambda x: (os.path.splitext(x)[1] != '.pdf', os.path.splitext(x)[1] != '.json')) |
|
|
|
for file in file_paths_list: |
|
|
|
if isinstance(file, str): |
|
file_path = file |
|
else: |
|
file_path = file.name |
|
|
|
file_path_name = get_file_name_without_type(file_path) |
|
file_path_end = detect_file_type(file_path) |
|
|
|
if file_path_end == "pdf": |
|
pdf_name = os.path.basename(file_path) |
|
|
|
if file_path_end == "csv" and "review_file" in file_path_name: |
|
|
|
if not pdf_name: |
|
pdf_name = file_path_name |
|
|
|
review_file_df = pd.read_csv(file_path) |
|
|
|
|
|
if 'text' in review_file_df.columns: review_file_df['text'] = review_file_df['text'].fillna('') |
|
if 'label' in review_file_df.columns: review_file_df['label'] = review_file_df['label'].fillna('') |
|
|
|
xfdf_content = create_xfdf(review_file_df, pdf_name, pdf_doc, image_paths, document_cropboxes, page_sizes) |
|
|
|
output_path = output_folder + file_path_name + "_adobe.xfdf" |
|
|
|
with open(output_path, 'w', encoding='utf-8') as f: |
|
f.write(xfdf_content) |
|
|
|
output_paths.append(output_path) |
|
|
|
return output_paths |
|
|
|
|
|
|
|
def convert_adobe_coords_to_image(pdf_page_width:float, pdf_page_height:float, image_width:float, image_height:float, x1:float, y1:float, x2:float, y2:float): |
|
''' |
|
Converts coordinates from Adobe PDF space to image space. |
|
|
|
Parameters: |
|
- pdf_page_width: Width of the PDF page |
|
- pdf_page_height: Height of the PDF page |
|
- image_width: Width of the source image |
|
- image_height: Height of the source image |
|
- x1, y1, x2, y2: Coordinates in Adobe PDF space |
|
|
|
Returns: |
|
- Tuple of converted coordinates (x1, y1, x2, y2) in image space |
|
''' |
|
|
|
|
|
scale_width = image_width / pdf_page_width |
|
scale_height = image_height / pdf_page_height |
|
|
|
|
|
image_x1 = x1 * scale_width |
|
image_x2 = x2 * scale_width |
|
|
|
|
|
|
|
image_y1 = (pdf_page_height - y1) * scale_height |
|
image_y2 = (pdf_page_height - y2) * scale_height |
|
|
|
|
|
if image_y1 > image_y2: |
|
image_y1, image_y2 = image_y2, image_y1 |
|
|
|
return image_x1, image_y1, image_x2, image_y2 |
|
|
|
def parse_xfdf(xfdf_path:str): |
|
''' |
|
Parse the XFDF file and extract redaction annotations. |
|
|
|
Parameters: |
|
- xfdf_path: Path to the XFDF file |
|
|
|
Returns: |
|
- List of dictionaries containing redaction information |
|
''' |
|
tree = parse(xfdf_path) |
|
root = tree.getroot() |
|
|
|
|
|
namespace = {'xfdf': 'http://ns.adobe.com/xfdf/'} |
|
|
|
redactions = list() |
|
|
|
|
|
for redact in root.findall('.//xfdf:redact', namespaces=namespace): |
|
|
|
redaction_info = { |
|
'image': '', |
|
'page': int(redact.get('page')) + 1, |
|
'xmin': float(redact.get('rect').split(',')[0]), |
|
'ymin': float(redact.get('rect').split(',')[1]), |
|
'xmax': float(redact.get('rect').split(',')[2]), |
|
'ymax': float(redact.get('rect').split(',')[3]), |
|
'label': redact.get('title'), |
|
'text': redact.get('contents'), |
|
'color': redact.get('border-color', '(0, 0, 0)') |
|
} |
|
redactions.append(redaction_info) |
|
|
|
return redactions |
|
|
|
def convert_xfdf_to_dataframe(file_paths_list:List[str], pymupdf_doc, image_paths:List[str], output_folder:str=OUTPUT_FOLDER): |
|
''' |
|
Convert redaction annotations from XFDF and associated images into a DataFrame. |
|
|
|
Parameters: |
|
- xfdf_path: Path to the XFDF file |
|
- pdf_doc: PyMuPDF document object |
|
- image_paths: List of PIL Image objects corresponding to PDF pages |
|
|
|
Returns: |
|
- DataFrame containing redaction information |
|
''' |
|
output_paths = list() |
|
xfdf_paths = list() |
|
df = pd.DataFrame() |
|
|
|
|
|
file_paths_list = sorted(file_paths_list, key=lambda x: (os.path.splitext(x)[1] != '.pdf', os.path.splitext(x)[1] != '.json')) |
|
|
|
for file in file_paths_list: |
|
|
|
if isinstance(file, str): |
|
file_path = file |
|
else: |
|
file_path = file.name |
|
|
|
file_path_name = get_file_name_without_type(file_path) |
|
file_path_end = detect_file_type(file_path) |
|
|
|
if file_path_end == "pdf": |
|
pdf_name = os.path.basename(file_path) |
|
|
|
|
|
output_paths.append(file_path) |
|
|
|
if file_path_end == "xfdf": |
|
|
|
if not pdf_name: |
|
message = "Original PDF needed to convert from .xfdf format" |
|
print(message) |
|
raise ValueError(message) |
|
xfdf_path = file |
|
|
|
file_path_name = get_file_name_without_type(xfdf_path) |
|
|
|
|
|
redactions = parse_xfdf(xfdf_path) |
|
|
|
|
|
df = pd.DataFrame(redactions) |
|
|
|
df.fillna('', inplace=True) |
|
|
|
for _, row in df.iterrows(): |
|
page_python_format = int(row["page"])-1 |
|
|
|
pymupdf_page = pymupdf_doc.load_page(page_python_format) |
|
|
|
pdf_page_height = pymupdf_page.rect.height |
|
pdf_page_width = pymupdf_page.rect.width |
|
|
|
image_path = image_paths[page_python_format] |
|
|
|
if isinstance(image_path, str): |
|
image = Image.open(image_path) |
|
|
|
image_page_width, image_page_height = image.size |
|
|
|
|
|
image_x1, image_y1, image_x2, image_y2 = convert_adobe_coords_to_image(pdf_page_width, pdf_page_height, image_page_width, image_page_height, row['xmin'], row['ymin'], row['xmax'], row['ymax']) |
|
|
|
df.loc[_, ['xmin', 'ymin', 'xmax', 'ymax']] = [image_x1, image_y1, image_x2, image_y2] |
|
|
|
|
|
df.loc[_, 'image'] = image_path |
|
|
|
out_file_path = output_folder + file_path_name + "_review_file.csv" |
|
df.to_csv(out_file_path, index=None) |
|
|
|
output_paths.append(out_file_path) |
|
|
|
return output_paths |