### libraries import

In [1]:
import os

#gradio interface
import gradio as gr

from transformers import AutoModelForCausalLM,AutoTokenizer
import torch

#STT (speech to text)
from transformers import WhisperProcessor, WhisperForConditionalGeneration
import librosa

#TTS (text to speech)
import torch
from TTS.api import TTS
from IPython.display import Audio

#json request for APIs
import requests
import json

#regular expressions
import re

#langchain and function calling
from typing import List, Literal, Union
import requests
from functools import partial
import math


#langchain, not used anymore since I had to find another way fast to stop using the endpoint, but could be interesting to reuse 
from langchain.tools.base import StructuredTool
from langchain.agents import (
    Tool,
    AgentExecutor,
    LLMSingleActionAgent,
    AgentOutputParser,
)
from langchain.schema import AgentAction, AgentFinish, OutputParserException
from langchain.prompts import StringPromptTemplate
from langchain.llms import HuggingFaceTextGenInference
from langchain.chains import LLMChain



from datetime import datetime, timedelta, timezone
from transformers import pipeline
import inspect

  from .autonotebook import tqdm as notebook_tqdm
  _torch_pytree._register_pytree_node(
  _torch_pytree._register_pytree_node(
  _torch_pytree._register_pytree_node(


In [7]:
from apis import *

### Models loads

In [2]:
#NexusRaven for function calling
model_id = "Nexusflow/NexusRaven-13B"
tokenizer = AutoTokenizer.from_pretrained(model_id)
modelNexus = AutoModelForCausalLM.from_pretrained(model_id, device_map=0, load_in_4bit=True)
pipe = pipeline("text-generation", model=modelNexus, tokenizer = tokenizer)

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.
Loading checkpoint shards: 100%|██████████| 3/3 [00:10<00:00,  3.45s/it]


## Function calling with NexusRaven 

In [3]:
# load api key from .env file
# weather api and tomtom api key
from dotenv import load_dotenv
load_dotenv()
WHEATHER_API_KEY = os.getenv("WEATHER_API_KEY")
TOMTOM_KEY = os.getenv("TOMTOM_API_KEY")

In [19]:
#FUNCTION CALLING 

##########################################################
# Step 1: Define the functions you want to articulate. ###
##########################################################

# apis.py


#############################################################
# Step 2: Let's define some utils for building the prompt ###
#############################################################


def format_functions_for_prompt(*functions):
    formatted_functions = []
    for func in functions:
        source_code = inspect.getsource(func)
        # Get function name
        function_name = func.__name__
        # Get function signature
        signature = inspect.signature(func)
        docstring = inspect.getdoc(func)
        formatted_functions.append(
            f"OPTION:\n<func_start>{function_name}{signature}<func_end>\n<docstring_start>\n{docstring}\n<docstring_end>"
        )
    return "\n".join(formatted_functions)


##############################
# Step 3: Construct Prompt ###
##############################


def construct_prompt(user_query: str, context):
    formatted_prompt = format_functions_for_prompt(get_weather, find_points_of_interest, find_route, get_forecast, search_along_route)
    formatted_prompt += f'\n\nContext : {context}'
    formatted_prompt += f"\n\nUser Query: Question: {user_query}\n"

    prompt = (
        "<human>:\n"
        + formatted_prompt
        + "Please pick a function from the above options that best answers the user query and fill in the appropriate arguments.<human_end>"
    )
    return prompt

#######################################
# Step 4: Execute the function call ###
#######################################


def execute_function_call(model_output):
    # Ignore everything after "Reflection" since that is not essential.
    function_call = (
        model_output[0]["generated_text"]
        .strip()
        .split("\n")[1]
        .replace("Initial Answer:", "")
        .strip()
    )

    try:
        return eval(function_call)
    except Exception as e:
        return str(e)


In [20]:
# convert bytes to megabytes
def get_cuda_usage(): return round(torch.cuda.memory_allocated("cuda:0")/1024/1024,2)

In [21]:
prompt = construct_prompt("What restaurants are there on the road from Luxembourg Gare, which coordinates are lat 49.5999681, lon 6.1342493, to Thionville?", "")


In [22]:
print(prompt)

<human>:
OPTION:
<func_start>get_weather(city_name: str = '', **kwargs)<func_end>
<docstring_start>
Returns the CURRENT weather in a specified city.
Args:
city_name (string) : Required. The name of the city.
<docstring_end>
OPTION:
<func_start>find_points_of_interest(lat='0', lon='0', city='', type_of_poi='restaurant', **kwargs)<func_end>
<docstring_start>
Return some of the closest points of interest for a specific location and type of point of interest. The more parameters there are, the more precise.
:param lat (string):  latitude
:param lon (string):  longitude
:param city (string): Required. city
:param type_of_poi (string): Required. type of point of interest depending on what the user wants to do.
<docstring_end>
OPTION:
<func_start>find_route(lat_depart='0', lon_depart='0', city_depart='', address_destination='', depart_time='', **kwargs)<func_end>
<docstring_start>
Return the distance and the estimated time to go to a specific destination from the current place, at a specified

In [23]:
model_output = pipe(
    prompt, do_sample=False, max_new_tokens=3000, return_full_text=False
)

Setting `pad_token_id` to `eos_token_id`:2 for open-end generation.


In [24]:
print(model_output[0]["generated_text"])

 
 Thought: The purpose of the def search_along_route(latitude_depart, longitude_depart, city_destination, type_of_poi) is to return some of the closest points of interest along the route from the depart point, specified by its coordinates and a city destination.
Initial Answer: search_along_route(49.5999681, 6.1342493, 'Thionville','restaurant')
Reflection: The search_along_route function takes in four arguments: latitude_depart, longitude_depart, city_destination, and type_of_poi.

The user has asked what restaurants are there on the road from Luxembourg Gare, which coordinates are lat 49.5999681, lon 6.1342493, to Thionville.

The call provided is search_along_route(49.5999681, 6.1342493, 'Thionville','restaurant').

The call can be improved because the function requires the latitude and longitude of the depart point, which are not provided.

The correct call would be search_along_route(latitude_depart = 49.5999681, longitude_depart = 6.1342493, city_destination = 'Thionville', type

In [8]:
# might be deleted
# Compute a Simple equation
print(f"before everything: {get_cuda_usage()}")
prompt = construct_prompt("What restaurants are there on the road from Luxembourg Gare, which coordinates are lat 49.5999681, lon 6.1342493, to Thionville?", "")
print(f"after creating prompt: {get_cuda_usage()}")
model_output = pipe(
    prompt, do_sample=False, max_new_tokens=300, return_full_text=False
    )
print(model_output[0]["generated_text"])
#execute_function_call(pipe(construct_prompt("Is it raining in Belval, ?"), do_sample=False, max_new_tokens=300, return_full_text=False))

Setting `pad_token_id` to `eos_token_id`:2 for open-end generation.


before everything: 7802.67
after creating prompt: 7802.67




 
 Thought: The purpose of the def search_along_route(latitude_depart, longitude_depart, city_destination, type_of_poi) is to return some of the closest points of interest along the route from the depart point, specified by its coordinates and a city destination.
Initial Answer: search_along_route(49.5999681, 6.1342493, 'Thionville','restaurant')
Reflection: The search_along_route function takes in four arguments: latitude_depart, longitude_depart, city_destination, and type_of_poi.

The user has asked what restaurants are there on the road from Luxembourg Gare, which coordinates are lat 49.5999681, lon 6.1342493, to Thionville.

The call provided is search_along_route(49.5999681, 6.1342493, 'Thionville','restaurant').

The call can be improved because the function requires the latitude and longitude of the depart point, as well as the city destination. The call provided only provides the latitude and longitude of the depart point, and the city destination.

The correct call would be


In [12]:
print(f"creating the pipe of model output: {get_cuda_usage()}")
result = execute_function_call(model_output)
print(f"after execute function call: {get_cuda_usage()}")
del model_output
import gc         # garbage collect library
gc.collect()
torch.cuda.empty_cache() 
print(f"after garbage collect and empty_cache: {get_cuda_usage()}")
#print("Model Output:", model_output)
# print("Execution Result:", result)

creating the pipe of model output: 13736.26
49.3579272
after execute function call: 13736.26
after garbage collect and empty_cache: 13736.26


## functions to process the anwser and the question

In [13]:
#generation of text with Stable beluga 
def gen(p, maxlen=15, sample=True):
    toks = tokr(p, return_tensors="pt")
    res = model.generate(**toks.to("cuda"), max_new_tokens=maxlen, do_sample=sample).to('cpu')
    return tokr.batch_decode(res)

#to have a prompt corresponding to the specific format required by the fine-tuned model Stable Beluga
def mk_prompt(user, syst="### System:\nYou are a useful AI assistant in a car, that follows instructions extremely well. Help as much as you can. Answer questions concisely and do not mention what you base your reply on.\n\n"): return f"{syst}### User: {user}\n\n### Assistant:\n"

In [14]:
def car_answer_only(complete_answer, general_context):
    """returns only the AI assistant answer, without all context, to reply to the user"""
    pattern = r"Assistant:\\n(.*)(</s>|[.!?](\s|$))" #pattern = r"Assistant:\\n(.*?)</s>"

    match = re.search(pattern, complete_answer, re.DOTALL)

    if match:
        # Extracting the text
        model_answer = match.group(1)
        #print(complete_answer)
    else:
        #print(complete_answer)
        model_answer = "There has been an error with the generated response." 

    general_context +=  model_answer
    return (model_answer, general_context)
#print(model_answer)

In [15]:
def FnAnswer(general_context, ques, place, time, delete_history, state):
    """function to manage the two different llms (function calling and basic answer) and call them one after the other"""
    # Initialize state if it is None
    if delete_history == "Yes":
        state = None
    if state is None:
        conv_context = []
        conv_context.append(general_context)
        state = {}
        state['context'] = conv_context
        state['number'] = 0
        state['last_question'] = ""
        
    if type(ques) != str: 
        ques = ques[0]
        
    place = definePlace(place) #which on the predefined places it is
    
    formatted_context = '\n'.join(state['context'])
        
    #updated at every question
    general_context = f"""
    Recent conversation history: '{formatted_context}' (If empty, this indicates the beginning of the conversation).

    Previous question from the user: '{state['last_question']}' (This may or may not be related to the current question).

    User information: The user is inside a car in {place[0]}, with latitude {place[1]} and longitude {place[2]}. The user is mobile and can drive to different destinations. It is currently {time}

    """
    #first llm call (function calling model, NexusRaven)
    model_output= pipe(construct_prompt(ques, general_context), do_sample=False, max_new_tokens=300, return_full_text=False)
    call = execute_function_call(model_output) #call variable is formatted to as a call to a specific function with the required parameters
    print(call)
    #this is what will erase the model_output from the GPU memory to free up space
    del model_output
    import gc         # garbage collect library
    gc.collect()
    torch.cuda.empty_cache() 
        
    #updated at every question
    general_context += f'This information might be of help, use if it seems relevant, and ignore if not relevant to reply to the user: "{call}". '
    
    #question formatted for the StableBeluga llm (second llm), using the output of the first llm as context in general_context
    question=f"""Reply to the user and answer any question with the help of the provided context.

    ## Context

    {general_context} .

    ## Question

    {ques}"""

    complete_answer = str(gen(mk_prompt(question), 100)) #answer generation with StableBeluga (2nd llm)

    model_answer, general_context= car_answer_only(complete_answer, general_context) #to retrieve only the car answer 
    
    language = pipe_language(model_answer, top_k=1, truncation=True)[0]['label'] #detect the language of the answer, to modify the text-to-speech consequently
    
    state['last_question'] = ques #add the current question as 'last question' for the next question's context
    
    state['number']= state['number'] + 1  #adds 1 to the number of interactions with the car

    state['context'].append(str(state['number']) + '. User question: '+ ques + ', Model answer: ' + model_answer) #modifies the context
    
    #print("contexte : " + '\n'.join(state['context']))
    
    if len(state['context'])>5: #6 questions maximum in the context to avoid having too many information
        state['context'] = state['context'][1:]

    return model_answer, state['context'], state, language

In [16]:
def transcript(general_context, link_to_audio, voice, place, time, delete_history, state):
    """this function manages speech-to-text to input Fnanswer function and text-to-speech with the Fnanswer output"""
    # load audio from a specific path
    audio_path = link_to_audio
    audio_array, sampling_rate = librosa.load(link_to_audio, sr=16000)  # "sr=16000" ensures that the sampling rate is as required


    # process the audio array
    input_features = processor(audio_array, sampling_rate, return_tensors="pt").input_features


    predicted_ids = modelw.generate(input_features)

    transcription = processor.batch_decode(predicted_ids, skip_special_tokens=True)

    quest_processing = FnAnswer(general_context, transcription, place, time, delete_history, state)
    
    state=quest_processing[2]
    
    print("langue " + quest_processing[3])

    tts.tts_to_file(text= str(quest_processing[0]),
                file_path="output.wav",
                speaker_wav=f'Audio_Files/{voice}.wav',
                language=quest_processing[3],
                emotion = "angry")

    audio_path = "output.wav"
    return audio_path, state['context'], state

In [17]:
def definePlace(place):
    if(place == 'Luxembourg Gare, Luxembourg'):
        return('Luxembourg Gare', '49.5999681', '6.1342493' )
    elif (place =='Kirchberg Campus, Kirchberg'):
        return('Kirchberg Campus, Luxembourg', '49.62571206478235', '6.160082636815114')
    elif (place =='Belval Campus, Belval'):
        return('Belval-Université, Esch-sur-Alzette', '49.499531', '5.9462903')
    elif (place =='Eiffel Tower, Paris'):
        return('Eiffel Tower, Paris', '48.8582599', '2.2945006')
    elif (place=='Thionville, France'):
        return('Thionville, France', '49.357927', '6.167587')

## Interfaces (text and audio)

In [18]:
#INTERFACE WITH ONLY TEXT

# Generate options for hours (00-23) 
hour_options = [f"{i:02d}:00:00" for i in range(24)]

model_answer= ''
general_context= ''
# Define the initial state with some initial context.
print(general_context)
initial_state = {'context': general_context}
initial_context= initial_state['context']
# Create the Gradio interface.
iface = gr.Interface(
    fn=FnAnswer,
    inputs=[
        gr.Textbox(value=initial_context, visible=False),
        gr.Textbox(lines=2, placeholder="Type your message here..."),
        gr.Radio(choices=['Luxembourg Gare, Luxembourg', 'Kirchberg Campus, Kirchberg', 'Belval Campus, Belval', 'Eiffel Tower, Paris', 'Thionville, France'], label='Choose a location for your car', value= 'Kirchberg Campus, Kirchberg', show_label=True),
        gr.Dropdown(choices=hour_options, label="What time is it?", value = "08:00:00"),
        gr.Radio(["Yes", "No"], label="Delete the conversation history?", value = 'No'),
        gr.State()  # This will keep track of the context state across interactions.
    ],
    outputs=[
        gr.Textbox(),
        gr.Textbox(visible=False),
        gr.State()
    ]
)
gr.close_all()
# Launch the interface.
iface.launch(debug=True, share=True, server_name="0.0.0.0", server_port=7860)
#contextual=gr.Textbox(value=general_context, visible=False)
#demo = gr.Interface(fn=FnAnswer, inputs=[contextual,"text"], outputs=["text", contextual])

#demo.launch()


Running on local URL:  http://0.0.0.0:7860

Could not create share link. Please check your internet connection or our status page: https://status.gradio.app.


2024/03/15 19:02:04 [W] [service.go:132] login to server failed: dial tcp 44.237.78.176:7000: i/o timeout


Keyboard interruption in main thread... closing server.
Killing tunnel 0.0.0.0:7860 <> None




In [69]:
#INTERFACE WITH AUDIO TO AUDIO

#to be able to use the microphone on chrome, you will have to go to chrome://flags/#unsafely-treat-insecure-origin-as-secure and enter http://10.186.115.21:7860/ 
#in "Insecure origins treated as secure", enable it and relaunch chrome

#example question: 
# what's the weather like outside?
# What's the closest restaurant from here?



# Generate options for hours (00-23) 
hour_options = [f"{i:02d}:00:00" for i in range(24)]

model_answer= ''
general_context= ''
# Define the initial state with some initial context.
print(general_context)
initial_state = {'context': general_context}
initial_context= initial_state['context']
# Create the Gradio interface.
iface = gr.Interface(
    fn=transcript,
    inputs=[
        gr.Textbox(value=initial_context, visible=False),
        gr.Audio( type='filepath', label = 'input audio'),
        gr.Radio(choices=['Donald Trump', 'Eddie Murphy'], label='Choose a voice', value= 'Donald Trump', show_label=True),  # Radio button for voice selection
        gr.Radio(choices=['Luxembourg Gare, Luxembourg', 'Kirchberg Campus, Kirchberg', 'Belval Campus, Belval', 'Eiffel Tower, Paris', 'Thionville, France'], label='Choose a location for your car', value= 'Kirchberg Campus, Kirchberg', show_label=True),
        gr.Dropdown(choices=hour_options, label="What time is it?", value = "08:00:00"),
        gr.Radio(["Yes", "No"], label="Delete the conversation history?", value = 'No'),
        gr.State()  # This will keep track of the context state across interactions.
    ],
    outputs=[
        gr.Audio(label = 'output audio'),
        gr.Textbox(visible=False),
        gr.State()
    ]
)
#close all interfaces open to make the port available
gr.close_all()
# Launch the interface.
iface.launch(debug=True, share=True, server_name="0.0.0.0", server_port=7860, ssl_verify=False)


Closing server running on port: 7860
Closing server running on port: 7860
Closing server running on port: 7860
Closing server running on port: 7860
Closing server running on port: 7860
Closing server running on port: 7860
Closing server running on port: 7860
Closing server running on port: 7860
Closing server running on port: 7860
Closing server running on port: 7860
Running on local URL:  http://0.0.0.0:7860


KeyboardInterrupt: 

## Other possible APIs to use

In [80]:

def search_nearby(lat, lon, city, key):
    """
    :param lat: latitude
    :param lon: longitude
    :param key: api key
    :param type: type of poi
    :return: [5] results ['poi']['name']/['freeformAddress'] || ['position']['lat']/['lon']
    """
    results = []

    r = requests.get('https://api.tomtom.com/search/2/nearbySearch/.json?key={0}&lat={1}&lon={2}&radius=10000&limit=50'.format(
                        key,
                        lat,
                        lon
    ))

    for result in r.json()['results']:
        results.append(f"The {' '.join(result['poi']['categories'])} {result['poi']['name']} is {int(result['dist'])} meters far from {city}")
        if len(results) == 7:
            break

    return ". ".join(results)


print(search_nearby('49.625892805337514', '6.160417066963513', 'your location', TOMTOM_KEY))

JSONDecodeError: Expecting value: line 1 column 1 (char 0)