File size: 6,834 Bytes
15c06cc
0c5a382
 
919ef54
e3dc1e5
3c68fa9
 
919ef54
 
15c06cc
3c68fa9
 
edee20e
3c68fa9
15c06cc
3c68fa9
0c5a382
 
 
15c06cc
 
919ef54
 
 
3c68fa9
 
 
919ef54
1149742
0c5a382
1149742
 
8321f99
1149742
 
919ef54
0c5a382
919ef54
1149742
0c5a382
 
 
1149742
0c5a382
1149742
 
8321f99
1149742
 
919ef54
0c5a382
919ef54
1149742
0c5a382
 
 
3c68fa9
 
0c5a382
 
 
 
 
1149742
 
0c5a382
3c68fa9
 
 
 
 
 
 
 
 
 
 
 
 
919ef54
3c68fa9
1149742
3c68fa9
 
 
1149742
3c68fa9
919ef54
3c68fa9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1149742
919ef54
1149742
 
 
 
 
 
 
 
 
0c5a382
1149742
0c5a382
3c68fa9
0c5a382
 
8321f99
 
0c5a382
1149742
0c5a382
 
 
8321f99
 
0c5a382
8321f99
0c5a382
 
1149742
 
8321f99
0c5a382
1149742
 
3c68fa9
 
0c5a382
e3dc1e5
 
 
 
 
 
 
 
 
 
 
 
 
0c5a382
 
e3dc1e5
0c5a382
 
 
3c68fa9
e3dc1e5
 
 
 
 
0c5a382
 
1149742
0c5a382
3c68fa9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
import gradio as gr
import tempfile
import imageio
import torch
import time
import os
import openai
from transformers import pipeline
from diffusers import DiffusionPipeline

# ---------- Secret Key Setup ----------
openai.api_key = os.getenv("OPENAI_KEY")  # βœ… Hugging Face Space Secret

# ---------- Config ----------
AVAILABLE_MODELS = {
    "Codette Fine-Tuned (v9)": "ft:gpt-4.1-2025-04-14:raiffs-bits:codette-final:BO907H7Z",
    "GPT-2 (small, fast)": "gpt2",
    "Falcon (TII UAE)": "tiiuae/falcon-7b-instruct",
    "Mistral (OpenAccess)": "mistralai/Mistral-7B-v0.1"
}

device = "cuda" if torch.cuda.is_available() else "cpu"
text_model_cache = {}
chat_memory = {}
MAX_PROMPTS_PER_SESSION = 5
THROTTLE_SECONDS = 30
last_usage_time = {}

# ---------- Load Image Generator ----------
try:
    image_generator = DiffusionPipeline.from_pretrained(
        "runwayml/stable-diffusion-v1-5",
        safety_checker=None,
        torch_dtype=torch.float16 if device == "cuda" else torch.float32
    )
    image_generator.to(device)
    image_enabled = True
except Exception as e:
    print(f"[Image Model Load Error]: {e}")
    image_generator = None
    image_enabled = False

# ---------- Load Video Generator ----------
try:
    video_pipeline = DiffusionPipeline.from_pretrained(
        "damo-vilab/text-to-video-ms-1.7b",
        safety_checker=None,
        torch_dtype=torch.float16 if device == "cuda" else torch.float32
    )
    video_pipeline.to(device)
    video_enabled = True
except Exception as e:
    print(f"[Video Model Load Error]: {e}")
    video_pipeline = None
    video_enabled = False

