import os
import re
import shutil
import string
from collections import Counter
import numpy as np
from pyrouge import Rouge155
from rouge import Rouge
from nltk.translate.bleu_score import sentence_bleu, corpus_bleu
from torchnlp.metrics import bleu
from transformers import glue_compute_metrics, xnli_compute_metrics

from aitoolbox.experiment.core_metrics.abstract_metric import AbstractBaseMetric

[docs]class ROUGEMetric(AbstractBaseMetric): def __init__(self, y_true, y_predicted, target_actual_text=False, output_text_dir=None, output_text_cleaning_regex=(r'<.*?>', r'[^a-zA-Z0-9.?! ]+')): """ROGUE score calculation From this package: Args: y_true (numpy.array or list): y_predicted (numpy.array or list): target_actual_text (bool): output_text_dir (str): output_text_cleaning_regex (list): """ self.output_text_cleaning_regex = output_text_cleaning_regex self.target_actual_text = target_actual_text self.output_text_dir = output_text_dir AbstractBaseMetric.__init__(self, y_true, y_predicted, metric_name='ROGUE', np_array=False)
[docs] def calculate_metric(self): if self.output_text_dir is not None: # Not affecting the metric calculation. Just for record keeping it drops the texts to disk so they can be # reviewed self.dump_answer_text_to_disk(self.y_true, self.y_predicted, self.output_text_dir, self.output_text_cleaning_regex, self.target_actual_text) self.prepare_text() rouge_calc = Rouge() hypothesis = self.y_predicted reference = self.y_true # TODO: remove try-except... just for testing try: return rouge_calc.get_scores(hypothesis, reference, avg=True) except: print('hypothesis') print(hypothesis) print('reference') print(reference) exit()
[docs] def prepare_text(self): if not self.target_actual_text: self.y_true = [' '.join(sent) for sent in self.y_true] self.y_predicted = [' '.join(sent) if len(sent) > 0 else ' ' for sent in self.y_predicted]
[docs] @staticmethod def dump_answer_text_to_disk(true_text, pred_text, output_text_dir, output_text_cleaning_regex, target_actual_text): """ Problems: Defined regex text cleaning to deal with Illegal division by zero Args: true_text (list): pred_text (list): output_text_dir (str): output_text_cleaning_regex (list): target_actual_text (bool): Returns: """ if os.path.exists(output_text_dir): shutil.rmtree(output_text_dir) os.mkdir(output_text_dir) for i, (pred_answ, true_answ) in enumerate(zip(pred_text, true_text)): with open(os.path.join(output_text_dir, f'answer_pred_true_{i}.txt'), 'w', encoding='utf-8') as f: # Default regex cleaners: (r'<.*?>', r'[^a-zA-Z0-9.?! ]+') pred_answ_clean = ROUGEPerlMetric.regex_clean_text(pred_answ, output_text_cleaning_regex) pred_answ_clean = ' '.join(pred_answ_clean) if len(pred_answ_clean) > 0 else ' ' if target_actual_text: true_answ_clean = [true_answ] else: true_answ_clean = ROUGEPerlMetric.regex_clean_text(true_answ, output_text_cleaning_regex) true_answ_clean = ' '.join(true_answ_clean) f.write(f'Answer to question {i}:\n') f.write(f'Predicted:\t{pred_answ_clean}\n') f.write(f'True:\t{true_answ_clean}\n')
[docs]class ROUGEPerlMetric(AbstractBaseMetric): def __init__(self, y_true, y_predicted, output_text_dir, output_text_cleaning_regex=(r'<.*?>', r'[^a-zA-Z0-9.?! ]+'), target_actual_text=False): """ROGUE score calculation using the Perl implementation Use this package: Problems: Defined regex text cleaning to deal with Illegal division by zero Args: y_true (numpy.array or list): gold standard summaries are ‘model’ summaries y_predicted (numpy.array or list): your summaries are ‘system’ summaries output_text_dir (str): output_text_cleaning_regex (list): target_actual_text (bool): """ self.output_text_dir = output_text_dir self.output_text_cleaning_regex = output_text_cleaning_regex self.target_actual_text = target_actual_text AbstractBaseMetric.__init__(self, y_true, y_predicted, metric_name='ROGUE_Perl', np_array=False)
[docs] def calculate_metric(self): self.dump_answer_text_to_disk(self.y_true, self.y_predicted, self.output_text_dir, self.output_text_cleaning_regex, self.target_actual_text) rouge = Rouge155() # In ROUGE, your summaries are ‘system’ summaries and the gold standard summaries are ‘model’ summaries. rouge.system_dir = os.path.join(self.output_text_dir, 'pred_answer') rouge.model_dir = os.path.join(self.output_text_dir, 'true_answer') rouge.system_filename_pattern = 'pred_answer_text.(\d+).txt' rouge.model_filename_pattern = 'true_answer_text.#ID#.txt' rouge_output = rouge.convert_and_evaluate() output_dict = rouge.output_to_dict(rouge_output) return output_dict
[docs] @staticmethod def dump_answer_text_to_disk(true_text, pred_text, output_text_dir, output_text_cleaning_regex, target_actual_text): """ Problems: Defined regex text cleaning to deal with Illegal division by zero Args: true_text (list): pred_text (list): output_text_dir (str): output_text_cleaning_regex (list): target_actual_text (bool): Returns: """ if os.path.exists(output_text_dir): shutil.rmtree(output_text_dir) os.mkdir(output_text_dir) os.mkdir(os.path.join(output_text_dir, 'true_answer')) os.mkdir(os.path.join(output_text_dir, 'pred_answer')) for i, text in enumerate(true_text): # TODO: Encoding setting not tested yet with open(os.path.join(output_text_dir, f'true_answer/true_answer_text.{i}.txt'), 'w', encoding='utf-8') as f: # Default regex cleaners: (r'<.*?>', r'[^a-zA-Z0-9.?! ]+') if target_actual_text: text_clean = [text] else: text_clean = ROUGEPerlMetric.regex_clean_text(text, output_text_cleaning_regex) f.write(' '.join(text_clean)) for i, text in enumerate(pred_text): # TODO: Encoding setting not tested yet with open(os.path.join(output_text_dir, f'pred_answer/pred_answer_text.{i}.txt'), 'w', encoding='utf-8') as f: # Default regex cleaners: (r'<.*?>', r'[^a-zA-Z0-9.?! ]+') text_clean = ROUGEPerlMetric.regex_clean_text(text, output_text_cleaning_regex) f.write(' '.join(text_clean) if len(text_clean) > 0 else ' ')
[docs] @staticmethod def regex_clean_text(text, cleaning_regex_list): """ Args: text (list): cleaning_regex_list (list): Returns: list: """ # The default is: (r'<.*?>', r'[^a-zA-Z0-9.?! ]+') for cleaning_regex in cleaning_regex_list: re_pattern = re.compile(cleaning_regex) text = [re_pattern.sub('', t) for t in text if len(re_pattern.sub('', t)) > 0] return text
[docs]class ExactMatchTextMetric(AbstractBaseMetric): def __init__(self, y_true, y_predicted, target_actual_text=False, output_text_dir=None): """Calculate exact match of answered strings Args: y_true (numpy.array or list): y_predicted (numpy.array or list): target_actual_text (bool): output_text_dir (str): """ if len(y_true) != len(y_predicted): raise ValueError(f'len(y_true) != len(y_predicted). Got {len(y_true)} != {len(y_predicted)}') self.target_actual_text = target_actual_text self.output_text_dir = output_text_dir AbstractBaseMetric.__init__(self, y_true, y_predicted, metric_name='EM', np_array=False)
[docs] def calculate_metric(self): if self.output_text_dir is not None: # Not affecting the metric calculation. Just for record keeping it drops the texts to disk so they can be # reviewed ROUGEMetric.dump_answer_text_to_disk(self.y_true, self.y_predicted, self.output_text_dir, [], self.target_actual_text) if not self.target_actual_text: self.y_true = [' '.join(sent) for sent in self.y_true] self.y_predicted = [' '.join(sent) for sent in self.y_predicted] em = 0 for pred_answ, true_answ in zip(self.y_predicted, self.y_true): em += int(self.normalize_answer(pred_answ) == self.normalize_answer(true_answ)) return 100. * em / len(self.y_true)
[docs] @staticmethod def normalize_answer(text_str): """Convert to lowercase and remove punctuation, articles and extra whitespace. All methods below this line are from the official SQuAD 2.0 eval script Args: text_str (str): Returns: str """ def remove_articles(text): regex = re.compile(r'\b(a|an|the)\b', re.UNICODE) return re.sub(regex, ' ', text) def white_space_fix(text): return ' '.join(text.split()) def remove_punc(text): exclude = set(string.punctuation) return ''.join(ch for ch in text if ch not in exclude) def lower(text): return text.lower() return white_space_fix(remove_articles(remove_punc(lower(text_str))))
[docs]class F1TextMetric(AbstractBaseMetric): def __init__(self, y_true, y_predicted, target_actual_text=False, output_text_dir=None): """Calculate F1 score of answered strings Args: y_true (numpy.array or list): y_predicted (numpy.array or list): target_actual_text (bool): output_text_dir (str): """ if len(y_true) != len(y_predicted): raise ValueError(f'len(y_true) != len(y_predicted). Got {len(y_true)} != {len(y_predicted)}') self.target_actual_text = target_actual_text self.output_text_dir = output_text_dir AbstractBaseMetric.__init__(self, y_true, y_predicted, metric_name='F1', np_array=False)
[docs] def calculate_metric(self): if self.output_text_dir is not None: # Not affecting the metric calculation. Just for record keeping it drops the texts to disk so they can be # reviewed ROUGEMetric.dump_answer_text_to_disk(self.y_true, self.y_predicted, self.output_text_dir, [], self.target_actual_text) if not self.target_actual_text: self.y_true = [' '.join(sent) for sent in self.y_true] self.y_predicted = [' '.join(sent) for sent in self.y_predicted] f1 = 0 for pred_answ, true_answ in zip(self.y_predicted, self.y_true): f1 += self.compute_f1(true_answ, pred_answ) return 100. * f1 / len(self.y_true)
[docs] @staticmethod def compute_f1(a_gold, a_pred): gold_toks = F1TextMetric.get_tokens(a_gold) pred_toks = F1TextMetric.get_tokens(a_pred) common = Counter(gold_toks) & Counter(pred_toks) num_same = sum(common.values()) if len(gold_toks) == 0 or len(pred_toks) == 0: # If either is no-answer, then F1 is 1 if they agree, 0 otherwise return int(gold_toks == pred_toks) if num_same == 0: return 0 precision = 1.0 * num_same / len(pred_toks) recall = 1.0 * num_same / len(gold_toks) f1 = (2 * precision * recall) / (precision + recall) return f1
[docs] @staticmethod def get_tokens(s): if not s: return [] return ExactMatchTextMetric.normalize_answer(s).split()
[docs]class BLEUSentenceScoreMetric(AbstractBaseMetric): def __init__(self, y_true, y_predicted, source_sents=None, output_text_dir=None): """BLEU score calculation NLTK provides the sentence_bleu() function for evaluating a candidate sentence against one or more reference sentences. The reference sentences must be provided as a list of sentences where each reference is a list of tokens. The candidate sentence is provided as a list of tokens. For example: reference = [['this', 'is', 'a', 'test'], ['this', 'is' 'test']] candidate = ['this', 'is', 'a', 'test'] score = sentence_bleu(reference, candidate) Args: y_true (list): y_predicted (list): source_sents (list or None): output_text_dir (str or None): """ if output_text_dir is not None and source_sents is None: raise ValueError('output_text_dir is not None and source_sents is None; ' 'if output_text_dir you must give the source_sents') self.output_text_dir = output_text_dir self.source_sents = source_sents AbstractBaseMetric.__init__(self, y_true, y_predicted, metric_name='BLEU_sentence_score', np_array=False)
[docs] def calculate_metric(self): self.check_transl_sent_num_match([self.y_true, self.y_predicted]) sentence_bleu_results = [sentence_bleu([true_t], pred_t) for true_t, pred_t in zip(self.y_true, self.y_predicted)] if self.output_text_dir is not None: self.dump_translation_text_to_disk(self.source_sents, [' '.join(sent) for sent in self.y_predicted], [' '.join(sent) for sent in self.y_true], sentence_bleu_results, self.output_text_dir) return np.mean(sentence_bleu_results)
[docs] @staticmethod def dump_translation_text_to_disk(source_sents, pred_translations, true_translations, sentence_bleu_results, output_text_dir): """ Args: source_sents (list): pred_translations (list): true_translations (list): sentence_bleu_results (list): output_text_dir (str): Returns: """ BLEUSentenceScoreMetric.check_transl_sent_num_match([pred_translations, true_translations, source_sents, sentence_bleu_results]) if os.path.exists(output_text_dir): shutil.rmtree(output_text_dir) os.mkdir(output_text_dir) for i, (source, pred_transl, true_transl, bleu_result) in enumerate(zip(source_sents, pred_translations, true_translations, sentence_bleu_results)): with open(os.path.join(output_text_dir, f'transl_{i}.txt'), 'w', encoding='utf-8') as f: f.write(f'Source:\t{source}\n') f.write(f'Predicted:\t{pred_transl}\n') f.write(f'True:\t{true_transl}\n') f.write(f'BLEU: {bleu_result}\n')
[docs] @staticmethod def check_transl_sent_num_match(sent_types): """ Args: sent_types (list): list of lists Raises: ValueError """ num_sents = len(sent_types[0]) for sent_t in sent_types: if len(sent_t) != num_sents: raise ValueError(f"The length of list elements across different text types does not match " f"The featured lengths are: {', '.join([str(len(el)) for el in sent_types])}")
[docs]class BLEUCorpusScoreMetric(AbstractBaseMetric): def __init__(self, y_true, y_predicted, source_sents=None, output_text_dir=None): """BLEU corpus score calculation Function called corpus_bleu() for calculating the BLEU score for multiple sentences such as a paragraph or a document. The references must be specified as a list of documents where each document is a list of references and each alternative reference is a list of tokens, e.g. a list of lists of lists of tokens. The candidate documents must be specified as a list where each document is a list of tokens, e.g. a list of lists of tokens. references = [[['this', 'is', 'a', 'test'], ['this', 'is' 'test']]] candidates = [['this', 'is', 'a', 'test']] score = corpus_bleu(references, candidates) Args: y_true (list): y_predicted (list): source_sents (list or None): output_text_dir (str or None): """ self.output_text_dir = output_text_dir self.source_sents = source_sents AbstractBaseMetric.__init__(self, y_true, y_predicted, metric_name='BLEU_corpus_score', np_array=False)
[docs] def calculate_metric(self): BLEUSentenceScoreMetric.check_transl_sent_num_match([self.y_true, self.y_predicted]) if self.output_text_dir is not None: BLEUSentenceScoreMetric.dump_translation_text_to_disk(self.source_sents, [' '.join(sent) for sent in self.y_predicted], [' '.join(sent) for sent in self.y_true], ['na'] * len(self.y_predicted), self.output_text_dir) return corpus_bleu([[sent] for sent in self.y_true], self.y_predicted)
[docs]class BLEUScoreStrTorchNLPMetric(AbstractBaseMetric): def __init__(self, y_true, y_predicted, lowercase=False, source_sents=None, output_text_dir=None): """BLEU score calculation using the TorchNLP implementation Example: hypotheses = [ "The brown fox jumps over the dog 笑", "The brown fox jumps over the dog 2 笑" ] references = [ "The quick brown fox jumps over the lazy dog 笑", "The quick brown fox jumps over the lazy dog 笑" ] get_moses_multi_bleu(hypotheses, references, lowercase=True) 46.51 Args: y_true (list): y_predicted (list): lowercase (bool): source_sents (list or None): output_text_dir (str or None): """ self.output_text_dir = output_text_dir self.source_sents = source_sents self.lowercase = lowercase AbstractBaseMetric.__init__(self, y_true, y_predicted, metric_name='BLEU_str_torchNLP_score', np_array=False)
[docs] def calculate_metric(self): BLEUSentenceScoreMetric.check_transl_sent_num_match([self.y_true, self.y_predicted]) sentence_bleu_results = [ bleu.get_moses_multi_bleu([' '.join(true_t)], [' '.join(pred_t)], lowercase=self.lowercase) for true_t, pred_t in zip(self.y_true, self.y_predicted) ] if self.output_text_dir is not None: BLEUSentenceScoreMetric.dump_translation_text_to_disk(self.source_sents, [' '.join(sent) for sent in self.y_predicted], [' '.join(sent) for sent in self.y_true], sentence_bleu_results, self.output_text_dir) return float(np.mean(sentence_bleu_results))
[docs]class PerplexityMetric(AbstractBaseMetric): def __init__(self, batch_losses): """Perplexity metric used in MT Args: batch_losses (numpy.array or list): """ AbstractBaseMetric.__init__(self, None, batch_losses, metric_name='Perplexity', np_array=False)
[docs] def calculate_metric(self): return np.exp(np.mean(self.y_predicted))
[docs]class GLUEMetric(AbstractBaseMetric): def __init__(self, y_true, y_predicted, task_name): """GLUE evaluation metrics Wrapper around HF Transformers ``glue_compute_metrics()`` Args: y_true: y_predicted: task_name (str): name of the GLUE task """ self.task_name = task_name super().__init__(y_true, y_predicted, metric_name=f'GLUE_{task_name}')
[docs] def calculate_metric(self): metric_dict = glue_compute_metrics(task_name=self.task_name, preds=self.y_predicted, labels=self.y_true) metric_dict = {k.replace('/', '_'): v for k, v in metric_dict.items()} return metric_dict
[docs]class XNLIMetric(AbstractBaseMetric): def __init__(self, y_true, y_predicted): """XNLI evaluation metrics Wrapper around HF Transformers ``xnli_compute_metrics()`` Args: y_true: y_predicted: """ super().__init__(y_true, y_predicted, metric_name='xnli_accuracy')
[docs] def calculate_metric(self): metric_dict = xnli_compute_metrics(task_name='xnli', preds=self.y_predicted, labels=self.y_true) metric_dict = {k.replace('/', '_'): v for k, v in metric_dict.items()} return metric_dict