Fitting a BERT model on social media data
Obtain and prepare the data.
The dataset we will use comes from Kaggle, you can download it here: https://www.kaggle.com/datasets/farisdurrani/sentimentsearch (CC BY 4.0 License). In my experiments, I only chose the facebook and twitter data sets.
The following snippet will take the csv files and save 3 splits (training, validation and testing) wherever you want. I recommend saving them in Google Cloud Storage.
You can run the script with:
python make_splits --output-dir gs://your-bucket/
import pandas as pd
import argparse
import numpy as np
from sklearn.model_selection import train_test_splitdef make_splits(output_dir):
df=pd.concat((
pd.read_csv("data/farisdurrani/twitter_filtered.csv"),
pd.read_csv("data/farisdurrani/facebook_filtered.csv")
))
df = df.dropna(subset=('sentiment'), axis=0)
df('Target') = df('sentiment').apply(lambda x: 1 if x==0 else np.sign(x)+1).astype(int)
df_train, df_ = train_test_split(df, stratify=df('Target'), test_size=0.2)
df_eval, df_test = train_test_split(df_, stratify=df_('Target'), test_size=0.5)
print(f"Files will be saved in {output_dir}")
df_train.to_csv(output_dir + "/train.csv", index=False)
df_eval.to_csv(output_dir + "/eval.csv", index=False)
df_test.to_csv(output_dir + "/test.csv", index=False)
print(f"Train : ({df_train.shape}) samples")
print(f"Val : ({df_eval.shape}) samples")
print(f"Test : ({df_test.shape}) samples")
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--output-dir')
args, _ = parser.parse_known_args()
make_splits(args.output_dir)
The data should look something like this:
Using a small pre-trained BERT model
For our model, we will use a lightweight BERT model, BERT-Tiny. This model has already been pre-trained with a large amount of data, but not necessarily with social media data nor necessarily for the purpose of sentiment analysis. That's why we will perfect it.
It contains only 2 layers with a dimension of 128 units, you can see the complete list of models. here If you want to take a bigger one.
First let's create a main.py
file, with all the necessary modules:
import pandas as pd
import argparse
import tensorflow as tf
import tensorflow_hub as hub
import tensorflow_text as text
import logging
import os
os.environ("TFHUB_MODEL_LOAD_FORMAT") = "UNCOMPRESSED"def train_and_evaluate(**params):
pass
# will be updated as we go
Let's also write down our requirements in a dedicated document. requirements.txt
transformers==4.40.1
torch==2.2.2
pandas==2.0.3
scikit-learn==1.3.2
gcsfs
Now we will load 2 parts to train our model:
- He tokenizerwhich will be responsible for dividing the text inputs into tokens with which BERT has been trained.
- He model itself.
You can get both from Huggingface here. You can also download them to Cloud Storage. That's what I did and therefore I will load them with:
# Load pretrained tokenizers and bert model
tokenizer = BertTokenizer.from_pretrained('models/bert_uncased_L-2_H-128_A-2/vocab.txt')
model = BertModel.from_pretrained('models/bert_uncased_L-2_H-128_A-2')
Now let's add the following piece to our file:
class SentimentBERT(nn.Module):
def __init__(self, bert_model):
super().__init__()
self.bert_module = bert_model
self.dropout = nn.Dropout(0.1)
self.final = nn.Linear(in_features=128, out_features=3, bias=True) # Uncomment the below if you only want to retrain certain layers.
# self.bert_module.requires_grad_(False)
# for param in self.bert_module.encoder.parameters():
# param.requires_grad = True
def forward(self, inputs):
ids, mask, token_type_ids = inputs('ids'), inputs('mask'), inputs('token_type_ids')
# print(ids.size(), mask.size(), token_type_ids.size())
x = self.bert_module(ids, mask, token_type_ids)
x = self.dropout(x('pooler_output'))
out = self.final(x)
return out
A little break here. We have several options when it comes to reusing an existing model.
- Transfer learning : we freeze the weights of the model and use it as a “feature extractor”. Therefore, we can add additional layers downstream. This is frequently used in Computer Vision, where models like VGG, Xception, etc. they can be reused to train a custom model on small data sets.
- Fine tuning – We unfreeze all or part of the model weights and retrain the model on a custom data set. This is the preferred approach when forming custom LLMs.
More details on transfer learning and adjustment here:
In the model, we have chosen to unfreeze the entire model, but feel free to freeze one or more layers of the pre-trained BERT module and see how it influences performance.
The key part here is to add a fully connected layer after the BERT module to “tie” it to our classification task, hence the final layer with 3 units. This will allow us to reuse the previously trained BERT weights and adapt our model to our task.
Creating data loaders
To create the data loaders we will need the Tokenizer loaded above. The Tokenizer takes a string as input and returns several outputs among which we can find the tokens ('input_ids' in our case):
The BERT tokenizer is a bit special and will return several results, but the most important one is the input_ids
: are the tokens used to encode our sentence. They can be words, or parts of words. For example, the word “look” could be made up of 2 tokens, “look” and “##ing”.
Now let's create a data loading module that will handle our data sets:
class BertDataset(Dataset):
def __init__(self, df, tokenizer, max_length=100):
super(BertDataset, self).__init__()
self.df=df
self.tokenizer=tokenizer
self.target=self.df('Target')
self.max_length=max_lengthdef __len__(self):
return len(self.df)
def __getitem__(self, idx):
x = self.df('bodyText').values(idx)
y = self.target.values(idx)
inputs = self.tokenizer.encode_plus(
x,
pad_to_max_length=True,
add_special_tokens=True,
return_attention_mask=True,
max_length=self.max_length,
)
ids = inputs("input_ids")
token_type_ids = inputs("token_type_ids")
mask = inputs("attention_mask")
x = {
'ids': torch.tensor(ids, dtype=torch.long).to(DEVICE),
'mask': torch.tensor(mask, dtype=torch.long).to(DEVICE),
'token_type_ids': torch.tensor(token_type_ids, dtype=torch.long).to(DEVICE)
}
y = torch.tensor(y, dtype=torch.long).to(DEVICE)
return x, y
Write the main script to train the model.
Let's first define two functions to manage the training and evaluation steps:
def train(epoch, model, dataloader, loss_fn, optimizer, max_steps=None):
model.train()
total_acc, total_count = 0, 0
log_interval = 50
start_time = time.time()for idx, (inputs, label) in enumerate(dataloader):
optimizer.zero_grad()
predicted_label = model(inputs)
loss = loss_fn(predicted_label, label)
loss.backward()
optimizer.step()
total_acc += (predicted_label.argmax(1) == label).sum().item()
total_count += label.size(0)
if idx % log_interval == 0:
elapsed = time.time() - start_time
print(
"Epoch {:3d} | {:5d}/{:5d} batches "
"| accuracy {:8.3f} | loss {:8.3f} ({:.3f}s)".format(
epoch, idx, len(dataloader), total_acc / total_count, loss.item(), elapsed
)
)
total_acc, total_count = 0, 0
start_time = time.time()
if max_steps is not None:
if idx == max_steps:
return {'loss': loss.item(), 'acc': total_acc / total_count}
return {'loss': loss.item(), 'acc': total_acc / total_count}
def evaluate(model, dataloader, loss_fn):
model.eval()
total_acc, total_count = 0, 0
with torch.no_grad():
for idx, (inputs, label) in enumerate(dataloader):
predicted_label = model(inputs)
loss = loss_fn(predicted_label, label)
total_acc += (predicted_label.argmax(1) == label).sum().item()
total_count += label.size(0)
return {'loss': loss.item(), 'acc': total_acc / total_count}
We are getting closer to having our main script up and running. Let's put the pieces together. Have:
- TO
BertDataset
class to handle data loading - TO
SentimentBERT
model that takes our Tiny-BERT model and adds an additional layer for our custom use case train()
andeval()
functions to handle those steps- TO
train_and_eval()
features that bring it all together
We will use argparse
to be able to launch our script with arguments. These arguments are usually the training/evaluation/test files to run our model with any data set, the path where our model will be stored and the parameters related to the training.
import pandas as pd
import time
import torch.nn as nn
import torch
import logging
import numpy as np
import argparsefrom torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer, BertModel
logging.basicConfig(format='%(asctime)s (%(levelname)s): %(message)s', level=logging.DEBUG)
logging.getLogger().setLevel(logging.INFO)
# --- CONSTANTS ---
BERT_MODEL_NAME = 'small_bert/bert_en_uncased_L-2_H-128_A-2'
if torch.cuda.is_available():
logging.info(f"GPU: {torch.cuda.get_device_name(0)} is available.")
DEVICE = torch.device('cuda')
else:
logging.info("No GPU available. Training will run on CPU.")
DEVICE = torch.device('cpu')
# --- Data preparation and tokenization ---
class BertDataset(Dataset):
def __init__(self, df, tokenizer, max_length=100):
super(BertDataset, self).__init__()
self.df=df
self.tokenizer=tokenizer
self.target=self.df('Target')
self.max_length=max_length
def __len__(self):
return len(self.df)
def __getitem__(self, idx):
x = self.df('bodyText').values(idx)
y = self.target.values(idx)
inputs = self.tokenizer.encode_plus(
x,
pad_to_max_length=True,
add_special_tokens=True,
return_attention_mask=True,
max_length=self.max_length,
)
ids = inputs("input_ids")
token_type_ids = inputs("token_type_ids")
mask = inputs("attention_mask")
x = {
'ids': torch.tensor(ids, dtype=torch.long).to(DEVICE),
'mask': torch.tensor(mask, dtype=torch.long).to(DEVICE),
'token_type_ids': torch.tensor(token_type_ids, dtype=torch.long).to(DEVICE)
}
y = torch.tensor(y, dtype=torch.long).to(DEVICE)
return x, y
# --- Model definition ---
class SentimentBERT(nn.Module):
def __init__(self, bert_model):
super().__init__()
self.bert_module = bert_model
self.dropout = nn.Dropout(0.1)
self.final = nn.Linear(in_features=128, out_features=3, bias=True)
def forward(self, inputs):
ids, mask, token_type_ids = inputs('ids'), inputs('mask'), inputs('token_type_ids')
x = self.bert_module(ids, mask, token_type_ids)
x = self.dropout(x('pooler_output'))
out = self.final(x)
return out
# --- Training loop ---
def train(epoch, model, dataloader, loss_fn, optimizer, max_steps=None):
model.train()
total_acc, total_count = 0, 0
log_interval = 50
start_time = time.time()
for idx, (inputs, label) in enumerate(dataloader):
optimizer.zero_grad()
predicted_label = model(inputs)
loss = loss_fn(predicted_label, label)
loss.backward()
optimizer.step()
total_acc += (predicted_label.argmax(1) == label).sum().item()
total_count += label.size(0)
if idx % log_interval == 0:
elapsed = time.time() - start_time
print(
"Epoch {:3d} | {:5d}/{:5d} batches "
"| accuracy {:8.3f} | loss {:8.3f} ({:.3f}s)".format(
epoch, idx, len(dataloader), total_acc / total_count, loss.item(), elapsed
)
)
total_acc, total_count = 0, 0
start_time = time.time()
if max_steps is not None:
if idx == max_steps:
return {'loss': loss.item(), 'acc': total_acc / total_count}
return {'loss': loss.item(), 'acc': total_acc / total_count}
# --- Validation loop ---
def evaluate(model, dataloader, loss_fn):
model.eval()
total_acc, total_count = 0, 0
with torch.no_grad():
for idx, (inputs, label) in enumerate(dataloader):
predicted_label = model(inputs)
loss = loss_fn(predicted_label, label)
total_acc += (predicted_label.argmax(1) == label).sum().item()
total_count += label.size(0)
return {'loss': loss.item(), 'acc': total_acc / total_count}
# --- Main function ---
def train_and_evaluate(**params):
logging.info("running with the following params :")
logging.info(params)
# Load pretrained tokenizers and bert model
# update the paths to whichever you are using
tokenizer = BertTokenizer.from_pretrained('models/bert_uncased_L-2_H-128_A-2/vocab.txt')
model = BertModel.from_pretrained('models/bert_uncased_L-2_H-128_A-2')
# Training parameters
epochs = int(params.get('epochs'))
batch_size = int(params.get('batch_size'))
learning_rate = float(params.get('learning_rate'))
# Load the data
df_train = pd.read_csv(params.get('training_file'))
df_eval = pd.read_csv(params.get('validation_file'))
df_test = pd.read_csv(params.get('testing_file'))
# Create dataloaders
train_ds = BertDataset(df_train, tokenizer, max_length=100)
train_loader = DataLoader(dataset=train_ds,batch_size=batch_size, shuffle=True)
eval_ds = BertDataset(df_eval, tokenizer, max_length=100)
eval_loader = DataLoader(dataset=eval_ds,batch_size=batch_size)
test_ds = BertDataset(df_test, tokenizer, max_length=100)
test_loader = DataLoader(dataset=test_ds,batch_size=batch_size)
# Create the model
classifier = SentimentBERT(bert_model=model).to(DEVICE)
total_parameters = sum((np.prod(p.size()) for p in classifier.parameters()))
model_parameters = filter(lambda p: p.requires_grad, classifier.parameters())
params = sum((np.prod(p.size()) for p in model_parameters))
logging.info(f"Total params : {total_parameters} - Trainable : {params} ({params/total_parameters*100}% of total)")
# Optimizer and loss functions
optimizer = torch.optim.Adam((p for p in classifier.parameters() if p.requires_grad), learning_rate)
loss_fn = nn.CrossEntropyLoss()
# If dry run we only
logging.info(f'Training model with {BERT_MODEL_NAME}')
if args.dry_run:
logging.info("Dry run mode")
epochs = 1
steps_per_epoch = 1
else:
steps_per_epoch = None
# Action !
for epoch in range(1, epochs + 1):
epoch_start_time = time.time()
train_metrics = train(epoch, classifier, train_loader, loss_fn=loss_fn, optimizer=optimizer, max_steps=steps_per_epoch)
eval_metrics = evaluate(classifier, eval_loader, loss_fn=loss_fn)
print("-" * 59)
print(
"End of epoch {:3d} - time: {:5.2f}s - loss: {:.4f} - accuracy: {:.4f} - valid_loss: {:.4f} - valid accuracy {:.4f} ".format(
epoch, time.time() - epoch_start_time, train_metrics('loss'), train_metrics('acc'), eval_metrics('loss'), eval_metrics('acc')
)
)
print("-" * 59)
if args.dry_run:
# If dry run, we do not run the evaluation
return None
test_metrics = evaluate(classifier, test_loader, loss_fn=loss_fn)
metrics = {
'train': train_metrics,
'val': eval_metrics,
'test': test_metrics,
}
logging.info(metrics)
# save model and architecture to single file
if params.get('job_dir') is None:
logging.warning("No job dir provided, model will not be saved")
else:
logging.info("Saving model to {} ".format(params.get('job_dir')))
torch.save(classifier.state_dict(), params.get('job_dir'))
logging.info("Bye bye")
if __name__ == '__main__':
# Create arguments here
parser = argparse.ArgumentParser()
parser.add_argument('--training-file', required=True, type=str)
parser.add_argument('--validation-file', required=True, type=str)
parser.add_argument('--testing-file', type=str)
parser.add_argument('--job-dir', type=str)
parser.add_argument('--epochs', type=float, default=2)
parser.add_argument('--batch-size', type=float, default=1024)
parser.add_argument('--learning-rate', type=float, default=0.01)
parser.add_argument('--dry-run', action="store_true")
# Parse them
args, _ = parser.parse_known_args()
# Execute training
train_and_evaluate(**vars(args))
This is great, but unfortunately, this model will take a long time to train. In fact, with around 4.7 million parameters to train, one step will take around 3 seconds on a 16Gb Macbook Pro with Intel chip.
3 seconds per step can be quite long when you have 1238 steps to go and 10 epochs to complete…
Without GPU, there is no game.