|
import modal |
|
import requests |
|
import textstat |
|
from bs4 import BeautifulSoup |
|
import json |
|
import os |
|
import openai |
|
|
|
|
|
|
|
modal_image = modal.Image.debian_slim().pip_install( |
|
"requests", |
|
"beautifulsoup4", |
|
"textstat", |
|
"lxml", |
|
"openai" |
|
) |
|
|
|
|
|
app = modal.App(name="sitegeist-ai-app") |
|
|
|
|
|
def query_llm(prompt_text: str, expected_json_structure: dict): |
|
""" |
|
Calls the OpenAI API to get structured JSON responses. |
|
|
|
Args: |
|
prompt_text (str): The prompt to send to OpenAI |
|
expected_json_structure (dict): Dictionary structure to guide the output format |
|
|
|
Returns: |
|
dict: Parsed JSON response from OpenAI |
|
|
|
Raises: |
|
Exception: If API call fails or response is not valid JSON |
|
""" |
|
try: |
|
|
|
client = openai.OpenAI(api_key=os.environ.get("OPENAI_API_KEY")) |
|
|
|
|
|
expected_keys = list(expected_json_structure.keys()) |
|
system_prompt = ( |
|
"You are a helpful assistant designed to output JSON. " |
|
"Based on the user's content, provide a JSON object with the following keys: " |
|
f"{', '.join(expected_keys)}. " |
|
"Ensure your response is valid JSON format only." |
|
) |
|
|
|
print(f"--- OpenAI API CALLED ---\nPrompt: {prompt_text[:200]}...\nExpected JSON keys: {expected_keys}") |
|
|
|
|
|
response = client.chat.completions.create( |
|
model="gpt-4-turbo-preview", |
|
response_format={"type": "json_object"}, |
|
messages=[ |
|
{"role": "system", "content": system_prompt}, |
|
{"role": "user", "content": prompt_text} |
|
], |
|
temperature=0.3 |
|
) |
|
|
|
|
|
result_json_str = response.choices[0].message.content |
|
print(f"OpenAI Response: {result_json_str[:200]}...") |
|
|
|
|
|
result_dict = json.loads(result_json_str) |
|
return result_dict |
|
|
|
except openai.APIError as e: |
|
error_msg = f"OpenAI API error: {str(e)}" |
|
print(f"ERROR: {error_msg}") |
|
|
|
|
|
if "response_format" in str(e) and "not supported" in str(e): |
|
print("Falling back to non-JSON mode...") |
|
try: |
|
fallback_response = client.chat.completions.create( |
|
model="gpt-4-turbo-preview", |
|
messages=[ |
|
{"role": "system", "content": system_prompt + " Make sure to respond with valid JSON only."}, |
|
{"role": "user", "content": prompt_text} |
|
], |
|
temperature=0.3 |
|
) |
|
|
|
fallback_result_str = fallback_response.choices[0].message.content |
|
print(f"Fallback Response: {fallback_result_str[:200]}...") |
|
|
|
|
|
import re |
|
json_match = re.search(r'\{.*\}', fallback_result_str, re.DOTALL) |
|
if json_match: |
|
json_str = json_match.group() |
|
return json.loads(json_str) |
|
else: |
|
return json.loads(fallback_result_str) |
|
|
|
except Exception as fallback_error: |
|
return {"error": f"Fallback also failed: {str(fallback_error)}", "status": "fallback_failed"} |
|
else: |
|
return {"error": error_msg, "status": "api_error"} |
|
|
|
except json.JSONDecodeError as e: |
|
error_msg = f"Failed to parse JSON response: {str(e)}" |
|
print(f"ERROR: {error_msg}") |
|
return {"error": error_msg, "status": "json_parse_error", "raw_response": result_json_str} |
|
|
|
except Exception as e: |
|
error_msg = f"Unexpected error in OpenAI call: {str(e)}" |
|
print(f"ERROR: {error_msg}") |
|
return {"error": error_msg, "status": "unexpected_error"} |
|
|
|
|
|
@app.function(image=modal_image, secrets=[modal.Secret.from_name("openai-secret")]) |
|
def deep_analyze_url(url: str): |
|
""" |
|
Performs a deep marketing and content analysis on a single URL. |
|
""" |
|
print(f"Deep analyzing URL: {url}") |
|
scraped_data = {} |
|
text_content = "" |
|
|
|
|
|
try: |
|
response = requests.get(url, timeout=10, headers={'User-Agent': 'Mozilla/5.0'}) |
|
response.raise_for_status() |
|
soup = BeautifulSoup(response.content, 'lxml') |
|
|
|
|
|
main_content_area = soup.find('article') or soup.find('main') or soup.body |
|
if main_content_area: |
|
text_content = main_content_area.get_text(separator=' ', strip=True) |
|
else: |
|
text_content = soup.get_text(separator=' ', strip=True) |
|
|
|
scraped_data["meta_title"] = soup.find('title').get_text(strip=True) if soup.find('title') else "Not found" |
|
meta_desc_tag = soup.find('meta', attrs={'name': 'description'}) |
|
scraped_data["meta_description"] = meta_desc_tag['content'] if meta_desc_tag and 'content' in meta_desc_tag.attrs else "Not found" |
|
|
|
|
|
all_links = [a['href'] for a in soup.find_all('a', href=True)] |
|
scraped_data["internal_links"] = len([link for link in all_links if url in link or link.startswith('/')]) |
|
scraped_data["external_links"] = len([link for link in all_links if url not in link and link.startswith('http')]) |
|
|
|
except requests.exceptions.RequestException as e: |
|
return {"url": url, "status": "failed", "error": f"Scraping failed: {str(e)}"} |
|
except Exception as e: |
|
return {"url": url, "status": "failed", "error": f"Error during scraping/parsing: {str(e)}"} |
|
|
|
if not text_content: |
|
return {"url": url, "status": "failed", "error": "Could not extract text content."} |
|
|
|
|
|
try: |
|
word_count = textstat.lexicon_count(text_content) |
|
sentence_count = textstat.sentence_count(text_content) |
|
readability_metrics = { |
|
"flesch_reading_ease": textstat.flesch_reading_ease(text_content), |
|
"flesch_kincaid_grade": textstat.flesch_kincaid_grade(text_content), |
|
"estimated_reading_time_minutes": round(word_count / 200, 2) if word_count > 0 else 0, |
|
"word_count": word_count, |
|
"sentence_count": sentence_count, |
|
} |
|
except Exception as e: |
|
readability_metrics = {"error": f"Readability analysis failed: {str(e)}"} |
|
|
|
|
|
|
|
llm_prompt_for_deep_analysis = f""" |
|
Analyze the following web content from {url}. Extract the requested information and provide it in a JSON format. |
|
|
|
Content: "{text_content}" |
|
|
|
Please analyze this content and provide: |
|
- primary_keywords: List of 3-5 main keywords/topics |
|
- lsi_keywords: List of related semantic keywords (5-8 keywords) |
|
- sentiment: Object with "score" (Positive/Negative/Neutral) and "confidence" (0-1) |
|
- emotional_tone: List of emotional descriptors (2-4 items) |
|
- cta_analysis: Object with "has_cta" (boolean) and "cta_text" (string or null) |
|
- brand_mentions: List of brand names mentioned in the content |
|
""" |
|
|
|
|
|
expected_llm_structure_deep = { |
|
"primary_keywords": [], "lsi_keywords": [], "sentiment": {}, |
|
"emotional_tone": [], "cta_analysis": {}, "brand_mentions": [] |
|
} |
|
llm_driven_analysis_result = query_llm(llm_prompt_for_deep_analysis, expected_llm_structure_deep) |
|
|
|
|
|
if "error" in llm_driven_analysis_result: |
|
return { |
|
"url": url, |
|
"status": "partial_success", |
|
"analysis": { |
|
"readability_metrics": readability_metrics, |
|
"seo_metrics": { |
|
"meta_title": scraped_data.get("meta_title"), |
|
"meta_description": scraped_data.get("meta_description"), |
|
"internal_links": scraped_data.get("internal_links"), |
|
"external_links": scraped_data.get("external_links"), |
|
}, |
|
"llm_driven_analysis": llm_driven_analysis_result |
|
} |
|
} |
|
|
|
|
|
return { |
|
"url": url, |
|
"status": "success", |
|
"analysis": { |
|
"readability_metrics": readability_metrics, |
|
"seo_metrics": { |
|
"meta_title": scraped_data.get("meta_title"), |
|
"meta_description": scraped_data.get("meta_description"), |
|
"internal_links": scraped_data.get("internal_links"), |
|
"external_links": scraped_data.get("external_links"), |
|
}, |
|
"llm_driven_analysis": llm_driven_analysis_result |
|
} |
|
} |
|
|
|
|
|
@app.function(image=modal_image, secrets=[modal.Secret.from_name("openai-secret")]) |
|
def scrape_and_analyze(url: str, analysis_prompt_for_swarm: str): |
|
""" |
|
Helper function for swarm analysis: scrapes and performs a *simple* analysis on one URL. |
|
""" |
|
print(f"Swarm - analyzing single URL: {url}") |
|
try: |
|
response = requests.get(url, timeout=10, headers={'User-Agent': 'Mozilla/5.0'}) |
|
response.raise_for_status() |
|
soup = BeautifulSoup(response.content, 'lxml') |
|
main_content_area = soup.find('article') or soup.find('main') or soup.body |
|
text_content = main_content_area.get_text(separator=' ', strip=True) if main_content_area else soup.get_text(separator=' ', strip=True) |
|
|
|
if not text_content: |
|
return {"url": url, "status": "failed", "error": "No text content found"} |
|
|
|
|
|
llm_prompt = f""" |
|
Content from {url}: {text_content[:1000]} |
|
|
|
{analysis_prompt_for_swarm} |
|
|
|
Please provide a concise summary of the main topic and key points from this content. |
|
""" |
|
summary_result = query_llm(llm_prompt, {"summary": ""}) |
|
return {"url": url, "status": "success", "analysis": summary_result} |
|
|
|
except requests.exceptions.RequestException as e: |
|
return {"url": url, "status": "failed", "error": f"Scraping failed: {str(e)}"} |
|
except Exception as e: |
|
return {"url": url, "status": "failed", "error": f"Processing error: {str(e)}"} |
|
|
|
@app.function(image=modal_image, secrets=[modal.Secret.from_name("openai-secret")], timeout=60000) |
|
def swarm_analyze_urls(urls: list[str], analysis_prompt: str): |
|
""" |
|
Scrapes and analyzes a list of URLs in parallel for swarm mode. |
|
""" |
|
print(f"Swarm analyzing {len(urls)} URLs. Prompt: {analysis_prompt}") |
|
individual_results = [] |
|
|
|
|
|
for result in scrape_and_analyze.map(urls, kwargs={"analysis_prompt_for_swarm": analysis_prompt}): |
|
individual_results.append(result) |
|
|
|
|
|
successful_summaries = [ |
|
res["analysis"]["summary"] |
|
for res in individual_results |
|
if res["status"] == "success" and "analysis" in res and "summary" in res["analysis"] |
|
] |
|
|
|
if not successful_summaries: |
|
return { |
|
"overall_summary": "No successful analyses to aggregate.", |
|
"top_themes": [], |
|
"individual_results": individual_results |
|
} |
|
|
|
aggregation_prompt = f""" |
|
Synthesize these summaries into an comprehensive overview and identify the top themes: |
|
|
|
Summaries: {'. '.join(successful_summaries)} |
|
|
|
Please provide: |
|
- aggregated_summary: A comprehensive overview synthesizing all summaries |
|
- top_themes: List of 3-5 main themes that emerge across all content |
|
""" |
|
aggregated_llm_result = query_llm(aggregation_prompt, {"aggregated_summary": "", "top_themes": []}) |
|
|
|
return { |
|
"overall_summary": aggregated_llm_result.get("aggregated_summary"), |
|
"top_themes": aggregated_llm_result.get("top_themes"), |
|
"individual_results": individual_results |
|
} |
|
|
|
|
|
|
|
|
|
@app.local_entrypoint() |
|
def main(): |
|
print("--- Testing deep_analyze_url ---") |
|
|
|
test_url_deep = "https://modal.com/docs/guide" |
|
deep_result = deep_analyze_url.remote(test_url_deep) |
|
print(json.dumps(deep_result, indent=2)) |
|
|
|
print("\n--- Testing swarm_analyze_urls ---") |
|
test_urls_swarm = [ |
|
"https://modal.com/blog", |
|
"https://gantry.io/blog", |
|
"http://example.com/nonexistentpage" |
|
] |
|
swarm_result = swarm_analyze_urls.remote(test_urls_swarm, "Provide a brief summary of the main topic.") |
|
print(json.dumps(swarm_result, indent=2)) |