|
from fastapi import FastAPI |
|
from fastapi.responses import StreamingResponse, HTMLResponse |
|
from pydantic import BaseModel |
|
from transformers import pipeline, TextStreamer |
|
import asyncio |
|
import httpx |
|
import time |
|
import queue |
|
import threading |
|
import random |
|
import re |
|
|
|
|
|
|
|
|
|
UPDATE_INTERVAL = 60 |
|
MAX_KG_SIZE = 50 |
|
|
|
|
|
|
|
|
|
|
|
generator = pipeline( |
|
"text-generation", |
|
model="deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B", |
|
device="cpu" |
|
) |
|
|
|
|
|
query_generator = pipeline( |
|
"text-generation", |
|
model="HuggingFaceTB/SmolLM2-360M-Instruct", |
|
device="cpu" |
|
) |
|
|
|
summarizer = query_generator |
|
|
|
|
|
|
|
|
|
knowledge_graph = {} |
|
|
|
|
|
|
|
|
|
app = FastAPI() |
|
|
|
class ModelInput(BaseModel): |
|
prompt: str |
|
max_new_tokens: int = 64000 |
|
|
|
|
|
|
|
|
|
async def fetch_ddg_search(query: str): |
|
url = "https://api.duckduckgo.com/" |
|
params = { |
|
"q": query, |
|
"format": "json", |
|
"no_redirect": "1", |
|
"no_html": "1", |
|
"skip_disambig": "1" |
|
} |
|
async with httpx.AsyncClient() as client: |
|
resp = await client.get(url, params=params, timeout=15) |
|
return resp.json() |
|
|
|
def clean_ddg_text(ddg_json): |
|
abstract = ddg_json.get("AbstractText", "") |
|
related = ddg_json.get("RelatedTopics", []) |
|
related_texts = [] |
|
for item in related: |
|
if isinstance(item, dict) and "Text" in item: |
|
related_texts.append(item["Text"]) |
|
elif isinstance(item, dict) and "Topics" in item: |
|
for sub in item["Topics"]: |
|
if "Text" in sub: |
|
related_texts.append(sub["Text"]) |
|
combined = (abstract + " " + " ".join(related_texts)).strip() |
|
combined = re.sub(r"\s+", " ", combined) |
|
if len(combined) > 1000: |
|
combined = combined[:1000] + "..." |
|
return combined |
|
|
|
def generate_dynamic_query(): |
|
prompt = ( |
|
"Generate a short, specific search query about technology, startups, AI, or science. " |
|
"Be creative, realistic, and output only the query with no extra words." |
|
) |
|
output = query_generator( |
|
prompt, |
|
max_new_tokens=32, |
|
truncation=True, |
|
do_sample=True, |
|
temperature=0.9 |
|
) |
|
query = output[0]["generated_text"].strip().split("\n")[0] |
|
return query |
|
|
|
def summarize_text(text: str): |
|
prompt = f"Summarize this concisely:\n{text}\nSummary:" |
|
output = summarizer( |
|
prompt, |
|
max_new_tokens=256, |
|
truncation=True, |
|
do_sample=False |
|
) |
|
return output[0]["generated_text"].strip() |
|
|
|
def inject_relevant_kg(prompt: str): |
|
"""Find relevant KG entries and inject into prompt.""" |
|
if not knowledge_graph: |
|
return prompt |
|
best_match = None |
|
for key, node in knowledge_graph.items(): |
|
if any(word.lower() in prompt.lower() for word in key.split()): |
|
best_match = node |
|
break |
|
if best_match: |
|
return f"{prompt}\n\nRelevant knowledge from memory:\n{best_match['summary']}" |
|
return prompt |
|
|
|
|
|
|
|
|
|
async def update_knowledge_graph_periodically(): |
|
while True: |
|
try: |
|
query = generate_dynamic_query() |
|
print(f"[KG Updater] Searching DDG for query: {query}") |
|
ddg_data = await fetch_ddg_search(query) |
|
cleaned = clean_ddg_text(ddg_data) |
|
|
|
if not cleaned or len(cleaned) < 50: |
|
print("[KG Updater] Too little info found, retrying next cycle...") |
|
else: |
|
summary = summarize_text(cleaned) |
|
knowledge_graph[query] = { |
|
"raw_text": cleaned, |
|
"summary": summary, |
|
"timestamp": time.time() |
|
} |
|
if len(knowledge_graph) > MAX_KG_SIZE: |
|
|
|
oldest_key = min(knowledge_graph, key=lambda k: knowledge_graph[k]['timestamp']) |
|
del knowledge_graph[oldest_key] |
|
print(f"[KG Updater] Knowledge graph updated for query: {query}") |
|
|
|
except Exception as e: |
|
print(f"[KG Updater] Error: {e}") |
|
|
|
await asyncio.sleep(UPDATE_INTERVAL) |
|
|
|
@app.on_event("startup") |
|
async def startup_event(): |
|
asyncio.create_task(update_knowledge_graph_periodically()) |
|
|
|
|
|
|
|
|
|
@app.post("/generate/stream") |
|
async def generate_stream(input: ModelInput): |
|
q = queue.Queue() |
|
|
|
def run_generation(): |
|
try: |
|
streamer = TextStreamer(generator.tokenizer, skip_prompt=True) |
|
def enqueue_token(token): |
|
q.put(token) |
|
streamer.put = enqueue_token |
|
|
|
enriched_prompt = inject_relevant_kg(input.prompt) |
|
generator( |
|
enriched_prompt, |
|
max_new_tokens=input.max_new_tokens, |
|
do_sample=False, |
|
streamer=streamer |
|
) |
|
except Exception as e: |
|
q.put(f"[ERROR] {e}") |
|
finally: |
|
q.put(None) |
|
|
|
thread = threading.Thread(target=run_generation) |
|
thread.start() |
|
|
|
async def event_generator(): |
|
loop = asyncio.get_event_loop() |
|
while True: |
|
token = await loop.run_in_executor(None, q.get) |
|
if token is None: |
|
break |
|
yield token |
|
|
|
return StreamingResponse(event_generator(), media_type="text/plain") |
|
|
|
|
|
|
|
|
|
@app.get("/knowledge") |
|
async def get_knowledge(): |
|
return knowledge_graph |
|
|
|
|
|
|
|
|
|
@app.get("/", response_class=HTMLResponse) |
|
async def root(): |
|
return """ |
|
<!DOCTYPE html> |
|
<html> |
|
<head><title>Xylaria Cognitive Worker</title></head> |
|
<body> |
|
<h2>Xylaria Cognitive Worker</h2> |
|
<textarea id="prompt" rows="4" cols="60">Explain how AI startups secure funding</textarea><br/> |
|
<button onclick="startStreaming()">Generate</button> |
|
<pre id="output" style="white-space: pre-wrap; background:#eee; padding:10px; border-radius:5px; max-height:400px; overflow:auto;"></pre> |
|
<h3>Knowledge Graph</h3> |
|
<pre id="kg" style="background:#ddd; padding:10px; max-height:300px; overflow:auto;"></pre> |
|
|
|
<script> |
|
async function startStreaming() { |
|
const prompt = document.getElementById("prompt").value; |
|
const output = document.getElementById("output"); |
|
output.textContent = ""; |
|
const response = await fetch("/generate/stream", { |
|
method: "POST", |
|
headers: { "Content-Type": "application/json" }, |
|
body: JSON.stringify({ prompt: prompt, max_new_tokens: 64000 }) |
|
}); |
|
const reader = response.body.getReader(); |
|
const decoder = new TextDecoder(); |
|
while(true) { |
|
const {done, value} = await reader.read(); |
|
if(done) break; |
|
const chunk = decoder.decode(value, {stream: true}); |
|
output.textContent += chunk; |
|
output.scrollTop = output.scrollHeight; |
|
} |
|
} |
|
async function fetchKG() { |
|
const kgPre = document.getElementById("kg"); |
|
const res = await fetch("/knowledge"); |
|
const data = await res.json(); |
|
kgPre.textContent = JSON.stringify(data, null, 2); |
|
} |
|
setInterval(fetchKG, 10000); |
|
window.onload = fetchKG; |
|
</script> |
|
</body> |
|
</html> |
|
""" |
|
|