ner-annotation / app.py
nam pham
feat: create app
090dddd
raw
history blame
26.2 kB
import gradio as gr
from huggingface_hub import HfApi
import os
import re
import json
import torch
import random
from typing import List, Dict, Union, Tuple
from gliner import GLiNER
from datasets import load_dataset
# Available models for annotation
AVAILABLE_MODELS = [
"BookingCare/gliner-multi-healthcare",
"knowledgator/gliner-multitask-large-v0.5",
"knowledgator/gliner-multitask-base-v0.5"
]
# Dataset Viewer Classes and Functions
class DynamicDataset:
def __init__(
self, data: List[Dict[str, Union[List[Union[int, str]], bool]]]
) -> None:
self.data = data
self.data_len = len(self.data)
self.current = -1
for example in self.data:
if not "validated" in example.keys():
example["validated"] = False
def next_example(self):
self.current += 1
if self.current > self.data_len-1:
self.current = self.data_len -1
elif self.current < 0:
self.current = 0
def previous_example(self):
self.current -= 1
if self.current > self.data_len-1:
self.current = self.data_len -1
elif self.current < 0:
self.current = 0
def example_by_id(self, id):
self.current = id
if self.current > self.data_len-1:
self.current = self.data_len -1
elif self.current < 0:
self.current = 0
def validate(self):
self.data[self.current]["validated"] = True
def load_current_example(self):
return self.data[self.current]
def tokenize_text(text):
"""Tokenize the input text into a list of tokens."""
return re.findall(r'\w+(?:[-_]\w+)*|\S', text)
def join_tokens(tokens):
# Joining tokens with space, but handling special characters correctly
text = ""
for token in tokens:
if token in {",", ".", "!", "?", ":", ";", "..."}:
text = text.rstrip() + token
else:
text += " " + token
return text.strip()
def prepare_for_highlight(data):
tokens = data["tokenized_text"]
ner = data["ner"]
highlighted_text = []
current_entity = None
entity_tokens = []
normal_tokens = []
for idx, token in enumerate(tokens):
# Check if the current token is the start of a new entity
if current_entity is None or idx > current_entity[1]:
if entity_tokens:
highlighted_text.append((" ".join(entity_tokens), current_entity[2]))
entity_tokens = []
current_entity = next((entity for entity in ner if entity[0] == idx), None)
# If current token is part of an entity
if current_entity and current_entity[0] <= idx <= current_entity[1]:
if normal_tokens:
highlighted_text.append((" ".join(normal_tokens), None))
normal_tokens = []
entity_tokens.append(token + " ")
else:
if entity_tokens:
highlighted_text.append((" ".join(entity_tokens), current_entity[2]))
entity_tokens = []
normal_tokens.append(token + " ")
# Append any remaining tokens
if entity_tokens:
highlighted_text.append((" ".join(entity_tokens), current_entity[2]))
if normal_tokens:
highlighted_text.append((" ".join(normal_tokens), None))
# Clean up spaces before punctuation
cleaned_highlighted_text = []
for text, label in highlighted_text:
cleaned_text = re.sub(r'\s(?=[,\.!?…:;])', '', text)
cleaned_highlighted_text.append((cleaned_text, label))
return cleaned_highlighted_text
def extract_tokens_and_labels(data: List[Dict[str, Union[str, None]]]) -> Dict[str, Union[List[str], List[Tuple[int, int, str]]]]:
tokens = []
ner = []
token_start_idx = 0
for entry in data:
char = entry['token']
label = entry['class_or_confidence']
# Tokenize the current text chunk
token_list = tokenize_text(char)
# Append tokens to the main tokens list
tokens.extend(token_list)
if label:
token_end_idx = token_start_idx + len(token_list) - 1
ner.append((token_start_idx, token_end_idx, label))
token_start_idx += len(token_list)
return tokens, ner
# Global variables for dataset viewer
dynamic_dataset = None
def update_example(data):
global dynamic_dataset
tokens, ner = extract_tokens_and_labels(data)
dynamic_dataset.data[dynamic_dataset.current]["tokenized_text"] = tokens
dynamic_dataset.data[dynamic_dataset.current]["ner"] = ner
return prepare_for_highlight(dynamic_dataset.load_current_example())
def validate_example():
global dynamic_dataset
dynamic_dataset.data[dynamic_dataset.current]["validated"] = True
return [("The example was validated!", None)]
def next_example():
global dynamic_dataset
dynamic_dataset.next_example()
return prepare_for_highlight(dynamic_dataset.load_current_example()), dynamic_dataset.current
def previous_example():
global dynamic_dataset
dynamic_dataset.previous_example()
return prepare_for_highlight(dynamic_dataset.load_current_example()), dynamic_dataset.current
def save_dataset(inp):
global dynamic_dataset
with open("data/annotated_data.json", "wt") as file:
json.dump(dynamic_dataset.data, file)
return [("The validated dataset was saved as data/annotated_data.json", None)]
def load_dataset():
global dynamic_dataset
try:
with open("data/annotated_data.json", 'rt') as dataset:
ANNOTATED_DATA = json.load(dataset)
dynamic_dataset = DynamicDataset(ANNOTATED_DATA)
max_value = len(dynamic_dataset.data) - 1 if dynamic_dataset.data else 0
return prepare_for_highlight(dynamic_dataset.load_current_example()), 0, max_value
except Exception as e:
return [("Error loading dataset: " + str(e), None)], 0, 0
# Original annotation functions
def transform_data(data):
tokens = tokenize_text(data['text'])
spans = []
for entity in data['entities']:
entity_tokens = tokenize_text(entity['word'])
entity_length = len(entity_tokens)
# Find the start and end indices of each entity in the tokenized text
for i in range(len(tokens) - entity_length + 1):
if tokens[i:i + entity_length] == entity_tokens:
spans.append([i, i + entity_length - 1, entity['entity']])
break
return {"tokenized_text": tokens, "ner": spans, "validated": False}
def merge_entities(entities):
if not entities:
return []
merged = []
current = entities[0]
for next_entity in entities[1:]:
if next_entity['entity'] == current['entity'] and (next_entity['start'] == current['end'] + 1 or next_entity['start'] == current['end']):
current['word'] += ' ' + next_entity['word']
current['end'] = next_entity['end']
else:
merged.append(current)
current = next_entity
merged.append(current)
return merged
def annotate_text(model, text, labels: List[str], threshold: float, nested_ner: bool) -> Dict:
labels = [label.strip() for label in labels]
r = {
"text": text,
"entities": [
{
"entity": entity["label"],
"word": entity["text"],
"start": entity["start"],
"end": entity["end"],
"score": 0,
}
for entity in model.predict_entities(
text, labels, flat_ner=not nested_ner, threshold=threshold
)
],
}
r["entities"] = merge_entities(r["entities"])
return transform_data(r)
class AutoAnnotator:
def __init__(
self, model: str = "knowledgator/gliner-multitask-large-v0.5",
device = torch.device('cuda:0') if torch.cuda.is_available() else torch.device('cpu')
) -> None:
self.model = GLiNER.from_pretrained(model).to(device)
self.annotated_data = []
self.stat = {
"total": None,
"current": -1
}
def auto_annotate(
self, data: List[str], labels: List[str],
prompt: Union[str, List[str]] = None, threshold: float = 0.5, nested_ner: bool = False
) -> List[Dict]:
self.stat["total"] = len(data)
self.stat["current"] = -1 # Reset current progress
for text in data:
self.stat["current"] += 1
if isinstance(prompt, list):
prompt_text = random.choice(prompt)
else:
prompt_text = prompt
text = f"{prompt_text}\n{text}" if prompt_text else text
annotation = annotate_text(self.model, text, labels, threshold, nested_ner)
if not annotation["ner"]: # If no entities identified
annotation = {"tokenized_text": tokenize_text(text), "ner": [], "validated": False}
self.annotated_data.append(annotation)
return self.annotated_data
# Global variables
annotator = None
sentences = []
def process_uploaded_file(file_obj):
if file_obj is None:
return "Please upload a file first!"
try:
# Read the uploaded file
with open(file_obj.name, 'r', encoding='utf-8') as f:
global sentences
sentences = [line.strip() for line in f if line.strip()]
return f"Successfully loaded {len(sentences)} sentences from file!"
except Exception as e:
return f"Error reading file: {str(e)}"
def annotate(model, labels, threshold, prompt):
global annotator
try:
if not sentences:
return "Please upload a file with text first!"
labels = [label.strip() for label in labels.split(",")]
annotator = AutoAnnotator(model)
annotated_data = annotator.auto_annotate(sentences, labels, prompt, threshold)
# Save annotated data
os.makedirs("data", exist_ok=True)
with open("data/annotated_data.json", "wt") as file:
json.dump(annotated_data, file, ensure_ascii=False)
# Upload to Hugging Face Hub
api = HfApi()
api.upload_file(
path_or_fileobj="data/annotated_data.json",
path_in_repo="annotated_data.json",
repo_id="YOUR_USERNAME/YOUR_REPO_NAME", # Replace with your repo
repo_type="dataset"
)
return "Successfully annotated and saved to Hugging Face Hub!"
except Exception as e:
return f"Error during annotation: {str(e)}"
def convert_hf_dataset_to_ner_format(dataset):
"""Convert Hugging Face dataset to NER format"""
converted_data = []
for item in dataset:
# Assuming the dataset has 'tokens' and 'ner_tags' fields
# Adjust the field names based on your dataset structure
if 'tokens' in item and 'ner_tags' in item:
ner_spans = []
current_span = None
for i, (token, tag) in enumerate(zip(item['tokens'], item['ner_tags'])):
if tag != 'O': # Not Outside
if current_span is None:
current_span = [i, i, tag]
elif tag == current_span[2]:
current_span[1] = i
else:
ner_spans.append(current_span)
current_span = [i, i, tag]
elif current_span is not None:
ner_spans.append(current_span)
current_span = None
if current_span is not None:
ner_spans.append(current_span)
converted_data.append({
"tokenized_text": item['tokens'],
"ner": ner_spans,
"validated": False
})
return converted_data
def load_from_huggingface(dataset_name: str, split: str = "train"):
"""Load dataset from Hugging Face Hub"""
try:
dataset = load_dataset(dataset_name, split=split)
converted_data = convert_hf_dataset_to_ner_format(dataset)
# Save the converted data
os.makedirs("data", exist_ok=True)
with open("data/annotated_data.json", "wt") as file:
json.dump(converted_data, file, ensure_ascii=False)
return f"Successfully loaded and converted dataset: {dataset_name}"
except Exception as e:
return f"Error loading dataset: {str(e)}"
def load_from_local_file(file_path: str, file_format: str = "json"):
"""Load and convert data from local file in various formats"""
try:
if file_format == "json":
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
if isinstance(data, list):
# If data is already in the correct format
if all("tokenized_text" in item and "ner" in item for item in data):
return data
# Convert from other JSON formats
converted_data = []
for item in data:
if "tokens" in item and "ner_tags" in item:
ner_spans = []
current_span = None
for i, (token, tag) in enumerate(zip(item["tokens"], item["ner_tags"])):
if tag != "O":
if current_span is None:
current_span = [i, i, tag]
elif tag == current_span[2]:
current_span[1] = i
else:
ner_spans.append(current_span)
current_span = [i, i, tag]
elif current_span is not None:
ner_spans.append(current_span)
current_span = None
if current_span is not None:
ner_spans.append(current_span)
converted_data.append({
"tokenized_text": item["tokens"],
"ner": ner_spans,
"validated": False
})
return converted_data
else:
raise ValueError("JSON file must contain a list of examples")
elif file_format == "conll":
converted_data = []
current_example = {"tokens": [], "ner_tags": []}
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line:
if line.startswith("#"):
continue
parts = line.split()
if len(parts) >= 2:
token, tag = parts[0], parts[-1]
current_example["tokens"].append(token)
current_example["ner_tags"].append(tag)
elif current_example["tokens"]:
# Convert current example
ner_spans = []
current_span = None
for i, (token, tag) in enumerate(zip(current_example["tokens"], current_example["ner_tags"])):
if tag != "O":
if current_span is None:
current_span = [i, i, tag]
elif tag == current_span[2]:
current_span[1] = i
else:
ner_spans.append(current_span)
current_span = [i, i, tag]
elif current_span is not None:
ner_spans.append(current_span)
current_span = None
if current_span is not None:
ner_spans.append(current_span)
converted_data.append({
"tokenized_text": current_example["tokens"],
"ner": ner_spans,
"validated": False
})
current_example = {"tokens": [], "ner_tags": []}
# Handle last example if exists
if current_example["tokens"]:
ner_spans = []
current_span = None
for i, (token, tag) in enumerate(zip(current_example["tokens"], current_example["ner_tags"])):
if tag != "O":
if current_span is None:
current_span = [i, i, tag]
elif tag == current_span[2]:
current_span[1] = i
else:
ner_spans.append(current_span)
current_span = [i, i, tag]
elif current_span is not None:
ner_spans.append(current_span)
current_span = None
if current_span is not None:
ner_spans.append(current_span)
converted_data.append({
"tokenized_text": current_example["tokens"],
"ner": ner_spans,
"validated": False
})
return converted_data
elif file_format == "txt":
# Simple text file with one sentence per line
converted_data = []
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line:
tokens = tokenize_text(line)
converted_data.append({
"tokenized_text": tokens,
"ner": [],
"validated": False
})
return converted_data
else:
raise ValueError(f"Unsupported file format: {file_format}")
except Exception as e:
raise Exception(f"Error loading file: {str(e)}")
def process_local_file(file_obj, file_format):
"""Process uploaded local file"""
if file_obj is None:
return "Please upload a file first!"
try:
# Load and convert the data
data = load_from_local_file(file_obj.name, file_format)
# Save the converted data
os.makedirs("data", exist_ok=True)
with open("data/annotated_data.json", "wt") as file:
json.dump(data, file, ensure_ascii=False)
return f"Successfully loaded and converted {len(data)} examples from {file_format} file!"
except Exception as e:
return f"Error processing file: {str(e)}"
# Create the main interface with tabs
with gr.Blocks() as demo:
gr.Markdown("# NER Annotation Tool")
with gr.Tabs():
with gr.TabItem("Auto Annotation"):
with gr.Row():
with gr.Column():
file_uploader = gr.File(label="Upload text file (one sentence per line)")
upload_status = gr.Textbox(label="Upload Status")
file_uploader.change(fn=process_uploaded_file, inputs=[file_uploader], outputs=[upload_status])
with gr.Column():
model = gr.Dropdown(
label="Choose the model for annotation",
choices=AVAILABLE_MODELS,
value=AVAILABLE_MODELS[0]
)
labels = gr.Textbox(
label="Labels",
placeholder="Enter comma-separated labels (e.g., PERSON,ORG,LOC)",
scale=2
)
threshold = gr.Slider(
0, 1,
value=0.3,
step=0.01,
label="Threshold",
info="Lower threshold increases entity predictions"
)
prompt = gr.Textbox(
label="Prompt",
placeholder="Enter your annotation prompt (optional)",
scale=2
)
annotate_btn = gr.Button("Annotate Data")
output_info = gr.Textbox(label="Processing Status")
annotate_btn.click(
fn=annotate,
inputs=[model, labels, threshold, prompt],
outputs=[output_info]
)
with gr.TabItem("Dataset Viewer"):
with gr.Row():
with gr.Column():
with gr.Row():
load_local_btn = gr.Button("Load Local Dataset")
load_hf_btn = gr.Button("Load from Hugging Face")
local_file = gr.File(label="Upload Local Dataset", visible=False)
file_format = gr.Dropdown(
choices=["json", "conll", "txt"],
value="json",
label="File Format",
visible=False
)
local_status = gr.Textbox(label="Local File Status", visible=False)
dataset_name = gr.Textbox(
label="Hugging Face Dataset Name",
placeholder="Enter dataset name (e.g., conll2003)",
visible=False
)
dataset_split = gr.Dropdown(
choices=["train", "validation", "test"],
value="train",
label="Dataset Split",
visible=False
)
bar = gr.Slider(minimum=0, maximum=1, step=1, label="Progress", interactive=False)
with gr.Row():
previous_btn = gr.Button("Previous example")
apply_btn = gr.Button("Apply changes")
next_btn = gr.Button("Next example")
validate_btn = gr.Button("Validate")
save_btn = gr.Button("Save validated dataset")
inp_box = gr.HighlightedText(value=None, interactive=True)
def toggle_local_inputs():
return {
local_file: gr.update(visible=True),
file_format: gr.update(visible=True),
local_status: gr.update(visible=True),
dataset_name: gr.update(visible=False),
dataset_split: gr.update(visible=False)
}
def toggle_hf_inputs():
return {
local_file: gr.update(visible=False),
file_format: gr.update(visible=False),
local_status: gr.update(visible=False),
dataset_name: gr.update(visible=True),
dataset_split: gr.update(visible=True)
}
load_local_btn.click(
fn=toggle_local_inputs,
inputs=None,
outputs=[local_file, file_format, local_status, dataset_name, dataset_split]
)
load_hf_btn.click(
fn=toggle_hf_inputs,
inputs=None,
outputs=[local_file, file_format, local_status, dataset_name, dataset_split]
)
def process_and_load_local(file_obj, format):
status = process_local_file(file_obj, format)
if "Successfully" in status:
return load_dataset()
return [status], 0, 0
local_file.change(
fn=process_and_load_local,
inputs=[local_file, file_format],
outputs=[inp_box, bar]
)
def load_hf_dataset(name, split):
status = load_from_huggingface(name, split)
if "Successfully" in status:
return load_dataset()
return [status], 0, 0
load_hf_btn.click(
fn=load_hf_dataset,
inputs=[dataset_name, dataset_split],
outputs=[inp_box, bar]
)
apply_btn.click(fn=update_example, inputs=inp_box, outputs=inp_box)
save_btn.click(fn=save_dataset, inputs=inp_box, outputs=inp_box)
validate_btn.click(fn=validate_example, inputs=None, outputs=inp_box)
next_btn.click(fn=next_example, inputs=None, outputs=[inp_box, bar])
previous_btn.click(fn=previous_example, inputs=None, outputs=[inp_box, bar])
demo.launch()