Spaces:
Sleeping
Sleeping
from shapely.geometry import box as shapely_box, Point | |
from collections import defaultdict, deque | |
import math | |
MAX_FALLBACK_DIST = 150 # pixels | |
def map_arrows(nodes, arrows): | |
""" | |
Map arrows to node boxes using geometric containment with fallback to nearest box. | |
Returns directional edges (source_id, target_id, label). | |
Args: | |
nodes (list): List of node dicts with 'bbox' field. | |
arrows (list): List of arrow dicts with 'tail' and 'head' coordinates. | |
Returns: | |
list: List of (source, target, label) tuples. | |
""" | |
for node in nodes: | |
node["shape"] = shapely_box(*node["bbox"]) | |
node["center"] = ( | |
(node["bbox"][0] + node["bbox"][2]) // 2, | |
(node["bbox"][1] + node["bbox"][3]) // 2 | |
) | |
edges = [] | |
def find_nearest_node(pt): | |
min_dist = float("inf") | |
nearest_id = None | |
for n in nodes: | |
cx, cy = n["center"] | |
dist = math.dist(pt, (cx, cy)) | |
if dist < min_dist: | |
min_dist = dist | |
nearest_id = n["id"] | |
return nearest_id, min_dist | |
for arrow in arrows: | |
if not isinstance(arrow, dict) or not isinstance(arrow.get("tail"), (tuple, list)) or not isinstance(arrow.get("head"), (tuple, list)): | |
print(f"⚠️ Skipping malformed arrow: {arrow}") | |
continue | |
tail_pt = Point(arrow["tail"]) | |
head_pt = Point(arrow["head"]) | |
label = arrow.get("label", "").strip() | |
source = next((n["id"] for n in nodes if n["shape"].buffer(10).contains(tail_pt)), None) | |
target = next((n["id"] for n in nodes if n["shape"].buffer(10).contains(head_pt)), None) | |
# Fallback to nearest node if not found | |
if not source: | |
source, dist = find_nearest_node(arrow["tail"]) | |
if dist > MAX_FALLBACK_DIST: | |
source = None | |
if not target: | |
target, dist = find_nearest_node(arrow["head"]) | |
if dist > MAX_FALLBACK_DIST: | |
target = None | |
if source and target and source != target: | |
print(f"✅ Mapped arrow from {source} → {target} [{label}]") | |
edges.append((source, target, label)) | |
else: | |
print(f"⚠️ Could not map arrow endpoints to nodes: tail={arrow.get('tail')} head={arrow.get('head')}") | |
return edges | |
def detect_node_type(text, default_type="process"): | |
""" | |
Heuristically infer the node type from its text. | |
Args: | |
text (str): Node label. | |
default_type (str): Fallback type. | |
Returns: | |
str: Inferred node type. | |
""" | |
text_lower = text.lower() | |
if "start" in text_lower: | |
return "start" | |
if "end" in text_lower or "full" in text_lower: | |
return "end" | |
if "?" in text or "yes" in text_lower or "no" in text_lower: | |
return "decision" | |
return default_type | |
def build_flowchart_json(nodes, arrows): | |
""" | |
Construct a structured flowchart JSON using basic graph traversal. | |
Args: | |
nodes (list): Detected node dicts. | |
arrows (list): Detected arrow dicts. | |
Returns: | |
dict: JSON with 'start' and 'steps'. | |
""" | |
edges = map_arrows(nodes, arrows) | |
# Build adjacency and reverse mappings | |
graph = defaultdict(list) | |
reverse_links = defaultdict(list) | |
edge_labels = {} | |
for src, tgt, label in edges: | |
graph[src].append(tgt) | |
reverse_links[tgt].append(src) | |
edge_labels[(src, tgt)] = label.lower() | |
all_node_ids = {n["id"] for n in nodes} | |
start_candidates = [nid for nid in all_node_ids if nid not in reverse_links] | |
flowchart = { | |
"start": start_candidates[0] if start_candidates else None, | |
"steps": [] | |
} | |
visited = set() | |
queue = deque(start_candidates) | |
id_to_node = {n["id"]: n for n in nodes} | |
while queue: | |
curr = queue.popleft() | |
if curr in visited: | |
continue | |
visited.add(curr) | |
node = id_to_node.get(curr, {}) | |
text = node.get("text", "").strip() | |
node_type = node.get("type") or detect_node_type(text) | |
step = { | |
"id": curr, | |
"text": text, | |
"type": node_type | |
} | |
parents = list(set(reverse_links[curr])) | |
if len(parents) == 1: | |
step["parent"] = parents[0] | |
elif len(parents) > 1: | |
step["parents"] = parents | |
next_nodes = list(set(graph[curr])) | |
if node_type == "decision" and next_nodes: | |
step["branches"] = {} | |
for tgt in next_nodes: | |
label = edge_labels.get((curr, tgt), "") | |
if "yes" in label: | |
step["branches"]["yes"] = tgt | |
elif "no" in label: | |
step["branches"]["no"] = tgt | |
else: | |
step["branches"].setdefault("unknown", []).append(tgt) | |
if tgt not in visited: | |
queue.append(tgt) | |
elif len(next_nodes) == 1: | |
step["next"] = next_nodes[0] | |
if next_nodes[0] not in visited: | |
queue.append(next_nodes[0]) | |
elif len(next_nodes) > 1: | |
step["next"] = next_nodes | |
for n in next_nodes: | |
if n not in visited: | |
queue.append(n) | |
flowchart["steps"].append(step) | |
return flowchart |