Venkat V
updated with fixes to all modules
152df72
from shapely.geometry import box as shapely_box, Point
from collections import defaultdict, deque
import math
MAX_FALLBACK_DIST = 150 # pixels
def map_arrows(nodes, arrows):
"""
Map arrows to node boxes using geometric containment with fallback to nearest box.
Returns directional edges (source_id, target_id, label).
Args:
nodes (list): List of node dicts with 'bbox' field.
arrows (list): List of arrow dicts with 'tail' and 'head' coordinates.
Returns:
list: List of (source, target, label) tuples.
"""
for node in nodes:
node["shape"] = shapely_box(*node["bbox"])
node["center"] = (
(node["bbox"][0] + node["bbox"][2]) // 2,
(node["bbox"][1] + node["bbox"][3]) // 2
)
edges = []
def find_nearest_node(pt):
min_dist = float("inf")
nearest_id = None
for n in nodes:
cx, cy = n["center"]
dist = math.dist(pt, (cx, cy))
if dist < min_dist:
min_dist = dist
nearest_id = n["id"]
return nearest_id, min_dist
for arrow in arrows:
if not isinstance(arrow, dict) or not isinstance(arrow.get("tail"), (tuple, list)) or not isinstance(arrow.get("head"), (tuple, list)):
print(f"⚠️ Skipping malformed arrow: {arrow}")
continue
tail_pt = Point(arrow["tail"])
head_pt = Point(arrow["head"])
label = arrow.get("label", "").strip()
source = next((n["id"] for n in nodes if n["shape"].buffer(10).contains(tail_pt)), None)
target = next((n["id"] for n in nodes if n["shape"].buffer(10).contains(head_pt)), None)
# Fallback to nearest node if not found
if not source:
source, dist = find_nearest_node(arrow["tail"])
if dist > MAX_FALLBACK_DIST:
source = None
if not target:
target, dist = find_nearest_node(arrow["head"])
if dist > MAX_FALLBACK_DIST:
target = None
if source and target and source != target:
print(f"✅ Mapped arrow from {source}{target} [{label}]")
edges.append((source, target, label))
else:
print(f"⚠️ Could not map arrow endpoints to nodes: tail={arrow.get('tail')} head={arrow.get('head')}")
return edges
def detect_node_type(text, default_type="process"):
"""
Heuristically infer the node type from its text.
Args:
text (str): Node label.
default_type (str): Fallback type.
Returns:
str: Inferred node type.
"""
text_lower = text.lower()
if "start" in text_lower:
return "start"
if "end" in text_lower or "full" in text_lower:
return "end"
if "?" in text or "yes" in text_lower or "no" in text_lower:
return "decision"
return default_type
def build_flowchart_json(nodes, arrows):
"""
Construct a structured flowchart JSON using basic graph traversal.
Args:
nodes (list): Detected node dicts.
arrows (list): Detected arrow dicts.
Returns:
dict: JSON with 'start' and 'steps'.
"""
edges = map_arrows(nodes, arrows)
# Build adjacency and reverse mappings
graph = defaultdict(list)
reverse_links = defaultdict(list)
edge_labels = {}
for src, tgt, label in edges:
graph[src].append(tgt)
reverse_links[tgt].append(src)
edge_labels[(src, tgt)] = label.lower()
all_node_ids = {n["id"] for n in nodes}
start_candidates = [nid for nid in all_node_ids if nid not in reverse_links]
flowchart = {
"start": start_candidates[0] if start_candidates else None,
"steps": []
}
visited = set()
queue = deque(start_candidates)
id_to_node = {n["id"]: n for n in nodes}
while queue:
curr = queue.popleft()
if curr in visited:
continue
visited.add(curr)
node = id_to_node.get(curr, {})
text = node.get("text", "").strip()
node_type = node.get("type") or detect_node_type(text)
step = {
"id": curr,
"text": text,
"type": node_type
}
parents = list(set(reverse_links[curr]))
if len(parents) == 1:
step["parent"] = parents[0]
elif len(parents) > 1:
step["parents"] = parents
next_nodes = list(set(graph[curr]))
if node_type == "decision" and next_nodes:
step["branches"] = {}
for tgt in next_nodes:
label = edge_labels.get((curr, tgt), "")
if "yes" in label:
step["branches"]["yes"] = tgt
elif "no" in label:
step["branches"]["no"] = tgt
else:
step["branches"].setdefault("unknown", []).append(tgt)
if tgt not in visited:
queue.append(tgt)
elif len(next_nodes) == 1:
step["next"] = next_nodes[0]
if next_nodes[0] not in visited:
queue.append(next_nodes[0])
elif len(next_nodes) > 1:
step["next"] = next_nodes
for n in next_nodes:
if n not in visited:
queue.append(n)
flowchart["steps"].append(step)
return flowchart