import os import torch import pandas as pd from datasets import Dataset from sklearn.model_selection import train_test_split from sklearn.metrics import mean_squared_error, r2_score from transformers import AutoTokenizer, Trainer, TrainingArguments, IntervalStrategy import re import torch.nn as nn import torch.nn.functional as F from transformers import AutoModel, AutoConfig, AutoTokenizer, Trainer, TrainingArguments, IntervalStrategy from nltk.corpus import stopwords import spacy class TinyFinBERTRegressor(nn.Module): def __init__(self, pretrained_model='huawei-noah/TinyBERT_General_4L_312D'): super().__init__() if pretrained_model: self.config = AutoConfig.from_pretrained(pretrained_model) self.bert = AutoModel.from_pretrained(pretrained_model, config=self.config) else: self.config = AutoConfig() self.bert = AutoModel(self.config) self.regressor = nn.Linear(self.config.hidden_size, 1) # Manually register the position_ids buffer to avoid missing key error self.bert.embeddings.register_buffer( "position_ids", torch.arange(512).expand((1, -1)), persistent=False, ) def forward(self, input_ids=None, attention_mask=None, labels=None): outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask) cls_output = outputs.last_hidden_state[:, 0] score = self.regressor(cls_output).squeeze() loss = F.mse_loss(score, labels) if labels is not None else None return {'loss': loss, 'score': score} def preprocess_texts(texts): nlp = spacy.load("en_core_web_sm", disable=["ner", "parser"]) # Speeds up processing negations = {'no', 'not', 'none', 'nobody', 'nothing', 'neither', 'nowhere', 'never', 'hardly', 'scarcely', 'barely', "n't", "without", "unless", "nor"} stop_words = set(stopwords.words('english')) - negations processed = [] for text in texts: text = text.lower() text = re.sub(r'[^a-zA-Z\s]', '', text) doc = nlp(text) tokens = [ token.lemma_ for token in doc if token.lemma_.strip() # token.lemma_ not in stop_words and ] processed.append(' '.join(tokens)) return processed def load_phrasebank(path): with open(path, 'r', encoding='latin1') as f: lines = f.readlines() sents, scores = [], [] for line in lines: if '@' in line: s, l = line.strip().split('@') score = 0.0 if l.lower() == 'neutral' else (-1.0 if l.lower() == 'negative' else 1.0) sents.append(s) scores.append(score) return pd.DataFrame({'text': sents, 'score': scores}) def load_words_phrases(path): with open(path, 'r', encoding='latin1') as f: lines = f.readlines() data = [] for line in lines: line = line.strip() match = re.search(r',(-?\d+\.?\d*)$', line) if match: text = line[:match.start()].strip() score = float(match.group(1)) data.append((text, score)) return pd.DataFrame(data, columns=["text", "score"]) def train_model(phrase_path, words_path, save_path): os.makedirs(save_path, exist_ok=True) # Set device device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"Using device: {device}") phrase_df = load_phrasebank(phrase_path) words_df = load_words_phrases(words_path) phrase_df['text'] = preprocess_texts(phrase_df['text']) words_df['text'] = preprocess_texts(words_df['text']) train_phrase, test_phrase = train_test_split(phrase_df, test_size=0.2, random_state=42) train_df = pd.concat([train_phrase, words_df]) test_df = test_phrase.reset_index(drop=True) tokenizer = AutoTokenizer.from_pretrained('huawei-noah/TinyBERT_General_4L_312D') def tokenize(batch): tokens = tokenizer(batch["text"], padding='max_length', truncation=True, max_length=128) tokens["labels"] = batch["score"] return tokens train_dataset = Dataset.from_pandas(train_df).map(tokenize, batched=True) test_dataset = Dataset.from_pandas(test_df).map(tokenize, batched=True) args = TrainingArguments( output_dir=os.path.join(save_path, "results"), evaluation_strategy=IntervalStrategy.EPOCH, save_strategy=IntervalStrategy.EPOCH, learning_rate=2e-5, per_device_train_batch_size=16, per_device_eval_batch_size=64, num_train_epochs=5, weight_decay=0.01, load_best_model_at_end=True, metric_for_best_model="eval_loss" ) model = TinyFinBERTRegressor().to(device) trainer = Trainer( model=model, args=args, train_dataset=train_dataset, eval_dataset=test_dataset, tokenizer=tokenizer, compute_metrics=lambda pred: { "mse": mean_squared_error(pred.label_ids, pred.predictions), "r2": r2_score(pred.label_ids, pred.predictions) } ) trainer.train() # Save the model and tokenizer model_to_save = model.module if hasattr(model, 'module') else model # Handle distributed/parallel training torch.save(model_to_save.state_dict(), os.path.join(save_path, "regressor_model.pt")) tokenizer.save_pretrained(save_path) print(f"Model saved to {save_path}") from sklearn.metrics import ( mean_squared_error, r2_score, accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix, cohen_kappa_score ) from sklearn.preprocessing import label_binarize def evaluate_model(phrase_path, model_path): # Set device device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"Using device: {device}") phrase_df = load_phrasebank(phrase_path) _, test_df = train_test_split(phrase_df, test_size=0.2, random_state=42) test_df['text'] = preprocess_texts(test_df['text']) tokenizer = AutoTokenizer.from_pretrained(model_path) model = TinyFinBERTRegressor() model.load_state_dict(torch.load(os.path.join(model_path, "regressor_model.pt"), map_location=device)) model.to(device) model.eval() y_true, y_pred, y_scores = [], [], [] for _, row in test_df.iterrows(): inputs = tokenizer(row["text"], return_tensors="pt", truncation=True, padding='max_length', max_length=128) inputs = {k: v.to(device) for k, v in inputs.items() if k != "token_type_ids"} with torch.no_grad(): score = model(**inputs)["score"].item() y_scores.append(score) y_true.append(row["score"]) # regression metrics mse = mean_squared_error(y_true, y_scores) r2 = r2_score(y_true, y_scores) y_pred = [1 if s > 0.3 else -1 if s < -0.3 else 0 for s in y_scores] y_true_classes = [int(round(s)) for s in y_true] acc = accuracy_score(y_true_classes, y_pred) prec = precision_score(y_true_classes, y_pred, average='weighted', zero_division=0) rec = recall_score(y_true_classes, y_pred, average='weighted') f1 = f1_score(y_true_classes, y_pred, average='weighted') kappa = cohen_kappa_score(y_true_classes, y_pred) cm = confusion_matrix(y_true_classes, y_pred) y_true_bin = label_binarize(y_true_classes, classes=[-1, 0, 1]) y_score_bin = label_binarize(y_pred, classes=[-1, 0, 1]) roc_auc = roc_auc_score(y_true_bin, y_score_bin, average='macro', multi_class='ovo') print(f"Sentiment Regression Metrics:") print(f"- MSE: {mse:.4f}") print(f"- R²: {r2:.4f}") print(f"- Accuracy: {acc:.4f}") print(f"- Precision: {prec:.4f}") print(f"- Recall: {rec:.4f}") print(f"- F1 Score: {f1:.4f}") print(f"- ROC-AUC: {roc_auc:.4f}") print(f"- Cohen's Kappa: {kappa:.4f}") print(f"- Confusion Matrix:\n{cm}") def test(model_path): # Set device device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"Using device: {device}") tokenizer = AutoTokenizer.from_pretrained(model_path) model = TinyFinBERTRegressor() model.load_state_dict(torch.load(os.path.join(model_path, "regressor_model.pt"), map_location=device)) model.to(device) model.eval() texts = [ "The company's earnings exceeded expectations.", "They faced major losses this quarter.", "They didn't face major losses this quarter.", "Stock prices remained the same.", "boost", "strong boost", "AMD was not able to reduce losses.", "AMD reduced debt significantly, improves balance sheet", "Economic indicators point to contraction in telecom sector", "Company didn't have increased losses over last years." ] for text in texts: clean_text = preprocess_texts([text])[0] print(f"Original Text: {text}") print(f"Processed Text: {clean_text}") tokens = tokenizer.tokenize(clean_text) print(f"Tokens: {tokens}") inputs = tokenizer(clean_text, return_tensors="pt", truncation=True, padding='max_length', max_length=128) inputs = {k: v.to(device) for k, v in inputs.items() if k != "token_type_ids"} with torch.no_grad(): score = model(**inputs)["score"].item() print(f"Predicted Sentiment Score: {score:.3f}") sentiment = "positive" if score > 0.3 else "negative" if score < -0.3 else "neutral" print(f"Sentiment: {sentiment}\n") def init_model(): """Function to properly initialize model with position_ids regardless of whether it's being loaded or created new""" model = TinyFinBERTRegressor() # Make sure position_ids is registered if not hasattr(model.bert.embeddings, 'position_ids'): model.bert.embeddings.register_buffer( "position_ids", torch.arange(512).expand((1, -1)), persistent=False, ) return model def create_api_model(model_path): """Create a model suitable for a FastAPI application""" device = torch.device("cuda" if torch.cuda.is_available() else "cpu") tokenizer = AutoTokenizer.from_pretrained(model_path) # Initialize model with position_ids properly registered model = init_model() model.load_state_dict(torch.load(os.path.join(model_path, "regressor_model.pt"), map_location=device)) model.to(device) model.eval() return model, tokenizer, device if __name__ == "__main__": model_dir = "./saved_model" phrase_path = "./Sentences_50Agree.txt" words_path = "./financial_sentiment_words_phrases_negations.csv" # Check for GPU availability device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"Using device: {device}") if not os.path.isfile(os.path.join(model_dir, "regressor_model.pt")): print("Training new model...") train_model(phrase_path, words_path, model_dir) else: print(f"Model found at {os.path.join(model_dir, 'regressor_model.pt')}") evaluate_model(phrase_path, model_dir) test(model_dir)