from shapely.geometry import box as shapely_box, Point from collections import defaultdict, deque import math MAX_FALLBACK_DIST = 150 # pixels def map_arrows(nodes, arrows): """ Map arrows to node boxes using geometric containment with fallback to nearest box. Returns directional edges (source_id, target_id, label). Args: nodes (list): List of node dicts with 'bbox' field. arrows (list): List of arrow dicts with 'tail' and 'head' coordinates. Returns: list: List of (source, target, label) tuples. """ for node in nodes: node["shape"] = shapely_box(*node["bbox"]) node["center"] = ( (node["bbox"][0] + node["bbox"][2]) // 2, (node["bbox"][1] + node["bbox"][3]) // 2 ) edges = [] def find_nearest_node(pt): min_dist = float("inf") nearest_id = None for n in nodes: cx, cy = n["center"] dist = math.dist(pt, (cx, cy)) if dist < min_dist: min_dist = dist nearest_id = n["id"] return nearest_id, min_dist for arrow in arrows: if not isinstance(arrow, dict) or not isinstance(arrow.get("tail"), (tuple, list)) or not isinstance(arrow.get("head"), (tuple, list)): print(f"⚠️ Skipping malformed arrow: {arrow}") continue tail_pt = Point(arrow["tail"]) head_pt = Point(arrow["head"]) label = arrow.get("label", "").strip() source = next((n["id"] for n in nodes if n["shape"].buffer(10).contains(tail_pt)), None) target = next((n["id"] for n in nodes if n["shape"].buffer(10).contains(head_pt)), None) # Fallback to nearest node if not found if not source: source, dist = find_nearest_node(arrow["tail"]) if dist > MAX_FALLBACK_DIST: source = None if not target: target, dist = find_nearest_node(arrow["head"]) if dist > MAX_FALLBACK_DIST: target = None if source and target and source != target: print(f"✅ Mapped arrow from {source} → {target} [{label}]") edges.append((source, target, label)) else: print(f"⚠️ Could not map arrow endpoints to nodes: tail={arrow.get('tail')} head={arrow.get('head')}") return edges def detect_node_type(text, default_type="process"): """ Heuristically infer the node type from its text. Args: text (str): Node label. default_type (str): Fallback type. Returns: str: Inferred node type. """ text_lower = text.lower() if "start" in text_lower: return "start" if "end" in text_lower or "full" in text_lower: return "end" if "?" in text or "yes" in text_lower or "no" in text_lower: return "decision" return default_type def build_flowchart_json(nodes, arrows): """ Construct a structured flowchart JSON using basic graph traversal. Args: nodes (list): Detected node dicts. arrows (list): Detected arrow dicts. Returns: dict: JSON with 'start' and 'steps'. """ edges = map_arrows(nodes, arrows) # Build adjacency and reverse mappings graph = defaultdict(list) reverse_links = defaultdict(list) edge_labels = {} for src, tgt, label in edges: graph[src].append(tgt) reverse_links[tgt].append(src) edge_labels[(src, tgt)] = label.lower() all_node_ids = {n["id"] for n in nodes} start_candidates = [nid for nid in all_node_ids if nid not in reverse_links] flowchart = { "start": start_candidates[0] if start_candidates else None, "steps": [] } visited = set() queue = deque(start_candidates) id_to_node = {n["id"]: n for n in nodes} while queue: curr = queue.popleft() if curr in visited: continue visited.add(curr) node = id_to_node.get(curr, {}) text = node.get("text", "").strip() node_type = node.get("type") or detect_node_type(text) step = { "id": curr, "text": text, "type": node_type } parents = list(set(reverse_links[curr])) if len(parents) == 1: step["parent"] = parents[0] elif len(parents) > 1: step["parents"] = parents next_nodes = list(set(graph[curr])) if node_type == "decision" and next_nodes: step["branches"] = {} for tgt in next_nodes: label = edge_labels.get((curr, tgt), "") if "yes" in label: step["branches"]["yes"] = tgt elif "no" in label: step["branches"]["no"] = tgt else: step["branches"].setdefault("unknown", []).append(tgt) if tgt not in visited: queue.append(tgt) elif len(next_nodes) == 1: step["next"] = next_nodes[0] if next_nodes[0] not in visited: queue.append(next_nodes[0]) elif len(next_nodes) > 1: step["next"] = next_nodes for n in next_nodes: if n not in visited: queue.append(n) flowchart["steps"].append(step) return flowchart