File size: 3,410 Bytes
28d024b
 
 
cb66cb4
 
 
 
 
 
 
 
 
 
f951332
 
cb66cb4
 
28d024b
 
 
 
 
 
f951332
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cb66cb4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f951332
 
 
 
cb66cb4
 
f951332
 
 
 
 
 
 
 
 
 
28d024b
 
 
 
 
cb66cb4
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import cohere
import asyncio

# Global client variable for lazy initialization
_client = None

def get_client(api_key):
    """Get or create Cohere client instance"""
    global _client
    if _client is None:
        _client = cohere.ClientV2(api_key)
    return _client

def send_message_stream(system_message, user_message, conversation_history, api_key, model_name="command-a-03-2025"):
    """Stream response from Cohere API"""
    # Get or create the Cohere client
    co = get_client(api_key)
    
    # Prepare all messages including history
    messages = [{"role": "system", "content": system_message}]
    messages.extend(conversation_history)
    messages.append({"role": "user", "content": user_message})
    
    # Send streaming request to Cohere
    stream = co.chat_stream(
        model=model_name,
        messages=messages
    )
    
    # Collect full response for history
    full_response = ""
    
    # Yield chunks as they come
    for chunk in stream:
        if chunk.type == "content-delta":
            text_chunk = chunk.delta.message.content.text
            full_response += text_chunk
            yield text_chunk

async def send_message_stream_async(system_message, user_message, conversation_history, api_key, model_name="command-a-03-2025"):
    """Async wrapper for streaming response from Cohere API"""
    def _sync_stream():
        return send_message_stream(system_message, user_message, conversation_history, api_key, model_name)
    
    # Run the synchronous generator in a thread
    loop = asyncio.get_event_loop()
    
    # Use a queue to handle the streaming data
    queue = asyncio.Queue()
    
    def _stream_worker():
        try:
            for chunk in _sync_stream():
                loop.call_soon_threadsafe(queue.put_nowait, chunk)
        except Exception as e:
            loop.call_soon_threadsafe(queue.put_nowait, StopIteration(e))
        else:
            loop.call_soon_threadsafe(queue.put_nowait, StopIteration())
    
    # Start the worker thread
    import threading
    thread = threading.Thread(target=_stream_worker)
    thread.start()
    
    # Yield chunks asynchronously
    while True:
        chunk = await queue.get()
        if isinstance(chunk, StopIteration):
            if chunk.args:
                raise chunk.args[0]
            break
        yield chunk
    

def send_message(system_message, user_message, conversation_history, api_key, model_name="command-a-03-2025"):
    """Non-streaming version for backward compatibility"""
    # Get or create the Cohere client
    co = get_client(api_key)
    
    # Prepare all messages including history
    messages = [{"role": "system", "content": system_message}]
    messages.extend(conversation_history)
    messages.append({"role": "user", "content": user_message})
    
    # Send request to Cohere synchronously
    response = co.chat(
        model=model_name,
        messages=messages
    )
    
    # Get the response
    response_content = response.message.content[0].text
    
    return response_content

async def send_message_async(system_message, user_message, conversation_history, api_key, model_name="command-a-03-2025"):
    """Async version using asyncio.to_thread"""
    return await asyncio.to_thread(
        send_message, 
        system_message, 
        user_message, 
        conversation_history, 
        api_key, 
        model_name
    )