Reality123b commited on
Commit
e41b8bc
·
verified ·
1 Parent(s): 8a72144

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +116 -79
app.py CHANGED
@@ -1,38 +1,57 @@
1
  from fastapi import FastAPI
2
  from fastapi.responses import StreamingResponse, HTMLResponse
3
  from pydantic import BaseModel
4
- from transformers import pipeline
5
  import asyncio
 
 
6
  import queue
7
  import threading
8
- import time
9
  import random
10
- import httpx
11
-
12
- class ModelInput(BaseModel):
13
- prompt: str
14
- max_new_tokens: int = 64000
15
-
16
- app = FastAPI()
17
-
18
- # Your main generation model (DeepSeek)
 
 
 
19
  generator = pipeline(
20
  "text-generation",
21
  model="deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B",
22
  device="cpu"
23
  )
24
 
25
- # The summarization instruct model
26
- summarizer = pipeline(
27
  "text-generation",
28
  model="HuggingFaceTB/SmolLM2-360M-Instruct",
29
- device="cpu",
30
- max_length=512, # keep summary short
31
- do_sample=False
32
  )
33
 
 
 
 
 
 
34
  knowledge_graph = {}
35
 
 
 
 
 
 
 
 
 
 
 
 
 
36
  async def fetch_ddg_search(query: str):
37
  url = "https://api.duckduckgo.com/"
38
  params = {
@@ -44,101 +63,115 @@ async def fetch_ddg_search(query: str):
44
  }
45
  async with httpx.AsyncClient() as client:
46
  resp = await client.get(url, params=params, timeout=15)
47
- data = resp.json()
48
- return data
49
 
50
  def clean_ddg_text(ddg_json):
51
- # Take abstract text + top related topic texts concatenated
52
  abstract = ddg_json.get("AbstractText", "")
53
  related = ddg_json.get("RelatedTopics", [])
54
  related_texts = []
55
  for item in related:
56
- if "Text" in item:
57
  related_texts.append(item["Text"])
58
- elif "Name" in item and "Topics" in item:
59
  for sub in item["Topics"]:
60
  if "Text" in sub:
61
  related_texts.append(sub["Text"])
62
- combined_text = abstract + " " + " ".join(related_texts)
63
- # Simple clean up, trim length to avoid overloading
64
- combined_text = combined_text.strip().replace("\n", " ")
65
- if len(combined_text) > 1000:
66
- combined_text = combined_text[:1000] + "..."
67
- return combined_text
68
 
69
- def summarize_text(text: str):
70
- # Run the instruct model to summarize/clean the text
71
- prompt = f"Summarize this information concisely:\n{text}\nSummary:"
72
- output = summarizer(prompt, max_length=256, do_sample=False)
73
- return output[0]["generated_text"].strip()
74
-
75
- async def generate_random_query():
76
- """Use your main model to invent a search query."""
77
  prompt = (
78
- "Generate one short, interesting, real-world question "
79
- "about technology, startups, AI, or science that someone might Google. "
80
- "Output only the query, nothing else."
81
  )
82
- output = generator(prompt, max_new_tokens=20, do_sample=True, temperature=0.9, truncation=True)
83
- query = output[0]["generated_text"].strip()
84
- # Safety: strip weird trailing punctuation
85
- return query.split("\n")[0].strip("?!. ")
 
 
 
 
 
86
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
87
  async def update_knowledge_graph_periodically():
88
  while True:
89
  try:
90
- query = await generate_random_query()
91
- print(f"[KG Updater] Searching DuckDuckGo for query: {query}")
92
-
93
  ddg_data = await fetch_ddg_search(query)
94
  cleaned = clean_ddg_text(ddg_data)
95
 
96
  if not cleaned or len(cleaned) < 50:
97
- print("[KG Updater] Too little info found, retrying with new query...")
98
- continue # Skip saving, go to next loop
99
-
100
- # Summarize with instruct model
101
- loop = asyncio.get_event_loop()
102
- summary = await loop.run_in_executor(
103
- None,
104
- lambda: summarizer(
105
- f"Summarize this info concisely:\n{cleaned}",
106
- max_length=256,
107
- truncation=True,
108
- do_sample=False
109
- )[0]["generated_text"].strip()
110
- )
111
-
112
- knowledge_graph[query] = {
113
- "raw_text": cleaned,
114
- "summary": summary,
115
- "timestamp": time.time()
116
- }
117
- print(f"[KG Updater] Knowledge graph updated for query: {query}")
118
 
119
  except Exception as e:
120
  print(f"[KG Updater] Error: {e}")
121
 
122
- await asyncio.sleep(60) # Run forever, every 60s
123
-
124
 
125
  @app.on_event("startup")
126
  async def startup_event():
127
  asyncio.create_task(update_knowledge_graph_periodically())
128
 
129
- # Manual streaming endpoint (kept as-is)
 
 
130
  @app.post("/generate/stream")
131
  async def generate_stream(input: ModelInput):
132
  q = queue.Queue()
 
133
  def run_generation():
134
  try:
135
- streamer = pipeline("text-generation", model="deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B", device="cpu").tokenizer
136
  streamer = TextStreamer(generator.tokenizer, skip_prompt=True)
