Spaces:
Sleeping
Sleeping
import os | |
import torch | |
import pandas as pd | |
from datasets import Dataset | |
from sklearn.model_selection import train_test_split | |
from sklearn.metrics import mean_squared_error, r2_score | |
from transformers import AutoTokenizer, Trainer, TrainingArguments, IntervalStrategy | |
import re | |
import torch.nn as nn | |
import torch.nn.functional as F | |
from transformers import AutoModel, AutoConfig, AutoTokenizer, Trainer, TrainingArguments, IntervalStrategy | |
from nltk.corpus import stopwords | |
import spacy | |
class TinyFinBERTRegressor(nn.Module): | |
def __init__(self, pretrained_model='huawei-noah/TinyBERT_General_4L_312D'): | |
super().__init__() | |
if pretrained_model: | |
self.config = AutoConfig.from_pretrained(pretrained_model) | |
self.bert = AutoModel.from_pretrained(pretrained_model, config=self.config) | |
else: | |
self.config = AutoConfig() | |
self.bert = AutoModel(self.config) | |
self.regressor = nn.Linear(self.config.hidden_size, 1) | |
# Manually register the position_ids buffer to avoid missing key error | |
self.bert.embeddings.register_buffer( | |
"position_ids", | |
torch.arange(512).expand((1, -1)), | |
persistent=False, | |
) | |
def forward(self, input_ids=None, attention_mask=None, labels=None): | |
outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask) | |
cls_output = outputs.last_hidden_state[:, 0] | |
score = self.regressor(cls_output).squeeze() | |
loss = F.mse_loss(score, labels) if labels is not None else None | |
return {'loss': loss, 'score': score} | |
def preprocess_texts(texts): | |
nlp = spacy.load("en_core_web_sm", disable=["ner", "parser"]) # Speeds up processing | |
negations = {'no', 'not', 'none', 'nobody', 'nothing', 'neither', 'nowhere', 'never', | |
'hardly', 'scarcely', 'barely', "n't", "without", "unless", "nor"} | |
stop_words = set(stopwords.words('english')) - negations | |
processed = [] | |
for text in texts: | |
text = text.lower() | |
text = re.sub(r'[^a-zA-Z\s]', '', text) | |
doc = nlp(text) | |
tokens = [ | |
token.lemma_ for token in doc | |
if token.lemma_.strip() # token.lemma_ not in stop_words and | |
] | |
processed.append(' '.join(tokens)) | |
return processed | |
def load_phrasebank(path): | |
with open(path, 'r', encoding='latin1') as f: | |
lines = f.readlines() | |
sents, scores = [], [] | |
for line in lines: | |
if '@' in line: | |
s, l = line.strip().split('@') | |
score = 0.0 if l.lower() == 'neutral' else (-1.0 if l.lower() == 'negative' else 1.0) | |
sents.append(s) | |
scores.append(score) | |
return pd.DataFrame({'text': sents, 'score': scores}) | |
def load_words_phrases(path): | |
with open(path, 'r', encoding='latin1') as f: | |
lines = f.readlines() | |
data = [] | |
for line in lines: | |
line = line.strip() | |
match = re.search(r',(-?\d+\.?\d*)$', line) | |
if match: | |
text = line[:match.start()].strip() | |
score = float(match.group(1)) | |
data.append((text, score)) | |
return pd.DataFrame(data, columns=["text", "score"]) | |
def train_model(phrase_path, words_path, save_path): | |
os.makedirs(save_path, exist_ok=True) | |
# Set device | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
print(f"Using device: {device}") | |
phrase_df = load_phrasebank(phrase_path) | |
words_df = load_words_phrases(words_path) | |
phrase_df['text'] = preprocess_texts(phrase_df['text']) | |
words_df['text'] = preprocess_texts(words_df['text']) | |
train_phrase, test_phrase = train_test_split(phrase_df, test_size=0.2, random_state=42) | |
train_df = pd.concat([train_phrase, words_df]) | |
test_df = test_phrase.reset_index(drop=True) | |
tokenizer = AutoTokenizer.from_pretrained('huawei-noah/TinyBERT_General_4L_312D') | |
def tokenize(batch): | |
tokens = tokenizer(batch["text"], padding='max_length', truncation=True, max_length=128) | |
tokens["labels"] = batch["score"] | |
return tokens | |
train_dataset = Dataset.from_pandas(train_df).map(tokenize, batched=True) | |
test_dataset = Dataset.from_pandas(test_df).map(tokenize, batched=True) | |
args = TrainingArguments( | |
output_dir=os.path.join(save_path, "results"), | |
evaluation_strategy=IntervalStrategy.EPOCH, | |
save_strategy=IntervalStrategy.EPOCH, | |
learning_rate=2e-5, | |
per_device_train_batch_size=16, | |
per_device_eval_batch_size=64, | |
num_train_epochs=5, | |
weight_decay=0.01, | |
load_best_model_at_end=True, | |
metric_for_best_model="eval_loss" | |
) | |
model = TinyFinBERTRegressor().to(device) | |
trainer = Trainer( | |
model=model, | |
args=args, | |
train_dataset=train_dataset, | |
eval_dataset=test_dataset, | |
tokenizer=tokenizer, | |
compute_metrics=lambda pred: { | |
"mse": mean_squared_error(pred.label_ids, pred.predictions), | |
"r2": r2_score(pred.label_ids, pred.predictions) | |
} | |
) | |
trainer.train() | |
# Save the model and tokenizer | |
model_to_save = model.module if hasattr(model, 'module') else model # Handle distributed/parallel training | |
torch.save(model_to_save.state_dict(), os.path.join(save_path, "regressor_model.pt")) | |
tokenizer.save_pretrained(save_path) | |
print(f"Model saved to {save_path}") | |
from sklearn.metrics import ( | |
mean_squared_error, r2_score, | |
accuracy_score, precision_score, recall_score, f1_score, | |
roc_auc_score, confusion_matrix, cohen_kappa_score | |
) | |
from sklearn.preprocessing import label_binarize | |
def evaluate_model(phrase_path, model_path): | |
# Set device | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
print(f"Using device: {device}") | |
phrase_df = load_phrasebank(phrase_path) | |
_, test_df = train_test_split(phrase_df, test_size=0.2, random_state=42) | |
test_df['text'] = preprocess_texts(test_df['text']) | |
tokenizer = AutoTokenizer.from_pretrained(model_path) | |
model = TinyFinBERTRegressor() | |
model.load_state_dict(torch.load(os.path.join(model_path, "regressor_model.pt"), map_location=device)) | |
model.to(device) | |
model.eval() | |
y_true, y_pred, y_scores = [], [], [] | |
for _, row in test_df.iterrows(): | |
inputs = tokenizer(row["text"], return_tensors="pt", truncation=True, padding='max_length', max_length=128) | |
inputs = {k: v.to(device) for k, v in inputs.items() if k != "token_type_ids"} | |
with torch.no_grad(): | |
score = model(**inputs)["score"].item() | |
y_scores.append(score) | |
y_true.append(row["score"]) | |
# regression metrics | |
mse = mean_squared_error(y_true, y_scores) | |
r2 = r2_score(y_true, y_scores) | |
y_pred = [1 if s > 0.3 else -1 if s < -0.3 else 0 for s in y_scores] | |
y_true_classes = [int(round(s)) for s in y_true] | |
acc = accuracy_score(y_true_classes, y_pred) | |
prec = precision_score(y_true_classes, y_pred, average='weighted', zero_division=0) | |
rec = recall_score(y_true_classes, y_pred, average='weighted') | |
f1 = f1_score(y_true_classes, y_pred, average='weighted') | |
kappa = cohen_kappa_score(y_true_classes, y_pred) | |
cm = confusion_matrix(y_true_classes, y_pred) | |
y_true_bin = label_binarize(y_true_classes, classes=[-1, 0, 1]) | |
y_score_bin = label_binarize(y_pred, classes=[-1, 0, 1]) | |
roc_auc = roc_auc_score(y_true_bin, y_score_bin, average='macro', multi_class='ovo') | |
print(f"Sentiment Regression Metrics:") | |
print(f"- MSE: {mse:.4f}") | |
print(f"- R²: {r2:.4f}") | |
print(f"- Accuracy: {acc:.4f}") | |
print(f"- Precision: {prec:.4f}") | |
print(f"- Recall: {rec:.4f}") | |
print(f"- F1 Score: {f1:.4f}") | |
print(f"- ROC-AUC: {roc_auc:.4f}") | |
print(f"- Cohen's Kappa: {kappa:.4f}") | |
print(f"- Confusion Matrix:\n{cm}") | |
def test(model_path): | |
# Set device | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
print(f"Using device: {device}") | |
tokenizer = AutoTokenizer.from_pretrained(model_path) | |
model = TinyFinBERTRegressor() | |
model.load_state_dict(torch.load(os.path.join(model_path, "regressor_model.pt"), map_location=device)) | |
model.to(device) | |
model.eval() | |
texts = [ | |
"The company's earnings exceeded expectations.", | |
"They faced major losses this quarter.", | |
"They didn't face major losses this quarter.", | |
"Stock prices remained the same.", | |
"boost", | |
"strong boost", | |
"AMD was not able to reduce losses.", | |
"AMD reduced debt significantly, improves balance sheet", | |
"Economic indicators point to contraction in telecom sector", | |
"Company didn't have increased losses over last years." | |
] | |
for text in texts: | |
clean_text = preprocess_texts([text])[0] | |
print(f"Original Text: {text}") | |
print(f"Processed Text: {clean_text}") | |
tokens = tokenizer.tokenize(clean_text) | |
print(f"Tokens: {tokens}") | |
inputs = tokenizer(clean_text, return_tensors="pt", truncation=True, padding='max_length', max_length=128) | |
inputs = {k: v.to(device) for k, v in inputs.items() if k != "token_type_ids"} | |
with torch.no_grad(): | |
score = model(**inputs)["score"].item() | |
print(f"Predicted Sentiment Score: {score:.3f}") | |
sentiment = "positive" if score > 0.3 else "negative" if score < -0.3 else "neutral" | |
print(f"Sentiment: {sentiment}\n") | |
def init_model(): | |
"""Function to properly initialize model with position_ids regardless of whether it's being loaded or created new""" | |
model = TinyFinBERTRegressor() | |
# Make sure position_ids is registered | |
if not hasattr(model.bert.embeddings, 'position_ids'): | |
model.bert.embeddings.register_buffer( | |
"position_ids", | |
torch.arange(512).expand((1, -1)), | |
persistent=False, | |
) | |
return model | |
def create_api_model(model_path): | |
"""Create a model suitable for a FastAPI application""" | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
tokenizer = AutoTokenizer.from_pretrained(model_path) | |
# Initialize model with position_ids properly registered | |
model = init_model() | |
model.load_state_dict(torch.load(os.path.join(model_path, "regressor_model.pt"), map_location=device)) | |
model.to(device) | |
model.eval() | |
return model, tokenizer, device | |
if __name__ == "__main__": | |
model_dir = "./saved_model" | |
phrase_path = "./Sentences_50Agree.txt" | |
words_path = "./financial_sentiment_words_phrases_negations.csv" | |
# Check for GPU availability | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
print(f"Using device: {device}") | |
if not os.path.isfile(os.path.join(model_dir, "regressor_model.pt")): | |
print("Training new model...") | |
train_model(phrase_path, words_path, model_dir) | |
else: | |
print(f"Model found at {os.path.join(model_dir, 'regressor_model.pt')}") | |
evaluate_model(phrase_path, model_dir) | |
test(model_dir) |