|
import os |
|
import pandas as pd |
|
import streamlit as st |
|
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments |
|
from datasets import Dataset |
|
from sklearn.model_selection import train_test_split |
|
import requests |
|
from io import BytesIO |
|
|
|
|
|
@st.cache_data |
|
def load_data(): |
|
url = "https://huggingface.co/datasets/HUPD/hupd/resolve/main/hupd_metadata_2022-02-22.feather" |
|
response = requests.get(url) |
|
data = BytesIO(response.content) |
|
df = pd.read_feather(data) |
|
return df |
|
|
|
|
|
def load_tokenizer_and_model(model_name, num_labels): |
|
tokenizer = AutoTokenizer.from_pretrained(model_name) |
|
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=num_labels) |
|
return tokenizer, model |
|
|
|
|
|
def prepare_data(df, tokenizer): |
|
df['filing_date'] = pd.to_datetime(df['filing_date']) |
|
jan_2016_df = df[df['filing_date'].dt.to_period('M') == '2016-01'] |
|
|
|
|
|
unique_labels = jan_2016_df['patent_number'].astype('category').cat.categories[:5] |
|
jan_2016_df = jan_2016_df[jan_2016_df['patent_number'].isin(unique_labels)] |
|
|
|
|
|
label_mapping = {label: idx for idx, label in enumerate(unique_labels)} |
|
jan_2016_df['label'] = jan_2016_df['patent_number'].map(label_mapping) |
|
|
|
texts = jan_2016_df['invention_title'].tolist() |
|
labels = jan_2016_df['label'].tolist() |
|
num_labels = len(unique_labels) |
|
|
|
|
|
def tokenize_function(texts): |
|
return tokenizer(texts, padding=True, truncation=True, return_tensors="pt", max_length=512) |
|
|
|
|
|
tokenized_data = tokenize_function(texts) |
|
|
|
|
|
dataset_dict = { |
|
'input_ids': [x.tolist() for x in tokenized_data['input_ids']], |
|
'attention_mask': [x.tolist() for x in tokenized_data['attention_mask']], |
|
'labels': labels |
|
} |
|
|
|
dataset = Dataset.from_dict(dataset_dict) |
|
|
|
return dataset, num_labels |
|
|
|
|
|
def main(): |
|
st.title("Patent Classification with Fine-Tuned BERT") |
|
|
|
|
|
model_dir = './finetuned_model' |
|
|
|
|
|
df = load_data() |
|
|
|
|
|
st.subheader("Data from January 2016") |
|
st.write(df.head()) |
|
|
|
|
|
model_name = "bert-base-uncased" |
|
tokenizer, model = load_tokenizer_and_model(model_name, num_labels=5) |
|
dataset, num_labels = prepare_data(df, tokenizer) |
|
|
|
|
|
if num_labels != 5: |
|
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=num_labels) |
|
|
|
|
|
train_data, eval_data = train_test_split(list(zip(dataset['input_ids'], dataset['attention_mask'], dataset['labels'])), test_size=0.2, random_state=42) |
|
|
|
def create_dataset(data): |
|
return Dataset.from_dict({ |
|
'input_ids': [item[0] for item in data], |
|
'attention_mask': [item[1] for item in data], |
|
'labels': [item[2] for item in data] |
|
}) |
|
|
|
train_dataset = create_dataset(train_data) |
|
eval_dataset = create_dataset(eval_data) |
|
|
|
|
|
st.subheader("Training Data") |
|
train_df = pd.DataFrame({ |
|
'input_ids': [ids[:10] for ids in train_dataset['input_ids'][:5]], |
|
'attention_mask': [mask[:10] for mask in train_dataset['attention_mask'][:5]], |
|
'labels': train_dataset['labels'][:5] |
|
}) |
|
st.write(train_df) |
|
|
|
|
|
training_args = TrainingArguments( |
|
output_dir=model_dir, |
|
evaluation_strategy="epoch", |
|
learning_rate=2e-5, |
|
per_device_train_batch_size=8, |
|
per_device_eval_batch_size=8, |
|
num_train_epochs=3, |
|
weight_decay=0.01, |
|
) |
|
|
|
trainer = Trainer( |
|
model=model, |
|
args=training_args, |
|
train_dataset=train_dataset, |
|
eval_dataset=eval_dataset, |
|
tokenizer=tokenizer |
|
) |
|
|
|
st.subheader("Training the Model") |
|
if st.button('Train Model'): |
|
with st.spinner('Training in progress...'): |
|
trainer.train() |
|
model.save_pretrained(model_dir) |
|
tokenizer.save_pretrained(model_dir) |
|
st.success("Model training complete and saved.") |
|
|
|
|
|
st.subheader("Pretrained Model") |
|
if st.button('Show Pretrained Model'): |
|
if os.path.exists(model_dir): |
|
|
|
st.write(f"Model name: `{model_name}`") |
|
|
|
|
|
json_files = [f for f in os.listdir(model_dir) if f.endswith('.json')] |
|
if json_files: |
|
st.write("Available `.json` files:") |
|
for file in json_files: |
|
file_path = os.path.join(model_dir, file) |
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
file_content = f.read() |
|
st.write(f"[{file}](data:file/{file})") |
|
st.text(file_content) |
|
else: |
|
st.write("No `.json` files found in `./finetuned_model` directory.") |
|
else: |
|
st.write("Directory `./finetuned_model` does not exist.") |
|
|
|
if __name__ == "__main__": |
|
main() |
|
|