Spaces:
Sleeping
Sleeping
import gradio as gr | |
import os | |
import json | |
import requests | |
from bs4 import BeautifulSoup | |
import networkx as nx | |
import matplotlib | |
matplotlib.use('Agg') # Use non-interactive backend | |
import matplotlib.pyplot as plt | |
import numpy as np | |
import io | |
import base64 | |
from huggingface_hub import InferenceClient | |
import re | |
from urllib.parse import urlparse | |
import warnings | |
# Configure matplotlib for better font handling | |
plt.rcParams['font.family'] = ['DejaVu Sans'] | |
plt.rcParams['font.size'] = 10 | |
plt.rcParams['font.weight'] = 'normal' | |
plt.rcParams['figure.max_open_warning'] = 0 # Disable figure warnings | |
warnings.filterwarnings('ignore', category=UserWarning) | |
warnings.filterwarnings('ignore', message='.*Font family.*not found.*') | |
warnings.filterwarnings('ignore', message='.*Matplotlib.*') | |
def clean_text_for_display(text): | |
"""Clean text to remove characters that might cause font issues.""" | |
if not isinstance(text, str): | |
return str(text) | |
# Remove or replace problematic characters | |
text = re.sub(r'[^\x00-\x7F]+', '', text) # Remove non-ASCII characters | |
text = re.sub(r'\s+', ' ', text).strip() # Normalize whitespace | |
return text[:50] if len(text) > 50 else text # Limit length for display | |
def fetch_content(url_or_text): | |
"""Fetch content from URL or return text directly. | |
Args: | |
url_or_text: Either a URL to fetch content from, or direct text input | |
Returns: | |
Extracted text content | |
""" | |
try: | |
# Check if input looks like a URL | |
parsed = urlparse(url_or_text) | |
if parsed.scheme in ['http', 'https']: | |
try: | |
headers = { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
} | |
response = requests.get(url_or_text, headers=headers, timeout=10) | |
response.raise_for_status() | |
# Parse HTML and extract text | |
soup = BeautifulSoup(response.content, 'html.parser') | |
# Remove script and style elements | |
for script in soup(["script", "style"]): | |
script.decompose() | |
# Get text and clean it up | |
text = soup.get_text() | |
lines = (line.strip() for line in text.splitlines()) | |
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
text = ' '.join(chunk for chunk in chunks if chunk) | |
return text[:5000] # Limit to first 5000 characters | |
except Exception as e: | |
return f"Error fetching URL: {str(e)}" | |
else: | |
# It's direct text input | |
return url_or_text | |
except Exception as e: | |
return f"Error processing input: {str(e)}" | |
def simple_entity_extraction(text): | |
"""Fallback entity extraction when AI is not available.""" | |
try: | |
words = text.split() | |
entities = [] | |
# Simple heuristic: words that are capitalized and longer than 2 characters | |
seen = set() | |
for word in words[:30]: # Limit to first 30 words | |
clean_word = re.sub(r'[^\w]', '', word) | |
if (clean_word.istitle() and len(clean_word) > 2 and | |
clean_word.lower() not in seen and | |
clean_word not in ['The', 'This', 'That', 'When', 'Where', 'How']): | |
entities.append({ | |
"name": clean_text_for_display(clean_word), | |
"type": "CONCEPT", | |
"description": "Auto-detected entity" | |
}) | |
seen.add(clean_word.lower()) | |
# Create some basic relationships | |
relationships = [] | |
if len(entities) > 1: | |
for i in range(min(len(entities) - 1, 5)): # Max 5 relationships | |
relationships.append({ | |
"source": entities[i]["name"], | |
"target": entities[i + 1]["name"], | |
"relation": "related_to", | |
"description": "Sequential relationship" | |
}) | |
return {"entities": entities[:10], "relationships": relationships} | |
except Exception as e: | |
return { | |
"entities": [{"name": "Error", "type": "ERROR", "description": str(e)}], | |
"relationships": [] | |
} | |
def extract_entities(text): | |
"""Extract entities and relationships using Mistral AI with fallback. | |
Args: | |
text: Input text to analyze | |
Returns: | |
Dictionary containing entities and relationships | |
""" | |
try: | |
# Check if HF_TOKEN is available | |
hf_token = os.environ.get("HF_TOKEN") | |
if not hf_token: | |
print("No HF_TOKEN found, using simple extraction") | |
return simple_entity_extraction(text) | |
client = InferenceClient( | |
provider="together", | |
api_key=hf_token, | |
) | |
prompt = f""" | |
Analyze the following text and extract: | |
1. Named entities (people, organizations, locations, concepts) | |
2. Relationships between these entities | |
Return ONLY a valid JSON object with this structure: | |
{{ | |
"entities": [ | |
{{"name": "entity_name", "type": "PERSON", "description": "brief description"}} | |
], | |
"relationships": [ | |
{{"source": "entity1", "target": "entity2", "relation": "relationship_type", "description": "brief description"}} | |
] | |
}} | |
Text to analyze: {text[:1500]} | |
""" | |
completion = client.chat.completions.create( | |
model="mistralai/Mistral-Small-24B-Instruct-2501", | |
messages=[{"role": "user", "content": prompt}], | |
max_tokens=1000, | |
temperature=0.1, | |
) | |
response_text = completion.choices[0].message.content | |
# Clean and extract JSON | |
json_match = re.search(r'\{.*\}', response_text, re.DOTALL) | |
if json_match: | |
json_str = json_match.group() | |
# Clean the JSON string | |
json_str = re.sub(r'[\x00-\x1f\x7f-\x9f]', '', json_str) # Remove control characters | |
parsed_data = json.loads(json_str) | |
# Clean entity names for display | |
if "entities" in parsed_data: | |
for entity in parsed_data["entities"]: | |
if "name" in entity: | |
entity["name"] = clean_text_for_display(entity["name"]) | |
return parsed_data | |
else: | |
print("No valid JSON found in AI response, using fallback") | |
return simple_entity_extraction(text) | |
except Exception as e: | |
print(f"AI extraction failed: {e}, using fallback") | |
return simple_entity_extraction(text) | |
def build_knowledge_graph(entities_data): | |
"""Build and visualize knowledge graph. | |
Args: | |
entities_data: Dictionary containing entities and relationships | |
Returns: | |
PIL Image object of the knowledge graph | |
""" | |
try: | |
# Create networkx graph | |
G = nx.Graph() | |
# Add nodes (entities) | |
entities = entities_data.get("entities", []) | |
for entity in entities[:15]: # Limit to 15 entities for better visualization | |
clean_name = clean_text_for_display(entity.get("name", "Unknown")) | |
if clean_name and len(clean_name.strip()) > 0: | |
G.add_node(clean_name, | |
type=entity.get("type", "UNKNOWN"), | |
description=entity.get("description", "")) | |
# Add edges (relationships) | |
relationships = entities_data.get("relationships", []) | |
for rel in relationships: | |
source = clean_text_for_display(rel.get("source", "")) | |
target = clean_text_for_display(rel.get("target", "")) | |
if source in G.nodes and target in G.nodes: | |
G.add_edge(source, target, | |
relation=rel.get("relation", "related"), | |
description=rel.get("description", "")) | |
# If no relationships found, create some connections between entities | |
if len(relationships) == 0 and len(list(G.nodes())) > 1: | |
node_list = list(G.nodes()) | |
for i in range(min(len(node_list) - 1, 5)): | |
G.add_edge(node_list[i], node_list[i + 1], relation="related") | |
# Create visualization | |
fig, ax = plt.subplots(figsize=(10, 8)) | |
# Skip if no nodes | |
if len(G.nodes()) == 0: | |
ax.text(0.5, 0.5, "No entities found to visualize", | |
ha='center', va='center', fontsize=14, transform=ax.transAxes) | |
ax.set_title("Knowledge Graph") | |
ax.axis('off') | |
else: | |
# Position nodes using spring layout | |
pos = nx.spring_layout(G, k=1, iterations=50) | |
# Color nodes by type | |
node_colors = [] | |
type_colors = { | |
"PERSON": "#FF6B6B", | |
"ORG": "#4ECDC4", | |
"LOCATION": "#45B7D1", | |
"CONCEPT": "#96CEB4", | |
"ERROR": "#FF0000", | |
"UNKNOWN": "#DDA0DD" | |
} | |
for node in G.nodes(): | |
node_type = G.nodes[node].get('type', 'UNKNOWN') | |
node_colors.append(type_colors.get(node_type, "#DDA0DD")) | |
# Draw the graph | |
nx.draw(G, pos, | |
node_color=node_colors, | |
node_size=800, | |
font_size=8, | |
font_weight='bold', | |
with_labels=True, | |
edge_color='gray', | |
width=1.5, | |
alpha=0.8, | |
ax=ax) | |
# Add title | |
ax.set_title("Knowledge Graph", size=14, weight='bold') | |
# Convert to PIL Image | |
fig.canvas.draw() | |
# Handle different matplotlib versions | |
try: | |
# Try newer method first | |
img_array = np.frombuffer(fig.canvas.buffer_rgba(), dtype=np.uint8) | |
img_array = img_array.reshape(fig.canvas.get_width_height()[::-1] + (4,)) | |
# Convert RGBA to RGB | |
img_array = img_array[:, :, :3] | |
except AttributeError: | |
try: | |
# Fallback to older method | |
img_array = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8) | |
img_array = img_array.reshape(fig.canvas.get_width_height()[::-1] + (3,)) | |
except AttributeError: | |
# Final fallback - save to buffer | |
buf = io.BytesIO() | |
fig.savefig(buf, format='png', bbox_inches='tight') | |
buf.seek(0) | |
from PIL import Image | |
pil_image = Image.open(buf).convert('RGB') | |
plt.close(fig) | |
return pil_image | |
from PIL import Image | |
pil_image = Image.fromarray(img_array) | |
plt.close(fig) | |
return pil_image | |
except Exception as e: | |
# Create simple error image | |
fig, ax = plt.subplots(figsize=(8, 6)) | |
ax.text(0.5, 0.5, f"Error creating graph", | |
ha='center', va='center', fontsize=12, transform=ax.transAxes) | |
ax.set_title("Knowledge Graph Error") | |
ax.axis('off') | |
# Handle different matplotlib versions for error image | |
try: | |
# Try newer method first | |
fig.canvas.draw() | |
img_array = np.frombuffer(fig.canvas.buffer_rgba(), dtype=np.uint8) | |
img_array = img_array.reshape(fig.canvas.get_width_height()[::-1] + (4,)) | |
img_array = img_array[:, :, :3] # Convert RGBA to RGB | |
except AttributeError: | |
try: | |
# Fallback to older method | |
fig.canvas.draw() | |
img_array = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8) | |
img_array = img_array.reshape(fig.canvas.get_width_height()[::-1] + (3,)) | |
except AttributeError: | |
# Final fallback - save to buffer | |
buf = io.BytesIO() | |
fig.savefig(buf, format='png', bbox_inches='tight') | |
buf.seek(0) | |
from PIL import Image | |
pil_image = Image.open(buf).convert('RGB') | |
plt.close(fig) | |
return pil_image | |
from PIL import Image | |
pil_image = Image.fromarray(img_array) | |
plt.close(fig) | |
return pil_image | |
def build_ascii_diagram(entities, relationships): | |
"""Create simple ASCII diagram of knowledge graph""" | |
if not entities: | |
return "No entities to visualize" | |
diagram = "KNOWLEDGE GRAPH DIAGRAM:\n" | |
diagram += "=" * 30 + "\n\n" # Reduced line length | |
# Show entities by type | |
entity_types = {} | |
for entity in entities: # Already limited by caller | |
etype = entity.get("type", "UNKNOWN") | |
if etype not in entity_types: | |
entity_types[etype] = [] | |
entity_types[etype].append(entity.get("name", "Unknown")) | |
for etype, names in entity_types.items(): | |
diagram += f"{etype}:\n" # Removed emoji for MCP compatibility | |
for name in names: | |
diagram += f" - {name}\n" | |
diagram += "\n" | |
# Show relationships | |
if relationships: | |
diagram += "RELATIONSHIPS:\n" # Removed emoji for MCP compatibility | |
for rel in relationships: # Already limited by caller | |
source = rel.get("source", "?") | |
target = rel.get("target", "?") | |
relation = rel.get("relation", "related") | |
diagram += f" {source} -> {target} ({relation})\n" | |
return diagram | |
def validate_mcp_response(response_data): | |
"""Validate and sanitize response for MCP compatibility""" | |
try: | |
# Ensure all string values are ASCII-safe | |
def sanitize_strings(obj): | |
if isinstance(obj, dict): | |
return {k: sanitize_strings(v) for k, v in obj.items()} | |
elif isinstance(obj, list): | |
return [sanitize_strings(item) for item in obj] | |
elif isinstance(obj, str): | |
# Remove non-ASCII characters and control characters | |
return re.sub(r'[^\x20-\x7E\n\r\t]', '', obj) | |
else: | |
return obj | |
sanitized = sanitize_strings(response_data) | |
# Test JSON serialization | |
test_json = json.dumps(sanitized, ensure_ascii=True, separators=(',', ':')) | |
# Size check | |
if len(test_json) > 100000: # 100KB hard limit | |
# Drastically reduce content | |
sanitized["entities"] = sanitized.get("entities", [])[:5] | |
sanitized["relationships"] = sanitized.get("relationships", [])[:3] | |
sanitized["diagram"] = "Knowledge graph generated (content reduced for MCP)" | |
return sanitized | |
except Exception as e: | |
return { | |
"success": False, | |
"error": f"Response validation failed: {str(e)}", | |
"entities": [], | |
"relationships": [], | |
"diagram": "Error generating diagram", | |
"summary": "Analysis failed during response validation" | |
} | |
def build_kg(url_or_text): | |
"""Main function to build knowledge graph from URL or text. | |
Args: | |
url_or_text: URL to analyze or direct text input | |
Returns: | |
String: Simple JSON response optimized for MCP streaming | |
""" | |
try: | |
# Quick validation | |
if not url_or_text or len(url_or_text.strip()) == 0: | |
return '{"error":"Please provide text or URL to analyze"}' | |
# Limit input size immediately to prevent timeouts | |
input_text = url_or_text[:2000] if len(url_or_text) > 2000 else url_or_text | |
# Step 1: Fetch content (with timeout protection) | |
try: | |
content = fetch_content(input_text) | |
if content.startswith("Error"): | |
return f'{{"error":"{content}"}}' | |
except Exception: | |
content = input_text # Use input directly if fetch fails | |
# Limit content size for fast processing | |
content = content[:1500] if len(content) > 1500 else content | |
# Step 2: Quick entity extraction (simplified for speed) | |
try: | |
entities_data = simple_entity_extraction(content) # Always use simple extraction for MCP | |
except Exception: | |
entities_data = {"entities": [], "relationships": []} | |
# Step 3: Minimal response | |
entities = entities_data.get("entities", [])[:5] # Max 5 entities | |
relationships = entities_data.get("relationships", [])[:3] # Max 3 relationships | |
# Create minimal ASCII summary | |
diagram_parts = [] | |
if entities: | |
diagram_parts.append("ENTITIES:") | |
for entity in entities: | |
name = str(entity.get("name", "Unknown"))[:20] # Truncate names | |
diagram_parts.append(f" - {name}") | |
if relationships: | |
diagram_parts.append("RELATIONSHIPS:") | |
for rel in relationships: | |
source = str(rel.get("source", ""))[:15] | |
target = str(rel.get("target", ""))[:15] | |
diagram_parts.append(f" {source} -> {target}") | |
diagram = "\n".join(diagram_parts) if diagram_parts else "No entities found" | |
# Ultra-minimal response | |
response = { | |
"success": True, | |
"entity_count": len(entities), | |
"relationship_count": len(relationships), | |
"entities": [{"name": e.get("name", "")[:20], "type": e.get("type", "UNKNOWN")} for e in entities], | |
"relationships": [{"source": r.get("source", "")[:15], "target": r.get("target", "")[:15]} for r in relationships], | |
"diagram": diagram[:500] # Strict limit | |
} | |
# Return ultra-compact JSON | |
return json.dumps(response, separators=(',', ':'))[:2000] # Hard size limit | |
except Exception as e: | |
# Ultra-simple error response | |
error_msg = str(e)[:100] # Truncate error message | |
return f'{{"success":false,"error":"{error_msg}"}}' | |
# Wrapper function with timeout protection for MCP | |
def mcp_safe_build_kg(url_or_text): | |
"""MCP-safe wrapper with timeout protection""" | |
try: | |
import signal | |
import functools | |
def timeout_handler(signum, frame): | |
raise TimeoutError("Function timed out") | |
# Set timeout for 10 seconds | |
signal.signal(signal.SIGALRM, timeout_handler) | |
signal.alarm(10) | |
try: | |
result = build_kg(url_or_text) | |
signal.alarm(0) # Cancel timeout | |
return result | |
except TimeoutError: | |
return '{"success":false,"error":"Request timed out"}' | |
except Exception as e: | |
signal.alarm(0) # Cancel timeout | |
return f'{{"success":false,"error":"Function error: {str(e)[:50]}"}}' | |
except Exception: | |
# Fallback if signal not available (Windows, etc.) | |
try: | |
return build_kg(url_or_text) | |
except Exception as e: | |
return f'{{"success":false,"error":"Fallback error: {str(e)[:50]}"}}' | |
# Create Gradio interface with error handling | |
try: | |
demo = gr.Interface( | |
fn=mcp_safe_build_kg, # Use the timeout-protected version | |
inputs=gr.Textbox( | |
label="Input Text or URL", | |
placeholder="Enter text to analyze or paste a URL...", | |
max_lines=5 | |
), | |
outputs=gr.Textbox( | |
label="Knowledge Graph JSON", | |
show_copy_button=True | |
), | |
title="KG Builder - MCP Edition", | |
description="Lightweight knowledge graph builder optimized for MCP servers.", | |
allow_flagging="never", | |
cache_examples=False | |
) | |
except Exception as e: | |
print(f"Failed to create Gradio interface: {e}") | |
# Create minimal fallback | |
def error_demo(text): | |
return f'{{"error":"Interface creation failed: {str(e)[:100]}"}}' | |
demo = gr.Interface( | |
fn=error_demo, | |
inputs="text", | |
outputs="text", | |
title="KG Builder - Error Mode", | |
allow_flagging="never" | |
) | |
# Launch the demo | |
if __name__ == "__main__": | |
print("Starting KG Builder MCP Server...") | |
try: | |
demo.launch( | |
mcp_server=True, | |
share=False, | |
show_error=False, # Reduce error verbosity for MCP | |
quiet=True, # Reduce logging to prevent SSE issues | |
server_name="0.0.0.0", | |
server_port=7860, | |
max_threads=1, # Limit concurrency to prevent resource issues | |
show_api=False # Disable API docs to reduce overhead | |
) | |
except Exception as e: | |
print(f"MCP server launch failed: {e}") | |
print("Trying fallback mode...") | |
try: | |
# Fallback without MCP | |
demo.launch( | |
mcp_server=False, | |
share=False, | |
quiet=True, | |
show_error=False | |
) | |
except Exception as e2: | |
print(f"All launch attempts failed: {e2}") | |
print("Creating emergency fallback...") | |
# Create absolute minimal demo | |
def emergency_demo(text): | |
return '{"error":"Server in emergency mode"}' | |
emergency = gr.Interface( | |
fn=emergency_demo, | |
inputs="text", | |
outputs="text", | |
title="KG Builder Emergency Mode" | |
) | |
emergency.launch(quiet=True, share=False) | |