<center><h1>Metric Learning</h1></center>

<center><h2><a href="https://deepcourse-epita.netlify.app/">Course link</a></h2></center>

To keep your modifications in case you want to come back later to this colab, do *File -> Save a copy in Drive*.

If you find a mistake, or know how to improve this notebook, please open an issue [here](https://gitlab.com/ey_datakalab/course_epita).

In [None]:
!rm -rf *.tgz*
!wget http://vis-www.cs.umass.edu/lfw/lfw-deepfunneled.tgz
!tar zxf lfw-deepfunneled.tgz

In [None]:
%pylab inline

In [None]:
import glob
import os
import collections
import random
import copy

import numpy as np
import torch
from torch import nn
from torch.nn import functional as F
from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder
from torchvision import transforms
import torchvision
from PIL import Image

We are going to code a Siamese network to do face recognition. We'll use the LFW dataset, where every folder contain one or multiple images of a celebrity:

In [None]:
os.listdir("lfw-deepfunneled")[:10], os.listdir("lfw-deepfunneled")[-10:]

We first build two subsets, one for train and one for val. Because we are doing metric learning, we don't need to have the same classes between the two sets.

In [None]:
names = sorted(os.listdir("lfw-deepfunneled"))
name_to_classid = {d: i for i, d in enumerate(names)}
classid_to_name = {v: k for k, v in name_to_classid.items()}

def build_subset(names, start_index, end_index):
  classid_to_paths = collections.defaultdict(list)
  c = 0
  for name in names[start_index:end_index]:
    class_id = name_to_classid[name]
    for image_name in os.listdir(f"lfw-deepfunneled/{name}"):
      classid_to_paths[class_id].append(f"lfw-deepfunneled/{name}/{image_name}")
      c += 1

  print(f"Number of person: {len(classid_to_paths)}")
  print(f"Number of images: {c}")

  return classid_to_paths


print("Build train...")
train_set = build_subset(names, 0, 500)
print("Build test...")
val_set = build_subset(names, 500, 1000)

Notice that most celebrities have only 1 image! We definitely cannot use them to do positive pairs, but we can still keep them for negative pairs:

In [None]:
plt.subplot(1, 2, 1)
train_occur = {classid_to_name[class_id]: len(paths) for class_id, paths in train_set.items()}
plt.hist(train_occur.values())
plt.title("Distribution of train set")
plt.xlabel("Nb images / person")

plt.subplot(1, 2, 2)
val_occur = {classid_to_name[class_id]: len(paths) for class_id, paths in val_set.items()}
plt.hist(val_occur.values())
plt.title("Distribution of val set")
plt.xlabel("Nb images / person");

Here are the most represented celebrities of our train set:

In [None]:
for name, nb in sorted(train_occur.items(), key=lambda x: x[1], reverse=True)[:20]:
  print(f"{name}: {nb} images")

We now build pairs, we will balance our dataset to have as much negative as positive pairs.

Note that we could compute the pair on the fly at each batch, but it helps the model convergence to see multiple times the same pairs.

In [None]:
def build_pairs(classid_to_paths, pair_per_class=50):
  pairs = []
  classes = set(classid_to_paths.keys())

  for class_id in classid_to_paths:
    nb = len(classid_to_paths[class_id])
    if nb == 1:
      continue

    for _ in range(min(pair_per_class, nb)):
      # pos
      pairs.append((
          random.choice(classid_to_paths[class_id]),
          random.choice(classid_to_paths[class_id]),
          1.0
      ))

      # neg
      neg_classes = classes - {class_id}
      neg_class_id = random.choice(list(neg_classes))

      pairs.append((
          random.choice(classid_to_paths[class_id]),
          random.choice(classid_to_paths[neg_class_id]),
          0.0
      ))

  return pairs


class SiameseDataset(torch.utils.data.Dataset):
  def __init__(self, classid_to_paths, transform, pair_per_class=20):
    self.transform = transform
    self.pairs = build_pairs(classid_to_paths, pair_per_class=pair_per_class)

  def __len__(self):
    return len(self.pairs)

  def __getitem__(self, index):
    p1, p2, y = self.pairs[index]

    left_img = Image.open(p1).convert('RGB')
    right_img = Image.open(p2).convert('RGB')

    left_img = self.transform(left_img)
    right_img = self.transform(right_img)

    return left_img, right_img, y

Let's build the data augmentations and the loaders. 

You can change the train augmentations and see how performance change. But beware, too much augmentation is often detrimental to the model.

In [None]:
train_transform = transforms.Compose([
    transforms.Resize((100, 100)),
    transforms.CenterCrop((80, 80)),
    transforms.ColorJitter(brightness=0.2),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor()
])

val_transform = transforms.Compose([
    transforms.Resize((100, 100)),
    transforms.CenterCrop((80, 80)),
    transforms.ToTensor()
])


train_dataset = SiameseDataset(train_set, train_transform, pair_per_class=50)
val_dataset = SiameseDataset(val_set, val_transform, pair_per_class=50)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32)

