✔️ Prerequisites¶
First of all we need to take care of a few prerequisites, most notably:
- Install the various pip modules that we will be using.
- Install some linux specific dependencies of our content loss.
- Initialize the Random Number Generator(s), so that our experiments can be replicated.
- Determine:
- The current working directory, as it's going to be used to reference various files such as the dataset, our model checkpoints e.t.c
- The available hardware backend. GPU utilization is preferable, as it results in higher complition time.
(Optionally)
Mount Google Drive, where we can load our dataset from.
Determining the Current Working Directory¶
In [ ]:
Copied!
from pathlib import Path
BASE_DIR = Path.cwd()
from pathlib import Path BASE_DIR = Path.cwd()
Mounting Google Drive¶
In [ ]:
Copied!
GDRIVE_DIR = BASE_DIR / "drive"
try:
from google.colab import drive
drive.mount(f"{GDRIVE_DIR}")
except ImportError:
pass
GDRIVE_DIR = BASE_DIR / "drive" try: from google.colab import drive drive.mount(f"{GDRIVE_DIR}") except ImportError: pass
In [ ]:
Copied!
SECRETS_DIR = GDRIVE_DIR / "MyDrive" / "Secrets"
if GDRIVE_DIR.is_dir():
THESIS_DIR = GDRIVE_DIR / "MyDrive" / "Thesis"
else:
THESIS_DIR = BASE_DIR
OUTPUT_DIR = THESIS_DIR / "Output"
if THESIS_DIR.is_dir():
DATASET_DIR = THESIS_DIR / "Datasets"
else:
DATASET_DIR = BASE_DIR / "Datasets"
SECRETS_DIR = GDRIVE_DIR / "MyDrive" / "Secrets" if GDRIVE_DIR.is_dir(): THESIS_DIR = GDRIVE_DIR / "MyDrive" / "Thesis" else: THESIS_DIR = BASE_DIR OUTPUT_DIR = THESIS_DIR / "Output" if THESIS_DIR.is_dir(): DATASET_DIR = THESIS_DIR / "Datasets" else: DATASET_DIR = BASE_DIR / "Datasets"
Configuring our Loggers¶
In [ ]:
Copied!
import os
LOGGING_LEVEL = os.environ.get("LOGGING_LEVEL", "CRITICAL").upper()
LOGGING_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {"standard": {"format": "[%(asctime)s] %(levelname)s:%(name)s: %(message)s"}},
"handlers": {
"default": {
"level": LOGGING_LEVEL,
"formatter": "standard",
"class": "logging.StreamHandler",
},
"file": {
"level": LOGGING_LEVEL,
"formatter": "standard",
"class": "logging.FileHandler",
},
},
"loggers": {"": {"handlers": ["default", "file"], "level": LOGGING_LEVEL}},
}
import os LOGGING_LEVEL = os.environ.get("LOGGING_LEVEL", "CRITICAL").upper() LOGGING_CONFIG = { "version": 1, "disable_existing_loggers": False, "formatters": {"standard": {"format": "[%(asctime)s] %(levelname)s:%(name)s: %(message)s"}}, "handlers": { "default": { "level": LOGGING_LEVEL, "formatter": "standard", "class": "logging.StreamHandler", }, "file": { "level": LOGGING_LEVEL, "formatter": "standard", "class": "logging.FileHandler", }, }, "loggers": {"": {"handlers": ["default", "file"], "level": LOGGING_LEVEL}}, }
Installing graphviz & libgraphviz-dev¶
The aforementioned packages are required by PyINSECT and more specifically its graph plotting methods.
In [ ]:
Copied!
!sudo apt-get install graphviz libgraphviz-dev 1> /dev/null
!sudo apt-get install graphviz libgraphviz-dev 1> /dev/null
Installing the required pip
modules¶
In [ ]:
Copied!
WHEEL_VERSION = "3.0.1"
WHEEL_FILE = "roughgan-%s-py3-none-any.whl" % (WHEEL_VERSION,)
WHEEL_PATH = THESIS_DIR / "Binaries" / WHEEL_FILE
WHEEL_VERSION = "3.0.1" WHEEL_FILE = "roughgan-%s-py3-none-any.whl" % (WHEEL_VERSION,) WHEEL_PATH = THESIS_DIR / "Binaries" / WHEEL_FILE
In [ ]:
Copied!
import os
import random
import os import random
In [ ]:
Copied!
import subprocess
import sys
import numpy as np
pip_freeze_output = subprocess.check_output([sys.executable, "-m", "pip", "freeze"]).decode()
if "roughgan" not in pip_freeze_output:
if WHEEL_PATH.is_file():
subprocess.check_call([sys.executable, "-m", "pip", "install", WHEEL_PATH])
else:
raise FileNotFoundError(WHEEL_PATH)
import subprocess import sys import numpy as np pip_freeze_output = subprocess.check_output([sys.executable, "-m", "pip", "freeze"]).decode() if "roughgan" not in pip_freeze_output: if WHEEL_PATH.is_file(): subprocess.check_call([sys.executable, "-m", "pip", "install", WHEEL_PATH]) else: raise FileNotFoundError(WHEEL_PATH)
Initializing (a.k.a Seeding
) the Random Number Generator(s)¶
In [ ]:
Copied!
import torch
SEED = 1234
if SEED is not None:
np.random.seed(SEED)
random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.backends.cudnn.deterministic = True
os.environ["PYTHONHASHSEED"] = str(SEED)
import torch SEED = 1234 if SEED is not None: np.random.seed(SEED) random.seed(SEED) torch.manual_seed(SEED) torch.cuda.manual_seed(SEED) torch.backends.cudnn.deterministic = True os.environ["PYTHONHASHSEED"] = str(SEED)
Determining available backend¶
By default, we are going to be utilizing the available CPU backend, if no GPU is available.
In [ ]:
Copied!
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
Setting up our callbacks¶
End-of-training callback¶
In [ ]:
Copied!
from datetime import datetime
from roughgan.shared.notifiers import EndOfTrainingNotifier
training_callback = None
if SECRETS_DIR.is_dir():
notifier = EndOfTrainingNotifier.from_json(SECRETS_DIR / "credentials.json")
timestamp = datetime.now().strftime("%Y/%m/%d %H:%M:%S.%f")
def training_callback(log_file=None, **context):
notifier(
("Vasilis Sioros", "billsioros97@gmail.com"),
log_file=log_file,
dataset=context["dataset"],
generator=context["generator"],
discriminator=context["discriminator"],
elapsed_time=context["elapsed_time"],
succeeded=context["succeeded"],
identifier=timestamp,
)
from datetime import datetime from roughgan.shared.notifiers import EndOfTrainingNotifier training_callback = None if SECRETS_DIR.is_dir(): notifier = EndOfTrainingNotifier.from_json(SECRETS_DIR / "credentials.json") timestamp = datetime.now().strftime("%Y/%m/%d %H:%M:%S.%f") def training_callback(log_file=None, **context): notifier( ("Vasilis Sioros", "billsioros97@gmail.com"), log_file=log_file, dataset=context["dataset"], generator=context["generator"], discriminator=context["discriminator"], elapsed_time=context["elapsed_time"], succeeded=context["succeeded"], identifier=timestamp, )
Logging initialization callback¶
In [ ]:
Copied!
def logging_callback(config, logging_dir):
level = config.handlers.file.level.lower()
config.handlers.file.filename = logging_dir / f"{level}.log"
return config
def logging_callback(config, logging_dir): level = config.handlers.file.level.lower() config.handlers.file.filename = logging_dir / f"{level}.log" return config
🙃 A naive-approach¶
Defining the Generator and the Discriminator instantiation callbacks¶
In [ ]:
Copied!
from roughgan.models import PerceptronGenerator
def get_generator():
return PerceptronGenerator.from_device(device)
from roughgan.models import PerceptronGenerator def get_generator(): return PerceptronGenerator.from_device(device)
In [ ]:
Copied!
from roughgan.models import PerceptronDiscriminator
def get_discriminator(generator):
return PerceptronDiscriminator.from_generator(generator)
from roughgan.models import PerceptronDiscriminator def get_discriminator(generator): return PerceptronDiscriminator.from_generator(generator)
Training¶
In [ ]:
Copied!
from torch.nn import BCELoss
criterion = BCELoss().to(device)
from torch.nn import BCELoss criterion = BCELoss().to(device)
In [ ]:
Copied!
import functools
from torch.optim import Adam
from roughgan.content.loss import NGramGraphContentLoss
from roughgan.data.loaders import load_multiple_datasets_from_pt
from roughgan.data.transforms import To, View
from roughgan.training.epoch import per_epoch
from roughgan.training.flow import TrainingFlow
training_flow = TrainingFlow(
output_dir=OUTPUT_DIR,
logging={"config": LOGGING_CONFIG, "callback": logging_callback},
training={
"manager": {
"benchmark": True,
# Uncomment if you want to enable checkpointing
# "checkpoint": {"multiple": True},
"train_epoch": per_epoch,
"log_every_n": 10,
"criterion": {"instance": criterion},
"n_epochs": 10,
"train_ratio": 0.8,
"optimizer": {
"type": Adam,
"params": {"lr": 0.1, "weight_decay": 0},
},
"dataloader": {
"batch_size": 256,
"shuffle": True,
"num_workers": 0,
},
},
"callbacks": [
training_callback,
],
},
content_loss={
"type": NGramGraphContentLoss,
# Uncomment if you want to enable checkpointing
# "cache": "n_gram_graph_content_loss.pkl",
},
data={
"loader": functools.partial(
load_multiple_datasets_from_pt,
DATASET_DIR,
transforms=[To(device), View(1, 128, 128)],
limit=(2, 10),
)
},
animation={
"indices": [
0,
],
"save_path": "perceptron_per_epoch_animation.mp4",
},
plot={
"grayscale": {"limit": 10, "save_path_fmt": "grayscale/%s_%02d.png"},
"surface": {"limit": 10, "save_path_fmt": "surface/%s_%02d.png"},
"against": {"save_path_fmt": "against_%s.png"},
},
suppress_exceptions=False,
)
import functools from torch.optim import Adam from roughgan.content.loss import NGramGraphContentLoss from roughgan.data.loaders import load_multiple_datasets_from_pt from roughgan.data.transforms import To, View from roughgan.training.epoch import per_epoch from roughgan.training.flow import TrainingFlow training_flow = TrainingFlow( output_dir=OUTPUT_DIR, logging={"config": LOGGING_CONFIG, "callback": logging_callback}, training={ "manager": { "benchmark": True, # Uncomment if you want to enable checkpointing # "checkpoint": {"multiple": True}, "train_epoch": per_epoch, "log_every_n": 10, "criterion": {"instance": criterion}, "n_epochs": 10, "train_ratio": 0.8, "optimizer": { "type": Adam, "params": {"lr": 0.1, "weight_decay": 0}, }, "dataloader": { "batch_size": 256, "shuffle": True, "num_workers": 0, }, }, "callbacks": [ training_callback, ], }, content_loss={ "type": NGramGraphContentLoss, # Uncomment if you want to enable checkpointing # "cache": "n_gram_graph_content_loss.pkl", }, data={ "loader": functools.partial( load_multiple_datasets_from_pt, DATASET_DIR, transforms=[To(device), View(1, 128, 128)], limit=(2, 10), ) }, animation={ "indices": [ 0, ], "save_path": "perceptron_per_epoch_animation.mp4", }, plot={ "grayscale": {"limit": 10, "save_path_fmt": "grayscale/%s_%02d.png"}, "surface": {"limit": 10, "save_path_fmt": "surface/%s_%02d.png"}, "against": {"save_path_fmt": "against_%s.png"}, }, suppress_exceptions=False, )
In [ ]:
Copied!
training_flow(get_generator, get_discriminator)
training_flow(get_generator, get_discriminator)
😎 A CNN based approach¶
Instantiating the Generator and the Discriminator Networks¶
In [ ]:
Copied!
from roughgan.models import CNNGenerator
def get_generator():
return CNNGenerator.from_device(device)
from roughgan.models import CNNGenerator def get_generator(): return CNNGenerator.from_device(device)
In [ ]:
Copied!
from roughgan.models import CNNDiscriminator
def get_discriminator(generator):
return CNNDiscriminator.from_generator(generator)
from roughgan.models import CNNDiscriminator def get_discriminator(generator): return CNNDiscriminator.from_generator(generator)
Training¶
In [ ]:
Copied!
from torch.nn import BCELoss
criterion = BCELoss().to(device)
from torch.nn import BCELoss criterion = BCELoss().to(device)
In [ ]:
Copied!
import functools
from torch.optim import Adam
from roughgan.data.transforms import To, View
from roughgan.training.epoch import per_epoch
training_flow = TrainingFlow(
output_dir=OUTPUT_DIR,
logging={"config": LOGGING_CONFIG, "callback": logging_callback},
training={
"manager": {
"benchmark": True,
# Uncomment if you want to enable checkpointing
# "checkpoint": {"multiple": True},
"train_epoch": per_epoch,
"log_every_n": 10,
"criterion": {"instance": criterion},
"n_epochs": 10,
"train_ratio": 0.8,
"optimizer": {
"type": Adam,
"params": {"lr": 0.0002, "betas": (0.5, 0.999)},
},
"dataloader": {
"batch_size": 256,
"shuffle": True,
"num_workers": 0,
},
},
"callbacks": [
training_callback,
],
},
content_loss={
"type": NGramGraphContentLoss,
# Uncomment if you want to enable checkpointing
# "cache": "n_gram_graph_content_loss.pkl",
},
data={
"loader": functools.partial(
load_multiple_datasets_from_pt,
DATASET_DIR,
transforms=[To(device), View(1, 128, 128)],
limit=(2, 10),
)
},
animation={
"indices": [
0,
],
"save_path": "cnn_per_epoch_animation.mp4",
},
plot={
"grayscale": {"limit": 10, "save_path_fmt": "grayscale/%s_%02d.png"},
"surface": {"limit": 10, "save_path_fmt": "surface/%s_%02d.png"},
"against": {"save_path_fmt": "against_%s.png"},
},
suppress_exceptions=False,
)
import functools from torch.optim import Adam from roughgan.data.transforms import To, View from roughgan.training.epoch import per_epoch training_flow = TrainingFlow( output_dir=OUTPUT_DIR, logging={"config": LOGGING_CONFIG, "callback": logging_callback}, training={ "manager": { "benchmark": True, # Uncomment if you want to enable checkpointing # "checkpoint": {"multiple": True}, "train_epoch": per_epoch, "log_every_n": 10, "criterion": {"instance": criterion}, "n_epochs": 10, "train_ratio": 0.8, "optimizer": { "type": Adam, "params": {"lr": 0.0002, "betas": (0.5, 0.999)}, }, "dataloader": { "batch_size": 256, "shuffle": True, "num_workers": 0, }, }, "callbacks": [ training_callback, ], }, content_loss={ "type": NGramGraphContentLoss, # Uncomment if you want to enable checkpointing # "cache": "n_gram_graph_content_loss.pkl", }, data={ "loader": functools.partial( load_multiple_datasets_from_pt, DATASET_DIR, transforms=[To(device), View(1, 128, 128)], limit=(2, 10), ) }, animation={ "indices": [ 0, ], "save_path": "cnn_per_epoch_animation.mp4", }, plot={ "grayscale": {"limit": 10, "save_path_fmt": "grayscale/%s_%02d.png"}, "surface": {"limit": 10, "save_path_fmt": "surface/%s_%02d.png"}, "against": {"save_path_fmt": "against_%s.png"}, }, suppress_exceptions=False, )
In [ ]:
Copied!
training_flow(get_generator, get_discriminator)
training_flow(get_generator, get_discriminator)
👋 Dismounting Google Drive and persisting any changes made¶
In [ ]:
Copied!
try:
from google.colab import drive
drive.flush_and_unmount()
except ImportError:
pass
try: from google.colab import drive drive.flush_and_unmount() except ImportError: pass