{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Generative modelling in deep learning"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Generative modelling in machine learning can aim at achieving different goals.\n",
"\n",
"The first, obvious one is that a generative model can be used to generate more data, to be used afterwards by another algorithm. While a generative model cannot create more information to solve the issue of having too small datasets, it could be used to solve anonymity questions. Typically, sharing a generative model trained on private data could allow the exploitation of the statistical property of this data without sharing the data itself (which can be protected by privacy matters for example).\n",
"\n",
"Another goal is to use generative modelling to better understand the data at hand. This is based on the hypothesis that a model that successfully learned to generate (and generalize) a dataset should have internally learned some efficient and compressed representation of the information contained in the data. In this case, analysing a posteriori the learned representation may give us insights on the data itself.\n",
"\n",
"The notion of a generative model however needs to be more formally specified, in order to work with. What does it mean for the model to generate data that \"looks like\" the original dataset? A mathematical formulation of that is necessary, in order to define a training objective that can be used efficiently. Having some expert rate the quality of all generated datapoints one by one is definitely not an option.\n",
"\n",
"Thus, modelling our data and models as probability distributions comes to the rescue. If we consider our data as coming from some underlying probability distribution, that we will name $p_D$, our goal is thus to train our model to represent another probability distribution, which we will name $p_\\theta$, that should be some good approximation of $p_D$. Given that we only know $p_D$ through some set of realisations from it (the dataset), we can never hope to learn it exactly."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Q1: Can you name some metrics that can be used to compare two given distributions $p_D$ and $p_\\theta$?**"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"> (Write your answer here)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Most comparison methods can be separated into two kinds: those that compare the density of the distributions ($p_\\theta(x)$ vs $p_D(x)$), and those that compare the values sampled from them. These two kinds of approaches have different behavior and trade-offs."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Q2: Given we want to use them as an optimisation objective, what are the caveats to keep in mind about these two kinds?**"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"> (Write your answer here)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In this work, we will focus on two of the most widely used generative models based on deep neural networks: Generative Adversarial Networks (GANs) and Variational AutoEncoders (VAEs), in order to compare them and understand their strengths and weaknesses."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Generative Adversarial Networks"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"GANs structure is based on modelling the distribution $p_\\theta$ as a learned deterministic function applied to a standard noise. Sampling from it is thus done as follows: first, some noise is sampled from a standard N-dimensional Gaussian distribution: $\\epsilon \\sim \\mathcal{N}(0;I)$, and then the output is computed as a deterministic function $x = f_\\theta(\\epsilon)$. The function $f_\\theta$ is implemented as a neural network, $\\theta$ representing its learned parameters."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Q3: What is, a priori, the impact of the choice of N, the dimension of the input noise $\\epsilon$?**"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"> (Write your answer here)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"By construction, this generator structure only allows sampling the distribution $p_\\theta$, and does not allow the computation of the density $p_\\theta(x)$ (at least not without strong assumptions on $f_\\theta$). Such a model thus needs a comparison method based on samples to be trained.\n",
"\n",
"The smart idea of GANs is to instead use another neural network to model the objective. Another neural network is introduced: a classifier (that we call the discriminator) which is trained to distinguish examples from the dataset from examples generated by $p_\\theta$. The reasoning is as follows:\n",
"\n",
"The discriminator $D$ is trained using a classic classifier loss between the two classes defined as the samples generated by either $p_D$ or $p_\\theta$. This way $D(x)$ can be interpreted as the probability that $x$ came from the real dataset:\n",
"\n",
"$$ \\mathcal{L}_D = \\mathbb{E}_{p_D} \\left[ -\\log D(x) \\right] + \\mathbb{E}_{p_\\theta} \\left[ -\\log \\left(1-D(x)\\right) \\right] $$\n",
"\n",
"From that, it can be shown that for the generator fixed, the optimal discriminator is given by $D(x) = \\frac{p_D(x)}{p_\\theta(x) + p_D(x)}$, and when reached its loss takes a specific value:\n",
"\n",
"$$ \\mathcal{L}_D = 2 \\left( \\log 2 - JSD(p_\\theta \\| p_D) \\right) $$\n",
", where JSD is the JS divergence used to measure the similarity of two distributions. \n",
"$$JSD(p_\\theta \\| p_D) = \\frac{1}{2} \\mathbb{E}_{p_D} \\left[ log(\\frac{2p_D}{p_D+p_\\theta}) \\right] + \\frac{1}{2} \\mathbb{E}_{p_\\theta} \\left[ log(\\frac{2p_\\theta}{p_D+p_\\theta}) \\right]$$\n",
"So, training the generator network to *maximize* the same loss would, assuming the discriminator is always trained to optimality, minimize the Jensen-Shannon Divergence between $p_\\theta$ and $p_D$, and thus bring $p_\\theta$ closer to $p_D$."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Q4: Can you anticipate a caveat of using the JSD as a training objective for the generator?**"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"> (Write your answer here)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Having the generator trained to maximize $\\mathcal{L}_D$ is equivalent to setting its training loss to $ \\mathcal{L}_G = \\mathbb{E}_{p_\\theta} \\log(1-D(x)) $."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Q5: This loss only gives feedback to the generator on samples it generated, what problem may this cause?**"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"> (Write your answer here)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We will now work on implementing a GAN on a simple toy problem, to get a feeling of its behavior and test our theoretical insights. For this we will use the `pytorch` library.\n",
"\n",
"While a real problem would be generating images for example (each datapoint $x$ would then be a different image), this is a kind of task that easily requires intensive CPU/GPU power, and image datasets are difficult to visualize from a geometric point of view (even small images contains hundreds of pixels, and nobody can visualize points in a 100-dimensional space). So instead we will focus on points in the plane: each datapoint $x$ will actually be a couple of numbers $(x1, x2)$, and our target dataset will be a 2D two-moons shape with some noise."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import torch\n",
"import math\n",
"from sklearn import datasets\n",
"import numpy as np\n",
"\n",
"# Our dataset is mathematically defined, we can generate batches on the fly and enjoy\n",
"# an infinite-size dataset\n",
"def generate_batch(batchlen):\n",
" \"\"\"This function generates a batch of length 'batchlen' from the dataset\n",
" \"\"\"\n",
" data = datasets.make_moons(n_samples=batchlen, noise=0.05)[0].astype(np.float32)\n",
" return torch.from_numpy(data)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's plot a large batch, to see what the dataset looks like."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%matplotlib inline\n",
"import matplotlib.pyplot as plt\n",
"\n",
"batch = generate_batch(5000)\n",
"\n",
"plt.scatter(batch[:,0], batch[:,1], s=2.0)\n",
"plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We now need to define our two neural networks, the generator and the discriminator. The generator will take as input a value $z$ sampled from a Gaussian prior, and output a value $x$ (thus a couple $(x_1,x_2)$). The discriminator takes as input a value $x$, and is a binary classifier."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import torch.nn as nn\n",
"import torch.nn.functional as F\n",
"\n",
"# Choose a value for the prior dimension\n",
"PRIOR_N = 2\n",
"\n",
"# Define the generator\n",
"class Generator(nn.Module):\n",
" def __init__(self):\n",
" super().__init__()\n",
" self.fc1 = nn.Linear(PRIOR_N, 2)\n",
" \n",
" def __call__(self, z):\n",
" return self.fc1(z)\n",
" \n",
" def generate(self, batchlen):\n",
" z = torch.normal(torch.zeros(batchlen, PRIOR_N), 1.0)\n",
" return self.__call__(z)\n",
" \n",
"\n",
"# Define the discriminator\n",
"class Discriminator(nn.Module):\n",
" def __init__(self):\n",
" super().__init__()\n",
" self.fc1 = nn.Linear(2, 1)\n",
" \n",
" def __call__(self, x):\n",
" return self.fc1(x)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"With these classes in shape, now is only needed the training loop. To stick with the mathematical GAN framework, we should train the discriminator until convergence between each training step of the generator. This is not practical for two reasons: first it takes a lot of time, and second if the discriminator is too good, it will generate vanishing gradients to the generator (as seen in **Q4**).\n",
"\n",
"We will thus train the discriminator a fixed number of times between each training iteration of the generator."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Number of times to train the discriminator between two generator steps\n",
"TRAIN_RATIO = 1\n",
"# Total number of training iterations for the generator\n",
"N_ITER = 20001\n",
"# Batch size to use\n",
"BATCHLEN = 128\n",
"\n",
"generator = Generator()\n",
"optim_gen = torch.optim.Adam(generator.parameters(), lr=0.001, betas=(0.5,0.9))\n",
"discriminator = Discriminator()\n",
"optim_disc = torch.optim.Adam(discriminator.parameters(), lr=0.001, betas=(0.5,0.9))\n",
"\n",
"for i in range(N_ITER):\n",
" # train the discriminator\n",
" for _ in range(TRAIN_RATIO):\n",
" discriminator.zero_grad()\n",
" real_batch = generate_batch(BATCHLEN)\n",
" fake_batch = generator.generate(BATCHLEN)\n",
" #\n",
" # == COMPUTE THE DISCRIMINATOR LOSS HERE ==\n",
" #\n",
" disc_loss = 0\n",
" disc_loss.backward()\n",
" optim_disc.step()\n",
" # train the generator\n",
" generator.zero_grad()\n",
" fake_batch = generator.generate(BATCHLEN)\n",
" #\n",
" # == COMPUTE THE GENERATOR LOSS HERE\n",
" #\n",
" gen_loss = 0\n",
" gen_loss.backward()\n",
" optim_gen.step()\n",
" if i%1000 == 0:\n",
" print('step {}: discriminator: {:.3e}, generator: {:.3e}'.format(i, float(disc_loss), float(gen_loss)))\n",
" # plot the result\n",
" real_batch = generate_batch(1024)\n",
" fake_batch = generator.generate(1024).detach()\n",
" plt.scatter(real_batch[:,0], real_batch[:,1], s=2.0, label='real data')\n",
" plt.scatter(fake_batch[:,0], fake_batch[:,1], s=2.0, label='fake data')\n",
" plt.legend()\n",
" plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Complete the previous code and train your model.\n",
"\n",
"Depending on your choice of parameters, the training may not go well at all, with the generator completely collapsing quickly at the beginning of the training. It has been observed by the litterature that the generator's loss $\\mathcal{L}_G = \\mathbb{E}_{p_\\theta} \\log(1-D(x))$ is often to blame."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Q6: Why could we anticipate that this loss could cause this?**"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"> (Write your answer here)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This issue is solved by replacing the generator loss by an alternative loss: $\\mathcal{L}_G = \\mathbb{E}_{p_\\theta} [ -\\log D(x) ]$ to avoid gradient vanishing."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Q7: Inspect the impact of these different factors:**\n",
"\n",
"- depth / width of the generator network\n",
"- depth / width of the discriminator network\n",
"- impact of `TRAIN_RATIO`"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"> (Write your answer here)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"For further readings on GANs, you can see the following papers:\n",
"\n",
"- Generative Adversarial Networks *(Goodfellow et al.)*: [arXiv:1406.2661](https://arxiv.org/abs/1406.2661)\n",
"- Unsupervised Representation Learning with Deep Convolutional Generative Adversarial Networks *(Radford et al.)*: [arXiv:1511.06434](https://arxiv.org/abs/1511.06434)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Variational AutoEncoders"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Another well-known approach to generative modelling is embodied by Variational AutoEncoders (VAEs). While the generative model itself and the procedure to sample it is similar to GANs, the way it is trained is not.\n",
"\n",
"The main goal of VAEs is to optimize the likelihood of the real data according to the generative model. In other words, maximize $\\mathbb{E}_{p_D} \\left[\\log p_\\theta(x) \\right ]$, which is equivalent to minimizing $D_{KL}(p_D \\| p_\\theta)$.\n",
"$$D_{KL}(p_D \\| p_\\theta) = \\mathbb{E}_{p_D}\\left[ log(\\frac{p_D}{p_\\theta}) \\right]$$"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Q8: Prove this equivalence.**"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"> (Write your answer here)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"However, the classes of distributions for which $\\log p_\\theta(x)$ can be analytically computed and optimized is very restricted, and not suitable for real world problems. The main idea of the VAE is thus to introduce a latent variable $z$ and decompose the distribution as: $p_\\theta(x, z) = p_\\theta(x | z) p(z)$. Here $p(z)$ is some fixed prior and $p_\\theta(x | z)$ is a simple distribution whose parameters are the output of a neural network.\n",
"\n",
"For example, you could have $p(z)$ be a standard $\\mathcal{N}(0;1)$ and $p_\\theta(x | z)$ be defined as a gaussian $\\mathcal{N}(\\mu_\\theta(z); \\sigma_\\theta(z))$ where $\\mu_\\theta(z)$ and $\\sigma_\\theta(z)$ are created by the neural network you will train. In this case, the resulting distribution $p_\\theta(x) = \\int_z p_\\theta(x|z)p(z)dz$ is an infinite mixture of Gaussians, which is a much more expressive class of distributions.\n",
"\n",
"Now, this cannot stop here, as we are not able to analitically compute the density $p_\\theta(x)$. The second main idea of the VAE is to introduce another, auxilliary distribution: $q_\\phi(z | x)$, which will be modelled by a neural network similarly to $p_\\theta(x | z)$. Introducing it allows us to create a lower bound for $\\log p_\\theta(x)$:\n",
"\n",
"$$\\log p_\\theta(x) = \\mathbb{E}_{z \\sim q_\\phi} \\log p_\\theta(x) = \\mathbb{E}_{z \\sim q_\\phi} \\left[ \\log p_\\theta(x) \\frac {q_\\phi(z|x)}{q_\\phi(z|x)} \\right]$$\n",
"\n",
"Following Bayes theorem, $p_\\theta(x) p_\\theta(z|x) = p_\\theta(x, z) = p_\\theta(x|z) p(z)$, so we get:\n",
"\n",
"$$\\log p_\\theta(x) = \\mathbb{E}_{z \\sim q_\\phi} \\left[ \\log \\frac{p_\\theta(x|z) p(z)}{p_\\theta(z|x)} \\frac {q_\\phi(z|x)}{q_\\phi(z|x)} \\right]$$\n",
"\n",
"Re-organizing the terms:\n",
"\n",
"$$\\log p_\\theta(x) = \\mathbb{E}_{z \\sim q_\\phi} \\log \\frac{q_\\phi(z|x)}{p_\\theta(z|x)} + \\mathbb{E}_{z \\sim q_\\phi} \\log \\frac{p(z)}{q_\\phi(z|x)} + \\mathbb{E}_{z \\sim q_\\phi} \\log p_\\theta(x | z)$$\n",
"\n",
"This can be re-expressed like so:\n",
"\n",
"$$\\log p_\\theta(x) = D_{KL}(q_\\phi(z | x) \\| p_\\theta(z | x)) - D_{KL}(q_\\phi(z | x) \\| p(z)) + \\mathbb{E}_{z \\sim q_\\phi} \\log p_\\theta(x|z)$$\n",
"\n",
"The 3 terms of this equality can be interpreted like so:\n",
"\n",
"- the first term measures how much $q_\\phi(z | x)$ is similar to $p_\\theta(z | x)$, or in other words is a good inverse of $p_\\theta(x | z)$\n",
"- the second term measures how similar $q_\\phi(z|x)$ is from the latent prior $p(z)$\n",
"- the third term is linked to how likely $p_\\theta$ is to yield the given $x$ when $z$ is sampled from $q_\\phi(z | x)$ rather than $p(z)$\n",
"\n",
"It is interesting to note that the first term, being a KL-divergence is always positive. As such the combination of the last two terms forms a lower bound of $\\log p_\\theta(x)$ which *can* be computed and used as a training objective. This bound is called the *Evidence Lower-Bound (ELBO)*. Simply flipping its sign can make it into a loss that can be minimized by gradient descent:\n",
"\n",
"$$ \\mathcal{L}_{ELBO} = D_{KL}(q_\\phi(z | x) \\| p(z)) + \\mathbb{E}_{z \\sim q_\\phi} [ - \\log p_\\theta(x|z) ]$$\n",
"\n",
"From this formulation comes the parallel with auto-encoders that give the VAE its name: $q_\\phi(z | x)$ can be seen as a *probabilistic encoder* from the data $x$ to the latent space $z$, and $p_\\theta(x | z)$ can be seen as a *probabilistic decoder* from the latent space $z$ to the data $x$. In this case the second term of $\\mathcal{L}_{ELBO}$ is the loss measuring the reconstruction quality of the auto-encoder, and the first term can be seens as a regularization of the latent space."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"![VAE](vae-gaussian.png)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Q9: We can see that $p(z)$ is never sampled during the training process, how can that be a problem?**"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"> (Write your answer here)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"A typical choice to represent $q_\\phi(z | x)$ is to use a diagonal Gaussian distribution $\\mathcal{N}(\\mu_\\phi(x); Diag(\\sigma_\\phi^2(x)))$, which makes the KL-divergence term of $\\mathcal{L}_{ELBO}$ analytically computable."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Q10: Assuming $p(z)$ is a $\\mathcal{N}(0; Id)$ gaussian, what is the value of $D_{KL}(q_\\phi(z | x) \\| p(z))$?**"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"> (Write your answer here)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We will also model $p_\\theta(x | z)$ as a diagonal Gaussian $\\mathcal{N}(\\mu_\\theta(z); Diag(\\sigma_\\theta^2(z)))$.\n",
"\n",
"\n",
"**Note:** For the following, be careful about the difference between $\\mu_\\phi, \\sigma_\\phi$ which define the Gaussian distribution of the *encoder* $q_\\phi$ and $\\mu_\\theta, \\sigma_\\theta$ which define the Gaussian distribution of the *decoder* $p_\\theta$."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Q11: What is the expression of $-\\log p_\\theta(x | z)$ for given $x$ and $z$?**"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"> (Write your answer here)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We will build and train a VAE using the same dataset as previously, in order to compare its behavior to GANs. For numerical stability, we will interpret the output of the encoder and decoder networks as $(\\mu, \\log\\sigma^2)$, rather than $(\\mu, \\sigma)$."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Choose a value for the latent dimension\n",
"LATENT_N = 10\n",
"\n",
"# Define the generator\n",
"class Encoder(nn.Module):\n",
" def __init__(self):\n",
" super().__init__()\n",
" self.fc_mu = nn.Linear(2, LATENT_N)\n",
" self.fc_logvar = nn.Linear(2, LATENT_N)\n",
" \n",
" # encode a datapoint. This should return a couple of tensors (mu, logvar) representing\n",
" # the parameters of the Gaussian q_\\phi(z | x)\n",
" def __call__(self, x):\n",
" mu = self.fc_mu(x)\n",
" logvar = self.fc_logvar(x)\n",
" return (mu, logvar)\n",
" \n",
"\n",
"# Define the discriminator\n",
"class Decoder(nn.Module):\n",
" def __init__(self):\n",
" super().__init__()\n",
" self.fc_mu = nn.Linear(LATENT_N, 2)\n",
" self.fc_logvar = nn.Linear(LATENT_N, 2)\n",
" \n",
" # decode a datapoint. This should return a couple of tensors (mu, logvar) representing\n",
" # the parameters of the Gaussian p_\\theta(z | x)\n",
" def __call__(self, z):\n",
" mu = self.fc_mu(z)\n",
" logvar = self.fc_logvar(z)\n",
" return (mu, logvar)\n",
"\n",
" def generate(self, batchlen):\n",
" z = torch.normal(torch.zeros(batchlen, LATENT_N), 1.0)\n",
" (mu, logvar) = self.__call__(z)\n",
" return torch.normal(mu, torch.exp(0.5*logvar))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"From this, the parameters of both networks are trained conjointly using the same loss $\\mathcal{L}_{ELBO}$. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": false
},
"outputs": [],
"source": [
"# Total number of training iterations for the VAE\n",
"N_ITER = 40001\n",
"# Batch size to use\n",
"BATCHLEN = 128\n",
"\n",
"encoder = Encoder()\n",
"optim_enc = torch.optim.Adam(encoder.parameters(), lr=0.001, betas=(0.5,0.9))\n",
"decoder = Decoder()\n",
"optim_dec = torch.optim.Adam(decoder.parameters(), lr=0.001, betas=(0.5,0.9))\n",
"\n",
"for i in range(N_ITER):\n",
" encoder.zero_grad()\n",
" decoder.zero_grad()\n",
" \n",
" x = generate_batch(BATCHLEN)\n",
" \n",
" enc_mu, enc_logvar = encoder(x)\n",
" #\n",
" # COMPUTE THE KL PART OF THE LOSS HERE\n",
" #\n",
" loss_kl = 0\n",
" #\n",
" # SAMPLE z FROM q(z|x) HERE\n",
" #\n",
" z = 0\n",
" \n",
" dec_mu, dec_logvar = decoder(z)\n",
" #\n",
" # COMPUTE THE RECONSTRUCTION PART OF THE LOSS HERE\n",
" #\n",
" loss_rec = 0\n",
" \n",
" (loss_kl + loss_rec).backward()\n",
" optim_enc.step()\n",
" optim_dec.step()\n",
" if i%1000 == 0:\n",
" print('step {}: KL: {:.3e}, rec: {:.3e}'.format(i, float(loss_kl), float(loss_rec)))\n",
" # plot the result\n",
" real_batch = generate_batch(1024)\n",
" rec_batch = torch.normal(dec_mu, torch.exp(0.5*dec_logvar)).detach()\n",
" fake_batch = decoder.generate(1024).detach()\n",
" plt.scatter(real_batch[:,0], real_batch[:,1], s=2.0, label='real data')\n",
" plt.scatter(rec_batch[:,0], rec_batch[:,1], s=2.0, label='rec data')\n",
" plt.scatter(fake_batch[:,0], fake_batch[:,1], s=2.0, label='fake data')\n",
" plt.legend()\n",
" plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Q12: Try hardcoding $\\sigma_\\theta(z)$ to some small value (like 1E-4) rather than allowing the decoder to learn it. What does it change?**"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"> (Write your answer here)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Q13: How do the power of encoder and decoder affect the overall training of the VAE?**"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"> (Write your answer here)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Normalizing flows"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Another approach to generative modelling is given by Normalizing flows. The idea is to learn a mapping $f$ from the data distribution $p_D$ (defined over a space $\\mathcal{X}$) to a known distribution (typically, a normal distribution) from which we know how to sample. Two key points are to be noted:\n",
"1. The mapping needs to be bijective (i.e., the network needs to be invertible).\n",
"2. The exact likelihood of a data point should be easily computable.\n",
"\n",
"To tackle point 2, let us introduce some notations.\n",
"Let us denote by $p_\\mathcal{N} = \\mathcal{N}(0,1)$ the normal distribution from which we sample $z$. We obtain points $x$ by computing $x = f^{-1}(z)$, and we hope that the distribution of points obtained this way will match more or less the dataset distribution $p_D$ of real examples.\n",
"We denote by $p_G$ this obtained distribution, that is, the image of $p_\\mathcal{N}$ through $f^{-1}$: $p_G$ is a probability distribution defined on $\\mathcal{X}$, and our goal will be to optimize the mapping $f$ so that $p_G$ is close to $p_D$. If you are not familiar with images of distributions, the definition is here: https://en.wikipedia.org/wiki/Pushforward_measure ; it can be defined as the measure that satisfies this change of variables: $\\int_{z\\sim p_\\mathcal{N}} g(f^{-1}(z)) dz = \\int_{x\\sim p_{G}} g(x) dx$ for all functions $g$. Another notation for this is $\\int_{z} g(f^{-1}(z)) \\;dp_\\mathcal{N}(z) = \\int_{x} g(x) \\;dp_{G}(x)$. \n",
"We will build on the following identity:\n",
"$$\n",
"\\log(p_X(x))=\\log(p_Z(z))+\\log \\det J_f\n",
"$$\n",
"where $J_f = \\frac{df(x)}{dx}$ is the Jacobian matrix of the function $f$."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Q14: Justify the above equality**"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"> (Write your answer here)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let us consider, as an optimization criterion, the Kullback-Leibler divergence $KL(p_D||p_G)$.\n",
"\n",
"**Q15: Express this loss as a (very simple) function of the quantity above.**"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"> (Write your answer here)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"A challenging task is then to find a network, such that f is:\n",
"* invertible\n",
"* $\\det J_f$ is tractable\n",
"* $f$ is general enough\n",
"\n",
"A common architecture is the so called real-valued non-volume preserving (real-NVP), which is made of coupling layers.\n",
"\n",
"The idea of coupling layers is to cut the input $x$ in two (let say keeping only the $d < D$ first dimensions on one hand and the $D-d$ other dimensions on the other hand, with $D$ the dimension of the input space $\\mathcal{X}$). This gives inputs $x_{1:d}$ and $x_{d+1:D}$. Then the forward pass is defined as:\n",
"* $y_{1:d}=x_{1:d}$\n",
"* $y_{d+1:D}=x_{d+1:D}\\odot \\exp(s(x_{1:d})) + t(x_{1:d})$\n",
"\n",
"where $s,t:\\mathbb{R}^d\\to\\mathbb{R}^{D-d}$ and $\\odot$ is the coefficient-wise multiplication of two vectors of the same size.\n",
"\n",
"Those equations can be resumed in the following diagram:\n",
"![real NVP diagram](realNVPforward.png)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Q16: Show that the defined mapping is indeed invertible**"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"> (Write your answer here)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Q17: Show that the determinant of the Jacobian of such a mapping is $\\exp(\\sum_j s(x_{1:d})_j)$**"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"> (Write your answer here)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Q18: Fill the Coupling layer structure below by defining suitable functions sfun and tfun (using a final tanh in sfun is highly recommended) and the inverse pass**"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import torch\n",
"from torch import nn\n",
"from torch.nn import functional as F\n",
"\n",
"\n",
"class CouplingLayer(nn.Module):\n",
" def __init__(self, input_size, output_size, hidden_size, mask):\n",
" super().__init__()\n",
" ### define here variables to use in f and s ###\n",
" \n",
" ###\n",
" self.mask = mask #we use the mask variable to make the cut in input\n",
"\n",
" def sfun(self, x_m):\n",
" return x_m\n",
" \n",
" def tfun(self, x_m):\n",
" return x_m\n",
" \n",
" def forward(self, x):\n",
" x_m = x * self.mask\n",
" s_out = self.sfun(x_m) # multiplicative block\n",
" t_out = self.tfun(x_m) # additive block\n",
" y = x_m + (1-self.mask)*(x*torch.exp(s_out)+t_out)\n",
" log_det_jacobian = s_out.sum(dim=1) # CRITICAL\n",
" return y, log_det_jacobian\n",
"\n",
" def inverse(self, y):\n",
" return y"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"class RealNVP(nn.Module):\n",
" def __init__(self, input_size, output_size, hidden_size, mask, n_layers=6):\n",
" super().__init__()\n",
" assert n_layers >= 2, \"num of coupling layers should be greater or equal to 2\"\n",
"\n",
" self.modules = []\n",
" self.modules.append(CouplingLayer(input_size, output_size, hidden_size, mask))\n",
" for _ in range(n_layers-2):\n",
" mask = 1 - mask\n",
" self.modules.append(CouplingLayer(input_size, output_size, hidden_size, mask))\n",
" self.modules.append(CouplingLayer(input_size, output_size, hidden_size, 1 - mask))\n",
" self.module_list = nn.ModuleList(self.modules)\n",
"\n",
" def forward(self, x):\n",
" ldj_sum = 0 # sum of log determinant of jacobian\n",
" for module in self.module_list:\n",
" x, ldj = module(x)\n",
" ldj_sum += ldj\n",
" return x, ldj_sum\n",
"\n",
" def inverse(self, z):\n",
" for module in reversed(self.module_list):\n",
" z = module.inverse(z)\n",
" return z"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"\n",
"import matplotlib.pyplot as plt\n",
"import numpy as np\n",
"import torch\n",
"\n",
"\n",
"def train(epoch, model, optimizer, prior_z, train_loader, log_interval=50):\n",
" model.train()\n",
" train_loss = 0\n",
" for batch_idx, x_inputs in enumerate(train_loader):\n",
" optimizer.zero_grad()\n",
" z_outputs, log_det_j_sum = model.forward(x_inputs) # train with the forward\n",
" loss = -(prior_z.log_prob(z_outputs)+log_det_j_sum).mean() # CRITICAL\n",
" loss.backward()\n",
" cur_loss = loss.item()\n",
" train_loss += cur_loss\n",
" optimizer.step()\n",
" if batch_idx % log_interval == 0:\n",
" print(\"Train Epoch: {} [{}/{} ({:.0f}%)]\\tLoss: {:.6f}\".format(\n",
" epoch, batch_idx * len(x_inputs), len(train_loader.dataset),\n",
" 100. * batch_idx / len(train_loader), cur_loss / len(x_inputs)))\n",
"\n",
" average_train_loss = train_loss / len(train_loader.dataset)\n",
" print(f\"====> Epoch: {epoch} Average train loss: {average_train_loss:.4f}\")\n",
"\n",
"\n",
"def test(epoch, model, prior_z, test_loader):\n",
" model.eval()\n",
" test_loss = 0\n",
" x_all = np.array([[]]).reshape(0, 2)\n",
" z_all = np.array([[]]).reshape(0, 2)\n",
" with torch.no_grad():\n",
" for batch_idx, x_inputs in enumerate(test_loader):\n",
" z_outputs, log_det_j_sum = model.forward(x_inputs)\n",
" cur_loss = -(prior_z.log_prob(z_outputs)+log_det_j_sum).mean().item()\n",
" test_loss += cur_loss\n",
" x_all = np.concatenate((x_all, x_inputs.numpy()))\n",
" z_all = np.concatenate((z_all, z_outputs.numpy()))\n",
"\n",
" subfig_plot(1, x_all, -2, 3, -1, 1.5, \"Input: x ~ p(x)\", \"b\")\n",
" subfig_plot(2, z_all, -3, 3, -3, 3, \"Output: z = f(x)\", \"b\")\n",
"\n",
" test_loss /= len(test_loader.dataset)\n",
" print(f\"====> Test loss: {test_loss:.4f}\")\n",
"\n",
"\n",
"def sample(epoch, model, prior_z, save_plt_interval=5):\n",
" model.eval()\n",
" with torch.no_grad():\n",
" z_inputs = prior_z.sample((1000,))\n",
" x_outputs = model.inverse(z_inputs) # generation with the inverse\n",
" z_inputs = z_inputs.numpy()\n",
" x_outputs = x_outputs.numpy()\n",
"\n",
" subfig_plot(3, z_inputs, -3, 3, -3, 3, \"Input: z ~ p(z)\", \"r\")\n",
" subfig_plot(4, x_outputs, -2, 3, -1, 1.5, \"Output: x = g(z) (g: inverse of f)\", \"r\")\n",
"\n",
" if epoch % save_plt_interval == 0:\n",
" if not os.path.exists(\"results\"):\n",
" os.makedirs(\"results\")\n",
" plt.savefig(\"results/\"+\"result_\"+str(epoch)+\".png\")\n",
"\n",
"\n",
"def subfig_plot(location, data, x_start, x_end, y_start, y_end, title, color):\n",
" if location == 1:\n",
" plt.clf()\n",
" plt.subplot(2, 2, location)\n",
" plt.scatter(data[:, 0], data[:, 1], c=color, s=1)\n",
" plt.xlim(x_start, x_end)\n",
" plt.ylim(y_start, y_end)\n",
" plt.title(title)\n",
" plt.pause(1e-2)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# globals\n",
"BATCH_SIZE = 128\n",
"LOG_INTERVAL = 50\n",
"N_EPOCHS = 10\n",
"INPUT_SIZE = 2\n",
"OUTPUT_SIZE = 2\n",
"HIDDEN_SIZE = 256\n",
"SAVE_PLT_INTERVAL = 5\n",
"N_COUPLING_LAYERS = 8"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# data loading\n",
"train_data = datasets.make_moons(n_samples=50000, noise=.05)[0].astype(np.float32)\n",
"test_data = datasets.make_moons(n_samples=1000, noise=.05)[0].astype(np.float32)\n",
"device = torch.device(\"cuda\" if torch.cuda.is_available() else \"cpu\")\n",
"loader_kwargs = {\"num_workers\": 1, \"pin_memory\": True} if device == \"cuda\" else {}\n",
"train_loader = torch.utils.data.DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True, **loader_kwargs)\n",
"test_loader = torch.utils.data.DataLoader(test_data, batch_size=BATCH_SIZE, shuffle=True, **loader_kwargs)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from torch import distributions # optim, \n",
"# model construction\n",
"mask = torch.from_numpy(np.array([0, 1]).astype(np.float32))\n",
"model = RealNVP(INPUT_SIZE, OUTPUT_SIZE, HIDDEN_SIZE, mask, N_COUPLING_LAYERS)\n",
"optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)\n",
"prior_z = distributions.MultivariateNormal(torch.zeros(2), torch.eye(2))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from timeit import default_timer as timer\n",
"# run\n",
"start = timer()\n",
"for epoch in range(1, N_EPOCHS + 1):\n",
" train(epoch, model, optimizer, prior_z, train_loader)\n",
" test(epoch, model, prior_z, test_loader)\n",
" sample(epoch, model, prior_z)\n",
"f\"{timer() - start:.02f}s\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Q19: Explain why Normalizing flows do not fit well when there are many clusters in data**"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"> (Write your answer here)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Q20: As a conclusion, how would you compare the advantages and shortcomings of GANs, VAEs and Normalizing flows?**"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"> (Write your answer here)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.1"
}
},
"nbformat": 4,
"nbformat_minor": 2
}