len(train_loader)

Always, check that the loader output seems correct:

In [None]:
x1, x2, y = next(iter(train_loader))

x1.shape, x2.shape, y

In [None]:
plt.figure(figsize=(20, 6))

for i in range(5):
  ax = plt.subplot(2, 5, i + 1)
  ax.axis('off')
  img = (x1[i].permute(1, 2, 0).cpu().numpy() * 255).astype(np.uint8)
  plt.imshow(img)

for i in range(5):
  ax = plt.subplot(2, 5, i + 1 + 5)
  ax.axis('off')
  img = (x2[i].permute(1, 2, 0).cpu().numpy() * 255).astype(np.uint8)
  plt.imshow(img)
  plt.title("Same person" if y[i] == 1.0 else "Different person")

Now, build by yourself a backbone. You're free to do it as you want, and you can use the ConvBlock I provide for simplicity.


Ideally, your number of channels should range from 16 to 128, and you should interleave Max/Avg pooling between conv blocks. You can finish the network by a few fully connected layers, and finish with an embedding size of 50.

In [None]:
class ConvBlock(nn.Module):
  def __init__(self, in_channels, out_channels, kernel_size, padding):
    super().__init__()
    self.conv = nn.Sequential(
        nn.Conv2d(in_channels, out_channels, kernel_size, padding),
        nn.BatchNorm2d(out_channels),
        nn.ReLU(inplace=True)
    )

  def forward(self, x):
    return self.conv(x)
  

class Backbone(nn.Module):
  def __init__(self):
    super().__init__()

    # TODO

  def forward(self, x):
    # TODO
    return x


# Always check the output shape!
Backbone()(torch.randn(32, 3, 80, 80)).shape

In [None]:
# Execute this cell to see the solution, but try to do it by yourself before!
!wget https://deepcourse-epita.netlify.app/code/siamese/backbone.py
%pycat backbone.py

Now, we want a function that will do a pairwise cosine similarity.

Meaning given two tensors of shape (n, d), we compute over each row (1, d) the cosine similarity between the two parts.

Remember the cosine similarity formula is: 

$$\operatorname{cos}(x, y) = \frac{x \cdot y}{\Vert x \Vert_2 \Vert y \Vert_2}$$

The function `F.cosine_similarity` in PyTorch already does that, but try to recode it yourself! You can use it to validate your function.

In [None]:
def tensor_cosine(x, y):
  # Build a pair-wise cosine similarity without using F.cosine_similarity
  # Hint, you may want to normalize vectors with F.normalize

  # TODO
  pass
  

x = torch.randn(3, 5)
y = torch.randn(3, 5)

F.cosine_similarity(x, y), tensor_cosine(x, y)

In [None]:
# Execute this cell to see the solution, but try to do it by yourself before!
!wget https://deepcourse-epita.netlify.app/code/siamese/cos.py
%pycat cos.py

Now let's build our Siamese network! It combines the backbone and the tensor cosine.

In [None]:
class Siamese(nn.Module):
  def __init__(self):
    super().__init__()
    # TODO

  def forward(self, x1, x2):
    # TODO
    return cos


# Check output shape
Siamese()(torch.randn(5, 3, 80, 80), torch.randn(5, 3, 80, 80)).shape

In [None]:
# Execute this cell to see the solution, but try to do it by yourself before!
!wget https://deepcourse-epita.netlify.app/code/siamese/siamese.py
%pycat siamese.py

We need a **contrastive loss**. Here is the formula:

$$y (1 - \hat{y})^2 + (1 - y) \operatorname{max}^2(\hat{y} - m, 0)  $$

