|
import gradio as gr |
|
import asyncio |
|
import threading |
|
import os |
|
from dotenv import load_dotenv |
|
from google import genai |
|
from google.genai.types import Part, FileData, Tool, GenerateContentConfig, GoogleSearch, Content |
|
|
|
|
|
import discord |
|
from discord import app_commands |
|
from discord.ext import commands |
|
|
|
load_dotenv() |
|
|
|
|
|
GOOGLE_KEY = os.getenv("GOOGLE_KEY") |
|
DISCORD_TOKEN = os.getenv("SEINFELD_TOKEN") |
|
SEINFELD_CHANNEL_ID = os.getenv("SEINFELD_CHANNEL_ID") |
|
ADDITIONAL_CHANNELS = os.getenv("SEINFELD_ADDITIONAL_CHANNELS", "") |
|
|
|
|
|
TARGET_CHANNEL_IDS = [] |
|
if SEINFELD_CHANNEL_ID: |
|
TARGET_CHANNEL_IDS.append(int(SEINFELD_CHANNEL_ID)) |
|
if ADDITIONAL_CHANNELS: |
|
ADDITIONAL_IDS = [int(channel_id.strip()) for channel_id in ADDITIONAL_CHANNELS.split(",") if channel_id.strip()] |
|
TARGET_CHANNEL_IDS.extend(ADDITIONAL_IDS) |
|
|
|
|
|
MODEL_DEFINITIONS = { |
|
"flash": "gemini-2.0-flash", |
|
"pro": "gemini-2.5-pro-preview-05-06", |
|
"image": "imagen-3.0-generate-002" |
|
} |
|
|
|
|
|
chat_model_id = MODEL_DEFINITIONS["flash"] |
|
image_model_id = MODEL_DEFINITIONS["image"] |
|
|
|
|
|
google_client = None |
|
if GOOGLE_KEY: |
|
google_client = genai.Client(api_key=GOOGLE_KEY) |
|
|
|
|
|
SEINFELD_SYSTEM_INSTRUCTION = """You are now acting as Jerry Seinfeld. You are not an impersonator; you *are* Jerry Seinfeld. |
|
You are giving your observational humor stand-up routine. Your focus is on the absurdity and minutiae of everyday life. |
|
Topics include, but are not limited to: relationships, food, technology, social conventions, and the general frustrations |
|
of living in a modern world. |
|
|
|
Your humor is characterized by: |
|
|
|
* **Observation:** Pointing out things that everyone notices but rarely comments on. |
|
* **Relatability:** Situations and experiences that are common and easily understood. |
|
* **Sarcasm & Irony:** A dry, understated delivery that highlights the ridiculousness of things. |
|
* **"What's the deal with..."**: Use this phrase frequently to introduce a new observation. |
|
* **No Grand Conclusions:** You don't offer solutions or morals; you simply highlight the absurdity. |
|
* **Emphasis on the Specific:** Focus on very specific, sometimes trivial details. |
|
|
|
Avoid: |
|
|
|
* **Political Commentary:** Stay away from overtly political topics. |
|
* **Offensive or Mean-Spirited Jokes:** Your humor is observational, not mean-spirited. |
|
* **Explanations of Your Own Humor:** Don't break the fourth wall or analyze your own jokes. |
|
|
|
When responding to a prompt, always answer as if you are performing standup. Start with a joke, then elaborate on it.""" |
|
|
|
def respond_with_gemini(message, history): |
|
"""Generate response using Google Gemini API with Seinfeld personality""" |
|
if not google_client: |
|
return "I need a Google API key to work! Set the GOOGLE_KEY environment variable." |
|
|
|
try: |
|
|
|
formatted_history = [] |
|
for user_msg, assistant_msg in history: |
|
if user_msg: |
|
formatted_history.append(Content(role="user", parts=[Part(text=user_msg)])) |
|
if assistant_msg: |
|
formatted_history.append(Content(role="model", parts=[Part(text=assistant_msg)])) |
|
|
|
|
|
google_search_tool = Tool(google_search=GoogleSearch()) |
|
|
|
|
|
chat = google_client.chats.create( |
|
model=chat_model_id, |
|
history=formatted_history, |
|
config=GenerateContentConfig( |
|
system_instruction=SEINFELD_SYSTEM_INSTRUCTION, |
|
tools=[google_search_tool], |
|
response_modalities=["TEXT"] |
|
) |
|
) |
|
|
|
|
|
response = chat.send_message(message) |
|
return response.text |
|
|
|
except Exception as e: |
|
print(f"Error with Gemini API: {e}") |
|
|
|
return f"What's the deal with API errors? I mean, you type something in, the computer thinks about it, and then... nothing! It's like asking your friend a question and they just stare at you. 'Hey, how are you?' *silence* 'Hello?' *more silence* It's the digital equivalent of being ignored at a party!" |
|
|
|
def respond_gradio(message, history: list[tuple[str, str]]): |
|
"""Response function for Gradio interface""" |
|
|
|
response = respond_with_gemini(message, history) |
|
|
|
|
|
partial_response = "" |
|
for char in response: |
|
partial_response += char |
|
yield partial_response |
|
|
|
|
|
discord_bot = None |
|
|
|
async def setup_discord_bot(): |
|
"""Setup and run Discord bot""" |
|
if not DISCORD_TOKEN or not TARGET_CHANNEL_IDS: |
|
print("Discord bot disabled: Missing DISCORD_TOKEN or channel IDs") |
|
return |
|
|
|
global discord_bot |
|
|
|
|
|
intents = discord.Intents.default() |
|
intents.message_content = True |
|
discord_bot = commands.Bot(command_prefix="~", intents=intents) |
|
|
|
@discord_bot.event |
|
async def on_ready(): |
|
print(f"Discord bot logged in as {discord_bot.user}") |
|
try: |
|
synced = await discord_bot.tree.sync() |
|
print(f"Synced {len(synced)} command(s)") |
|
except Exception as e: |
|
print(f"Failed to sync commands: {e}") |
|
|
|
@discord_bot.event |
|
async def on_message(message): |
|
await discord_bot.process_commands(message) |
|
|
|
if message.channel.id in TARGET_CHANNEL_IDS: |
|
if message.author == discord_bot.user: |
|
return |
|
if message.content.startswith('!') or message.content.startswith('~'): |
|
return |
|
if message.content.strip() == "": |
|
return |
|
|
|
|
|
async with message.channel.typing(): |
|
|
|
response = respond_with_gemini(message.content, []) |
|
|
|
|
|
if len(response) > 2000: |
|
|
|
sentences = response.split('. ') |
|
current_msg = "" |
|
|
|
for sentence in sentences: |
|
if len(current_msg + sentence + '. ') > 1900: |
|
if current_msg: |
|
await message.reply(current_msg.strip()) |
|
current_msg = sentence + '. ' |
|
else: |
|
|
|
await message.reply(sentence[:1900] + "...") |
|
current_msg = "" |
|
else: |
|
current_msg += sentence + '. ' |
|
|
|
if current_msg: |
|
await message.channel.send(current_msg.strip()) |
|
else: |
|
await message.reply(response) |
|
|
|
@discord_bot.tree.command(name="model") |
|
@app_commands.describe(new_model_id="New model ID to use for Gemini API or shorthand ('flash', 'pro')") |
|
async def change_model(interaction: discord.Interaction, new_model_id: str): |
|
"""Changes the Gemini chat model being used.""" |
|
if not interaction.user.guild_permissions.administrator: |
|
await interaction.response.send_message("Only administrators can change the model.", ephemeral=True) |
|
return |
|
|
|
global chat_model_id |
|
|
|
|
|
actual_model_id = MODEL_DEFINITIONS.get(new_model_id.lower(), new_model_id) |
|
old_model = chat_model_id |
|
chat_model_id = actual_model_id |
|
|
|
await interaction.response.send_message(f"Chat model changed from `{old_model}` to `{actual_model_id}`", ephemeral=True) |
|
|
|
|
|
await discord_bot.start(DISCORD_TOKEN) |
|
|
|
def run_discord_bot(): |
|
"""Run Discord bot in separate thread""" |
|
try: |
|
asyncio.run(setup_discord_bot()) |
|
except Exception as e: |
|
print(f"Discord bot error: {e}") |
|
|
|
|
|
demo = gr.ChatInterface( |
|
respond_gradio, |
|
title="🥨 Seinfeld Chatbot", |
|
description="Chat with Jerry Seinfeld! What's the deal with chatbots anyway?", |
|
examples=[ |
|
["What's the deal with airplane food?"], |
|
["Why do people say 'after dark' when it's really after light?"], |
|
["What's up with people who take forever to order at restaurants?"], |
|
["Why do we park in driveways and drive on parkways?"], |
|
["Tell me about the soup nazi"], |
|
["What's your take on people who don't return shopping carts?"], |
|
], |
|
cache_examples=True, |
|
) |
|
|
|
if __name__ == "__main__": |
|
|
|
if DISCORD_TOKEN and TARGET_CHANNEL_IDS: |
|
discord_thread = threading.Thread(target=run_discord_bot, daemon=True) |
|
discord_thread.start() |
|
print("Discord bot starting in background...") |
|
else: |
|
print("Discord bot disabled: Missing credentials or channel IDs") |
|
|
|
|
|
demo.launch() |
|
|