#!/usr/bin/env python3 """Enhanced web document annotation tool with modern UI.""" import hashlib import json import os import uuid from collections import defaultdict, deque from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from random import randint, randrange import gradio as gr from datasets import Dataset, load_dataset from loguru import logger def doc_hash(url: str, text: str) -> str: return hashlib.sha256(f"{url}{text}".encode()).hexdigest() def filterfunc(x: dict) -> bool: # text length if len(x.get("text", "").split()) < 100: # very short content usually means it's not a high quality document return False excluded = {"Promotional/Advertisement", "Machine-Generated", "Images/Videos/Audio", "Truncated", "Spam/Ads", "Product Page", "Content Listing"} for version in ["document_type_v1", "document_type_v2"]: for level in ["primary", "secondary"]: if label := x.get("eai_taxonomy", {}).get(version, {}).get(level, {}).get("label"): if label in excluded: return False return True class DocLoader: __slots__ = ("queue", "k", "counts", "processed", "total_docs", "_dataset") def __init__(self, processed: set[str], k: int = 20): self.queue = deque() self.k = k self.counts = defaultdict(int) self.processed = processed self.total_docs = 0 self._dataset = None self._load() def _load(self): ds = load_dataset("sumuks/essential-web-v1.0-sample-10M", split="train") logger.info(f"Loaded {len(ds)} documents") ds = ds.filter(filterfunc) logger.info(f"Filtered to {len(ds)} documents") self._dataset = {} for idx, doc in enumerate(ds): doc_key = doc.get("id", idx) doc_with_key = dict(doc) doc_with_key["_dataset_key"] = doc_key self._dataset[doc_key] = doc_with_key for doc_id, doc in self._dataset.items(): url = doc.get("metadata", {}).get("url", doc.get("url", "")) h = doc_hash(url, doc.get("text", "")) if h in self.processed: continue if cat := doc.get("eai_taxonomy", {}).get("document_type_v2", {}).get("primary", {}).get("label"): min_count = min(self.counts.values(), default=0) if self.counts[cat] <= min_count or randrange(self.k) == 0: self.queue.append(doc) self.counts[cat] += 1 self.total_docs = len(self.queue) logger.info(f"Loaded {self.total_docs} documents") def next(self) -> dict | None: return self.queue.popleft() if self.queue else None def get_by_id(self, doc_id: str | int) -> dict | None: # Handle both string and int lookups result = self._dataset.get(doc_id) if result is None and isinstance(doc_id, str) and doc_id.isdigit(): result = self._dataset.get(int(doc_id)) elif result is None and isinstance(doc_id, int): result = self._dataset.get(str(doc_id)) return result @property def remaining(self) -> int: return len(self.queue) @dataclass(slots=True) class AnnotationStore: path: Path session_id: str = field(default_factory=lambda: str(uuid.uuid4())) buffer: list[dict] = field(default_factory=list) threshold: int = field(default_factory=lambda: randint(20, 30)) processed: set[str] = field(default_factory=set) annotations: list[dict] = field(default_factory=list) session_stats: dict = field(default_factory=lambda: { "total": 0, "selected": 0, "discarded": 0, "start_time": datetime.now(timezone.utc), "decisions": [] }) def __post_init__(self): self.path.parent.mkdir(parents=True, exist_ok=True) if self.path.exists(): for line in self.path.read_text().splitlines(): if rec := self._parse_line(line): self.processed.add(rec["hash"]) self.annotations.append(rec) # Initialize session stats for loaded annotations if "decision" in rec: decision = rec["decision"] if decision not in self.session_stats: self.session_stats[decision] = 0 def _parse_line(self, line: str) -> dict | None: try: return json.loads(line) except: return None def add(self, doc_hash: str, decision: str, doc_id: str | int): if doc_hash in self.processed: return rec = { "hash": doc_hash, "decision": decision, "session": self.session_id, "id": doc_id, "timestamp": datetime.now(timezone.utc).isoformat(), } self.path.open("a").write(json.dumps(rec) + "\n") self.processed.add(doc_hash) self.buffer.append(rec) self.annotations.append(rec) self.session_stats["total"] += 1 if decision not in self.session_stats: self.session_stats[decision] = 0 self.session_stats[decision] += 1 self.session_stats["decisions"].append((datetime.now(timezone.utc), decision)) if len(self.buffer) >= self.threshold: self.flush() def flush(self): if not self.buffer or not (token := os.getenv("HF_TOKEN")): self.buffer.clear() self.threshold = randint(20, 30) return try: Dataset.from_list(self.buffer).push_to_hub( "yourbench/essential-web-annotations", token=token ) logger.info(f"Pushed {len(self.buffer)} annotations") self.buffer.clear() except Exception as e: logger.error(f"Push failed: {e}") finally: self.threshold = randint(20, 30) def get_rate(self) -> float: if not self.session_stats["decisions"]: return 0.0 elapsed = (datetime.now(timezone.utc) - self.session_stats["start_time"]).total_seconds() return (self.session_stats["total"] / elapsed * 3600) if elapsed > 0 else 0.0 def get_filtered(self, decision: str | None = None) -> list[dict]: if decision is None or decision == "all": return self.annotations return [a for a in self.annotations if a.get("decision") == decision] SESSION_LIMIT = 50 store = AnnotationStore(Path("data/annotations.jsonl")) loader = DocLoader(store.processed) current = loader.next() # Viewer state viewer_state = { "annotations": [], "index": 0, "filter": "all" } def format_stats() -> str: stats = store.session_stats rate = store.get_rate() selected_count = stats.get('selected', 0) discarded_count = stats.get('discarded', 0) return f"""
{stats['total']}
Total Annotated
{selected_count}
Selected
{discarded_count}
Discarded
{rate:.0f}/hr
Annotation Rate
""" def format_progress() -> tuple[str, float]: session_completed = store.session_stats["total"] session_total = SESSION_LIMIT progress = (session_completed / session_total) if session_total > 0 else 0 percentage = progress * 100 return ( f"""
Session Progress {session_completed:,} / {session_total:,}
{percentage:.1f}% Complete
""", progress ) def format_document_info(doc: dict, annotation: dict | None = None) -> str: if not doc: return "" meta = doc.get("metadata", {}) url = meta.get("url", doc.get("url", "")) domain = url.split('/')[2] if url and '/' in url else "Unknown" cat = doc.get("eai_taxonomy", {}).get("document_type_v2", {}).get("primary", {}).get("label", "Uncategorized") word_count = len(doc.get("text", "").split()) annotation_info = "" if annotation: timestamp = datetime.fromisoformat(annotation["timestamp"].replace("Z", "+00:00")) decision_color = "#667eea" if annotation["decision"] == "selected" else "#f5576c" annotation_info = f"""
{"✅" if annotation["decision"] == "selected" else "❌"} {annotation["decision"].title()} 📅 {timestamp.strftime("%Y-%m-%d %H:%M:%S")}
""" return f"""
{annotation_info}
📌 {domain} 🏷️ {cat} 📝 {word_count:,} words
{url}
""" def choose(decision: str): global current if not current: return done_state() url = current.get("metadata", {}).get("url", current.get("url", "")) h = doc_hash(url, current.get("text", "")) doc_id = current.get("_dataset_key", current.get("id", "")) store.add(h, decision, doc_id) if store.session_stats["total"] >= SESSION_LIMIT: return done_state() current = loader.next() if not current: return done_state() progress_html, progress_num = format_progress() return ( format_document_info(current), current.get("text", ""), gr.update(interactive=True), gr.update(interactive=True), format_stats(), progress_html, progress_num ) def done_state(): progress_html, progress_num = format_progress() if store.session_stats["total"] >= SESSION_LIMIT: message = "🎉 Session Complete!" description = f"Great job! You've completed your session of {SESSION_LIMIT} documents." else: message = "🎉 All documents annotated!" description = "Great job! You've completed all available documents." return ( f"
{message}
", description, gr.update(interactive=False), gr.update(interactive=False), format_stats(), progress_html, 1.0 ) def update_viewer_filter(filter_value: str): viewer_state["filter"] = filter_value viewer_state["index"] = 0 # Get filtered annotations viewer_state["annotations"] = store.get_filtered(filter_value) # Log for debugging logger.info(f"Filter: {filter_value}, Found {len(viewer_state['annotations'])} annotations") return update_viewer_display() def navigate_viewer(direction: int): if not viewer_state["annotations"]: return update_viewer_display() viewer_state["index"] = (viewer_state["index"] + direction) % len(viewer_state["annotations"]) return update_viewer_display() def update_viewer_display(): if not viewer_state["annotations"]: return ( "
No annotations to display
", "", f"0 / 0", gr.update(interactive=False), gr.update(interactive=False) ) idx = viewer_state["index"] annotation = viewer_state["annotations"][idx] doc = loader.get_by_id(annotation["id"]) if not doc: # Log the issue for debugging logger.warning(f"Document not found for ID: {annotation['id']} (type: {type(annotation['id'])})") return ( "
Document not found in dataset
", f"Annotation details: {json.dumps(annotation, indent=2)}", f"{idx + 1} / {len(viewer_state['annotations'])}", gr.update(interactive=idx > 0), gr.update(interactive=idx < len(viewer_state["annotations"]) - 1) ) return ( format_document_info(doc, annotation), doc.get("text", ""), f"{idx + 1} / {len(viewer_state['annotations'])}", gr.update(interactive=idx > 0), gr.update(interactive=idx < len(viewer_state["annotations"]) - 1) ) def build() -> gr.Blocks: css = """ .stats-container { display: flex; gap: 15px; margin: 10px 0; flex-wrap: nowrap; justify-content: space-between; } .stat-item { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); border-radius: 12px; padding: 15px; flex: 1; min-width: 100px; text-align: center; box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1); transition: transform 0.2s; } .stat-item:hover { transform: translateY(-2px); } .stat-value { font-size: 24px; font-weight: bold; color: white; margin-bottom: 3px; } .stat-label { font-size: 12px; color: rgba(255, 255, 255, 0.9); } .progress-container { background: #f8f9fa; border-radius: 12px; padding: 15px; margin: 10px 0; } .progress-header { display: flex; justify-content: space-between; margin-bottom: 10px; font-weight: 600; } .progress-bar-bg { background: #e9ecef; height: 20px; border-radius: 10px; overflow: hidden; margin-bottom: 10px; } .progress-bar-fill { background: linear-gradient(90deg, #667eea 0%, #764ba2 100%); height: 100%; transition: width 0.3s ease; } .progress-percentage { text-align: center; color: #6c757d; font-size: 14px; } .doc-info { background: #f8f9fa; border-radius: 12px; padding: 15px; margin-bottom: 10px; } .doc-meta { display: flex; gap: 20px; margin-bottom: 10px; flex-wrap: wrap; } .doc-meta span { font-size: 14px; color: #495057; } .doc-url { font-size: 14px; color: #667eea; text-decoration: none; word-break: break-all; } .doc-url:hover { text-decoration: underline; } .done-message { font-size: 32px; text-align: center; padding: 40px; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; border-radius: 12px; font-weight: bold; } .annotation-info { display: flex; justify-content: space-between; margin-bottom: 10px; padding-left: 10px; } .annotation-decision { font-weight: 600; } .annotation-time { color: #6c757d; font-size: 12px; } .viewer-empty, .viewer-error { text-align: center; padding: 40px; color: #6c757d; font-size: 18px; } .viewer-nav { display: flex; justify-content: center; align-items: center; gap: 20px; margin: 10px 0; } .viewer-counter { font-weight: 600; color: #495057; } #select { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); border: none; font-size: 18px; padding: 12px 24px; } #discard { background: linear-gradient(135deg, #f093fb 0%, #f5576c 100%); border: none; font-size: 18px; padding: 12px 24px; } .dark .stat-item { background: linear-gradient(135deg, #434343 0%, #000000 100%); } .dark .progress-container, .dark .doc-info { background: #1a1a1a; } .dark .progress-bar-bg { background: #2a2a2a; } @keyframes pulse { 0% { transform: scale(1); } 50% { transform: scale(1.05); } 100% { transform: scale(1); } } """ shortcut_js = """ """ with gr.Blocks( title="Essential Web Annotation", theme=gr.themes.Default(), css=css, head=shortcut_js ) as demo: gr.Markdown("# 🚀 Essential Web Annotation Tool") with gr.Tabs(): with gr.Tab("Annotate"): gr.Markdown(""" ## 📋 Document Quality Assessment Your task is to evaluate documents for **high-quality, valuable content** that provides generalizable information. ### ✅ **Select High-Quality Documents:** Examples include: - **Technical blogs** with detailed explanations - **Scientific papers** and research articles - **Information-rich discussions** with insights - **Educational content** with actionable knowledge - **Professional documentation** and guides ### ❌ **Discard Low-Quality Documents:** - Content with minimal informational value ### 🎯 **Quick Assessment Tips:** - High-quality documents are usually immediately recognizable to a human. - Use the **Viewer** tab to browse examples of selected documents - Trust your judgment on content value and depth ### ⌨️ **Keyboard Shortcuts:** | Key | Action | |-----|--------| | **`1`** | ✅ Select document | | **`2`** | ❌ Discard document | """) progress_html, progress_num = format_progress() progress_display = gr.HTML(progress_html) stats_display = gr.HTML(format_stats()) if current: doc_info_html = format_document_info(current) text_val = current.get("text", "") else: doc_info_html = "
No documents loaded.
" text_val = "" doc_info = gr.HTML(doc_info_html) with gr.Column(variant="panel"): text_display = gr.Textbox( text_val, label="📄 Document Content", lines=20, interactive=False, show_copy_button=True ) with gr.Row(): btn_sel = gr.Button( "✅ Select (1)", elem_id="select", variant="primary", interactive=bool(current), size="lg" ) btn_dis = gr.Button( "❌ Discard (2)", elem_id="discard", variant="stop", interactive=bool(current), size="lg" ) progress_bar = gr.Number(value=progress_num, visible=False) outputs = [doc_info, text_display, btn_sel, btn_dis, stats_display, progress_display, progress_bar] btn_sel.click(lambda: choose("selected"), outputs=outputs) btn_dis.click(lambda: choose("discarded"), outputs=outputs) with gr.Tab("Viewer"): gr.Markdown("### 📚 Browse Annotated Documents") with gr.Row(): filter_dropdown = gr.Radio( choices=["all", "selected", "discarded"], value="all", label="Filter", interactive=True ) viewer_info = gr.HTML() with gr.Column(variant="panel"): viewer_text = gr.Textbox( label="📄 Document Content", lines=20, interactive=False, show_copy_button=True ) with gr.Row(): prev_btn = gr.Button("← Previous", size="lg") viewer_counter = gr.HTML("
0 / 0
") next_btn = gr.Button("Next →", size="lg") # Initialize viewer filter_dropdown.change( update_viewer_filter, inputs=[filter_dropdown], outputs=[viewer_info, viewer_text, viewer_counter, prev_btn, next_btn] ) prev_btn.click( lambda: navigate_viewer(-1), outputs=[viewer_info, viewer_text, viewer_counter, prev_btn, next_btn] ) next_btn.click( lambda: navigate_viewer(1), outputs=[viewer_info, viewer_text, viewer_counter, prev_btn, next_btn] ) # Load initial viewer state demo.load( lambda: update_viewer_filter("all"), outputs=[viewer_info, viewer_text, viewer_counter, prev_btn, next_btn] ) gr.HTML(""" """) return demo if __name__ == "__main__": build().launch()