Notice that the ground-truth ($y = 1$ same person, $y = 0$ different person) acts as a gate between the two parts of the loss.

Now code it yourself!

In [None]:
def contrastive_loss(pred_simi, gt_simi, margin=0.20):
  # TODO
  return 0.

In [None]:
# Execute this cell to see the solution, but try to do it by yourself before!
!wget https://deepcourse-epita.netlify.app/code/siamese/cons.py
%pycat cons.py

In [None]:
def eval_model(net, loader):
  net.eval()
  acc, loss = 0., 0.
  c = 0
  for x1, x2, gt_simi in loader:
    with torch.no_grad():
      pred_simi = net(x1.cuda(), x2.cuda()).cpu()

    loss += contrastive_loss(pred_simi, gt_simi).item()
    acc += ((pred_simi > 0.5).float() == gt_simi).sum().item()
    c += len(x1)

  acc /= c
  loss /= len(loader)
  net.train()
  return round(100 * acc, 2), round(loss, 5)

Let's train. We define the accuracy according to an arbitratry threshold of 0.5.

Notice that the margin is set to 0 at first, you can try to change it to get better results. You may try to make it change gradually during training to induce some kind of *curriculum learning* also.

In [None]:
net = Siamese().cuda()
best_model, best_acc = None, 0.0

optimizer = torch.optim.Adam(net.parameters(), lr=0.0005)

val_acc, val_loss = eval_model(net, val_loader)
print(f"Random model --> Val loss: {val_loss}, val accuracy: {val_acc}")
epochs = 15
margin = 0.0

for epoch in range(epochs):
  losses = 0

  for x1, x2, y in train_loader:
    x1, x2, y = x1.cuda(), x2.cuda(), y.cuda()

    optimizer.zero_grad()

    pred_simi = net(x1, x2)
    loss = contrastive_loss(pred_simi, y, margin=margin)
    loss.backward()
    optimizer.step()

    losses += loss.item()

  print(f"Epoch {epoch}: Train loss: {round(losses / len(train_loader), 5)}")

  val_acc, val_loss = eval_model(net, val_loader)
  print(f"\tVal loss: {val_loss}, val accuracy: {val_acc}")

  if best_acc <= val_acc:
    print("\tCheckpointing!")
    best_acc = val_acc
    best_model = copy.deepcopy(net)
    

Now we want to visualize the recall of our model, let's compute all embeddings (here of the val set, but feel free to also try on the train set where results should be better).

In [None]:
names = []
embeddings = []
paths = []

for class_id, class_paths in val_set.items():
  for path in class_paths:
    paths.append(path)
    names.append(classid_to_name[class_id])

    img = Image.open(path).convert('RGB')
    img = val_transform(img)
    with torch.no_grad():
      emb = best_model.backbone(img[None].cuda())[0]

    embeddings.append(emb)

embeddings = torch.stack(embeddings)
embeddings.shape

In [None]:
for name, nb in sorted(val_occur.items(), key=lambda x: x[1], reverse=True)[:10]:
  print(f"{name} ({name_to_classid[name]}): {nb} images")

In [None]:
selected_name = "Bill_Clinton"

print(f"There are {names.count(selected_name)} images with {selected_name}'s face.")
idx = random.choice([i for i, name in enumerate(names) if name == selected_name])

similarities = tensor_cosine(embeddings[idx][None], embeddings)
closest_indexes = similarities.argsort()[-10:].flip(0)
closest_simis = similarities[closest_indexes]
print(closest_indexes)


plt.figure(figsize=(20, 6))

for i, (closest_index, closest_simi) in enumerate(zip(closest_indexes, closest_simis), start=1):
  ax = plt.subplot(2, 5, i)
  ax.axis('off')

  path, name = paths[closest_index], names[closest_index]

  img = Image.open(path).convert('RGB')
  plt.imshow(img)
  plt.title(f"{name}, simi: {round(closest_simi.item(), 2)}")



The most similar image (with similarity score of $1.0$) is obviously the image itself. 

Remark that our model isn't very good, and it happens that the correct person is not even in the top-k...

You can try to improve it by:
- using more data
- larger image size (slower!)
- bigger network
- tuning of the margin, optimizer, etc.
- more augmentation
- select hard negative
- use a triplet network

Notice that the train loss quickly overfit. Would that happen with hard negative mining?