%pip install wandb onnx -Uq
%pip install nbformatCalibrating hyperparameters with weights and biases
Calibrating hyperparameters with weights and biases
Overview
- Goal: Learn how to use weights and biases to calibrate the hyperparameters of a DL model.
Weights and biases is a platform used for AI developers to track, visualize and manage their ML models and experiments. The coolest part is that W&B allows you to log various performance metrics during training, like training and validation loss, test set correlations, etc. Additionally, it allows you to compare between different experiments or versions of your models. Making it easier to identify the best performing models and see which hyperparameter configuration is the optimal.
In this notebook, we will focus on using W&B as a tool to help callibrate the hyperparameters of the TF binding prediction model to find an optimal solution. However, we encourage you to explore other applications that W&B offers.
1. Installing W&B
First install w&b in your environment with the following command, it should take only a couple of seconds
Load all libraries
import torch
from torch import nn
import numpy as np
import matplotlib.pyplot as plt
import random
import os
import pandas as pd
import wandb
from scipy.stats import pearsonr2. Login to w&b
If you don’t already have a w&b account, sign up here. Then, run the following command which will prompt you to insert your API key
wandb.login()3. Define your regular functions
Then, define your functions, this will remain unchanged
def get_device():
"""
Determines the device to use for PyTorch computations.
Prioritizes Metal Performance Shaders (MPS), then CUDA, then CPU.
Returns:
torch.device: The selected device.
"""
if torch.backends.mps.is_available():
device = torch.device("mps")
print("Using MPS device.")
elif torch.cuda.is_available():
device = torch.device("cuda")
print("Using CUDA device.")
else:
device = torch.device("cpu")
print("Using CPU device.")
return device
# Example usage:
device = get_device()def one_hot_encode(seq):
"""
Given a DNA sequence, return its one-hot encoding
"""
# Make sure seq has only allowed bases
allowed = set("ACTGN")
if not set(seq).issubset(allowed):
invalid = set(seq) - allowed
print(seq)
raise ValueError(f"Sequence contains chars not in allowed DNA alphabet (ACGTN): {invalid}")
# Dictionary returning one-hot encoding for each nucleotide
nuc_d = {'A':[1.0,0.0,0.0,0.0],
'C':[0.0,1.0,0.0,0.0],
'G':[0.0,0.0,1.0,0.0],
'T':[0.0,0.0,0.0,1.0],
'N':[0.0,0.0,0.0,0.0]}
# Create array from nucleotide sequence
vec=np.array([nuc_d[x] for x in seq],dtype='float32')
return vecdef quick_split(df, split_frac=0.8, verbose=False):
'''
Given a df of samples, randomly split indices between
train and test at the desired fraction
'''
cols = df.columns # original columns, use to clean up reindexed cols
df = df.reset_index()
# shuffle indices
idxs = list(range(df.shape[0]))
random.shuffle(idxs)
# split shuffled index list by split_frac
split = int(len(idxs)*split_frac)
train_idxs = idxs[:split]
test_idxs = idxs[split:]
# split dfs and return
train_df = df[df.index.isin(train_idxs)]
test_df = df[df.index.isin(test_idxs)]
return train_df[cols], test_df[cols]def split_sequences(sequences_df):
full_train_sequences, test_sequences = quick_split(sequences_df)
train_sequences, val_sequences = quick_split(full_train_sequences)
print("Train:", train_sequences.shape)
print("Val:", val_sequences.shape)
print("Test:", test_sequences.shape)
return train_sequences, val_sequences, test_sequencesdef get_data_tensors(scores_df, sequences_df):
# split sequences in train, validation and test sets
train_sequences, val_sequences, test_sequences = split_sequences(sequences_df)
# get scores for each set of sequences
train_scores = scores_df[train_sequences['window_name'].to_list()].transpose().values.astype('float32') # shape is (num_sequences, 300)
val_scores = scores_df[val_sequences['window_name'].to_list()].transpose().values.astype('float32')
test_scores = scores_df[test_sequences['window_name'].to_list()].transpose().values.astype('float32')
train_scores = torch.tensor(train_scores, dtype=torch.float32).to(device)
val_scores = torch.tensor(val_scores, dtype=torch.float32).to(device)
test_scores = torch.tensor(test_scores, dtype=torch.float32).to(device)
# get one hot encoded sequences for each set
train_one_hot = [one_hot_encode(seq) for seq in train_sequences['sequence'].to_list()]
train_sequences_tensor = torch.tensor(np.stack(train_one_hot))
val_one_hot = [one_hot_encode(seq) for seq in val_sequences['sequence'].to_list()]
val_sequences_tensor = torch.tensor(np.stack(val_one_hot))
test_one_hot = [one_hot_encode(seq) for seq in test_sequences['sequence'].to_list()]
test_sequences_tensor = torch.tensor(np.stack(test_one_hot))
return train_scores, train_sequences_tensor, val_scores, val_sequences_tensor, test_scores, test_sequences_tensordef create_dataloader(predictors, targets, batch_size, is_train = True):
'''
features: one hot encoded sequences
targets: sequence scores
batch_size
is_train: if True, data is reshuffled at every epoch
'''
dataset = torch.utils.data.TensorDataset(predictors, targets)
return torch.utils.data.DataLoader(dataset, batch_size, shuffle = is_train)class DNA_CNN(nn.Module):
def __init__(self,
seq_len,
num_filters=16,
kernel_size=10,
add_sigmoid=False):
super().__init__()
self.seq_len = seq_len
self.add_sigmoid = add_sigmoid
# Define layers individually
self.conv = nn.Conv1d(in_channels = 4, out_channels = num_filters, kernel_size=kernel_size)
self.relu = nn.ReLU(inplace=True)
self.linear = nn.Linear(num_filters*(seq_len-kernel_size+1), 300)
self.sigmoid = nn.Sigmoid()
def forward(self, xb):
# reshape view to batch_size x 4channel x seq_len
# permute to put channel in correct order
xb = xb.permute(0,2,1) # (batch_size, 300, 4) to (batch_size, 4, 300)
# Apply layers step by step
x = self.conv(xb)
x = self.relu(x)
x = x.flatten(1) # flatten all dimensions except batch
out = self.linear(x)
if self.add_sigmoid:
out = self.sigmoid(out)
return outdef process_batch(model, loss_func, x_batch, y_batch, opt=None):
xb_out = model(x_batch.to(torch.float32))
loss = loss_func(xb_out, y_batch)
if opt is not None: # backpropagate if train step (optimizer given)
loss.backward()
opt.step()
opt.zero_grad()
return loss.item(), len(x_batch)def train_epoch(model, train_dl, loss_func, device, opt):
model.train()
tl = [] # train losses
ns = [] # batch sizes, n
# loop through batches
for x_batch, y_batch in train_dl:
x_batch, y_batch = x_batch.to(device),y_batch.to(device)
t, n = process_batch(model, loss_func, x_batch, y_batch, opt=opt)
# collect train loss and batch sizes
tl.append(t)
ns.append(n)
# average the losses over all batches
train_loss = np.sum(np.multiply(tl, ns)) / np.sum(ns)
return train_lossdef val_epoch(model, val_dl, loss_func, device):
# Set model to Evaluation mode
model.eval()
with torch.no_grad():
vl = [] # val losses
ns = [] # batch sizes, n
# loop through validation DataLoader
for x_batch, y_batch in val_dl:
x_batch, y_batch = x_batch.to(device),y_batch.to(device)
v, n = process_batch(model, loss_func, x_batch, y_batch)
# collect val loss and batch sizes
vl.append(v)
ns.append(n)
# average the losses over all batches
val_loss = np.sum(np.multiply(vl, ns)) / np.sum(ns)
return val_loss4. Modify the train_loop() function
The only function that we need to change is the train_loop function, because here is where we are recovering the parameters that we want to track with w&b. We will use wandb.log and create a dictionary with the parmeters that we want to track.
def train_loop(epochs, model, loss_func, opt, train_dl, val_dl, device):
# keep track of losses
train_losses = []
val_losses = []
# loop through epochs
for epoch in range(epochs):
# take a training step
train_loss = train_epoch(model,train_dl,loss_func,device,opt)
train_losses.append(train_loss)
# take a validation step
val_loss = val_epoch(model,val_dl,loss_func,device)
val_losses.append(val_loss)
print(f"Epoch {epoch + 1} | train loss: {train_loss:.3f} | val loss: {val_loss:.3f}")
wandb.log({"epoch": epoch + 1,
"train_loss": train_loss,
"val_loss": val_loss})
return train_losses, val_lossesDefining the train_model function. We omit the plot_curves function since all performance metrics will be tracked on w&b.
def train_model(train_dl,val_dl,model,device, lr=0.01, epochs=50, lossf=None,opt=None):
# define optimizer
if opt:
optimizer = opt(model.parameters(), lr=lr)
else: # if no opt provided, just use SGD
optimizer = torch.optim.SGD(model.parameters(), lr=lr)
# define loss function
if lossf:
loss_func = lossf
else: # if no loss function provided, just use MSE
loss_func = torch.nn.MSELoss()
# run the training loop
train_losses, val_losses = train_loop(
epochs,
model,
loss_func,
optimizer,
train_dl,
val_dl,
device)5. Also track the correlation metrics of the test set
As a way to evaluate each model, let’s modify the test_model() function so that wand also keeps track of the performance metrics. In this case the metrics are pearson_per_sample, test_pearson_r and best_test.
def test_model(model, test_features, test_targets):
model.eval()
predictions = model(test_features.to(torch.float32).to(device)).detach().cpu().numpy()
observations = test_targets.cpu().numpy()
pearson_per_sample = np.array([pearsonr(predictions[i], observations[i])[0] for i in range(300)])
test_pearsonr = pearson_per_sample.mean()
best_test = pearson_per_sample.max()
wandb.log({'test_avg_pearsonr': test_pearsonr,
'beast_pearsonr': best_test})6. Define the sweep configuration
A sweep is the training and testing of a single model with a given configuration of hyperparameters. with wandb.sweep we define the set of hyperparameters to test, which will be then combined in different configurations each sweep.
sweep_config = {
'method': 'random',
'metric': {'name': 'test_avg_pearsonr', 'goal': 'maximize'},
'parameters': {
'num_filters': {'values': [4, 16]},
'kernel_size': {'values': [5, 10]},
'add_sigmoid': {'values': [True, False],},
'learning_rate':{'values':[0.1, 0.05]},
'batch_size': {'values':[16, 32, 64]},
'optimizer': {'values': ['SGD','Adam']}
}
}Create a project ID for your model, all your tests will be saved in this project
sweep_id = wandb.sweep(sweep_config, project="DNA_model")7. Get the training, val and test sets ready for training
DIR = '/Users/sofiasalazar/Library/CloudStorage/Box-Box/imlab-data/Courses/AI-in-Genomics-2025/data/'
sequences = pd.read_csv(os.path.join(DIR, 'chr22_sequences.txt.gz'), sep="\t", compression='gzip')
scores = pd.read_csv(os.path.join(DIR, 'chr22_scores.txt.gz'), sep="\t", compression='gzip',dtype='float32')train_scores, train_sequences_tensor, val_scores, val_sequences_tensor, test_scores, test_sequences_tensor = get_data_tensors(scores, sequences)8. Initialize the sweep
With wandb.init we initialize one sweep and what we want to do in each. This consists on
Loading a configuration of hyperparameters with
wandb.configLoading the model
Telling wandb to track the training with
wandb.watchCreate the dataloaders
Train and test the model
def train_sweep():
with wandb.init(project = "DNA_model"):
config = wandb.config
model = DNA_CNN(seq_len=300, num_filters=config.num_filters, kernel_size=config.kernel_size, add_sigmoid=config.add_sigmoid).to(device)
wandb.watch(model, log="all", log_freq=10) # log all: logs all gradients and parameters, every log_freq number of training steps (batches)
train_loader = create_dataloader(train_sequences_tensor, train_scores, batch_size=config.batch_size)
val_loader = create_dataloader(val_sequences_tensor, val_scores, batch_size=config.batch_size, is_train=False)
if config.optimizer == 'SGD':
opt = torch.optim.SGD
else: opt = torch.optim.Adam
train_model(train_loader, val_loader, model, device, epochs=30, lr = config.learning_rate, opt=opt)
test_model(model, test_sequences_tensor, test_scores)Finally, we train with wandb.agent, the argument count is the number of combinations of hyperparameters I want to try. The maximum in my case is 240 combinations
# wandb.agent(sweep_id, train_sweep, count=240)
wandb.agent(sweep_id, train_sweep, count=6)