Source code for aitoolbox.torchtrain.callbacks.performance_eval
import copy
import os
from aitoolbox.torchtrain.callbacks.abstract import AbstractCallback, AbstractExperimentCallback
from aitoolbox.torchtrain.train_loop.components.message_passing import MessageHandling
from aitoolbox.cloud.AWS.results_save import BaseResultsSaver as BaseResultsS3Saver
from aitoolbox.cloud.GoogleCloud.results_save import BaseResultsGoogleStorageSaver
from aitoolbox.cloud import s3_available_options, gcs_available_options
from aitoolbox.experiment.local_save.local_results_save import BaseLocalResultsSaver
from aitoolbox.experiment.result_reporting.report_generator import TrainingHistoryPlotter, TrainingHistoryWriter
from aitoolbox.experiment.result_package.torch_metrics_packages import TorchMetricsPackage
[docs]class ModelPerformanceEvaluation(AbstractCallback):
def __init__(self, result_package, args,
on_each_epoch=True, on_train_data=False, on_val_data=True, eval_frequency=None,
if_available_output_to_project_dir=True):
"""Track performance metrics from result_package and store them into TrainLoop's history
This callback is different from those for model and experiment saving where performance evaluations are also
calculated. Here we only want to calculate performance and store it in memory into TrainLoop's history dict.
It is a more lightweight, on the go performance tracking without the need for the full project folder structure
construction.
Args:
result_package (:class:`~aitoolbox.experiment.result_package.abstract_result_packages.AbstractResultPackage`):
result package to be evaluated
args (dict): used hyper-parameters
on_each_epoch (bool): calculate performance results just at the end of training or at the end of each epoch
on_train_data (bool): should the evaluation be done on the training dataset
on_val_data (bool): should the evaluation be done on the validation dataset
eval_frequency (int or None): evaluation is done every specified number of epochs. Useful when predictions
are quite expensive and are slowing down the overall training
if_available_output_to_project_dir (bool): if using train loop version which builds project local folder
structure for saving checkpoints or creation of end of training reports, by setting
if_available_output_to_project_dir to True the potential additional metadata result outputs from the
result_package will be saved in the folder inside the main project folder. In this case
the result_package's output folder shouldn't be full path but just the folder name and the full folder
path pointing inside the corresponding project folder will be automatically created.
If such a functionality should to be prevented and manual full additional metadata results dump folder
is needed potentially outside the project folder, then set this argument to False and
specify a full folder path.
"""
AbstractCallback.__init__(self, 'Model performance calculator - evaluator')
self.result_package = result_package
self.args = args
self.on_each_epoch = on_each_epoch
self.on_train_data = on_train_data
self.on_val_data = on_val_data
self.eval_frequency = eval_frequency
self.if_available_output_to_project_dir = if_available_output_to_project_dir
if not on_train_data and not on_val_data:
raise ValueError('Both on_train_data and on_val_data are set to False. At least one of them has to be True')
if on_train_data:
self.train_result_package = copy.deepcopy(result_package)
[docs] def on_epoch_end(self):
if self.on_each_epoch:
if self.eval_frequency is None or \
(self.eval_frequency is not None and self.train_loop_obj.epoch % self.eval_frequency == 0):
self.evaluate_model_performance()
else:
print(f'Skipping performance evaluation on this epoch ({self.train_loop_obj.epoch}). '
f'Evaluating every {self.eval_frequency} epochs.')
if isinstance(self.result_package, TorchMetricsPackage):
if self.on_train_data:
self.train_result_package.metric_reset()
if self.on_val_data:
self.result_package.metric_reset()
[docs] def evaluate_model_performance(self, prefix=''):
"""Calculate performance based on the provided result packages
Args:
prefix (str): additional prefix for metric names that will get saved into the training history
Returns:
None
"""
if self.on_train_data:
y_pred, y_test, additional_results = self.train_loop_obj.predict_on_train_set()
if self.train_result_package.requires_loss:
additional_results['loss'] = self.train_loop_obj.evaluate_loss_on_train_set()
self.train_result_package.prepare_result_package(y_test, y_pred,
hyperparameters=self.args,
additional_results=additional_results)
if self.on_val_data:
y_pred, y_test, additional_results = self.train_loop_obj.predict_on_validation_set()
if self.result_package.requires_loss:
additional_results['loss'] = self.train_loop_obj.evaluate_loss_on_validation_set(float_dict_format=True)
self.result_package.prepare_result_package(y_test, y_pred,
hyperparameters=self.args,
additional_results=additional_results)
self.store_evaluated_metrics_to_history(prefix=prefix)
[docs] def store_evaluated_metrics_to_history(self, prefix=''):
"""Save the calculated performance results into the training history
The performance results are saved into the training history after they are calculated by the before called
evaluate_model_performance() function.
Args:
prefix (str): additional prefix for metric names that will get saved into the training history
Returns:
None
"""
evaluated_metrics = self.result_package.get_results().keys() if self.on_val_data \
else self.train_result_package.get_results().keys()
for m_name in evaluated_metrics:
if self.on_train_data:
metric_name = f'{prefix}train_{m_name}'
self.train_loop_obj.insert_metric_result_into_history(metric_name,
self.train_result_package.get_results()[m_name])
if self.on_val_data:
metric_name = f'{prefix}val_{m_name}'
self.train_loop_obj.insert_metric_result_into_history(metric_name,
self.result_package.get_results()[m_name])
[docs] def on_train_loop_registration(self):
if self.if_available_output_to_project_dir and \
hasattr(self.train_loop_obj, 'project_name') and hasattr(self.train_loop_obj, 'experiment_name') and \
hasattr(self.train_loop_obj, 'local_model_result_folder_path'):
self.result_package.set_experiment_dir_path_for_additional_results(self.train_loop_obj.project_name,
self.train_loop_obj.experiment_name,
self.train_loop_obj.experiment_timestamp,
self.train_loop_obj.local_model_result_folder_path)
if isinstance(self.result_package, TorchMetricsPackage):
if self.on_train_data:
self.train_result_package.metric.to(self.train_loop_obj.device)
if self.on_val_data:
self.result_package.metric.to(self.train_loop_obj.device)
[docs]class ModelPerformancePrintReport(AbstractCallback):
def __init__(self, metrics, on_each_epoch=True, report_frequency=None,
strict_metric_reporting=True, list_tracked_metrics=False):
"""Print the model performance to the console
Best used in combination with the callback which actually calculates some performance evaluation metrics, such
as ModelPerformanceEvaluation. Otherwise, we are limited only to automatic loss calculation reporting.
When listing callbacks for the TrainLoop it is important to list the ModelPerformanceEvaluation before
this ModelPerformancePrintReport. This ensures that the calculated results are present in the
TrainLoop.train_history before there is an attempt to print them.
Args:
metrics (list): list of string metric names which should be presented in the printed report
on_each_epoch (bool): present results just at the end of training or at the end of each epoch
report_frequency (int or None): evaluation is done every specified number of epochs. Useful when predictions
are quite expensive and are slowing down the overall training
strict_metric_reporting (bool): if False ignore missing metric in the TrainLoop.train_history, if True, in
case of missing metric throw and exception and thus interrupt the training loop
list_tracked_metrics (bool): should all tracked metrics names be listed
"""
AbstractCallback.__init__(self, 'Model performance print reporter',
execution_order=97, device_idx_execution=0)
self.metrics = metrics
self.on_each_epoch = on_each_epoch
self.report_frequency = report_frequency
self.strict_metric_reporting = strict_metric_reporting
self.list_tracked_metrics = list_tracked_metrics
if len(metrics) == 0:
raise ValueError('metrics list is empty')
[docs] def on_train_end(self):
print('----------------- End of training performance report -----------------')
self.print_performance_report(prefix='train_end_')
[docs] def on_epoch_end(self):
if self.on_each_epoch:
if self.report_frequency is None or \
(self.report_frequency is not None and self.train_loop_obj.epoch % self.report_frequency == 0):
print('------------------ End of epoch performance report -------------------')
self.print_performance_report()
[docs] def print_performance_report(self, prefix=''):
"""Print the model performance
Args:
prefix (str): additional prefix for metric names that will get saved into the training history
Returns:
None
"""
if self.list_tracked_metrics:
print(self.train_loop_obj.train_history.keys())
for metric_name in self.metrics:
metric_name = prefix + metric_name
if metric_name not in self.train_loop_obj.train_history:
if self.strict_metric_reporting:
raise ValueError(
f'Metric {metric_name} expected for the report missing from TrainLoop.train_history. '
f'Found only the following: {self.train_loop_obj.train_history.keys()}')
else:
print(f'Metric {metric_name} expected for the report missing from TrainLoop.train_history. '
f'Found only the following: {self.train_loop_obj.train_history.keys()}')
else:
print(f'{metric_name}: {self.train_loop_obj.train_history[metric_name][-1]}')
[docs]class TrainHistoryFormatter(AbstractCallback):
def __init__(self, input_metric_getter, output_metric_setter,
epoch_end=True, train_end=False, strict_metric_extract=True):
"""Format stored training history results
Args:
input_metric_getter (lambda): extract full history for the desired metric, not just the last history input.
Return should be represented as a list.
output_metric_setter (lambda): take the extracted full history of a metric and convert it as desired.
Return new / transformed metric name and transformed metric result.
epoch_end (bool): should the formatting be executed at the end of the epoch
train_end (bool): should the formatting be executed at the end of the training process
strict_metric_extract (bool): in case of (quality) problems should exception be raised on just the
notification printed to console
"""
if epoch_end == train_end:
raise ValueError(f'Only either epoch_end or train_end have to be set to True. '
f'Have set epoch_end to {epoch_end} and train_end to {train_end}')
AbstractCallback.__init__(self, 'Train history general formatter engine')
self.input_metric_getter = input_metric_getter
self.output_metric_setter = output_metric_setter
self.epoch_end = epoch_end
self.train_end = train_end
self.strict_metric_extract = strict_metric_extract
[docs] def on_epoch_end(self):
if self.epoch_end:
if self.check_if_history_updated(not self.epoch_end):
self.format_history()
[docs] def on_train_end(self):
if self.train_end:
if self.check_if_history_updated(self.train_end):
self.format_history()
[docs] def format_history(self):
input_metric = self.input_metric_getter(self.train_loop_obj.train_history)
output_metric_name, output_metric = self.output_metric_setter(input_metric)
self.train_loop_obj.insert_metric_result_into_history(output_metric_name, output_metric)
[docs] def check_if_history_updated(self, train_end_phase):
if train_end_phase:
history_elements_expected = 1
else:
history_elements_expected = self.train_loop_obj.epoch + 1
metric_result_list = self.input_metric_getter(self.train_loop_obj.train_history)
metric_result_len = len(metric_result_list)
if history_elements_expected != metric_result_len:
if self.strict_metric_extract:
raise ValueError(f'Metric found at path specified in input_metric_getter not yet updated. '
f'Expecting {history_elements_expected} history elements, '
f'but got {metric_result_len} elements.')
else:
print(f'Metric found at path specified in input_metric_getter not yet updated. '
f'Expecting {history_elements_expected} history elements, but got {metric_result_len} elements.')
return False
return True
[docs]class MetricHistoryRename(TrainHistoryFormatter):
def __init__(self, input_metric_path, new_metric_name,
epoch_end=True, train_end=False, strict_metric_extract=True):
"""Specific interface for TrainHistoryFormatter which renames the metric in the training history
Args:
input_metric_path (str or lambda): if using lambda, extract full history for the desired metric,
not just the last history input. Return should be represented as a list.
new_metric_name (str): the new metric name
epoch_end (bool): should the formatting be executed at the end of the epoch
train_end (bool): should the formatting be executed at the end of the training process
strict_metric_extract (bool): in case of (quality) problems should exception be raised on just the
notification printed to console
"""
# TODO: decide which of these two options is better
# if callable(input_metric_path):
# input_metric_getter = input_metric_path
# else:
# input_metric_getter = lambda train_history: train_history[input_metric_path]
# input_metric_getter = input_metric_path if callable(input_metric_path) \
# else lambda train_history: train_history[input_metric_path]
# output_metric_setter = lambda input_metric: (new_metric_name, input_metric[-1])
# TrainHistoryFormatter.__init__(self, input_metric_getter, output_metric_setter,
# epoch_end=True, train_end=True, strict_metric_extract=strict_metric_extract)
TrainHistoryFormatter.__init__(self,
input_metric_getter=input_metric_path if callable(input_metric_path) else
lambda train_history: train_history[input_metric_path],
output_metric_setter=lambda input_metric: (new_metric_name, input_metric[-1]),
epoch_end=epoch_end, train_end=train_end,
strict_metric_extract=strict_metric_extract)
[docs]class ModelTrainHistoryBaseCB(AbstractExperimentCallback):
def __init__(self, callback_name, execution_order=0,
epoch_end=True, train_end=False, file_format='',
project_name=None, experiment_name=None, local_model_result_folder_path=None,
cloud_save_mode=None, bucket_name=None, cloud_dir_prefix=None):
"""Base callback class to be inherited from when reporting train performance history
Args:
callback_name (str): name of the callback
execution_order (int): order of the callback execution. If all the used callbacks have the orders set to 0,
then the callbacks are executed in the order they were registered.
epoch_end (bool): should plot after every epoch
train_end (bool): should plot at the end of the training
file_format (str): output file format
project_name (str or None): root name of the project
experiment_name (str or None): name of the particular experiment
local_model_result_folder_path (str or None): root local path where project folder will be created
cloud_save_mode (str or None): Storage destination selector.
For AWS S3: 's3' / 'aws_s3' / 'aws'
For Google Cloud Storage: 'gcs' / 'google_storage' / 'google storage'
Everything else results just in local storage to disk
bucket_name (str): name of the bucket in the cloud storage
cloud_dir_prefix (str): path to the folder inside the bucket where the experiments are going to be saved
"""
AbstractExperimentCallback.__init__(self, callback_name,
project_name, experiment_name, local_model_result_folder_path,
cloud_save_mode, bucket_name, cloud_dir_prefix,
execution_order=execution_order, device_idx_execution=0)
if epoch_end is False and train_end is False:
raise ValueError('Both epoch_end and train_end are set to False. At least one of these should be True.')
self.epoch_end = epoch_end
self.train_end = train_end
self.file_format = file_format
self.cloud_results_saver = None
[docs] def prepare_results_saver(self):
"""Initialize the required results saver
Returns:
None
"""
if self.cloud_save_mode in s3_available_options:
self.cloud_results_saver = BaseResultsS3Saver(bucket_name=self.bucket_name,
cloud_dir_prefix=self.cloud_dir_prefix)
elif self.cloud_save_mode in gcs_available_options:
self.cloud_results_saver = BaseResultsGoogleStorageSaver(bucket_name=self.bucket_name,
cloud_dir_prefix=self.cloud_dir_prefix)
else:
self.cloud_results_saver = None
[docs]class ModelTrainHistoryPlot(ModelTrainHistoryBaseCB):
def __init__(self, epoch_end=True, train_end=False, file_format='png',
project_name=None, experiment_name=None, local_model_result_folder_path=None,
cloud_save_mode=None, bucket_name=None, cloud_dir_prefix=None):
"""Plot the evaluated performance metric history
Args:
epoch_end (bool): should plot after every epoch
train_end (bool): should plot at the end of the training
file_format (str): output file format. Can be either 'png' for saving separate images or 'pdf' for combining
all the plots into a single pdf file.
project_name (str or None): root name of the project
experiment_name (str or None): name of the particular experiment
local_model_result_folder_path (str or None): root local path where project folder will be created
cloud_save_mode (str or None): Storage destination selector.
For AWS S3: 's3' / 'aws_s3' / 'aws'
For Google Cloud Storage: 'gcs' / 'google_storage' / 'google storage'
Everything else results just in local storage to disk
bucket_name (str): name of the bucket in the cloud storage
cloud_dir_prefix (str): path to the folder inside the bucket where the experiments are going to be saved
"""
# execution_order=97 makes sure that any performance calculation callbacks are executed before and the most
# recent results can already be found in the train_history
ModelTrainHistoryBaseCB.__init__(self, 'Model Train history Plot report', execution_order=97,
epoch_end=epoch_end, train_end=train_end, file_format=file_format,
project_name=project_name, experiment_name=experiment_name,
local_model_result_folder_path=local_model_result_folder_path,
cloud_save_mode=cloud_save_mode, bucket_name=bucket_name,
cloud_dir_prefix=cloud_dir_prefix)
if self.file_format not in ['png', 'pdf']:
raise ValueError(f"Output format '{self.file_format}' is not supported. "
"Select one of the following: 'png' or 'pdf'.")
[docs] def on_train_loop_registration(self):
self.try_infer_experiment_details(infer_cloud_details=True)
self.prepare_results_saver()
[docs] def on_train_end(self):
if self.train_end:
self.plot_current_train_history(prefix='train_end_')
[docs] def plot_current_train_history(self, prefix=''):
"""Plot current training history snapshot in the encapsulating TrainLoop
Args:
prefix (str): plots folder name prefix
Returns:
None
"""
experiment_results_local_path = \
BaseLocalResultsSaver.create_experiment_local_results_folder(self.project_name, self.experiment_name,
self.train_loop_obj.experiment_timestamp,
self.local_model_result_folder_path)
plotter = TrainingHistoryPlotter(experiment_results_local_path=experiment_results_local_path)
saved_local_results_details = \
plotter.generate_report(training_history=self.train_loop_obj.train_history,
plots_folder_name=f'{prefix}plots_epoch_{self.train_loop_obj.epoch}',
file_format=self.file_format)
results_file_local_paths = [result_local_path for _, result_local_path in saved_local_results_details]
self.message_service.write_message('ModelTrainHistoryPlot_results_file_local_paths',
results_file_local_paths,
msg_handling_settings=MessageHandling.UNTIL_END_OF_EPOCH)
if self.cloud_results_saver is not None:
experiment_cloud_path = \
self.cloud_results_saver.create_experiment_cloud_storage_folder_structure(self.project_name,
self.experiment_name,
self.train_loop_obj.experiment_timestamp)
for results_file_path_in_cloud_results_dir, results_file_local_path in saved_local_results_details:
results_file_s3_path = os.path.join(experiment_cloud_path, results_file_path_in_cloud_results_dir)
self.cloud_results_saver.save_file(local_file_path=results_file_local_path,
cloud_file_path=results_file_s3_path)
[docs]class ModelTrainHistoryFileWriter(ModelTrainHistoryBaseCB):
def __init__(self, epoch_end=True, train_end=False, file_format='txt',
project_name=None, experiment_name=None, local_model_result_folder_path=None,
cloud_save_mode=None, bucket_name=None, cloud_dir_prefix=None):
"""Write evaluated performance metric history to the text file
Args:
epoch_end (bool): should plot after every epoch
train_end (bool): should plot at the end of the training
file_format (str): output file format. Can be either 'txt' human-readable output or
'tsv' for a tabular format or 'csv' for comma separated format.
project_name (str or None): root name of the project
experiment_name (str or None): name of the particular experiment
local_model_result_folder_path (str or None): root local path where project folder will be created
cloud_save_mode (str or None): Storage destination selector.
For AWS S3: 's3' / 'aws_s3' / 'aws'
For Google Cloud Storage: 'gcs' / 'google_storage' / 'google storage'
Everything else results just in local storage to disk
bucket_name (str): name of the bucket in the cloud storage
cloud_dir_prefix (str): path to the folder inside the bucket where the experiments are going to be saved
"""
# execution_order=97 makes sure that any performance calculation callbacks are executed before and the most
# recent results can already be found in the train_history
ModelTrainHistoryBaseCB.__init__(self, 'Model Train performance history file writer', execution_order=97,
epoch_end=epoch_end, train_end=train_end, file_format=file_format,
project_name=project_name, experiment_name=experiment_name,
local_model_result_folder_path=local_model_result_folder_path,
cloud_save_mode=cloud_save_mode, bucket_name=bucket_name,
cloud_dir_prefix=cloud_dir_prefix)
# experiment_results_local_path will be set when callback is executed inside write_current_train_history()
self.result_writer = TrainingHistoryWriter(experiment_results_local_path=None)
if self.file_format not in ['txt', 'tsv', 'csv']:
raise ValueError(f"Output format '{self.file_format}' is not supported. "
"Select one of the following: 'txt', 'tsv' or 'csv'.")
[docs] def on_train_loop_registration(self):
self.try_infer_experiment_details(infer_cloud_details=True)
self.prepare_results_saver()
[docs] def on_train_end(self):
if self.train_end:
self.write_current_train_history(prefix='train_end_')
[docs] def write_current_train_history(self, prefix=''):
"""Write to text file the current training history snapshot in the encapsulating TrainLoop
Args:
prefix (str): history text file name prefix
Returns:
None
"""
experiment_results_local_path = \
BaseLocalResultsSaver.create_experiment_local_results_folder(self.project_name, self.experiment_name,
self.train_loop_obj.experiment_timestamp,
self.local_model_result_folder_path)
self.result_writer.experiment_results_local_path = experiment_results_local_path
results_file_path_in_cloud_results_dir, results_file_local_path = \
self.result_writer.generate_report(training_history=self.train_loop_obj.train_history,
epoch=self.train_loop_obj.epoch,
file_name=f'{prefix}results.{self.file_format}',
file_format=self.file_format)
self.message_service.write_message('ModelTrainHistoryFileWriter_results_file_local_paths',
[results_file_local_path],
msg_handling_settings=MessageHandling.UNTIL_END_OF_EPOCH)
if self.cloud_results_saver is not None:
experiment_cloud_path = \
self.cloud_results_saver.create_experiment_cloud_storage_folder_structure(self.project_name,
self.experiment_name,
self.train_loop_obj.experiment_timestamp)
results_file_s3_path = os.path.join(experiment_cloud_path, results_file_path_in_cloud_results_dir)
self.cloud_results_saver.save_file(local_file_path=results_file_local_path,
cloud_file_path=results_file_s3_path)