Spaces:
Sleeping
Sleeping
import gradio as gr | |
import google.generativeai as genai | |
from datetime import datetime | |
from dataclasses import dataclass | |
from typing import List, Dict, Optional, Tuple | |
import requests | |
import json | |
import os | |
from dotenv import load_dotenv | |
class Source: | |
"""Represents a source used for fact-checking.""" | |
url: str | |
title: str | |
content: str | |
reputation_score: float | |
class FactCheckResult: | |
"""Represents the result of a fact check.""" | |
claim: str | |
verdict: str | |
confidence_score: float | |
analysis_date: str | |
sources: List[Source] | |
evidence: List[Dict] | |
contradictions: List[Dict] | |
explanation: str | |
class GeminiFactChecker: | |
def __init__(self): | |
if not os.getenv("GOOGLE_API_KEY"): | |
raise ValueError("GOOGLE_API_KEY environment variable is required") | |
genai.configure(api_key=os.getenv("GOOGLE_API_KEY")) | |
generation_config = genai.types.GenerationConfig( | |
temperature=0.1, | |
top_p=0.8, | |
top_k=40, | |
) | |
self.model = genai.GenerativeModel( | |
model_name='gemini-1.5-pro', | |
generation_config=generation_config | |
) | |
self.search_api_key = os.getenv("SEARCH_API_KEY") | |
self.search_engine_id = os.getenv("SEARCH_ENGINE_ID") | |
self.jinai_api_key = os.getenv("JINA_AI_API_KEY") | |
self.jinai_reader_url = "https://r.jina.ai/" | |
def _search_sources(self, claim: str, num_sources: int = 3) -> List[str]: | |
try: | |
search_url = "https://www.googleapis.com/customsearch/v1" | |
params = { | |
'key': self.search_api_key, | |
'cx': self.search_engine_id, | |
'q': claim, | |
'num': num_sources | |
} | |
response = requests.get(search_url, params=params) | |
response.raise_for_status() | |
search_results = response.json() | |
return [item['link'] for item in search_results.get('items', [])] | |
except Exception as e: | |
print(f"Error searching sources: {str(e)}") | |
return [] | |
def _fetch_webpage_content(self, url: str) -> Optional[dict]: | |
try: | |
headers = { | |
'Accept': 'application/json', | |
'Authorization': f'Bearer {self.jinai_api_key}' | |
} | |
response = requests.get(f"{self.jinai_reader_url}/{url}", | |
headers=headers, | |
timeout=10) | |
response.raise_for_status() | |
data = response.json() | |
if not data.get('data'): | |
return None | |
return { | |
"content": data['data'].get('content', '')[:5000], | |
"title": data['data'].get('title', ''), | |
"data": data['data'] | |
} | |
except Exception as e: | |
print(f"Error fetching {url}: {str(e)}") | |
return None | |
def _analyze_evidence(self, claim: str, sources: List[Source]) -> List[Dict]: | |
all_evidence = [] | |
for source in sources: | |
prompt = f""" | |
Analyze this content and return evidence as JSON array: | |
CLAIM: "{claim}" | |
SOURCE TITLE: {source.title} | |
CONTENT: {source.content[:2000]} | |
Return array of evidence objects with properties: | |
- text: exact quote or clear paraphrase | |
- type: "supporting" or "contradicting" | |
- relevance: number 0.0 to 1.0 | |
- source: source title | |
""" | |
try: | |
response = self.model.generate_content(prompt) | |
if response.text: | |
clean_text = response.text.strip() | |
if clean_text.startswith('```json'): | |
clean_text = clean_text[7:-3] | |
elif clean_text.startswith('[') and clean_text.endswith(']'): | |
clean_text = clean_text | |
evidence_list = json.loads(clean_text) | |
for evidence in evidence_list: | |
evidence["source_score"] = source.reputation_score | |
all_evidence.extend(evidence_list) | |
except Exception as e: | |
print(f"Error analyzing source {source.url}: {str(e)}") | |
continue | |
return all_evidence | |
def check_fact(self, claim: str, num_sources: int = 3) -> Optional[FactCheckResult]: | |
try: | |
urls = self._search_sources(claim, num_sources) | |
if not urls: | |
return None | |
sources = [] | |
for url in urls: | |
content_dict = self._fetch_webpage_content(url) | |
if content_dict: | |
sources.append(Source( | |
url=url, | |
title=content_dict.get("title", url), | |
content=content_dict["content"], | |
reputation_score=0.8 # Default score | |
)) | |
if not sources: | |
return None | |
evidence = self._analyze_evidence(claim, sources) | |
supporting = [e for e in evidence if e["type"] == "supporting"] | |
contradicting = [e for e in evidence if e["type"] == "contradicting"] | |
total_support = sum( | |
float(e.get("relevance", 0.5)) * float(e.get("source_score", 1)) | |
for e in supporting | |
) | |
total_contradiction = sum( | |
float(e.get("relevance", 0.5)) * float(e.get("source_score", 1)) | |
for e in contradicting | |
) | |
if not evidence: | |
verdict = "Insufficient evidence" | |
confidence = 0.0 | |
explanation = "No evidence found from analyzed sources." | |
else: | |
support_ratio = total_support / (total_support + total_contradiction) if (total_support + total_contradiction) > 0 else 0 | |
confidence = max(support_ratio, 1 - support_ratio) | |
if support_ratio > 0.6: | |
verdict = "Likely True" if confidence >= 0.7 else "Somewhat True" | |
elif support_ratio < 0.4: | |
verdict = "Likely False" if confidence >= 0.7 else "Somewhat False" | |
else: | |
verdict = "Inconclusive" | |
explanation = f"Based on {len(supporting)} supporting and {len(contradicting)} contradicting pieces of evidence." | |
return FactCheckResult( | |
claim=claim, | |
verdict=verdict, | |
confidence_score=confidence, | |
analysis_date=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
sources=sources, | |
evidence=supporting, | |
contradictions=contradicting, | |
explanation=explanation | |
) | |
except Exception as e: | |
print(f"Error during fact checking: {str(e)}") | |
return None | |
def format_fact_check_report(result: FactCheckResult) -> str: | |
report = f"""# Fact Check Report | |
## Claim | |
"{result.claim}" | |
## Verdict: {result.verdict} | |
Confidence Score: {result.confidence_score:.2f} | |
## Explanation | |
{result.explanation} | |
## Analysis Summary | |
- Number of sources analyzed: {len(result.sources)} | |
- Supporting evidence found: {len(result.evidence)} | |
- Contradicting points found: {len(result.contradictions)} | |
## Sources Analyzed | |
""" | |
for source in result.sources: | |
report += f"- [{source.title}]({source.url}) (Credibility: {source.reputation_score:.2f})\n" | |
if result.evidence: | |
report += "\n### Supporting Evidence:\n" | |
for e in result.evidence[:3]: | |
report += f"- {e['text']} (Source: {e['source']})\n" | |
if result.contradictions: | |
report += "\n### Contradicting Points:\n" | |
for c in result.contradictions[:3]: | |
report += f"- {c['text']} (Source: {c['source']})\n" | |
return report | |
def main(): | |
load_dotenv() | |
fact_checker = GeminiFactChecker() | |
with gr.Blocks() as demo: | |
gr.Markdown("# AI-Powered Fact Checker") | |
gr.Markdown("Enter a claim to check its veracity against multiple sources.") | |
with gr.Row(): | |
with gr.Column(): | |
claim = gr.Textbox( | |
label="Claim to Check", | |
placeholder="Enter the claim you want to verify...", | |
lines=3 | |
) | |
num_sources = gr.Slider( | |
label="Number of Sources to Check", | |
minimum=1, | |
maximum=5, | |
value=3, | |
step=1 | |
) | |
check_button = gr.Button("Check Claim", variant="primary") | |
with gr.Column(): | |
status = gr.Markdown("Ready to check claims...") | |
report = gr.Markdown() | |
def check_fact_wrapper(claim: str, num_sources: int): | |
status_value = "π Searching and analyzing sources..." | |
yield status_value, "" | |
try: | |
result = fact_checker.check_fact(claim, int(num_sources)) | |
if result: | |
status_value = "β Analysis complete!" | |
report_value = format_fact_check_report(result) | |
else: | |
status_value = "β Error occurred" | |
report_value = "Error occurred during fact checking." | |
except Exception as e: | |
status_value = "β Error occurred" | |
report_value = f"Error: {str(e)}" | |
yield status_value, report_value | |
check_button.click( | |
fn=check_fact_wrapper, | |
inputs=[claim, num_sources], | |
outputs=[status, report], | |
show_progress=True | |
) | |
demo.launch() | |
if __name__ == "__main__": | |
main() |