import gradio as gr import os import json import requests from bs4 import BeautifulSoup import networkx as nx import matplotlib matplotlib.use('Agg') # Use non-interactive backend import matplotlib.pyplot as plt import numpy as np import io import base64 from huggingface_hub import InferenceClient import re from urllib.parse import urlparse import warnings # Configure matplotlib for better font handling plt.rcParams['font.family'] = ['DejaVu Sans'] plt.rcParams['font.size'] = 10 plt.rcParams['font.weight'] = 'normal' plt.rcParams['figure.max_open_warning'] = 0 # Disable figure warnings warnings.filterwarnings('ignore', category=UserWarning) warnings.filterwarnings('ignore', message='.*Font family.*not found.*') warnings.filterwarnings('ignore', message='.*Matplotlib.*') def clean_text_for_display(text): """Clean text to remove characters that might cause font issues.""" if not isinstance(text, str): return str(text) # Remove or replace problematic characters text = re.sub(r'[^\x00-\x7F]+', '', text) # Remove non-ASCII characters text = re.sub(r'\s+', ' ', text).strip() # Normalize whitespace return text[:50] if len(text) > 50 else text # Limit length for display def fetch_content(url_or_text): """Fetch content from URL or return text directly. Args: url_or_text: Either a URL to fetch content from, or direct text input Returns: Extracted text content """ try: # Check if input looks like a URL parsed = urlparse(url_or_text) if parsed.scheme in ['http', 'https']: try: headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } response = requests.get(url_or_text, headers=headers, timeout=10) response.raise_for_status() # Parse HTML and extract text soup = BeautifulSoup(response.content, 'html.parser') # Remove script and style elements for script in soup(["script", "style"]): script.decompose() # Get text and clean it up text = soup.get_text() lines = (line.strip() for line in text.splitlines()) chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) text = ' '.join(chunk for chunk in chunks if chunk) return text[:5000] # Limit to first 5000 characters except Exception as e: return f"Error fetching URL: {str(e)}" else: # It's direct text input return url_or_text except Exception as e: return f"Error processing input: {str(e)}" def simple_entity_extraction(text): """Fallback entity extraction when AI is not available.""" try: words = text.split() entities = [] # Simple heuristic: words that are capitalized and longer than 2 characters seen = set() for word in words[:30]: # Limit to first 30 words clean_word = re.sub(r'[^\w]', '', word) if (clean_word.istitle() and len(clean_word) > 2 and clean_word.lower() not in seen and clean_word not in ['The', 'This', 'That', 'When', 'Where', 'How']): entities.append({ "name": clean_text_for_display(clean_word), "type": "CONCEPT", "description": "Auto-detected entity" }) seen.add(clean_word.lower()) # Create some basic relationships relationships = [] if len(entities) > 1: for i in range(min(len(entities) - 1, 5)): # Max 5 relationships relationships.append({ "source": entities[i]["name"], "target": entities[i + 1]["name"], "relation": "related_to", "description": "Sequential relationship" }) return {"entities": entities[:10], "relationships": relationships} except Exception as e: return { "entities": [{"name": "Error", "type": "ERROR", "description": str(e)}], "relationships": [] } def extract_entities(text): """Extract entities and relationships using Mistral AI with fallback. Args: text: Input text to analyze Returns: Dictionary containing entities and relationships """ try: # Check if HF_TOKEN is available hf_token = os.environ.get("HF_TOKEN") if not hf_token: print("No HF_TOKEN found, using simple extraction") return simple_entity_extraction(text) client = InferenceClient( provider="together", api_key=hf_token, ) prompt = f""" Analyze the following text and extract: 1. Named entities (people, organizations, locations, concepts) 2. Relationships between these entities Return ONLY a valid JSON object with this structure: {{ "entities": [ {{"name": "entity_name", "type": "PERSON", "description": "brief description"}} ], "relationships": [ {{"source": "entity1", "target": "entity2", "relation": "relationship_type", "description": "brief description"}} ] }} Text to analyze: {text[:1500]} """ completion = client.chat.completions.create( model="mistralai/Mistral-Small-24B-Instruct-2501", messages=[{"role": "user", "content": prompt}], max_tokens=1000, temperature=0.1, ) response_text = completion.choices[0].message.content # Clean and extract JSON json_match = re.search(r'\{.*\}', response_text, re.DOTALL) if json_match: json_str = json_match.group() # Clean the JSON string json_str = re.sub(r'[\x00-\x1f\x7f-\x9f]', '', json_str) # Remove control characters parsed_data = json.loads(json_str) # Clean entity names for display if "entities" in parsed_data: for entity in parsed_data["entities"]: if "name" in entity: entity["name"] = clean_text_for_display(entity["name"]) return parsed_data else: print("No valid JSON found in AI response, using fallback") return simple_entity_extraction(text) except Exception as e: print(f"AI extraction failed: {e}, using fallback") return simple_entity_extraction(text) def build_knowledge_graph(entities_data): """Build and visualize knowledge graph. Args: entities_data: Dictionary containing entities and relationships Returns: PIL Image object of the knowledge graph """ try: # Create networkx graph G = nx.Graph() # Add nodes (entities) entities = entities_data.get("entities", []) for entity in entities[:15]: # Limit to 15 entities for better visualization clean_name = clean_text_for_display(entity.get("name", "Unknown")) if clean_name and len(clean_name.strip()) > 0: G.add_node(clean_name, type=entity.get("type", "UNKNOWN"), description=entity.get("description", "")) # Add edges (relationships) relationships = entities_data.get("relationships", []) for rel in relationships: source = clean_text_for_display(rel.get("source", "")) target = clean_text_for_display(rel.get("target", "")) if source in G.nodes and target in G.nodes: G.add_edge(source, target, relation=rel.get("relation", "related"), description=rel.get("description", "")) # If no relationships found, create some connections between entities if len(relationships) == 0 and len(list(G.nodes())) > 1: node_list = list(G.nodes()) for i in range(min(len(node_list) - 1, 5)): G.add_edge(node_list[i], node_list[i + 1], relation="related") # Create visualization fig, ax = plt.subplots(figsize=(10, 8)) # Skip if no nodes if len(G.nodes()) == 0: ax.text(0.5, 0.5, "No entities found to visualize", ha='center', va='center', fontsize=14, transform=ax.transAxes) ax.set_title("Knowledge Graph") ax.axis('off') else: # Position nodes using spring layout pos = nx.spring_layout(G, k=1, iterations=50) # Color nodes by type node_colors = [] type_colors = { "PERSON": "#FF6B6B", "ORG": "#4ECDC4", "LOCATION": "#45B7D1", "CONCEPT": "#96CEB4", "ERROR": "#FF0000", "UNKNOWN": "#DDA0DD" } for node in G.nodes(): node_type = G.nodes[node].get('type', 'UNKNOWN') node_colors.append(type_colors.get(node_type, "#DDA0DD")) # Draw the graph nx.draw(G, pos, node_color=node_colors, node_size=800, font_size=8, font_weight='bold', with_labels=True, edge_color='gray', width=1.5, alpha=0.8, ax=ax) # Add title ax.set_title("Knowledge Graph", size=14, weight='bold') # Convert to PIL Image fig.canvas.draw() # Handle different matplotlib versions try: # Try newer method first img_array = np.frombuffer(fig.canvas.buffer_rgba(), dtype=np.uint8) img_array = img_array.reshape(fig.canvas.get_width_height()[::-1] + (4,)) # Convert RGBA to RGB img_array = img_array[:, :, :3] except AttributeError: try: # Fallback to older method img_array = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8) img_array = img_array.reshape(fig.canvas.get_width_height()[::-1] + (3,)) except AttributeError: # Final fallback - save to buffer buf = io.BytesIO() fig.savefig(buf, format='png', bbox_inches='tight') buf.seek(0) from PIL import Image pil_image = Image.open(buf).convert('RGB') plt.close(fig) return pil_image from PIL import Image pil_image = Image.fromarray(img_array) plt.close(fig) return pil_image except Exception as e: # Create simple error image fig, ax = plt.subplots(figsize=(8, 6)) ax.text(0.5, 0.5, f"Error creating graph", ha='center', va='center', fontsize=12, transform=ax.transAxes) ax.set_title("Knowledge Graph Error") ax.axis('off') # Handle different matplotlib versions for error image try: # Try newer method first fig.canvas.draw() img_array = np.frombuffer(fig.canvas.buffer_rgba(), dtype=np.uint8) img_array = img_array.reshape(fig.canvas.get_width_height()[::-1] + (4,)) img_array = img_array[:, :, :3] # Convert RGBA to RGB except AttributeError: try: # Fallback to older method fig.canvas.draw() img_array = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8) img_array = img_array.reshape(fig.canvas.get_width_height()[::-1] + (3,)) except AttributeError: # Final fallback - save to buffer buf = io.BytesIO() fig.savefig(buf, format='png', bbox_inches='tight') buf.seek(0) from PIL import Image pil_image = Image.open(buf).convert('RGB') plt.close(fig) return pil_image from PIL import Image pil_image = Image.fromarray(img_array) plt.close(fig) return pil_image def build_ascii_diagram(entities, relationships): """Create simple ASCII diagram of knowledge graph""" if not entities: return "No entities to visualize" diagram = "KNOWLEDGE GRAPH DIAGRAM:\n" diagram += "=" * 30 + "\n\n" # Reduced line length # Show entities by type entity_types = {} for entity in entities: # Already limited by caller etype = entity.get("type", "UNKNOWN") if etype not in entity_types: entity_types[etype] = [] entity_types[etype].append(entity.get("name", "Unknown")) for etype, names in entity_types.items(): diagram += f"{etype}:\n" # Removed emoji for MCP compatibility for name in names: diagram += f" - {name}\n" diagram += "\n" # Show relationships if relationships: diagram += "RELATIONSHIPS:\n" # Removed emoji for MCP compatibility for rel in relationships: # Already limited by caller source = rel.get("source", "?") target = rel.get("target", "?") relation = rel.get("relation", "related") diagram += f" {source} -> {target} ({relation})\n" return diagram def validate_mcp_response(response_data): """Validate and sanitize response for MCP compatibility""" try: # Ensure all string values are ASCII-safe def sanitize_strings(obj): if isinstance(obj, dict): return {k: sanitize_strings(v) for k, v in obj.items()} elif isinstance(obj, list): return [sanitize_strings(item) for item in obj] elif isinstance(obj, str): # Remove non-ASCII characters and control characters return re.sub(r'[^\x20-\x7E\n\r\t]', '', obj) else: return obj sanitized = sanitize_strings(response_data) # Test JSON serialization test_json = json.dumps(sanitized, ensure_ascii=True, separators=(',', ':')) # Size check if len(test_json) > 100000: # 100KB hard limit # Drastically reduce content sanitized["entities"] = sanitized.get("entities", [])[:5] sanitized["relationships"] = sanitized.get("relationships", [])[:3] sanitized["diagram"] = "Knowledge graph generated (content reduced for MCP)" return sanitized except Exception as e: return { "success": False, "error": f"Response validation failed: {str(e)}", "entities": [], "relationships": [], "diagram": "Error generating diagram", "summary": "Analysis failed during response validation" } def build_kg(url_or_text): """Main function to build knowledge graph from URL or text. Args: url_or_text: URL to analyze or direct text input Returns: String: Simple JSON response optimized for MCP streaming """ try: # Quick validation if not url_or_text or len(url_or_text.strip()) == 0: return '{"error":"Please provide text or URL to analyze"}' # Limit input size immediately to prevent timeouts input_text = url_or_text[:2000] if len(url_or_text) > 2000 else url_or_text # Step 1: Fetch content (with timeout protection) try: content = fetch_content(input_text) if content.startswith("Error"): return f'{{"error":"{content}"}}' except Exception: content = input_text # Use input directly if fetch fails # Limit content size for fast processing content = content[:1500] if len(content) > 1500 else content # Step 2: Quick entity extraction (simplified for speed) try: entities_data = simple_entity_extraction(content) # Always use simple extraction for MCP except Exception: entities_data = {"entities": [], "relationships": []} # Step 3: Minimal response entities = entities_data.get("entities", [])[:5] # Max 5 entities relationships = entities_data.get("relationships", [])[:3] # Max 3 relationships # Create minimal ASCII summary diagram_parts = [] if entities: diagram_parts.append("ENTITIES:") for entity in entities: name = str(entity.get("name", "Unknown"))[:20] # Truncate names diagram_parts.append(f" - {name}") if relationships: diagram_parts.append("RELATIONSHIPS:") for rel in relationships: source = str(rel.get("source", ""))[:15] target = str(rel.get("target", ""))[:15] diagram_parts.append(f" {source} -> {target}") diagram = "\n".join(diagram_parts) if diagram_parts else "No entities found" # Ultra-minimal response response = { "success": True, "entity_count": len(entities), "relationship_count": len(relationships), "entities": [{"name": e.get("name", "")[:20], "type": e.get("type", "UNKNOWN")} for e in entities], "relationships": [{"source": r.get("source", "")[:15], "target": r.get("target", "")[:15]} for r in relationships], "diagram": diagram[:500] # Strict limit } # Return ultra-compact JSON return json.dumps(response, separators=(',', ':'))[:2000] # Hard size limit except Exception as e: # Ultra-simple error response error_msg = str(e)[:100] # Truncate error message return f'{{"success":false,"error":"{error_msg}"}}' # Wrapper function with timeout protection for MCP def mcp_safe_build_kg(url_or_text): """MCP-safe wrapper with timeout protection""" try: import signal import functools def timeout_handler(signum, frame): raise TimeoutError("Function timed out") # Set timeout for 10 seconds signal.signal(signal.SIGALRM, timeout_handler) signal.alarm(10) try: result = build_kg(url_or_text) signal.alarm(0) # Cancel timeout return result except TimeoutError: return '{"success":false,"error":"Request timed out"}' except Exception as e: signal.alarm(0) # Cancel timeout return f'{{"success":false,"error":"Function error: {str(e)[:50]}"}}' except Exception: # Fallback if signal not available (Windows, etc.) try: return build_kg(url_or_text) except Exception as e: return f'{{"success":false,"error":"Fallback error: {str(e)[:50]}"}}' # Create Gradio interface with error handling try: demo = gr.Interface( fn=mcp_safe_build_kg, # Use the timeout-protected version inputs=gr.Textbox( label="Input Text or URL", placeholder="Enter text to analyze or paste a URL...", max_lines=5 ), outputs=gr.Textbox( label="Knowledge Graph JSON", show_copy_button=True ), title="KG Builder - MCP Edition", description="Lightweight knowledge graph builder optimized for MCP servers.", allow_flagging="never", cache_examples=False ) except Exception as e: print(f"Failed to create Gradio interface: {e}") # Create minimal fallback def error_demo(text): return f'{{"error":"Interface creation failed: {str(e)[:100]}"}}' demo = gr.Interface( fn=error_demo, inputs="text", outputs="text", title="KG Builder - Error Mode", allow_flagging="never" ) # Launch the demo if __name__ == "__main__": print("Starting KG Builder MCP Server...") try: demo.launch( mcp_server=True, share=False, show_error=False, # Reduce error verbosity for MCP quiet=True, # Reduce logging to prevent SSE issues server_name="0.0.0.0", server_port=7860, max_threads=1, # Limit concurrency to prevent resource issues show_api=False # Disable API docs to reduce overhead ) except Exception as e: print(f"MCP server launch failed: {e}") print("Trying fallback mode...") try: # Fallback without MCP demo.launch( mcp_server=False, share=False, quiet=True, show_error=False ) except Exception as e2: print(f"All launch attempts failed: {e2}") print("Creating emergency fallback...") # Create absolute minimal demo def emergency_demo(text): return '{"error":"Server in emergency mode"}' emergency = gr.Interface( fn=emergency_demo, inputs="text", outputs="text", title="KG Builder Emergency Mode" ) emergency.launch(quiet=True, share=False)