Calibrating hyperparameters with weights and biases

gene46100
project
notebook
Author

Sofia Salazar

Published

April 10, 2026

Calibrating hyperparameters with weights and biases

Overview

  • Goal: Learn how to use weights and biases to calibrate the hyperparameters of a DL model.

Weights and biases is a platform used for AI developers to track, visualize and manage their ML models and experiments. The coolest part is that W&B allows you to log various performance metrics during training, like training and validation loss, test set correlations, etc. Additionally, it allows you to compare between different experiments or versions of your models. Making it easier to identify the best performing models and see which hyperparameter configuration is the optimal.

In this notebook, we will focus on using W&B as a tool to help callibrate the hyperparameters of the TF binding prediction model to find an optimal solution. However, we encourage you to explore other applications that W&B offers.

1. Installing W&B

First install w&b in your environment with the following command, it should take only a couple of seconds

%pip install wandb onnx -Uq
%pip install nbformat

Load all libraries

import torch
from torch import nn
import numpy as np
import matplotlib.pyplot as plt
import random
import os
import pandas as pd
import wandb
from scipy.stats import pearsonr

2. Login to w&b

If you don’t already have a w&b account, sign up here. Then, run the following command which will prompt you to insert your API key

wandb.login()

3. Define your regular functions

Then, define your functions, this will remain unchanged

def get_device():
  """
  Determines the device to use for PyTorch computations.

  Prioritizes Metal Performance Shaders (MPS), then CUDA, then CPU.

  Returns:
    torch.device: The selected device.
  """
  if torch.backends.mps.is_available():
    device = torch.device("mps")
    print("Using MPS device.")
  elif torch.cuda.is_available():
    device = torch.device("cuda")
    print("Using CUDA device.")
  else:
    device = torch.device("cpu")
    print("Using CPU device.")
  return device

# Example usage:
device = get_device()
def one_hot_encode(seq):
    """
    Given a DNA sequence, return its one-hot encoding
    """
    # Make sure seq has only allowed bases
    allowed = set("ACTGN")
    if not set(seq).issubset(allowed):
        invalid = set(seq) - allowed
        print(seq)
        raise ValueError(f"Sequence contains chars not in allowed DNA alphabet (ACGTN): {invalid}")

    # Dictionary returning one-hot encoding for each nucleotide
    nuc_d = {'A':[1.0,0.0,0.0,0.0],
             'C':[0.0,1.0,0.0,0.0],
             'G':[0.0,0.0,1.0,0.0],
             'T':[0.0,0.0,0.0,1.0],
             'N':[0.0,0.0,0.0,0.0]}

    # Create array from nucleotide sequence
    vec=np.array([nuc_d[x] for x in seq],dtype='float32')

    return vec
def quick_split(df, split_frac=0.8, verbose=False):
    '''
    Given a df of samples, randomly split indices between
    train and test at the desired fraction
    '''
    cols = df.columns # original columns, use to clean up reindexed cols
    df = df.reset_index()

    # shuffle indices
    idxs = list(range(df.shape[0]))
    random.shuffle(idxs)

    # split shuffled index list by split_frac
    split = int(len(idxs)*split_frac)
    train_idxs = idxs[:split]
    test_idxs = idxs[split:]

    # split dfs and return
    train_df = df[df.index.isin(train_idxs)]
    test_df = df[df.index.isin(test_idxs)]

    return train_df[cols], test_df[cols]
def split_sequences(sequences_df):
    full_train_sequences, test_sequences = quick_split(sequences_df)
    train_sequences, val_sequences = quick_split(full_train_sequences)
    print("Train:", train_sequences.shape)
    print("Val:", val_sequences.shape)
    print("Test:", test_sequences.shape)
    return train_sequences, val_sequences, test_sequences
def get_data_tensors(scores_df, sequences_df):
    # split sequences in train, validation and test sets
    train_sequences, val_sequences, test_sequences = split_sequences(sequences_df)
    # get scores for each set of sequences
    train_scores = scores_df[train_sequences['window_name'].to_list()].transpose().values.astype('float32') # shape is (num_sequences, 300)
    val_scores = scores_df[val_sequences['window_name'].to_list()].transpose().values.astype('float32')
    test_scores = scores_df[test_sequences['window_name'].to_list()].transpose().values.astype('float32')

    train_scores = torch.tensor(train_scores, dtype=torch.float32).to(device)
    val_scores = torch.tensor(val_scores, dtype=torch.float32).to(device)
    test_scores = torch.tensor(test_scores, dtype=torch.float32).to(device)

    # get one hot encoded sequences for each set
    train_one_hot = [one_hot_encode(seq) for seq in train_sequences['sequence'].to_list()]
    train_sequences_tensor = torch.tensor(np.stack(train_one_hot))

    val_one_hot = [one_hot_encode(seq) for seq in val_sequences['sequence'].to_list()]
    val_sequences_tensor = torch.tensor(np.stack(val_one_hot))

    test_one_hot = [one_hot_encode(seq) for seq in test_sequences['sequence'].to_list()]
    test_sequences_tensor = torch.tensor(np.stack(test_one_hot))

    return train_scores, train_sequences_tensor, val_scores, val_sequences_tensor, test_scores, test_sequences_tensor