# ---------- Rate-Limited Terminal ----------
def codette_terminal_limited(prompt, model_name, generate_image, generate_video, session_id, batch_size, video_steps, fps):
    if session_id not in chat_memory:
        chat_memory[session_id] = []

    if prompt.lower() in ["exit", "quit"]:
        chat_memory[session_id] = []
        yield "🧠 Codette signing off... Session reset.", None, None
        return

    if model_name == "Codette Fine-Tuned (v9)":
        count = sum(1 for line in chat_memory[session_id] if line.startswith("πŸ–‹οΈ You >"))
        if count >= MAX_PROMPTS_PER_SESSION:
            yield "[πŸ›‘ Limit] Max 5 prompts per session.", None, None
            return
        now = time.time()
        if now - last_usage_time.get(session_id, 0) < THROTTLE_SECONDS:
            wait = int(THROTTLE_SECONDS - (now - last_usage_time[session_id]))
            yield f"[⏳ Wait] Try again in {wait} sec.", None, None
            return
        last_usage_time[session_id] = now

    if model_name == "Codette Fine-Tuned (v9)":
        try:
            response = openai.ChatCompletion.create(
                model=AVAILABLE_MODELS[model_name],
                messages=[{"role": "user", "content": prompt}],
                temperature=0.7,
                max_tokens=256
            )
            output = response.choices[0].message.content.strip()
        except Exception as e:
            yield f"[OpenAI error]: {e}", None, None
            return
    else:
        if model_name not in text_model_cache:
            try:
                text_model_cache[model_name] = pipeline(
                    "text-generation",
                    model=AVAILABLE_MODELS[model_name],
                    device=0 if device == "cuda" else -1
                )
            except Exception as e:
                yield f"[Text model error]: {e}", None, None
                return
        try:
            output = text_model_cache[model_name](prompt, max_length=100, do_sample=True, num_return_sequences=1)[0]['generated_text'].strip()
        except Exception as e:
            yield f"[Generation error]: {e}", None, None
            return

    response_so_far = ""
    for char in output:
        response_so_far += char
        temp_log = chat_memory[session_id][:]
        temp_log.append(f"πŸ–‹οΈ You > {prompt}")
        temp_log.append(f"🧠 Codette > {response_so_far}")
        yield "\n".join(temp_log[-10:]), None, None
        time.sleep(0.01)

    chat_memory[session_id].append(f"πŸ–‹οΈ You > {prompt}")
    chat_memory[session_id].append(f"🧠 Codette > {output}")

    imgs, vid = None, None
    if generate_image and image_enabled:
        try:
            result = image_generator(prompt, num_images_per_prompt=batch_size)
            imgs = result.images
        except Exception as e:
            response_so_far += f"\n[Image error]: {e}"

    if generate_video and video_enabled:
        try:
            result = video_pipeline(prompt, num_inference_steps=video_steps)
            frames = result.frames
            temp_video_path = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False).name
            imageio.mimsave(temp_video_path, frames, fps=fps)
            vid = temp_video_path
        except Exception as e:
            response_so_far += f"\n[Video error]: {e}"

    yield "\n".join(chat_memory[session_id][-10:]), imgs, vid

# ---------- Gradio UI ----------
with gr.Blocks(title="🧬 Codette Terminal – Streamed AI Chat") as demo:
    gr.Markdown("## 🧬 Codette Terminal (Chat + Image + Video + Rate-Limited Access)")
    gr.Markdown("Type a prompt, select a model, and generate responses, images, or video. Type `'exit'` to reset.")

    with gr.Row():
        session_id = gr.Textbox(value="session_default", visible=False)
        model_dropdown = gr.Dropdown(choices=list(AVAILABLE_MODELS.keys()), value="GPT-2 (small, fast)", label="Language Model")

    with gr.Row():
        generate_image_toggle = gr.Checkbox(label="Generate Image(s)?", value=False, interactive=image_enabled)
        generate_video_toggle = gr.Checkbox(label="Generate Video?", value=False, interactive=video_enabled)

    with gr.Row():
        batch_size_slider = gr.Slider(label="Number of Images", minimum=1, maximum=4, step=1, value=1)
        video_steps_slider = gr.Slider(label="Video Inference Steps", minimum=10, maximum=100, step=10, value=50)
        fps_slider = gr.Slider(label="Video FPS", minimum=4, maximum=24, step=2, value=8)

    user_input = gr.Textbox(label="Your Prompt", placeholder="e.g. A robot dreaming on Mars", lines=1)
    output_text = gr.Textbox(label="Codette Output", lines=15, interactive=False)
    output_image = gr.Gallery(label="Generated Image(s)", columns=2)
    output_video = gr.Video(label="Generated Video")

    user_input.submit(
        codette_terminal_limited,
        inputs=[
            user_input, model_dropdown, generate_image_toggle, generate_video_toggle,
            session_id, batch_size_slider, video_steps_slider, fps_slider
        ],
        outputs=[output_text, output_image, output_video]
    )

# ---------- Launch ----------
if __name__ == "__main__":
    demo.launch(mcp_server=True)  # βœ… tool-ready