sumuks's picture
Create app.py
e66fa6d verified
raw
history blame
25.1 kB
#!/usr/bin/env python3
"""Enhanced web document annotation tool with modern UI."""
import hashlib
import json
import os
import uuid
from collections import defaultdict, deque
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from random import randint, randrange
import gradio as gr
from datasets import Dataset, load_dataset
from loguru import logger
def doc_hash(url: str, text: str) -> str:
return hashlib.sha256(f"{url}{text}".encode()).hexdigest()
def filterfunc(x: dict) -> bool:
# text length
if len(x.get("text", "").split()) < 100:
# very short content usually means it's not a high quality document
return False
excluded = {"Promotional/Advertisement", "Machine-Generated", "Images/Videos/Audio",
"Truncated", "Spam/Ads", "Product Page", "Content Listing"}
for version in ["document_type_v1", "document_type_v2"]:
for level in ["primary", "secondary"]:
if label := x.get("eai_taxonomy", {}).get(version, {}).get(level, {}).get("label"):
if label in excluded:
return False
return True
class DocLoader:
__slots__ = ("queue", "k", "counts", "processed", "total_docs", "_dataset")
def __init__(self, processed: set[str], k: int = 20):
self.queue = deque()
self.k = k
self.counts = defaultdict(int)
self.processed = processed
self.total_docs = 0
self._dataset = None
self._load()
def _load(self):
ds = load_dataset("sumuks/essential-web-v1.0-sample-10M", split="train")
logger.info(f"Loaded {len(ds)} documents")
ds = ds.filter(filterfunc)
logger.info(f"Filtered to {len(ds)} documents")
self._dataset = {}
for idx, doc in enumerate(ds):
doc_key = doc.get("id", idx)
doc_with_key = dict(doc)
doc_with_key["_dataset_key"] = doc_key
self._dataset[doc_key] = doc_with_key
for doc_id, doc in self._dataset.items():
url = doc.get("metadata", {}).get("url", doc.get("url", ""))
h = doc_hash(url, doc.get("text", ""))
if h in self.processed:
continue
if cat := doc.get("eai_taxonomy", {}).get("document_type_v2", {}).get("primary", {}).get("label"):
min_count = min(self.counts.values(), default=0)
if self.counts[cat] <= min_count or randrange(self.k) == 0:
self.queue.append(doc)
self.counts[cat] += 1
self.total_docs = len(self.queue)
logger.info(f"Loaded {self.total_docs} documents")
def next(self) -> dict | None:
return self.queue.popleft() if self.queue else None
def get_by_id(self, doc_id: str | int) -> dict | None:
# Handle both string and int lookups
result = self._dataset.get(doc_id)
if result is None and isinstance(doc_id, str) and doc_id.isdigit():
result = self._dataset.get(int(doc_id))
elif result is None and isinstance(doc_id, int):
result = self._dataset.get(str(doc_id))
return result
@property
def remaining(self) -> int:
return len(self.queue)
@dataclass(slots=True)
class AnnotationStore:
path: Path
session_id: str = field(default_factory=lambda: str(uuid.uuid4()))
buffer: list[dict] = field(default_factory=list)
threshold: int = field(default_factory=lambda: randint(20, 30))
processed: set[str] = field(default_factory=set)
annotations: list[dict] = field(default_factory=list)
session_stats: dict = field(default_factory=lambda: {
"total": 0,
"selected": 0,
"discarded": 0,
"start_time": datetime.now(timezone.utc),
"decisions": []
})
def __post_init__(self):
self.path.parent.mkdir(parents=True, exist_ok=True)
if self.path.exists():
for line in self.path.read_text().splitlines():
if rec := self._parse_line(line):
self.processed.add(rec["hash"])
self.annotations.append(rec)
# Initialize session stats for loaded annotations
if "decision" in rec:
decision = rec["decision"]
if decision not in self.session_stats:
self.session_stats[decision] = 0
def _parse_line(self, line: str) -> dict | None:
try:
return json.loads(line)
except:
return None
def add(self, doc_hash: str, decision: str, doc_id: str | int):
if doc_hash in self.processed:
return
rec = {
"hash": doc_hash,
"decision": decision,
"session": self.session_id,
"id": doc_id,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
self.path.open("a").write(json.dumps(rec) + "\n")
self.processed.add(doc_hash)
self.buffer.append(rec)
self.annotations.append(rec)
self.session_stats["total"] += 1
if decision not in self.session_stats:
self.session_stats[decision] = 0
self.session_stats[decision] += 1
self.session_stats["decisions"].append((datetime.now(timezone.utc), decision))
if len(self.buffer) >= self.threshold:
self.flush()
def flush(self):
if not self.buffer or not (token := os.getenv("HF_TOKEN")):
self.buffer.clear()
self.threshold = randint(20, 30)
return
try:
Dataset.from_list(self.buffer).push_to_hub(
"yourbench/essential-web-annotations",
token=token
)
logger.info(f"Pushed {len(self.buffer)} annotations")
self.buffer.clear()
except Exception as e:
logger.error(f"Push failed: {e}")
finally:
self.threshold = randint(20, 30)
def get_rate(self) -> float:
if not self.session_stats["decisions"]:
return 0.0
elapsed = (datetime.now(timezone.utc) - self.session_stats["start_time"]).total_seconds()
return (self.session_stats["total"] / elapsed * 3600) if elapsed > 0 else 0.0
def get_filtered(self, decision: str | None = None) -> list[dict]:
if decision is None or decision == "all":
return self.annotations
return [a for a in self.annotations if a.get("decision") == decision]
SESSION_LIMIT = 50
store = AnnotationStore(Path("data/annotations.jsonl"))
loader = DocLoader(store.processed)
current = loader.next()
# Viewer state
viewer_state = {
"annotations": [],
"index": 0,
"filter": "all"
}
def format_stats() -> str:
stats = store.session_stats
rate = store.get_rate()
selected_count = stats.get('selected', 0)
discarded_count = stats.get('discarded', 0)
return f"""
<div class="stats-container">
<div class="stat-item">
<div class="stat-value">{stats['total']}</div>
<div class="stat-label">Total Annotated</div>
</div>
<div class="stat-item">
<div class="stat-value">{selected_count}</div>
<div class="stat-label">Selected</div>
</div>
<div class="stat-item">
<div class="stat-value">{discarded_count}</div>
<div class="stat-label">Discarded</div>
</div>
<div class="stat-item">
<div class="stat-value">{rate:.0f}/hr</div>
<div class="stat-label">Annotation Rate</div>
</div>
</div>
"""
def format_progress() -> tuple[str, float]:
session_completed = store.session_stats["total"]
session_total = SESSION_LIMIT
progress = (session_completed / session_total) if session_total > 0 else 0
percentage = progress * 100
return (
f"""
<div class="progress-container">
<div class="progress-header">
<span class="progress-title">Session Progress</span>
<span class="progress-numbers">{session_completed:,} / {session_total:,}</span>
</div>
<div class="progress-bar-bg">
<div class="progress-bar-fill" style="width: {percentage:.1f}%"></div>
</div>
<div class="progress-percentage">{percentage:.1f}% Complete</div>
</div>
""",
progress
)
def format_document_info(doc: dict, annotation: dict | None = None) -> str:
if not doc:
return ""
meta = doc.get("metadata", {})
url = meta.get("url", doc.get("url", ""))
domain = url.split('/')[2] if url and '/' in url else "Unknown"
cat = doc.get("eai_taxonomy", {}).get("document_type_v2", {}).get("primary", {}).get("label", "Uncategorized")
word_count = len(doc.get("text", "").split())
annotation_info = ""
if annotation:
timestamp = datetime.fromisoformat(annotation["timestamp"].replace("Z", "+00:00"))
decision_color = "#667eea" if annotation["decision"] == "selected" else "#f5576c"
annotation_info = f"""
<div class="annotation-info" style="border-left: 4px solid {decision_color};">
<span class="annotation-decision" style="color: {decision_color};">
{"βœ…" if annotation["decision"] == "selected" else "❌"} {annotation["decision"].title()}
</span>
<span class="annotation-time">πŸ“… {timestamp.strftime("%Y-%m-%d %H:%M:%S")}</span>
</div>
"""
return f"""
<div class="doc-info">
{annotation_info}
<div class="doc-meta">
<span class="doc-domain">πŸ“Œ {domain}</span>
<span class="doc-category">🏷️ {cat}</span>
<span class="doc-words">πŸ“ {word_count:,} words</span>
</div>
<a href="{url}" target="_blank" class="doc-url">{url}</a>
</div>
"""
def choose(decision: str):
global current
if not current:
return done_state()
url = current.get("metadata", {}).get("url", current.get("url", ""))
h = doc_hash(url, current.get("text", ""))
doc_id = current.get("_dataset_key", current.get("id", ""))
store.add(h, decision, doc_id)
if store.session_stats["total"] >= SESSION_LIMIT:
return done_state()
current = loader.next()
if not current:
return done_state()
progress_html, progress_num = format_progress()
return (
format_document_info(current),
current.get("text", ""),
gr.update(interactive=True),
gr.update(interactive=True),
format_stats(),
progress_html,
progress_num
)
def done_state():
progress_html, progress_num = format_progress()
if store.session_stats["total"] >= SESSION_LIMIT:
message = "πŸŽ‰ Session Complete!"
description = f"Great job! You've completed your session of {SESSION_LIMIT} documents."
else:
message = "πŸŽ‰ All documents annotated!"
description = "Great job! You've completed all available documents."
return (
f"<div class='done-message'>{message}</div>",
description,
gr.update(interactive=False),
gr.update(interactive=False),
format_stats(),
progress_html,
1.0
)
def update_viewer_filter(filter_value: str):
viewer_state["filter"] = filter_value
viewer_state["index"] = 0
# Get filtered annotations
viewer_state["annotations"] = store.get_filtered(filter_value)
# Log for debugging
logger.info(f"Filter: {filter_value}, Found {len(viewer_state['annotations'])} annotations")
return update_viewer_display()
def navigate_viewer(direction: int):
if not viewer_state["annotations"]:
return update_viewer_display()
viewer_state["index"] = (viewer_state["index"] + direction) % len(viewer_state["annotations"])
return update_viewer_display()
def update_viewer_display():
if not viewer_state["annotations"]:
return (
"<div class='viewer-empty'>No annotations to display</div>",
"",
f"0 / 0",
gr.update(interactive=False),
gr.update(interactive=False)
)
idx = viewer_state["index"]
annotation = viewer_state["annotations"][idx]
doc = loader.get_by_id(annotation["id"])
if not doc:
# Log the issue for debugging
logger.warning(f"Document not found for ID: {annotation['id']} (type: {type(annotation['id'])})")
return (
"<div class='viewer-error'>Document not found in dataset</div>",
f"Annotation details: {json.dumps(annotation, indent=2)}",
f"{idx + 1} / {len(viewer_state['annotations'])}",
gr.update(interactive=idx > 0),
gr.update(interactive=idx < len(viewer_state["annotations"]) - 1)
)
return (
format_document_info(doc, annotation),
doc.get("text", ""),
f"{idx + 1} / {len(viewer_state['annotations'])}",
gr.update(interactive=idx > 0),
gr.update(interactive=idx < len(viewer_state["annotations"]) - 1)
)
def build() -> gr.Blocks:
css = """
.stats-container {
display: flex;
gap: 15px;
margin: 10px 0;
flex-wrap: nowrap;
justify-content: space-between;
}
.stat-item {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
border-radius: 12px;
padding: 15px;
flex: 1;
min-width: 100px;
text-align: center;
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
transition: transform 0.2s;
}
.stat-item:hover {
transform: translateY(-2px);
}
.stat-value {
font-size: 24px;
font-weight: bold;
color: white;
margin-bottom: 3px;
}
.stat-label {
font-size: 12px;
color: rgba(255, 255, 255, 0.9);
}
.progress-container {
background: #f8f9fa;
border-radius: 12px;
padding: 15px;
margin: 10px 0;
}
.progress-header {
display: flex;
justify-content: space-between;
margin-bottom: 10px;
font-weight: 600;
}
.progress-bar-bg {
background: #e9ecef;
height: 20px;
border-radius: 10px;
overflow: hidden;
margin-bottom: 10px;
}
.progress-bar-fill {
background: linear-gradient(90deg, #667eea 0%, #764ba2 100%);
height: 100%;
transition: width 0.3s ease;
}
.progress-percentage {
text-align: center;
color: #6c757d;
font-size: 14px;
}
.doc-info {
background: #f8f9fa;
border-radius: 12px;
padding: 15px;
margin-bottom: 10px;
}
.doc-meta {
display: flex;
gap: 20px;
margin-bottom: 10px;
flex-wrap: wrap;
}
.doc-meta span {
font-size: 14px;
color: #495057;
}
.doc-url {
font-size: 14px;
color: #667eea;
text-decoration: none;
word-break: break-all;
}
.doc-url:hover {
text-decoration: underline;
}
.done-message {
font-size: 32px;
text-align: center;
padding: 40px;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
border-radius: 12px;
font-weight: bold;
}
.annotation-info {
display: flex;
justify-content: space-between;
margin-bottom: 10px;
padding-left: 10px;
}
.annotation-decision {
font-weight: 600;
}
.annotation-time {
color: #6c757d;
font-size: 12px;
}
.viewer-empty, .viewer-error {
text-align: center;
padding: 40px;
color: #6c757d;
font-size: 18px;
}
.viewer-nav {
display: flex;
justify-content: center;
align-items: center;
gap: 20px;
margin: 10px 0;
}
.viewer-counter {
font-weight: 600;
color: #495057;
}
#select {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
border: none;
font-size: 18px;
padding: 12px 24px;
}
#discard {
background: linear-gradient(135deg, #f093fb 0%, #f5576c 100%);
border: none;
font-size: 18px;
padding: 12px 24px;
}
.dark .stat-item {
background: linear-gradient(135deg, #434343 0%, #000000 100%);
}
.dark .progress-container, .dark .doc-info {
background: #1a1a1a;
}
.dark .progress-bar-bg {
background: #2a2a2a;
}
@keyframes pulse {
0% { transform: scale(1); }
50% { transform: scale(1.05); }
100% { transform: scale(1); }
}
"""
shortcut_js = """
<script>
function handleKeyboardShortcuts(e) {
var target = e.target || e.srcElement;
switch (target.tagName.toLowerCase()) {
case "input":
case "textarea":
case "select":
case "button":
return;
default:
if (e.code === "Digit1" || e.key === "1") {
var selectBtn = document.getElementById("select");
if (selectBtn && !selectBtn.disabled) {
selectBtn.click();
e.preventDefault();
}
}
else if (e.code === "Digit2" || e.key === "2") {
var discardBtn = document.getElementById("discard");
if (discardBtn && !discardBtn.disabled) {
discardBtn.click();
e.preventDefault();
}
}
}
}
document.addEventListener('keyup', handleKeyboardShortcuts, false);
document.addEventListener('keydown', function(e) {
if ((e.code === "Digit1" || e.key === "1") && document.getElementById("select") && !document.getElementById("select").disabled) {
document.getElementById("select").style.transform = "scale(0.95)";
}
if ((e.code === "Digit2" || e.key === "2") && document.getElementById("discard") && !document.getElementById("discard").disabled) {
document.getElementById("discard").style.transform = "scale(0.95)";
}
});
document.addEventListener('keyup', function(e) {
if (e.code === "Digit1" || e.key === "1") {
var btn = document.getElementById("select");
if (btn) btn.style.transform = "scale(1)";
}
if (e.code === "Digit2" || e.key === "2") {
var btn = document.getElementById("discard");
if (btn) btn.style.transform = "scale(1)";
}
});
</script>
"""
with gr.Blocks(
title="Essential Web Annotation",
theme=gr.themes.Default(),
css=css,
head=shortcut_js
) as demo:
gr.Markdown("# πŸš€ Essential Web Annotation Tool")
with gr.Tabs():
with gr.Tab("Annotate"):
gr.Markdown("""
## πŸ“‹ Document Quality Assessment
Your task is to evaluate documents for **high-quality, valuable content** that provides generalizable information.
### βœ… **Select High-Quality Documents:**
Examples include:
- **Technical blogs** with detailed explanations
- **Scientific papers** and research articles
- **Information-rich discussions** with insights
- **Educational content** with actionable knowledge
- **Professional documentation** and guides
### ❌ **Discard Low-Quality Documents:**
- Content with minimal informational value
### 🎯 **Quick Assessment Tips:**
- High-quality documents are usually immediately recognizable to a human.
- Use the **Viewer** tab to browse examples of selected documents
- Trust your judgment on content value and depth
### ⌨️ **Keyboard Shortcuts:**
| Key | Action |
|-----|--------|
| **`1`** | βœ… Select document |
| **`2`** | ❌ Discard document |
""")
progress_html, progress_num = format_progress()
progress_display = gr.HTML(progress_html)
stats_display = gr.HTML(format_stats())
if current:
doc_info_html = format_document_info(current)
text_val = current.get("text", "")
else:
doc_info_html = "<div class='doc-info'>No documents loaded.</div>"
text_val = ""
doc_info = gr.HTML(doc_info_html)
with gr.Column(variant="panel"):
text_display = gr.Textbox(
text_val,
label="πŸ“„ Document Content",
lines=20,
interactive=False,
show_copy_button=True
)
with gr.Row():
btn_sel = gr.Button(
"βœ… Select (1)",
elem_id="select",
variant="primary",
interactive=bool(current),
size="lg"
)
btn_dis = gr.Button(
"❌ Discard (2)",
elem_id="discard",
variant="stop",
interactive=bool(current),
size="lg"
)
progress_bar = gr.Number(value=progress_num, visible=False)
outputs = [doc_info, text_display, btn_sel, btn_dis, stats_display, progress_display, progress_bar]
btn_sel.click(lambda: choose("selected"), outputs=outputs)
btn_dis.click(lambda: choose("discarded"), outputs=outputs)
with gr.Tab("Viewer"):
gr.Markdown("### πŸ“š Browse Annotated Documents")
with gr.Row():
filter_dropdown = gr.Radio(
choices=["all", "selected", "discarded"],
value="all",
label="Filter",
interactive=True
)
viewer_info = gr.HTML()
with gr.Column(variant="panel"):
viewer_text = gr.Textbox(
label="πŸ“„ Document Content",
lines=20,
interactive=False,
show_copy_button=True
)
with gr.Row():
prev_btn = gr.Button("← Previous", size="lg")
viewer_counter = gr.HTML("<div class='viewer-counter'>0 / 0</div>")
next_btn = gr.Button("Next β†’", size="lg")
# Initialize viewer
filter_dropdown.change(
update_viewer_filter,
inputs=[filter_dropdown],
outputs=[viewer_info, viewer_text, viewer_counter, prev_btn, next_btn]
)
prev_btn.click(
lambda: navigate_viewer(-1),
outputs=[viewer_info, viewer_text, viewer_counter, prev_btn, next_btn]
)
next_btn.click(
lambda: navigate_viewer(1),
outputs=[viewer_info, viewer_text, viewer_counter, prev_btn, next_btn]
)
# Load initial viewer state
demo.load(
lambda: update_viewer_filter("all"),
outputs=[viewer_info, viewer_text, viewer_counter, prev_btn, next_btn]
)
gr.HTML("""
<script>
const observer = new MutationObserver(() => {
document.querySelectorAll('.stat-item').forEach(item => {
item.style.animation = 'pulse 0.3s ease-out';
});
});
observer.observe(document.body, { childList: true, subtree: true });
</script>
""")
return demo
if __name__ == "__main__":
build().launch()