def create_dataloader(predictors, targets, batch_size, is_train = True):
    '''
    features: one hot encoded sequences
    targets: sequence scores
    batch_size
    is_train: if True, data is reshuffled at every epoch
    '''

    dataset = torch.utils.data.TensorDataset(predictors, targets)
    return torch.utils.data.DataLoader(dataset, batch_size, shuffle = is_train)
class DNA_CNN(nn.Module):
    def __init__(self,
                 seq_len,
                 num_filters=16,
                 kernel_size=10,
                 add_sigmoid=False):
        super().__init__()
        self.seq_len = seq_len
        self.add_sigmoid = add_sigmoid
        # Define layers individually
        self.conv = nn.Conv1d(in_channels = 4, out_channels = num_filters, kernel_size=kernel_size)
        self.relu = nn.ReLU(inplace=True)
        self.linear = nn.Linear(num_filters*(seq_len-kernel_size+1), 300)
        self.sigmoid = nn.Sigmoid()

    def forward(self, xb):
        # reshape view to batch_size x 4channel x seq_len
        # permute to put channel in correct order
        xb = xb.permute(0,2,1) # (batch_size, 300, 4) to (batch_size, 4, 300)

        # Apply layers step by step
        x = self.conv(xb)
        x = self.relu(x)
        x = x.flatten(1)  # flatten all dimensions except batch
        out = self.linear(x)
        
        if self.add_sigmoid:
            out = self.sigmoid(out)
        return out
def process_batch(model, loss_func, x_batch, y_batch, opt=None):
    xb_out = model(x_batch.to(torch.float32))

    loss = loss_func(xb_out, y_batch)

    if opt is not None: # backpropagate if train step (optimizer given)
        loss.backward()
        opt.step()
        opt.zero_grad()

    return loss.item(), len(x_batch)
def train_epoch(model, train_dl, loss_func, device, opt):

    model.train()
    tl = [] # train losses
    ns = [] # batch sizes, n

    # loop through batches
    for x_batch, y_batch in train_dl:

        x_batch, y_batch = x_batch.to(device),y_batch.to(device)

        t, n = process_batch(model, loss_func, x_batch, y_batch, opt=opt)

        # collect train loss and batch sizes
        tl.append(t)
        ns.append(n)

    # average the losses over all batches
    train_loss = np.sum(np.multiply(tl, ns)) / np.sum(ns)

    return train_loss
def val_epoch(model, val_dl, loss_func, device):

    # Set model to Evaluation mode
    model.eval()
    with torch.no_grad():
        vl = [] # val losses
        ns = [] # batch sizes, n

        # loop through validation DataLoader
        for x_batch, y_batch in val_dl:

            x_batch, y_batch = x_batch.to(device),y_batch.to(device)

            v, n = process_batch(model, loss_func, x_batch, y_batch)

            # collect val loss and batch sizes
            vl.append(v)
            ns.append(n)

    # average the losses over all batches
    val_loss = np.sum(np.multiply(vl, ns)) / np.sum(ns)

    return val_loss

4. Modify the train_loop() function

The only function that we need to change is the train_loop function, because here is where we are recovering the parameters that we want to track with w&b. We will use wandb.log and create a dictionary with the parmeters that we want to track.

def train_loop(epochs, model, loss_func, opt, train_dl, val_dl, device):

    # keep track of losses
    train_losses = []
    val_losses = []

    # loop through epochs
    for epoch in range(epochs):
        # take a training step
        train_loss = train_epoch(model,train_dl,loss_func,device,opt)
        train_losses.append(train_loss)

        # take a validation step
        val_loss = val_epoch(model,val_dl,loss_func,device)
        val_losses.append(val_loss)

        print(f"Epoch {epoch + 1} | train loss: {train_loss:.3f} | val loss: {val_loss:.3f}")
        wandb.log({"epoch": epoch + 1,
                   "train_loss": train_loss,
                   "val_loss": val_loss})

    return train_losses, val_losses

