Spaces:
Running
Running
import gradio as gr | |
from huggingface_hub import HfApi | |
import os | |
import re | |
import json | |
import torch | |
import random | |
from typing import List, Dict, Union, Tuple | |
from gliner import GLiNER | |
from datasets import load_dataset | |
# Available models for annotation | |
AVAILABLE_MODELS = [ | |
"BookingCare/gliner-multi-healthcare", | |
"knowledgator/gliner-multitask-large-v0.5", | |
"knowledgator/gliner-multitask-base-v0.5" | |
] | |
# Dataset Viewer Classes and Functions | |
class DynamicDataset: | |
def __init__( | |
self, data: List[Dict[str, Union[List[Union[int, str]], bool]]] | |
) -> None: | |
self.data = data | |
self.data_len = len(self.data) | |
self.current = -1 | |
for example in self.data: | |
if not "validated" in example.keys(): | |
example["validated"] = False | |
def next_example(self): | |
self.current += 1 | |
if self.current > self.data_len-1: | |
self.current = self.data_len -1 | |
elif self.current < 0: | |
self.current = 0 | |
def previous_example(self): | |
self.current -= 1 | |
if self.current > self.data_len-1: | |
self.current = self.data_len -1 | |
elif self.current < 0: | |
self.current = 0 | |
def example_by_id(self, id): | |
self.current = id | |
if self.current > self.data_len-1: | |
self.current = self.data_len -1 | |
elif self.current < 0: | |
self.current = 0 | |
def validate(self): | |
self.data[self.current]["validated"] = True | |
def load_current_example(self): | |
return self.data[self.current] | |
def tokenize_text(text): | |
"""Tokenize the input text into a list of tokens.""" | |
return re.findall(r'\w+(?:[-_]\w+)*|\S', text) | |
def join_tokens(tokens): | |
# Joining tokens with space, but handling special characters correctly | |
text = "" | |
for token in tokens: | |
if token in {",", ".", "!", "?", ":", ";", "..."}: | |
text = text.rstrip() + token | |
else: | |
text += " " + token | |
return text.strip() | |
def prepare_for_highlight(data): | |
tokens = data["tokenized_text"] | |
ner = data["ner"] | |
highlighted_text = [] | |
current_entity = None | |
entity_tokens = [] | |
normal_tokens = [] | |
for idx, token in enumerate(tokens): | |
# Check if the current token is the start of a new entity | |
if current_entity is None or idx > current_entity[1]: | |
if entity_tokens: | |
highlighted_text.append((" ".join(entity_tokens), current_entity[2])) | |
entity_tokens = [] | |
current_entity = next((entity for entity in ner if entity[0] == idx), None) | |
# If current token is part of an entity | |
if current_entity and current_entity[0] <= idx <= current_entity[1]: | |
if normal_tokens: | |
highlighted_text.append((" ".join(normal_tokens), None)) | |
normal_tokens = [] | |
entity_tokens.append(token + " ") | |
else: | |
if entity_tokens: | |
highlighted_text.append((" ".join(entity_tokens), current_entity[2])) | |
entity_tokens = [] | |
normal_tokens.append(token + " ") | |
# Append any remaining tokens | |
if entity_tokens: | |
highlighted_text.append((" ".join(entity_tokens), current_entity[2])) | |
if normal_tokens: | |
highlighted_text.append((" ".join(normal_tokens), None)) | |
# Clean up spaces before punctuation | |
cleaned_highlighted_text = [] | |
for text, label in highlighted_text: | |
cleaned_text = re.sub(r'\s(?=[,\.!?…:;])', '', text) | |
cleaned_highlighted_text.append((cleaned_text, label)) | |
return cleaned_highlighted_text | |
def extract_tokens_and_labels(data: List[Dict[str, Union[str, None]]]) -> Dict[str, Union[List[str], List[Tuple[int, int, str]]]]: | |
tokens = [] | |
ner = [] | |
token_start_idx = 0 | |
for entry in data: | |
char = entry['token'] | |
label = entry['class_or_confidence'] | |
# Tokenize the current text chunk | |
token_list = tokenize_text(char) | |
# Append tokens to the main tokens list | |
tokens.extend(token_list) | |
if label: | |
token_end_idx = token_start_idx + len(token_list) - 1 | |
ner.append((token_start_idx, token_end_idx, label)) | |
token_start_idx += len(token_list) | |
return tokens, ner | |
# Global variables for dataset viewer | |
dynamic_dataset = None | |
def update_example(data): | |
global dynamic_dataset | |
tokens, ner = extract_tokens_and_labels(data) | |
dynamic_dataset.data[dynamic_dataset.current]["tokenized_text"] = tokens | |
dynamic_dataset.data[dynamic_dataset.current]["ner"] = ner | |
return prepare_for_highlight(dynamic_dataset.load_current_example()) | |
def validate_example(): | |
global dynamic_dataset | |
dynamic_dataset.data[dynamic_dataset.current]["validated"] = True | |
return [("The example was validated!", None)] | |
def next_example(): | |
global dynamic_dataset | |
dynamic_dataset.next_example() | |
return prepare_for_highlight(dynamic_dataset.load_current_example()), dynamic_dataset.current | |
def previous_example(): | |
global dynamic_dataset | |
dynamic_dataset.previous_example() | |
return prepare_for_highlight(dynamic_dataset.load_current_example()), dynamic_dataset.current | |
def save_dataset(inp): | |
global dynamic_dataset | |
with open("data/annotated_data.json", "wt") as file: | |
json.dump(dynamic_dataset.data, file) | |
return [("The validated dataset was saved as data/annotated_data.json", None)] | |
def load_dataset(): | |
global dynamic_dataset | |
try: | |
with open("data/annotated_data.json", 'rt') as dataset: | |
ANNOTATED_DATA = json.load(dataset) | |
dynamic_dataset = DynamicDataset(ANNOTATED_DATA) | |
max_value = len(dynamic_dataset.data) - 1 if dynamic_dataset.data else 0 | |
return prepare_for_highlight(dynamic_dataset.load_current_example()), 0, max_value | |
except Exception as e: | |
return [("Error loading dataset: " + str(e), None)], 0, 0 | |
# Original annotation functions | |
def transform_data(data): | |
tokens = tokenize_text(data['text']) | |
spans = [] | |
for entity in data['entities']: | |
entity_tokens = tokenize_text(entity['word']) | |
entity_length = len(entity_tokens) | |
# Find the start and end indices of each entity in the tokenized text | |
for i in range(len(tokens) - entity_length + 1): | |
if tokens[i:i + entity_length] == entity_tokens: | |
spans.append([i, i + entity_length - 1, entity['entity']]) | |
break | |
return {"tokenized_text": tokens, "ner": spans, "validated": False} | |
def merge_entities(entities): | |
if not entities: | |
return [] | |
merged = [] | |
current = entities[0] | |
for next_entity in entities[1:]: | |
if next_entity['entity'] == current['entity'] and (next_entity['start'] == current['end'] + 1 or next_entity['start'] == current['end']): | |
current['word'] += ' ' + next_entity['word'] | |
current['end'] = next_entity['end'] | |
else: | |
merged.append(current) | |
current = next_entity | |
merged.append(current) | |
return merged | |
def annotate_text(model, text, labels: List[str], threshold: float, nested_ner: bool) -> Dict: | |
labels = [label.strip() for label in labels] | |
r = { | |
"text": text, | |
"entities": [ | |
{ | |
"entity": entity["label"], | |
"word": entity["text"], | |
"start": entity["start"], | |
"end": entity["end"], | |
"score": 0, | |
} | |
for entity in model.predict_entities( | |
text, labels, flat_ner=not nested_ner, threshold=threshold | |
) | |
], | |
} | |
r["entities"] = merge_entities(r["entities"]) | |
return transform_data(r) | |
class AutoAnnotator: | |
def __init__( | |
self, model: str = "knowledgator/gliner-multitask-large-v0.5", | |
device = torch.device('cuda:0') if torch.cuda.is_available() else torch.device('cpu') | |
) -> None: | |
self.model = GLiNER.from_pretrained(model).to(device) | |
self.annotated_data = [] | |
self.stat = { | |
"total": None, | |
"current": -1 | |
} | |
def auto_annotate( | |
self, data: List[str], labels: List[str], | |
prompt: Union[str, List[str]] = None, threshold: float = 0.5, nested_ner: bool = False | |
) -> List[Dict]: | |
self.stat["total"] = len(data) | |
self.stat["current"] = -1 # Reset current progress | |
for text in data: | |
self.stat["current"] += 1 | |
if isinstance(prompt, list): | |
prompt_text = random.choice(prompt) | |
else: | |
prompt_text = prompt | |
text = f"{prompt_text}\n{text}" if prompt_text else text | |
annotation = annotate_text(self.model, text, labels, threshold, nested_ner) | |
if not annotation["ner"]: # If no entities identified | |
annotation = {"tokenized_text": tokenize_text(text), "ner": [], "validated": False} | |
self.annotated_data.append(annotation) | |
return self.annotated_data | |
# Global variables | |
annotator = None | |
sentences = [] | |
def process_uploaded_file(file_obj): | |
if file_obj is None: | |
return "Please upload a file first!" | |
try: | |
# Read the uploaded file | |
with open(file_obj.name, 'r', encoding='utf-8') as f: | |
global sentences | |
sentences = [line.strip() for line in f if line.strip()] | |
return f"Successfully loaded {len(sentences)} sentences from file!" | |
except Exception as e: | |
return f"Error reading file: {str(e)}" | |
def annotate(model, labels, threshold, prompt): | |
global annotator | |
try: | |
if not sentences: | |
return "Please upload a file with text first!" | |
labels = [label.strip() for label in labels.split(",")] | |
annotator = AutoAnnotator(model) | |
annotated_data = annotator.auto_annotate(sentences, labels, prompt, threshold) | |
# Save annotated data | |
os.makedirs("data", exist_ok=True) | |
with open("data/annotated_data.json", "wt") as file: | |
json.dump(annotated_data, file, ensure_ascii=False) | |
# Upload to Hugging Face Hub | |
api = HfApi() | |
api.upload_file( | |
path_or_fileobj="data/annotated_data.json", | |
path_in_repo="annotated_data.json", | |
repo_id="YOUR_USERNAME/YOUR_REPO_NAME", # Replace with your repo | |
repo_type="dataset" | |
) | |
return "Successfully annotated and saved to Hugging Face Hub!" | |
except Exception as e: | |
return f"Error during annotation: {str(e)}" | |
def convert_hf_dataset_to_ner_format(dataset): | |
"""Convert Hugging Face dataset to NER format""" | |
converted_data = [] | |
for item in dataset: | |
# Assuming the dataset has 'tokens' and 'ner_tags' fields | |
# Adjust the field names based on your dataset structure | |
if 'tokens' in item and 'ner_tags' in item: | |
ner_spans = [] | |
current_span = None | |
for i, (token, tag) in enumerate(zip(item['tokens'], item['ner_tags'])): | |
if tag != 'O': # Not Outside | |
if current_span is None: | |
current_span = [i, i, tag] | |
elif tag == current_span[2]: | |
current_span[1] = i | |
else: | |
ner_spans.append(current_span) | |
current_span = [i, i, tag] | |
elif current_span is not None: | |
ner_spans.append(current_span) | |
current_span = None | |
if current_span is not None: | |
ner_spans.append(current_span) | |
converted_data.append({ | |
"tokenized_text": item['tokens'], | |
"ner": ner_spans, | |
"validated": False | |
}) | |
return converted_data | |
def load_from_huggingface(dataset_name: str, split: str = "train"): | |
"""Load dataset from Hugging Face Hub""" | |
try: | |
dataset = load_dataset(dataset_name, split=split) | |
converted_data = convert_hf_dataset_to_ner_format(dataset) | |
# Save the converted data | |
os.makedirs("data", exist_ok=True) | |
with open("data/annotated_data.json", "wt") as file: | |
json.dump(converted_data, file, ensure_ascii=False) | |
return f"Successfully loaded and converted dataset: {dataset_name}" | |
except Exception as e: | |
return f"Error loading dataset: {str(e)}" | |
def load_from_local_file(file_path: str, file_format: str = "json"): | |
"""Load and convert data from local file in various formats""" | |
try: | |
if file_format == "json": | |
with open(file_path, 'r', encoding='utf-8') as f: | |
data = json.load(f) | |
if isinstance(data, list): | |
# If data is already in the correct format | |
if all("tokenized_text" in item and "ner" in item for item in data): | |
return data | |
# Convert from other JSON formats | |
converted_data = [] | |
for item in data: | |
if "tokens" in item and "ner_tags" in item: | |
ner_spans = [] | |
current_span = None | |
for i, (token, tag) in enumerate(zip(item["tokens"], item["ner_tags"])): | |
if tag != "O": | |
if current_span is None: | |
current_span = [i, i, tag] | |
elif tag == current_span[2]: | |
current_span[1] = i | |
else: | |
ner_spans.append(current_span) | |
current_span = [i, i, tag] | |
elif current_span is not None: | |
ner_spans.append(current_span) | |
current_span = None | |
if current_span is not None: | |
ner_spans.append(current_span) | |
converted_data.append({ | |
"tokenized_text": item["tokens"], | |
"ner": ner_spans, | |
"validated": False | |
}) | |
return converted_data | |
else: | |
raise ValueError("JSON file must contain a list of examples") | |
elif file_format == "conll": | |
converted_data = [] | |
current_example = {"tokens": [], "ner_tags": []} | |
with open(file_path, 'r', encoding='utf-8') as f: | |
for line in f: | |
line = line.strip() | |
if line: | |
if line.startswith("#"): | |
continue | |
parts = line.split() | |
if len(parts) >= 2: | |
token, tag = parts[0], parts[-1] | |
current_example["tokens"].append(token) | |
current_example["ner_tags"].append(tag) | |
elif current_example["tokens"]: | |
# Convert current example | |
ner_spans = [] | |
current_span = None | |
for i, (token, tag) in enumerate(zip(current_example["tokens"], current_example["ner_tags"])): | |
if tag != "O": | |
if current_span is None: | |
current_span = [i, i, tag] | |
elif tag == current_span[2]: | |
current_span[1] = i | |
else: | |
ner_spans.append(current_span) | |
current_span = [i, i, tag] | |
elif current_span is not None: | |
ner_spans.append(current_span) | |
current_span = None | |
if current_span is not None: | |
ner_spans.append(current_span) | |
converted_data.append({ | |
"tokenized_text": current_example["tokens"], | |
"ner": ner_spans, | |
"validated": False | |
}) | |
current_example = {"tokens": [], "ner_tags": []} | |
# Handle last example if exists | |
if current_example["tokens"]: | |
ner_spans = [] | |
current_span = None | |
for i, (token, tag) in enumerate(zip(current_example["tokens"], current_example["ner_tags"])): | |
if tag != "O": | |
if current_span is None: | |
current_span = [i, i, tag] | |
elif tag == current_span[2]: | |
current_span[1] = i | |
else: | |
ner_spans.append(current_span) | |
current_span = [i, i, tag] | |
elif current_span is not None: | |
ner_spans.append(current_span) | |
current_span = None | |
if current_span is not None: | |
ner_spans.append(current_span) | |
converted_data.append({ | |
"tokenized_text": current_example["tokens"], | |
"ner": ner_spans, | |
"validated": False | |
}) | |
return converted_data | |
elif file_format == "txt": | |
# Simple text file with one sentence per line | |
converted_data = [] | |
with open(file_path, 'r', encoding='utf-8') as f: | |
for line in f: | |
line = line.strip() | |
if line: | |
tokens = tokenize_text(line) | |
converted_data.append({ | |
"tokenized_text": tokens, | |
"ner": [], | |
"validated": False | |
}) | |
return converted_data | |
else: | |
raise ValueError(f"Unsupported file format: {file_format}") | |
except Exception as e: | |
raise Exception(f"Error loading file: {str(e)}") | |
def process_local_file(file_obj, file_format): | |
"""Process uploaded local file""" | |
if file_obj is None: | |
return "Please upload a file first!" | |
try: | |
# Load and convert the data | |
data = load_from_local_file(file_obj.name, file_format) | |
# Save the converted data | |
os.makedirs("data", exist_ok=True) | |
with open("data/annotated_data.json", "wt") as file: | |
json.dump(data, file, ensure_ascii=False) | |
return f"Successfully loaded and converted {len(data)} examples from {file_format} file!" | |
except Exception as e: | |
return f"Error processing file: {str(e)}" | |
# Create the main interface with tabs | |
with gr.Blocks() as demo: | |
gr.Markdown("# NER Annotation Tool") | |
with gr.Tabs(): | |
with gr.TabItem("Auto Annotation"): | |
with gr.Row(): | |
with gr.Column(): | |
file_uploader = gr.File(label="Upload text file (one sentence per line)") | |
upload_status = gr.Textbox(label="Upload Status") | |
file_uploader.change(fn=process_uploaded_file, inputs=[file_uploader], outputs=[upload_status]) | |
with gr.Column(): | |
model = gr.Dropdown( | |
label="Choose the model for annotation", | |
choices=AVAILABLE_MODELS, | |
value=AVAILABLE_MODELS[0] | |
) | |
labels = gr.Textbox( | |
label="Labels", | |
placeholder="Enter comma-separated labels (e.g., PERSON,ORG,LOC)", | |
scale=2 | |
) | |
threshold = gr.Slider( | |
0, 1, | |
value=0.3, | |
step=0.01, | |
label="Threshold", | |
info="Lower threshold increases entity predictions" | |
) | |
prompt = gr.Textbox( | |
label="Prompt", | |
placeholder="Enter your annotation prompt (optional)", | |
scale=2 | |
) | |
annotate_btn = gr.Button("Annotate Data") | |
output_info = gr.Textbox(label="Processing Status") | |
annotate_btn.click( | |
fn=annotate, | |
inputs=[model, labels, threshold, prompt], | |
outputs=[output_info] | |
) | |
with gr.TabItem("Dataset Viewer"): | |
with gr.Row(): | |
with gr.Column(): | |
with gr.Row(): | |
load_local_btn = gr.Button("Load Local Dataset") | |
load_hf_btn = gr.Button("Load from Hugging Face") | |
local_file = gr.File(label="Upload Local Dataset", visible=False) | |
file_format = gr.Dropdown( | |
choices=["json", "conll", "txt"], | |
value="json", | |
label="File Format", | |
visible=False | |
) | |
local_status = gr.Textbox(label="Local File Status", visible=False) | |
dataset_name = gr.Textbox( | |
label="Hugging Face Dataset Name", | |
placeholder="Enter dataset name (e.g., conll2003)", | |
visible=False | |
) | |
dataset_split = gr.Dropdown( | |
choices=["train", "validation", "test"], | |
value="train", | |
label="Dataset Split", | |
visible=False | |
) | |
bar = gr.Slider(minimum=0, maximum=1, step=1, label="Progress", interactive=False) | |
with gr.Row(): | |
previous_btn = gr.Button("Previous example") | |
apply_btn = gr.Button("Apply changes") | |
next_btn = gr.Button("Next example") | |
validate_btn = gr.Button("Validate") | |
save_btn = gr.Button("Save validated dataset") | |
inp_box = gr.HighlightedText(value=None, interactive=True) | |
def toggle_local_inputs(): | |
return { | |
local_file: gr.update(visible=True), | |
file_format: gr.update(visible=True), | |
local_status: gr.update(visible=True), | |
dataset_name: gr.update(visible=False), | |
dataset_split: gr.update(visible=False) | |
} | |
def toggle_hf_inputs(): | |
return { | |
local_file: gr.update(visible=False), | |
file_format: gr.update(visible=False), | |
local_status: gr.update(visible=False), | |
dataset_name: gr.update(visible=True), | |
dataset_split: gr.update(visible=True) | |
} | |
load_local_btn.click( | |
fn=toggle_local_inputs, | |
inputs=None, | |
outputs=[local_file, file_format, local_status, dataset_name, dataset_split] | |
) | |
load_hf_btn.click( | |
fn=toggle_hf_inputs, | |
inputs=None, | |
outputs=[local_file, file_format, local_status, dataset_name, dataset_split] | |
) | |
def process_and_load_local(file_obj, format): | |
status = process_local_file(file_obj, format) | |
if "Successfully" in status: | |
return load_dataset() | |
return [status], 0, 0 | |
local_file.change( | |
fn=process_and_load_local, | |
inputs=[local_file, file_format], | |
outputs=[inp_box, bar] | |
) | |
def load_hf_dataset(name, split): | |
status = load_from_huggingface(name, split) | |
if "Successfully" in status: | |
return load_dataset() | |
return [status], 0, 0 | |
load_hf_btn.click( | |
fn=load_hf_dataset, | |
inputs=[dataset_name, dataset_split], | |
outputs=[inp_box, bar] | |
) | |
apply_btn.click(fn=update_example, inputs=inp_box, outputs=inp_box) | |
save_btn.click(fn=save_dataset, inputs=inp_box, outputs=inp_box) | |
validate_btn.click(fn=validate_example, inputs=None, outputs=inp_box) | |
next_btn.click(fn=next_example, inputs=None, outputs=[inp_box, bar]) | |
previous_btn.click(fn=previous_example, inputs=None, outputs=[inp_box, bar]) | |
demo.launch() |