pc-ai-data-analyst-dup / extract_results.py
dolphinium
dynamic core selection according to agentic api's output.
dae6a10
raw
history blame
2.29 kB
import requests
import json
import yaml
import re
def _parse_mappings(mapping_str: str) -> dict:
"""Parses the field mapping string into a dictionary."""
mappings = {}
if not mapping_str:
return mappings
# Example line: "Mapping 'company_territory' to 'owner_company_territory' under 'compound'."
pattern = re.compile(r"Mapping '([^']*)' to '([^']*)'")
for line in mapping_str.split('\n'):
match = pattern.search(line)
if match:
original_field, mapped_field = match.groups()
mappings[original_field] = mapped_field
return mappings
def get_search_list_params(query, k=20):
"""
Connects to the external API, parses the stream, and returns the core name,
search fields, and field mappings.
Returns tuple: (search_fields, search_name, field_mappings)
"""
url = "https://aitest.ebalina.com/stream"
response = requests.post(
url,
headers={'Content-Type': 'application/json'},
json={"query": query, "k": k},
stream=True
)
search_fields = []
search_name = ""
field_mappings_str = ""
for line in response.iter_lines():
if line and line.startswith(b'data: '):
try:
line_str = line.decode('utf-8')[6:]
if not line_str or line_str.isspace():
continue
data = json.loads(line_str)
log_title = data.get('log_title')
if log_title == 'Search List Result':
content = data.get('content', '')
if content:
yaml_data = yaml.safe_load(content)
print("DEBUG:", yaml_data)
# This is the dynamic core name
search_name = yaml_data.get('search_name', '')
search_fields = yaml_data.get('search_fields', [])
elif log_title == 'Field Mapping Outputs':
field_mappings_str = data.get('content', '')
except (json.JSONDecodeError, yaml.YAMLError, AttributeError):
continue
field_mappings = _parse_mappings(field_mappings_str)
return search_fields, search_name, field_mappings