import os, asyncio, time, logging, streamlit as st from google.adk.agents import Agent from google.adk.sessions import InMemorySessionService from google.adk.runners import Runner from google.adk.tools import google_search from google.genai import types from dotenv import load_dotenv load_dotenv(); logging.basicConfig(level=logging.ERROR) MODEL,APP_NAME,USER_ID = "gemini-2.0-flash-exp", "search_assistant_app", "streamlit_user_search" agent = Agent(name="search_assistant", model=MODEL, instruction="You are a helpful assistant Answer user questions using Google Search when needed.", description="An assistant that can search the web.", tools=[google_search]) @st.cache_resource def init_adk(): ss = InMemorySessionService(); r = Runner(agent=agent, app_name=APP_NAME, session_service=ss) sid = st.session_state.get("adk_session_id") or f"session_{int(time.time())}_{os.urandom(4).hex()}"; st.session_state["adk_session_id"] = sid try: ss.create_session(app_name=APP_NAME, user_id=USER_ID, session_id=sid, state={}) except: ss.create_session(app_name=APP_NAME, user_id=USER_ID, session_id=sid, state={}) return r, sid async def run_adk(r, sid, q): try: async for e in r.run_async(user_id=USER_ID, session_id=sid, new_message=types.Content(role="user", parts=[types.Part(text=q)])): if e.is_final_response() and e.content and e.content.parts: return e.content.parts[0].text except Exception as err: return f"Error: {err}" return "[No response]" def main(): st.set_page_config(page_title="Search", page_icon="🔍", layout="wide"); st.markdown( f'