Defining the train_model function. We omit the plot_curves function since all performance metrics will be tracked on w&b.

def train_model(train_dl,val_dl,model,device, lr=0.01, epochs=50, lossf=None,opt=None):

    # define optimizer
    if opt:
        optimizer = opt(model.parameters(), lr=lr)
    else: # if no opt provided, just use SGD
        optimizer = torch.optim.SGD(model.parameters(), lr=lr)

    # define loss function
    if lossf:
        loss_func = lossf
    else: # if no loss function provided, just use MSE
        loss_func = torch.nn.MSELoss()

    # run the training loop
    train_losses, val_losses = train_loop(
                                epochs,
                                model,
                                loss_func,
                                optimizer,
                                train_dl,
                                val_dl,
                                device)

5. Also track the correlation metrics of the test set

As a way to evaluate each model, let’s modify the test_model() function so that wand also keeps track of the performance metrics. In this case the metrics are pearson_per_sample, test_pearson_r and best_test.

def test_model(model, test_features, test_targets):
  model.eval()
  predictions = model(test_features.to(torch.float32).to(device)).detach().cpu().numpy()
  observations = test_targets.cpu().numpy()
  pearson_per_sample = np.array([pearsonr(predictions[i], observations[i])[0] for i in range(300)])
  test_pearsonr = pearson_per_sample.mean()
  best_test = pearson_per_sample.max()
  wandb.log({'test_avg_pearsonr': test_pearsonr,
             'beast_pearsonr': best_test})

6. Define the sweep configuration

A sweep is the training and testing of a single model with a given configuration of hyperparameters. with wandb.sweep we define the set of hyperparameters to test, which will be then combined in different configurations each sweep.

sweep_config = {
    'method': 'random',
    'metric': {'name': 'test_avg_pearsonr', 'goal': 'maximize'},
    'parameters': {
        'num_filters': {'values': [4, 16]},
        'kernel_size': {'values': [5, 10]},
        'add_sigmoid': {'values': [True, False],},
        'learning_rate':{'values':[0.1, 0.05]},
        'batch_size': {'values':[16, 32, 64]},
        'optimizer': {'values': ['SGD','Adam']}
    }
}

Create a project ID for your model, all your tests will be saved in this project

sweep_id = wandb.sweep(sweep_config, project="DNA_model")

7. Get the training, val and test sets ready for training

DIR = '/Users/sofiasalazar/Library/CloudStorage/Box-Box/imlab-data/Courses/AI-in-Genomics-2025/data/'
sequences = pd.read_csv(os.path.join(DIR, 'chr22_sequences.txt.gz'), sep="\t", compression='gzip')
scores = pd.read_csv(os.path.join(DIR, 'chr22_scores.txt.gz'), sep="\t", compression='gzip',dtype='float32')
train_scores, train_sequences_tensor, val_scores, val_sequences_tensor, test_scores, test_sequences_tensor = get_data_tensors(scores, sequences)

8. Initialize the sweep

With wandb.init we initialize one sweep and what we want to do in each. This consists on

  1. Loading a configuration of hyperparameters with wandb.config

  2. Loading the model

  3. Telling wandb to track the training with wandb.watch

  4. Create the dataloaders

  5. Train and test the model

def train_sweep():
    with wandb.init(project = "DNA_model"):
        config = wandb.config
        model = DNA_CNN(seq_len=300, num_filters=config.num_filters, kernel_size=config.kernel_size, add_sigmoid=config.add_sigmoid).to(device)
        wandb.watch(model, log="all", log_freq=10) # log all: logs all gradients and parameters, every log_freq number of training steps (batches) 
        train_loader = create_dataloader(train_sequences_tensor, train_scores, batch_size=config.batch_size)
        val_loader = create_dataloader(val_sequences_tensor, val_scores, batch_size=config.batch_size, is_train=False)
        if config.optimizer == 'SGD':
            opt = torch.optim.SGD
        else: opt = torch.optim.Adam 

        train_model(train_loader, val_loader, model, device, epochs=30, lr = config.learning_rate, opt=opt)
        test_model(model, test_sequences_tensor, test_scores)

Finally, we train with wandb.agent, the argument count is the number of combinations of hyperparameters I want to try. The maximum in my case is 240 combinations

# wandb.agent(sweep_id, train_sweep, count=240)
wandb.agent(sweep_id, train_sweep, count=6)

© HakyImLab and Listed Authors - CC BY 4.0 License