Source code for aitoolbox.torchtrain.callbacks.basic

import os
import numpy as np

from aitoolbox.torchtrain.callbacks.abstract import AbstractCallback, AbstractExperimentCallback
from aitoolbox.utils import util
from aitoolbox.cloud.AWS.simple_email_service import SESSender
from aitoolbox.cloud.AWS.results_save import BaseResultsSaver
from aitoolbox.cloud.GoogleCloud.results_save import BaseResultsGoogleStorageSaver
from aitoolbox.cloud import s3_available_options, gcs_available_options


[docs]class ListRegisteredCallbacks(AbstractCallback): def __init__(self): """List all the callbacks which are used in the current TrainLoop """ AbstractCallback.__init__(self, 'Print the list of registered callbacks')
[docs] def on_train_begin(self): self.train_loop_obj.callbacks_handler.print_registered_callback_names()
[docs]class EarlyStopping(AbstractCallback): def __init__(self, monitor='val_loss', min_delta=0., patience=0): """Early stopping of the training if the performance stops improving Args: monitor (str): performance measure that is tracked to decide if performance is improving during training min_delta (float): by how much the performance has to improve to still keep training the model patience (int): how many epochs the early stopper waits after the performance stopped improving """ # execution_order=99 makes sure that any performance calculation callbacks are executed before and the most # recent results can already be found in the train_history AbstractCallback.__init__(self, 'EarlyStopping', execution_order=99) self.monitor = monitor self.min_delta = min_delta self.patience = patience self.patience_count = self.patience self.best_performance = None self.best_epoch = 0
[docs] def on_epoch_end(self): history_data = self.train_loop_obj.train_history[self.monitor] current_performance = history_data[-1] if self.best_performance is None: self.best_performance = current_performance self.best_epoch = self.train_loop_obj.epoch else: if 'loss' in self.monitor.lower() or 'error' in self.monitor.lower(): if current_performance < self.best_performance - self.min_delta: self.best_performance = current_performance self.best_epoch = self.train_loop_obj.epoch self.patience_count = self.patience else: self.patience_count -= 1 else: if current_performance > self.best_performance + self.min_delta: self.best_performance = current_performance self.best_epoch = self.train_loop_obj.epoch self.patience_count = self.patience else: self.patience_count -= 1 if self.patience_count < 0: self.train_loop_obj.early_stop = True print(f'Early stopping at epoch: {self.train_loop_obj.epoch}. Best recorded epoch: {self.best_epoch}.')
[docs]class ThresholdEarlyStopping(AbstractCallback): def __init__(self, monitor, threshold, patience=0): """Early stopping of the training if the performance doesn't reach the specified threshold Args: monitor (str): performance measure that is tracked to decide if performance reached the desired threshold threshold (float): performance threshold that needs to be exceeded in order to continue training patience (int): how many epochs the early stopper waits for the tracked performance to reach the desired threshold """ AbstractCallback.__init__(self, 'Threshold early stopping', execution_order=99) self.monitor = monitor self.threshold = threshold self.patience = patience self.patience_count = self.patience
[docs] def on_epoch_end(self): history_data = self.train_loop_obj.train_history[self.monitor] current_performance = history_data[-1] if 'loss' in self.monitor.lower() or 'error' in self.monitor.lower(): if current_performance > self.threshold: self.patience_count -= 1 else: self.patience_count = self.patience else: if current_performance < self.threshold: self.patience_count -= 1 else: self.patience_count = self.patience if self.patience_count < 0: self.train_loop_obj.early_stop = True print('Threshold performance not reached. ' f'Early stopping at epoch: {self.train_loop_obj.epoch}.')
[docs]class TerminateOnNaN(AbstractCallback): def __init__(self, monitor='loss'): """Terminate training if NaNs are predicted, thus metrics are NaN Args: monitor (str): performance measure that is tracked to decide if performance is improving during training """ AbstractCallback.__init__(self, 'TerminateOnNaN', execution_order=98) self.monitor = monitor
[docs] def on_epoch_end(self): last_measure = self.train_loop_obj.train_history[self.monitor][-1] if last_measure is not None: if np.isnan(last_measure) or np.isinf(last_measure): self.train_loop_obj.early_stop = True print(f'Terminating on {self.monitor} = {last_measure} at epoch: {self.train_loop_obj.epoch}.')
[docs]class AllPredictionsSame(AbstractCallback): def __init__(self, value=0., stop_training=False, verbose=True): """Checks if all the predicted values are the same Useful for example when dealing with extremely unbalanced classes. Args: value (float): all predictions are the same as this value stop_training (bool): if all predictions match the specified value, should the training be (early) stopped verbose (bool): output messages """ AbstractCallback.__init__(self, 'All predictions have the same value') self.value = value self.stop_training = stop_training self.verbose = verbose
[docs] def on_epoch_end(self): predictions, _, _ = self.train_loop_obj.predict_on_validation_set() all_values_same = all(el == self.value for el in predictions) if all_values_same: if self.verbose: print(f'All the predicted values are of the same value: {self.value}') if self.stop_training: print('Executing early stopping') self.train_loop_obj.early_stop = True
[docs]class EmailNotification(AbstractCallback): def __init__(self, sender_name, sender_email, recipient_email, project_name=None, experiment_name=None, aws_region='eu-west-1'): """Notify user via email about the training progression Args: sender_name (str): Name of the email sender sender_email (str): Email of the email sender recipient_email (str): Email where the email will be sent project_name (str or None): root name of the project experiment_name (str or None): name of the particular experiment aws_region (str): AWS SES region """ AbstractCallback.__init__(self, 'Send email to notify about the state of training', execution_order=98, device_idx_execution=0) self.project_name = project_name self.experiment_name = experiment_name self.ses_sender = None self.sender_name = sender_name self.sender_email = sender_email self.recipient_email = recipient_email self.aws_region = aws_region
[docs] def on_epoch_end(self): subject = f"End of epoch {self.train_loop_obj.epoch} report: {self.project_name}: {self.experiment_name}" performance_list = self.get_metric_list_html() plot_msgs = self.message_service.read_messages('ModelTrainHistoryPlot_results_file_local_paths') plots_file_paths = plot_msgs if plot_msgs is not None else [] file_writer_msgs = self.message_service.read_messages('ModelTrainHistoryFileWriter_results_file_local_paths') plots_file_paths += file_writer_msgs if file_writer_msgs is not None else [] plots_file_paths = util.flatten_list_of_lists(plots_file_paths) body_text = f"""<h2>End of epoch {self.train_loop_obj.epoch}</h2> {performance_list} """ self.ses_sender.send_email(subject, body_text, plots_file_paths)
[docs] def on_train_end(self): subject = f"End of training: {self.project_name}: {self.experiment_name}" performance_list = self.get_metric_list_html() hyperparams = self.get_hyperparams_html() plot_msgs = self.message_service.read_messages('ModelTrainHistoryPlot_results_file_local_paths') plots_file_paths = plot_msgs if plot_msgs is not None else [] file_writer_msgs = self.message_service.read_messages('ModelTrainHistoryFileWriter_results_file_local_paths') plots_file_paths += file_writer_msgs if file_writer_msgs is not None else [] plots_file_paths = util.flatten_list_of_lists(plots_file_paths) body_text = f"""<h2>End of training at epoch {self.train_loop_obj.epoch}</h2> {performance_list} <h3>Used hyper parameters:</h3> {hyperparams} """ self.ses_sender.send_email(subject, body_text, plots_file_paths)
[docs] def get_metric_list_html(self): """Generate performance metrics list HTML Returns: str: HTML doc """ performance_list = '<ul>' + \ '\n'.join([f'<li><p>{metric_name}: {hist[-1]}</p></li>' for metric_name, hist in self.train_loop_obj.train_history.items() if len(hist) > 0]) + \ '</ul>' return performance_list
[docs] def get_hyperparams_html(self): """Generate hyperparameters list HTML Returns: str: HTML doc """ hyperparams = '<ul>' + \ '\n'.join([f'<li><p>{param_name}: {val}</p></li>' for param_name, val in self.train_loop_obj.hyperparams.items()]) + \ '</ul>' \ if hasattr(self.train_loop_obj, 'hyperparams') else 'Not given' return hyperparams
[docs] def on_train_loop_registration(self): try: if self.project_name is None: self.project_name = self.train_loop_obj.project_name if self.experiment_name is None: self.experiment_name = self.train_loop_obj.experiment_name except AttributeError: raise AttributeError('Currently used TrainLoop does not support automatic project folder structure ' 'creation. Project name, etc. thus can not be automatically deduced. Please provide' 'it in the callback parameters instead of currently used None values.') self.ses_sender = SESSender(self.sender_name, self.sender_email, self.recipient_email, self.aws_region)
[docs]class LogUpload(AbstractExperimentCallback): def __init__(self, log_file_path='~/project/training.log', fail_if_cloud_missing=True, project_name=None, experiment_name=None, local_model_result_folder_path=None, cloud_save_mode=None, bucket_name=None, cloud_dir_prefix=None): """Upload logging file to the cloud storage Uploading happens after each epoch and at the end of the training process. Args: log_file_path (str): path to the local logging file fail_if_cloud_missing (bool): should throw the exception if cloud saving is not available project_name (str or None): root name of the project experiment_name (str or None): name of the particular experiment local_model_result_folder_path (str or None): root local path where project folder will be created cloud_save_mode (str or None): Storage destination selector. For AWS S3: 's3' / 'aws_s3' / 'aws' For Google Cloud Storage: 'gcs' / 'google_storage' / 'google storage' Everything else results just in local storage to disk bucket_name (str): name of the bucket in the cloud storage cloud_dir_prefix (str): path to the folder inside the bucket where the experiments are going to be saved """ AbstractExperimentCallback.__init__(self, "Upload Tmux logging file to cloud", project_name, experiment_name, local_model_result_folder_path, cloud_save_mode, bucket_name, cloud_dir_prefix, execution_order=1500, device_idx_execution=0) self.log_file_path = os.path.expanduser(log_file_path) self.log_filename = os.path.basename(self.log_file_path) self.fail_if_cloud_missing = fail_if_cloud_missing self.cloud_saver = None
[docs] def on_train_loop_registration(self): self.try_infer_experiment_details(infer_cloud_details=True) if self.cloud_save_mode in s3_available_options: self.cloud_saver = BaseResultsSaver(bucket_name=self.bucket_name, cloud_dir_prefix=self.cloud_dir_prefix) elif self.cloud_save_mode in gcs_available_options: self.cloud_saver = BaseResultsGoogleStorageSaver(bucket_name=self.bucket_name, cloud_dir_prefix=self.cloud_dir_prefix) else: if self.fail_if_cloud_missing: raise ValueError("Cloud saving not supported. Produced logs can potentially get los in the case of " "instance termination.") print("Cloud saving not supported. Produced logs can potentially get lost.")
[docs] def on_epoch_end(self): self.upload_log_file()
[docs] def on_train_end(self): self.upload_log_file()
[docs] def upload_log_file(self): experiment_results_cloud_path = \ self.cloud_saver.create_experiment_cloud_storage_folder_structure(self.project_name, self.experiment_name, self.train_loop_obj.experiment_timestamp) experiment_cloud_path = os.path.dirname(experiment_results_cloud_path) self.cloud_saver.save_file(local_file_path=self.log_file_path, cloud_file_path=os.path.join(experiment_cloud_path, self.log_filename))
[docs]class DataSubsetTestRun(AbstractCallback): def __init__(self, num_train_batches=1, num_val_batches=0, num_test_batches=0): """Subset the provided data loaders to execute neural net only on a small dataset subset This is especially useful when first developing the neural architectures and debugging them. Subsetting the full dataset helps with fast development iterations. Args: num_train_batches (int): number of the training data batches that are kept in the training dataset num_val_batches (int): number of the validation data batches that are kept in the validation dataset num_test_batches (int): number of the test data batches that are kept in the test dataset """ AbstractCallback.__init__(self, 'Run model on a small subset of the training dataset') self.num_train_batches = num_train_batches self.num_val_batches = num_val_batches self.num_test_batches = num_test_batches
[docs] def on_train_begin(self): self.train_loop_obj.train_loader = self.subset_data_loader(self.train_loop_obj.train_loader, self.num_train_batches) if self.num_val_batches > 0: self.train_loop_obj.validation_loader = self.subset_data_loader(self.train_loop_obj.validation_loader, self.num_val_batches) if self.num_test_batches > 0: self.train_loop_obj.test_loader = self.subset_data_loader(self.train_loop_obj.test_loader, self.num_test_batches)
[docs] @staticmethod def subset_data_loader(data_loader, num_batches): sub_set = [] for i, batch in enumerate(data_loader): sub_set.append(batch) if i == num_batches - 1: break return sub_set
[docs] def on_train_loop_registration(self): if self.num_val_batches > 0 and self.train_loop_obj.validation_loader is None: raise ValueError("Validation loader was not provided to the TrainLoop, can't subset it") if self.num_test_batches > 0 and self.train_loop_obj.test_loader is None: raise ValueError("Test loader was not provided to the TrainLoop, can't subset it")
[docs]class FunctionOnTrainLoop(AbstractCallback): def __init__(self, fn_to_execute, tl_registration=False, epoch_begin=False, epoch_end=False, train_begin=False, train_end=False, batch_begin=False, batch_end=False, after_gradient_update=False, after_optimizer_step=False, execution_order=0, device_idx_execution=None): """Execute given function as a callback in the TrainLoop Args: fn_to_execute (function): function logic to be executed at the desired point of the TrainLoop. The function should take a single input as an argument which is the reference to the encapsulating TrainLoop object (self.train_loop_obj). tl_registration (bool): should execute on TrainLoop registration epoch_begin (bool): should execute at the beginning of the epoch epoch_end (bool): should execute at the end of the epoch train_begin (bool): should execute at the beginning of the training train_end (bool): should execute at the end of the training batch_begin (bool): should execute at the beginning of the batch batch_end (bool): should execute at the end of the batch after_gradient_update (bool): should execute after the gradient update after_optimizer_step (bool): should execute after the optimizer step execution_order (int): order of the callback execution. If all the used callbacks have the orders set to 0, then the callbacks are executed in the order they were registered. """ AbstractCallback.__init__(self, 'Execute given function as a callback', execution_order, device_idx_execution) self.fn_to_execute = fn_to_execute self.tl_registration = tl_registration self.epoch_begin = epoch_begin self.epoch_end = epoch_end self.train_begin = train_begin self.train_end = train_end self.batch_begin = batch_begin self.batch_end = batch_end self.after_gradient_update = after_gradient_update self.after_optimizer_step = after_optimizer_step
[docs] def execute_callback(self): self.fn_to_execute(self.train_loop_obj)
[docs] def on_train_loop_registration(self): if self.after_gradient_update or self.after_optimizer_step: self.train_loop_obj.grad_cb_used = True if self.tl_registration: self.execute_callback()
[docs] def on_epoch_begin(self): if self.epoch_begin: self.execute_callback()
[docs] def on_epoch_end(self): if self.epoch_end: self.execute_callback()
[docs] def on_train_begin(self): if self.train_begin: self.execute_callback()
[docs] def on_train_end(self): if self.train_end: self.execute_callback()
[docs] def on_batch_begin(self): if self.batch_begin: self.execute_callback()
[docs] def on_batch_end(self): if self.batch_end: self.execute_callback()
[docs] def on_after_gradient_update(self, optimizer_idx): if self.after_gradient_update: self.execute_callback()
[docs] def on_after_optimizer_step(self): if self.after_optimizer_step: self.execute_callback()