137
  def enqueue_token(token):
138
  q.put(token)
139
  streamer.put = enqueue_token
 
 
140
  generator(
141
- input.prompt,
142
  max_new_tokens=input.max_new_tokens,
143
  do_sample=False,
144
  streamer=streamer
@@ -147,6 +180,7 @@ async def generate_stream(input: ModelInput):
147
  q.put(f"[ERROR] {e}")
148
  finally:
149
  q.put(None)
 
150
  thread = threading.Thread(target=run_generation)
151
  thread.start()
152
 
@@ -157,23 +191,28 @@ async def generate_stream(input: ModelInput):
157
  if token is None:
158
  break
159
  yield token
 
160
  return StreamingResponse(event_generator(), media_type="text/plain")
161
 
162
- # Endpoint to get KG
 
 
163
  @app.get("/knowledge")
164
  async def get_knowledge():
165
  return knowledge_graph
166
 
167
- # Basic client page to test streaming
 
 
168
  @app.get("/", response_class=HTMLResponse)
169
  async def root():
170
  return """
171
  <!DOCTYPE html>
172
  <html>
173
- <head><title>Streaming Text Generation Client</title></head>
174
  <body>
175
- <h2>Streaming Text Generation Demo</h2>
176
- <textarea id="prompt" rows="4" cols="60">Write me a poem about tech startup struggles</textarea><br/>
177
  <button onclick="startStreaming()">Generate</button>
178
  <pre id="output" style="white-space: pre-wrap; background:#eee; padding:10px; border-radius:5px; max-height:400px; overflow:auto;"></pre>
179
  <h3>Knowledge Graph</h3>
@@ -199,15 +238,13 @@ async def root():
199
  output.scrollTop = output.scrollHeight;
200
  }
201
  }
202
-
203
  async function fetchKG() {
204
  const kgPre = document.getElementById("kg");
205
  const res = await fetch("/knowledge");
206
  const data = await res.json();
207
  kgPre.textContent = JSON.stringify(data, null, 2);
208
  }
209
-
210
- setInterval(fetchKG, 10000); // update KG display every 10s
211
  window.onload = fetchKG;
212
  </script>
213
  </body>
 
1
  from fastapi import FastAPI
2
  from fastapi.responses import StreamingResponse, HTMLResponse
3
  from pydantic import BaseModel
4
+ from transformers import pipeline, TextStreamer
5
  import asyncio
6
+ import httpx
7
+ import time
8
  import queue
9
  import threading
 
10
  import random
11
+ import re
12
+
13
+ # =========================
14
+ # CONFIG
15
+ # =========================
16
+ UPDATE_INTERVAL = 60 # seconds between KG updates
17
+ MAX_KG_SIZE = 50 # limit stored KG nodes to avoid memory bloat
18
+
19
+ # =========================
20
+ # MODELS
21
+ # =========================
22
+ # Main generator
23
  generator = pipeline(
24
  "text-generation",
25
  model="deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B",
26
  device="cpu"
27
  )
28
 
29
+ # Query + summarization model (SmolLM2 instruct)
30
+ query_generator = pipeline(
31
  "text-generation",
32
  model="HuggingFaceTB/SmolLM2-360M-Instruct",
33
+ device="cpu"
 
 
34
  )
35
 
36
+ summarizer = query_generator # same model for now
37
+
38
+ # =========================
39
+ # KNOWLEDGE GRAPH
40
+ # =========================
41
  knowledge_graph = {}
42
 
43
+ # =========================
44
+ # FASTAPI
45
+ # =========================
46
+ app = FastAPI()
47
+
48
+ class ModelInput(BaseModel):
49
+ prompt: str
50
+ max_new_tokens: int = 64000
51
+
52
+ # =========================
53
+ # UTILS
54
+ # =========================
55
  async def fetch_ddg_search(query: str):
56
  url = "https://api.duckduckgo.com/"
57
  params = {
 
63
  }
64
  async with httpx.AsyncClient() as client:
65
  resp = await client.get(url, params=params, timeout=15)
66
+ return resp.json()
 
67
 
68
  def clean_ddg_text(ddg_json):
 
69
  abstract = ddg_json.get("AbstractText", "")
70
  related = ddg_json.get("RelatedTopics", [])
71
  related_texts = []
72
  for item in related:
73
+ if isinstance(item, dict) and "Text" in item:
74
  related_texts.append(item["Text"])
75
+ elif isinstance(item, dict) and "Topics" in item:
76
  for sub in item["Topics"]:
77
  if "Text" in sub:
78
  related_texts.append(sub["Text"])
79
+ combined = (abstract + " " + " ".join(related_texts)).strip()
80
+ combined = re.sub(r"\s+", " ", combined)
81
+ if len(combined) > 1000:
82
+ combined = combined[:1000] + "..."
83
+ return combined
 
84
 
85
+ def generate_dynamic_query():
 
 
 
 
 
 
 
86
  prompt = (
87
+ "Generate a short, specific search query about technology, startups, AI, or science. "
88
+ "Be creative, realistic, and output only the query with no extra words."
 
89
  )
90
+ output = query_generator(
91
+ prompt,
92
+ max_new_tokens=32,
93
+ truncation=True,
94
+ do_sample=True,
95
+ temperature=0.9
96
+ )
97
+ query = output[0]["generated_text"].strip().split("\n")[0]
98
+ return query
99
 
100
+ def summarize_text(text: str):
101
+ prompt = f"Summarize this concisely:\n{text}\nSummary:"
102
+ output = summarizer(
103
+ prompt,
104
+ max_new_tokens=256,
105
+ truncation=True,
106
+ do_sample=False
107
+ )
108
+ return output[0]["generated_text"].strip()
109
+
110
+ def inject_relevant_kg(prompt: str):
111
+ """Find relevant KG entries and inject into prompt."""
112
+ if not knowledge_graph:
113
+ return prompt
114
+ best_match = None
115
+ for key, node in knowledge_graph.items():
116
+ if any(word.lower() in prompt.lower() for word in key.split()):
117
+ best_match = node
118
+ break
119
+ if best_match:
120
+ return f"{prompt}\n\nRelevant knowledge from memory:\n{best_match['summary']}"
121
+ return prompt
122
+
123
+ # =========================
124
+ # BACKGROUND TASK
125
+ # =========================
126
  async def update_knowledge_graph_periodically():
127
  while True:
128
  try:
129
+ query = generate_dynamic_query()
130
+ print(f"[KG Updater] Searching DDG for query: {query}")
 
131
  ddg_data = await fetch_ddg_search(query)
132
  cleaned = clean_ddg_text(ddg_data)
133
 
134
  if not cleaned or len(cleaned) < 50:
135
+ print("[KG Updater] Too little info found, retrying next cycle...")
136
+ else:
137
+ summary = summarize_text(cleaned)
138
+ knowledge_graph[query] = {
139
+ "raw_text": cleaned,
140
+ "summary": summary,
141
+ "timestamp": time.time()
142
+ }
143
+ if len(knowledge_graph) > MAX_KG_SIZE:
144
+ # remove oldest
145
+ oldest_key = min(knowledge_graph, key=lambda k: knowledge_graph[k]['timestamp'])
146
+ del knowledge_graph[oldest_key]
147
+ print(f"[KG Updater] Knowledge graph updated for query: {query}")
 
 
 
 
 
 
 
 
148
 
149
  except Exception as e:
150
  print(f"[KG Updater] Error: {e}")
151
 
152
+ await asyncio.sleep(UPDATE_INTERVAL)
 
153
 
154
  @app.on_event("startup")
155
  async def startup_event():
156
  asyncio.create_task(update_knowledge_graph_periodically())
157
 
158
+ # =========================
159
+ # STREAMING ENDPOINT
160
+ # =========================
161
  @app.post("/generate/stream")
162
  async def generate_stream(input: ModelInput):
163
  q = queue.Queue()
164
+
165
  def run_generation():
166
  try:
 
167
  streamer = TextStreamer(generator.tokenizer, skip_prompt=True)
168
  def enqueue_token(token):
169
  q.put(token)
170
  streamer.put = enqueue_token
171
+
172
+ enriched_prompt = inject_relevant_kg(input.prompt)
173
  generator(
174
+ enriched_prompt,
175
  max_new_tokens=input.max_new_tokens,
176
  do_sample=False,
177
  streamer=streamer
 
180
  q.put(f"[ERROR] {e}")
181
  finally:
182
  q.put(None)
183
+
184
  thread = threading.Thread(target=run_generation)
185
  thread.start()
186
 
 
191
  if token is None:
192
  break
193
  yield token
194
+
195
  return StreamingResponse(event_generator(), media_type="text/plain")
196
 
197
+ # =========================
198
+ # VIEW KG
199
+ # =========================
200
  @app.get("/knowledge")
201
  async def get_knowledge():
202
  return knowledge_graph
203
 
204
+ # =========================
205
+ # TEST CLIENT PAGE
206
+ # =========================
207
  @app.get("/", response_class=HTMLResponse)
208
  async def root():
209
  return """
210
  <!DOCTYPE html>
211
  <html>
212
+ <head><title>Xylaria Cognitive Worker</title></head>
213
  <body>
214
+ <h2>Xylaria Cognitive Worker</h2>
215
+ <textarea id="prompt" rows="4" cols="60">Explain how AI startups secure funding</textarea><br/>
216
  <button onclick="startStreaming()">Generate</button>
217
  <pre id="output" style="white-space: pre-wrap; background:#eee; padding:10px; border-radius:5px; max-height:400px; overflow:auto;"></pre>
218
  <h3>Knowledge Graph</h3>
 
238
  output.scrollTop = output.scrollHeight;
239
  }
240
  }
 
241
  async function fetchKG() {
242
  const kgPre = document.getElementById("kg");
243
  const res = await fetch("/knowledge");
244
  const data = await res.json();
245
  kgPre.textContent = JSON.stringify(data, null, 2);
246
  }
247
+ setInterval(fetchKG, 10000);
 
248
  window.onload = fetchKG;
249
  </script>
250
  </body>