Spaces:
Runtime error
Runtime error
from fastapi import FastAPI, HTTPException, Header, Depends, Request, Response, Query | |
from fastapi.responses import JSONResponse | |
from fastapi.security import HTTPBasic, HTTPBasicCredentials | |
from fastapi.exceptions import RequestValidationError | |
import asyncio | |
from typing import Optional, List | |
from pydantic import BaseModel, ValidationError | |
import pandas as pd | |
import numpy as np | |
import os | |
from filesplit.merge import Merge | |
import tensorflow as tf | |
import string | |
import re | |
import json | |
import csv | |
import tiktoken | |
from sklearn.preprocessing import LabelEncoder | |
from tensorflow import keras | |
from keras_nlp.layers import TransformerEncoder | |
from tensorflow.keras import layers | |
from tensorflow.keras.preprocessing.sequence import pad_sequences | |
from tensorflow.keras.utils import plot_model | |
api = FastAPI() | |
dataPath = "data" | |
imagePath = "images" | |
# ===== Keras ==== | |
strip_chars = string.punctuation + "¿" | |
strip_chars = strip_chars.replace("[", "") | |
strip_chars = strip_chars.replace("]", "") | |
def custom_standardization(input_string): | |
lowercase = tf.strings.lower(input_string) | |
lowercase=tf.strings.regex_replace(lowercase, "[à]", "a") | |
return tf.strings.regex_replace( | |
lowercase, f"[{re.escape(strip_chars)}]", "") | |
def load_vocab(file_path): | |
with open(file_path, "r", encoding="utf-8") as file: | |
return file.read().split('\n')[:-1] | |
def decode_sequence_rnn(input_sentence, src, tgt): | |
global translation_model | |
vocab_size = 15000 | |
sequence_length = 50 | |
source_vectorization = layers.TextVectorization( | |
max_tokens=vocab_size, | |
output_mode="int", | |
output_sequence_length=sequence_length, | |
standardize=custom_standardization, | |
vocabulary = load_vocab(dataPath+"/vocab_"+src+".txt"), | |
) | |
target_vectorization = layers.TextVectorization( | |
max_tokens=vocab_size, | |
output_mode="int", | |
output_sequence_length=sequence_length + 1, | |
standardize=custom_standardization, | |
vocabulary = load_vocab(dataPath+"/vocab_"+tgt+".txt"), | |
) | |
tgt_vocab = target_vectorization.get_vocabulary() | |
tgt_index_lookup = dict(zip(range(len(tgt_vocab)), tgt_vocab)) | |
max_decoded_sentence_length = 50 | |
tokenized_input_sentence = source_vectorization([input_sentence]) | |
decoded_sentence = "[start]" | |
for i in range(max_decoded_sentence_length): | |
tokenized_target_sentence = target_vectorization([decoded_sentence]) | |
next_token_predictions = translation_model.predict( | |
[tokenized_input_sentence, tokenized_target_sentence], verbose=0) | |
sampled_token_index = np.argmax(next_token_predictions[0, i, :]) | |
sampled_token = tgt_index_lookup[sampled_token_index] | |
decoded_sentence += " " + sampled_token | |
if sampled_token == "[end]": | |
break | |
return decoded_sentence[8:-6] | |
# ===== Enf of Keras ==== | |
# ===== Transformer section ==== | |
class TransformerDecoder(layers.Layer): | |
def __init__(self, embed_dim, dense_dim, num_heads, **kwargs): | |
super().__init__(**kwargs) | |
self.embed_dim = embed_dim | |
self.dense_dim = dense_dim | |
self.num_heads = num_heads | |
self.attention_1 = layers.MultiHeadAttention( | |
num_heads=num_heads, key_dim=embed_dim) | |
self.attention_2 = layers.MultiHeadAttention( | |
num_heads=num_heads, key_dim=embed_dim) | |
self.dense_proj = keras.Sequential( | |
[layers.Dense(dense_dim, activation="relu"), | |
layers.Dense(embed_dim),] | |
) | |
self.layernorm_1 = layers.LayerNormalization() | |
self.layernorm_2 = layers.LayerNormalization() | |
self.layernorm_3 = layers.LayerNormalization() | |
self.supports_masking = True | |
def get_config(self): | |
config = super().get_config() | |
config.update({ | |
"embed_dim": self.embed_dim, | |
"num_heads": self.num_heads, | |
"dense_dim": self.dense_dim, | |
}) | |
return config | |
def get_causal_attention_mask(self, inputs): | |
input_shape = tf.shape(inputs) | |
batch_size, sequence_length = input_shape[0], input_shape[1] | |
i = tf.range(sequence_length)[:, tf.newaxis] | |
j = tf.range(sequence_length) | |
mask = tf.cast(i >= j, dtype="int32") | |
mask = tf.reshape(mask, (1, input_shape[1], input_shape[1])) | |
mult = tf.concat( | |
[tf.expand_dims(batch_size, -1), | |
tf.constant([1, 1], dtype=tf.int32)], axis=0) | |
return tf.tile(mask, mult) | |
def call(self, inputs, encoder_outputs, mask=None): | |
causal_mask = self.get_causal_attention_mask(inputs) | |
if mask is not None: | |
padding_mask = tf.cast( | |
mask[:, tf.newaxis, :], dtype="int32") | |
padding_mask = tf.minimum(padding_mask, causal_mask) | |
else: | |
padding_mask = mask | |
attention_output_1 = self.attention_1( | |
query=inputs, | |
value=inputs, | |
key=inputs, | |
attention_mask=causal_mask) | |
attention_output_1 = self.layernorm_1(inputs + attention_output_1) | |
attention_output_2 = self.attention_2( | |
query=attention_output_1, | |
value=encoder_outputs, | |
key=encoder_outputs, | |
attention_mask=padding_mask, | |
) | |
attention_output_2 = self.layernorm_2( | |
attention_output_1 + attention_output_2) | |
proj_output = self.dense_proj(attention_output_2) | |
return self.layernorm_3(attention_output_2 + proj_output) | |
class PositionalEmbedding(layers.Layer): | |
def __init__(self, sequence_length, input_dim, output_dim, **kwargs): | |
super().__init__(**kwargs) | |
self.token_embeddings = layers.Embedding( | |
input_dim=input_dim, output_dim=output_dim) | |
self.position_embeddings = layers.Embedding( | |
input_dim=sequence_length, output_dim=output_dim) | |
self.sequence_length = sequence_length | |
self.input_dim = input_dim | |
self.output_dim = output_dim | |
def call(self, inputs): | |
length = tf.shape(inputs)[-1] | |
positions = tf.range(start=0, limit=length, delta=1) | |
embedded_tokens = self.token_embeddings(inputs) | |
embedded_positions = self.position_embeddings(positions) | |
return embedded_tokens + embedded_positions | |
def compute_mask(self, inputs, mask=None): | |
return tf.math.not_equal(inputs, 0) | |
def get_config(self): | |
config = super(PositionalEmbedding, self).get_config() | |
config.update({ | |
"output_dim": self.output_dim, | |
"sequence_length": self.sequence_length, | |
"input_dim": self.input_dim, | |
}) | |
return config | |
def decode_sequence_transf(input_sentence, src, tgt): | |
global translation_model | |
vocab_size = 15000 | |
sequence_length = 30 | |
source_vectorization = layers.TextVectorization( | |
max_tokens=vocab_size, | |
output_mode="int", | |
output_sequence_length=sequence_length, | |
standardize=custom_standardization, | |
vocabulary = load_vocab(dataPath+"/vocab_"+src+".txt"), | |
) | |
target_vectorization = layers.TextVectorization( | |
max_tokens=vocab_size, | |
output_mode="int", | |
output_sequence_length=sequence_length + 1, | |
standardize=custom_standardization, | |
vocabulary = load_vocab(dataPath+"/vocab_"+tgt+".txt"), | |
) | |
tgt_vocab = target_vectorization.get_vocabulary() | |
tgt_index_lookup = dict(zip(range(len(tgt_vocab)), tgt_vocab)) | |
max_decoded_sentence_length = 50 | |
tokenized_input_sentence = source_vectorization([input_sentence]) | |
decoded_sentence = "[start]" | |
for i in range(max_decoded_sentence_length): | |
tokenized_target_sentence = target_vectorization( | |
[decoded_sentence])[:, :-1] | |
predictions = translation_model( | |
[tokenized_input_sentence, tokenized_target_sentence]) | |
sampled_token_index = np.argmax(predictions[0, i, :]) | |
sampled_token = tgt_index_lookup[sampled_token_index] | |
decoded_sentence += " " + sampled_token | |
if sampled_token == "[end]": | |
break | |
return decoded_sentence[8:-6] | |
# ==== End Transforformer section ==== | |
def load_rnn(): | |
merge = Merge( dataPath+"/rnn_en-fr_split", dataPath, "seq2seq_rnn-model-en-fr.h5").merge(cleanup=False) | |
merge = Merge( dataPath+"/rnn_fr-en_split", dataPath, "seq2seq_rnn-model-fr-en.h5").merge(cleanup=False) | |
rnn_en_fr = keras.models.load_model(dataPath+"/seq2seq_rnn-model-en-fr.h5") # , compile=False) | |
rnn_fr_en = keras.models.load_model(dataPath+"/seq2seq_rnn-model-fr-en.h5") # , compile=False) | |
rnn_en_fr.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"]) | |
rnn_fr_en.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"]) | |
return rnn_en_fr, rnn_fr_en | |
def load_transformer(): | |
custom_objects = {"TransformerDecoder": TransformerDecoder, "PositionalEmbedding": PositionalEmbedding} | |
with keras.saving.custom_object_scope(custom_objects): | |
transformer_en_fr = keras.models.load_model( "data/transformer-model-en-fr.h5") | |
transformer_fr_en = keras.models.load_model( "data/transformer-model-fr-en.h5") | |
merge = Merge( "data/transf_en-fr_weight_split", "data", "transformer-model-en-fr.weights.h5").merge(cleanup=False) | |
merge = Merge( "data/transf_fr-en_weight_split", "data", "transformer-model-fr-en.weights.h5").merge(cleanup=False) | |
transformer_en_fr.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"]) | |
transformer_fr_en.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"]) | |
return transformer_en_fr, transformer_fr_en | |
rnn_en_fr, rnn_fr_en = load_rnn() | |
transformer_en_fr, transformer_fr_en = load_transformer() | |
# ==== Language identifier ==== | |
def encode_text(textes): | |
global tokenizer | |
max_length=250 | |
sequences = tokenizer.encode_batch(textes) | |
return pad_sequences(sequences, maxlen=max_length, padding='post') | |
def read_list_lan(): | |
with open(dataPath+'/multilingue/lan_code.csv', 'r') as fichier_csv: | |
reader = csv.reader(fichier_csv) | |
lan_code = next(reader) | |
return lan_code | |
def init_dl_identifier(): | |
global tokenizer, dl_model, label_encoder, lan_to_language, lan_identified | |
tokenizer = tiktoken.get_encoding("cl100k_base") | |
# Lisez le contenu du fichier JSON | |
with open(dataPath+'/multilingue/lan_to_language.json', 'r') as fichier: | |
lan_to_language = json.load(fichier) | |
label_encoder = LabelEncoder() | |
list_lan = read_list_lan() | |
lan_identified = [lan_to_language[l] for l in list_lan] | |
label_encoder.fit(list_lan) | |
merge = Merge(dataPath+"/dl_id_lang_split", dataPath, "dl_tiktoken_id_language_model.h5").merge(cleanup=False) | |
dl_model = keras.models.load_model(dataPath+"/dl_tiktoken_id_language_model.h5") #, compile=False) | |
return | |
def lang_id_dl(sentences): | |
global dl_model, label_encoder, lan_to_language | |
if 'dl_model' not in globals(): | |
init_dl_identifier() | |
predictions = dl_model.predict(encode_text(sentences)) | |
# Décodage des prédictions en langues | |
predicted_labels_encoded = np.argmax(predictions, axis=1) | |
predicted_languages = label_encoder.classes_[predicted_labels_encoded] | |
if (len(sentences)==1): return lan_to_language[predicted_languages[0]] | |
else: return [l for l in predicted_languages] | |
# ==== Endpoints ==== | |
def check_api(): | |
load_rnn() | |
load_transformer() | |
init_dl_identifier() | |
return {'message': "L'API fonctionne"} | |
async def trad_rnn(lang_tgt:str, | |
texte: str): | |
global translation_model | |
if 'translation_model' not in globals(): | |
load_rnn() | |
load_transformer() | |
if (lang_tgt=='en'): | |
translation_model = rnn_fr_en | |
return decode_sequence_rnn(texte, "fr", "en") | |
else: | |
translation_model = rnn_en_fr | |
return decode_sequence_rnn(texte, "en", "fr") | |
async def trad_transformer(lang_tgt:str, | |
texte: str): | |
global translation_model | |
if 'translation_model' not in globals(): | |
load_rnn() | |
load_transformer() | |
if (lang_tgt=='en'): | |
translation_model = transformer_fr_en | |
return decode_sequence_transf(texte, "fr", "en") | |
else: | |
translation_model = transformer_en_fr | |
return decode_sequence_transf(texte, "en", "fr") | |
def affiche_modele(model_type: str, | |
lang_tgt:Optional[str]=None): | |
global translation_model, dl_model | |
if model_type=="lang_id": | |
model_to_display = dl_model | |
elif (model_type=="rnn"): | |
if (lang_tgt=='en'): | |
model_to_display = rnn_fr_en | |
else: | |
model_to_display = rnn_en_fr | |
else: | |
if (lang_tgt=='en'): | |
model_to_display = transformer_fr_en | |
else: | |
model_to_display = transformer_en_fr | |
plot_model(model_to_display, show_shapes=True, show_layer_names=True, show_layer_activations=True,rankdir='TB',to_file=imagePath+'/model_plot.png') | |
with open(imagePath+'/model_plot.png', "rb") as image_file: | |
# Lire les données de l'image | |
image_data = image_file.read() | |
# Retourner l'image en tant que réponse HTTP avec le type de contenu approprié | |
return Response(content=image_data, media_type="image/png") | |
async def language_id_dl(sentence:List[str] = Query(..., min_length=1)): | |
return lang_id_dl(sentence) | |
def languages_identified(): | |
global lan_identified | |
if 'lan_identified' not in globals(): | |
init_dl_identifier() | |
return lan_identified | |