import requests import numpy as np import tensorflow as tf from tensorflow.keras import layers import asyncio from fastapi import FastAPI, Request from fastapi.responses import StreamingResponse import sentencepiece as spm app = FastAPI() from fastapi.middleware.cors import CORSMiddleware origins = [ "https://insect5386.github.io", "https://insect5386.github.io/insect5386" ] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) sp = spm.SentencePieceProcessor() sp.load("kolig_unigram.model") pad_id = sp.piece_to_id("") if pad_id == -1: pad_id = 0 start_id = sp.piece_to_id("") if start_id == -1: start_id = 1 end_id = sp.piece_to_id("") if end_id == -1: end_id = 2 unk_id = sp.piece_to_id("") if unk_id == -1: unk_id = 3 vocab_size = sp.get_piece_size() max_len = 100 def text_to_ids(text): return sp.encode(text, out_type=int) def ids_to_text(ids): return sp.decode(ids) class RotaryPositionalEmbedding(layers.Layer): def __init__(self, dim): super().__init__() inv_freq = 1.0 / (10000 ** (np.arange(0, dim, 2) / dim)) self.inv_freq = tf.constant(inv_freq, dtype=tf.float32) def call(self, x): batch, heads, seq_len, depth = tf.unstack(tf.shape(x)) t = tf.range(seq_len, dtype=tf.float32) freqs = tf.einsum('i,j->ij', t, self.inv_freq) emb_sin = tf.sin(freqs) emb_cos = tf.cos(freqs) emb_cos = tf.reshape(emb_cos, [1, 1, seq_len, -1]) emb_sin = tf.reshape(emb_sin, [1, 1, seq_len, -1]) x1 = x[..., ::2] x2 = x[..., 1::2] x_rotated = tf.stack([ x1 * emb_cos - x2 * emb_sin, x1 * emb_sin + x2 * emb_cos ], axis=-1) x_rotated = tf.reshape(x_rotated, tf.shape(x)) return x_rotated class SwiGLU(tf.keras.layers.Layer): def __init__(self, d_model, d_ff): super().__init__() self.proj = tf.keras.layers.Dense(d_ff * 2) self.out = tf.keras.layers.Dense(d_model) def call(self, x): x_proj = self.proj(x) x_val, x_gate = tf.split(x_proj, 2, axis=-1) return self.out(x_val * tf.nn.silu(x_gate)) class GPTBlock(tf.keras.layers.Layer): def __init__(self, d_model, d_ff, num_heads=8, dropout_rate=0.1, adapter_dim=64): super().__init__() self.ln1 = tf.keras.layers.LayerNormalization(epsilon=1e-5) self.mha = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=d_model // num_heads) self.dropout1 = tf.keras.layers.Dropout(dropout_rate) self.adapter_down = tf.keras.layers.Dense(adapter_dim, activation='gelu') self.adapter_up = tf.keras.layers.Dense(d_model) self.ln2 = tf.keras.layers.LayerNormalization(epsilon=1e-5) self.ffn = SwiGLU(d_model, d_ff) self.dropout2 = tf.keras.layers.Dropout(dropout_rate) self.rope = RotaryPositionalEmbedding(d_model // num_heads) def call(self, x, training=False): x_norm = self.ln1(x) b, s, _ = tf.shape(x_norm)[0], tf.shape(x_norm)[1], tf.shape(x_norm)[2] h = self.mha.num_heads d = x_norm.shape[-1] // h qkv = tf.reshape(x_norm, [b, s, h, d]) qkv = tf.transpose(qkv, [0, 2, 1, 3]) q = self.rope(qkv) k = self.rope(qkv) q = tf.reshape(tf.transpose(q, [0, 2, 1, 3]), [b, s, h * d]) k = tf.reshape(tf.transpose(k, [0, 2, 1, 3]), [b, s, h * d]) attn_out = self.mha(query=q, value=x_norm, key=k, use_causal_mask=True, training=training) attn_out = self.dropout1(attn_out, training=training) adapter_out = self.adapter_up(self.adapter_down(attn_out)) attn_out = attn_out + adapter_out x = x + attn_out ffn_out = self.ffn(self.ln2(x)) x = x + self.dropout2(ffn_out, training=training) return x class InteractGPT(tf.keras.Model): def __init__(self, vocab_size, seq_len, d_model, d_ff, n_layers, num_heads=8, dropout_rate=0.1): super().__init__() self.token_embedding = tf.keras.layers.Embedding(vocab_size, d_model) self.blocks = [GPTBlock(d_model, d_ff, num_heads, dropout_rate) for _ in range(n_layers)] self.ln_f = tf.keras.layers.LayerNormalization(epsilon=1e-5) def call(self, x, training=False): x = self.token_embedding(x) for block in self.blocks: x = block(x, training=training) x = self.ln_f(x) logits = tf.matmul(x, self.token_embedding.embeddings, transpose_b=True) return logits model = InteractGPT(vocab_size=vocab_size, seq_len=max_len, d_model=256, d_ff=1024, n_layers=6) dummy_input = tf.zeros((1, max_len), dtype=tf.int32) # 배치1, 시퀀스길이 max_len _ = model(dummy_input) # 모델이 빌드됨 model.load_weights("InteractGPT.weights.h5") print("모델 가중치 로드 완료!") def generate_text_typical(model, prompt, max_len=100, max_gen=98, temperature=0.50, min_len=20, repetition_penalty=1.2, typical_p=0.80): def typical_filtering(logits, typical_p): probs = np.exp(logits - np.max(logits)) probs /= probs.sum() log_probs = np.log(probs + 1e-9) entropy = -np.sum(probs * log_probs) shifted = np.abs(-log_probs - entropy) sorted_idx = np.argsort(shifted) sorted_probs = probs[sorted_idx] cum_probs = np.cumsum(sorted_probs) cutoff = np.searchsorted(cum_probs, typical_p) + 1 final_idx = sorted_idx[:cutoff] final_probs = probs[final_idx] final_probs /= final_probs.sum() return final_idx, final_probs model_input = text_to_ids(f" {prompt} ") model_input = model_input[:max_len] generated = list(model_input) for step in range(max_gen): pad_len = max(0, max_len - len(generated)) input_padded = np.pad(generated, (0, pad_len), constant_values=pad_id) input_tensor = tf.convert_to_tensor([input_padded]) logits = model(input_tensor, training=False) next_logits = logits[0, len(generated) - 1].numpy() # 반복 억제 for t in set(generated): count = generated.count(t) next_logits[t] /= (repetition_penalty ** count) # 조기 종료 방지 if len(generated) < min_len: next_logits[end_id] -= 5.0 next_logits[pad_id] -= 10.0 # 온도 적용 next_logits = next_logits / temperature # Typical Sampling 적용 final_idx, final_probs = typical_filtering(next_logits, typical_p=typical_p) sampled = np.random.choice(final_idx, p=final_probs) generated.append(int(sampled)) decoded = sp.decode(generated) for t in ["", "", ""]: decoded = decoded.replace(t, "") decoded = decoded.strip() if len(generated) >= min_len and (sampled == end_id or decoded.endswith(('.', '!', '?'))): yield decoded break def is_valid_response(response): if len(response.strip()) < 2: return False if re.search(r'[ㄱ-ㅎㅏ-ㅣ]{3,}', response): return False if len(response.split()) < 2: return False if response.count(' ') < 2: return False if any(tok in response.lower() for tok in ['hello', 'this', 'ㅋㅋ']): return False return True def extract_main_query(text): sentences = re.split(r'[.?!]\s*', text) sentences = [s.strip() for s in sentences if s.strip()] if not sentences: return text last = sentences[-1] last = re.sub(r'[^가-힣a-zA-Z0-9 ]', '', last) particles = ['이', '가', '은', '는', '을', '를', '의', '에서', '에게', '한테', '보다'] for p in particles: last = re.sub(rf'\b(\w+){p}\b', r'\1', last) return last.strip() def get_wikipedia_summary(query): cleaned_query = extract_main_query(query) url = f"https://ko.wikipedia.org/api/rest_v1/page/summary/{cleaned_query}" res = requests.get(url) if res.status_code == 200: return res.json().get("extract", "요약 정보를 찾을 수 없습니다.") else: return "위키백과에서 정보를 가져올 수 없습니다." def simple_intent_classifier(text): text = text.lower() greet_keywords = ["안녕", "반가워", "이름", "누구", "소개", "어디서 왔", "정체", "몇 살", "너 뭐야"] info_keywords = ["설명", "정보", "무엇", "뭐야", "어디", "누구", "왜", "어떻게", "종류", "개념"] math_keywords = ["더하기", "빼기", "곱하기", "나누기", "루트", "제곱", "+", "-", "*", "/", "=", "^", "√", "계산", "몇이야", "얼마야"] if any(kw in text for kw in greet_keywords): return "인사" elif any(kw in text for kw in info_keywords): return "정보질문" elif any(kw in text for kw in math_keywords): return "수학질문" else: return "일상대화" def parse_math_question(text): text = text.replace("곱하기", "*").replace("더하기", "+").replace("빼기", "-").replace("나누기", "/").replace("제곱", "*2") text = re.sub(r'루트\s(\d+)', r'math.sqrt(\1)', text) try: result = eval(text) return f"정답은 {result}입니다." except: return "계산할 수 없는 수식이에요. 다시 한번 확인해 주세요!" # 전체 응답 함수 def respond(input_text): intent = simple_intent_classifier(input_text) if "이름" in input_text: return "제 이름은 InteractGPT입니다." if "누구" in input_text: return "저는 InteractGPT이라고 해요." if intent == "수학질문": return parse_math_question(input_text) if intent == "인사": return "반가워요! 무엇을 도와드릴까요?" if intent == "정보질문": keyword = re.sub(r"(에 대해|에 대한|에 대해서)?\s*(설명해줘|알려줘|뭐야|개념|정의|정보)?", "", input_text).strip() if not keyword: return "어떤 주제에 대해 궁금한가요?" summary = get_wikipedia_summary(keyword) return f"{summary}\n다른 궁금한 점 있으신가요?" return generate_text_typical(input_text) async def async_generator_wrapper(prompt: str): gen = generate_text_typical(model, prompt) for text_piece in gen: yield text_piece await asyncio.sleep(0.1) @app.get("/generate") async def generate(request: Request): prompt = request.query_params.get("prompt", "안녕하세요") response_text = respond(prompt) async def stream_response(): for chunk in split_response(response_text): yield chunk await asyncio.sleep(0.1) return StreamingResponse(stream_response(), media_type="text/plain")