#
# Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module contains query selection heuristics for the Active Learning Pipeline.
"""
from abc import ABC, abstractmethod
from typing import List, Dict
from collections import defaultdict
import math
from scipy.stats import entropy as scipy_entropy
import numpy as np
from ..constants import (
ENTROPY_LOG_BASE,
ACTIVE_LEARNING_RANDOM_SEED,
)
[docs]def stratified_random_sample(labels: List) -> List[int]:
"""Reorders indices in evenly repeating pattern for as long as possible and then
shuffles and appends the remaining labels. The first part of this list will maintain
a uniform distrubition across labels, however, since the labels may not be perfectly
balanced the remaining portion will have a similar distribution as the original data.
|-------- Evenly Repeating --------||--- Shuffled Remaining ----|
For Example: ["R","B","C","R","B","C","R","B","C","B","R","R","B","B","B","R"]
Args:
labels (List[str or int]): A list of labels. (Eg: labels = ["R", "B", "B", "C"])
Returns:
ranked_indices (List[int]): Indices corresponding to elements ranked by the heuristic.
"""
np.random.seed(ACTIVE_LEARNING_RANDOM_SEED)
label_to_indices = _get_labels_to_indices(labels)
lowest_label_freq = min([len(indices) for indices in label_to_indices.values()])
avg_label_freq = len(labels) // len(label_to_indices)
sample_per_label = min(lowest_label_freq, avg_label_freq)
selected_indices = []
for i in range(sample_per_label):
for indices in label_to_indices.values():
selected_indices.append(indices[i])
remaining_indices = [i for i in range(len(labels)) if i not in selected_indices]
np.random.shuffle(remaining_indices)
ranked_indices = selected_indices + remaining_indices
return ranked_indices
def _get_labels_to_indices(labels: List) -> defaultdict:
"""Get a Dict mapping unique labels to indices in original data.
For Example: {"R": [1,3,5], "B": [2,6], "C":[4]}
Args:
labels (List[str or int]): A list of labels. (Eg: labels = ["R", "B", "B", "C"])
Returns:
labels_to_indices (defaultdict): Dictionary that maps unique labels to indices
where the label occurred in the original data.
"""
labels_to_indices = defaultdict(list)
for i, label in enumerate(labels):
labels_to_indices[label].append(i)
for v in labels_to_indices.values():
np.random.shuffle(v)
return labels_to_indices
[docs]class Heuristic(ABC):
""" Heuristic base class used as Active Learning query selection strategies."""
[docs] @staticmethod
@abstractmethod
def rank_2d(confidences_2d: List[List[float]]) -> List[int]:
"""Ranking method for 2d confidence arrays.
Args:
confidences_2d (List[List[float]]): Confidence probabilities per element.
Returns:
ranked_indices (List[int]): Indices corresponding to elements ranked by the heuristic.
"""
raise NotImplementedError("Subclasses must implement this method")
[docs] @staticmethod
@abstractmethod
def rank_3d(confidences_3d: List[List[List[float]]]) -> List[int]:
"""Ranking method for 3d confidence arrays.
Args:
confidences_3d (List[List[List[float]]]): Confidence probabilities per element.
Returns:
ranked_indices (List[int]): Indices corresponding to elements ranked by the heuristic.
"""
raise NotImplementedError("Subclasses must implement this method")
@staticmethod
def _convert_to_sample_ranks(ordered_sample_indices: List[int]):
"""
Args:
ordered_sample_indices (List[int]): List of indices corresponding to values ordered
from least to greatest.
Returns:
sample_ranks (List[int]): List where the value at each index is the rank of the
corresponding sample.
"""
sample_ranks = np.zeros(len(ordered_sample_indices), dtype=int)
for rank, sample_index in enumerate(ordered_sample_indices):
sample_ranks[sample_index] = rank
return sample_ranks
[docs] @staticmethod
def ordered_indices_list_to_final_rank(
ordered_sample_indices_list: List[List[int]],
):
"""Converts multiple lists of ordered indices to a final rank.
Args:
ordered_sample_indices_list (List[List[int]]): Multiple lists of ordered sample indices.
Returns:
ranked_indices (List[int]): Indices corresponding to elements ranked by the heuristic.
"""
all_sample_ranks = np.apply_along_axis(
Heuristic._convert_to_sample_ranks, axis=1, arr=ordered_sample_indices_list
)
total_sample_ranks = all_sample_ranks.sum(axis=0)
return list(np.argsort(total_sample_ranks))
[docs]class RandomSampling(ABC):
[docs] @staticmethod
def random_rank(num_elements: int) -> List[int]:
"""Randomly shuffles indices.
Args:
num_elements (int): Number of elements to randomly sample.
Returns:
ranked_indices (List[int]): Indices corresponding to elements ranked by the heuristic.
"""
ranked_indices = np.arange(num_elements)
np.random.shuffle(ranked_indices)
return list(ranked_indices)
[docs] @staticmethod
def rank_2d(confidences_2d: List[List[float]]) -> List[int]:
"""Randomly shuffles indices.
Args:
confidences_2d (List[List[float]]): Confidence probabilities per element.
Returns:
ranked_indices (List[int]): Indices corresponding to elements ranked by the heuristic.
"""
num_elements = len(confidences_2d)
return RandomSampling.random_rank(num_elements)
[docs] @staticmethod
def rank_3d(confidences_3d: List[List[List[float]]]) -> List[int]:
"""Randomly shuffles indices.
Args:
confidences_3d (List[List[List[float]]]): Confidence probabilities per element.
Returns:
ranked_indices (List[int]): Indices corresponding to elements ranked by the heuristic.
"""
_, num_elements, _ = np.array(confidences_3d).shape
return RandomSampling.random_rank(num_elements)
[docs]class LeastConfidenceSampling(ABC):
[docs] @staticmethod
def rank_2d(confidences_2d: List[List[float]]) -> List[int]:
"""First calculates the highest (max) confidences per element and then returns
the elements from lowest confidence to highest confidence.
Args:
confidences_2d (List[List[float]]): Confidence probabilities per element.
Returns:
ranked_indices (List[int]): Indices corresponding to elements ranked by the heuristic.
"""
highest_confidence_per_element = np.max(confidences_2d, axis=1)
return list(np.argsort(highest_confidence_per_element))
[docs] @staticmethod
def rank_3d(confidences_3d: List[List[List[float]]]) -> List[int]:
"""First calculates the highest (max) confidences per element and then returns
the elements with the lowest max confidence. This is done for each confidence_2d
in a confidence_3d. The rankings are added to generate a final ranking.
Args:
confidences_3d (List[List[List[float]]]): Confidence probabilities per element.
Returns:
ranked_indices (List[int]): Indices corresponding to elements ranked by the heuristic.
"""
all_ordered_sample_indices = [
LeastConfidenceSampling.rank_2d(c) for c in confidences_3d
]
return Heuristic.ordered_indices_list_to_final_rank(all_ordered_sample_indices)
[docs] @staticmethod
def rank_entities(entity_confidences: List[List[List[float]]]) -> List[int]:
query_uncertainty_list = []
for sequence in entity_confidences:
most_likely_sequence_prob = 1.0
for token in sequence:
max_token_posterior = max(np.array(token))
most_likely_sequence_prob *= max_token_posterior
query_uncertainty = 1 - most_likely_sequence_prob
query_uncertainty_list.append(query_uncertainty)
high_to_low_uncertainties = np.argsort(query_uncertainty_list)[::-1]
return list(high_to_low_uncertainties)
[docs]class MarginSampling(ABC):
[docs] @staticmethod
def rank_2d(confidences_2d: List[List[float]]) -> List[int]:
"""Calculates the "margin" or difference between the highest and second highest
confidence score per element. Elements are ranked from lowest to highest margin.
Args:
confidences_2d (List[List[float]]): Confidence probabilities per element.
Returns:
ranked_indices (List[int]): Indices corresponding to elements ranked by the heuristic.
"""
_, element_size = np.array(confidences_2d).shape
descending_confidences_per_element = np.partition(
confidences_2d, kth=(element_size - 2)
)
highest_val_per_element = descending_confidences_per_element[:, -1]
second_highest_val_per_element = descending_confidences_per_element[:, -2]
margin_per_element = np.abs(
highest_val_per_element - second_highest_val_per_element
)
ranked_indices_low_to_high_margin = np.argsort(margin_per_element)
return list(ranked_indices_low_to_high_margin)
[docs] @staticmethod
def rank_3d(confidences_3d: List[List[List[float]]]) -> List[int]:
"""Calculates the "margin" or difference between the highest and second highest
confidence score per element. Elements are ranked from lowest to highest margin.
This is done for each confidence_2d in a confidence_3d. The rankings are added
to generate a final ranking.
Args:
confidences_3d (List[List[List[float]]]): Confidence probabilities per element.
Returns:
ranked_indices (List[int]): Indices corresponding to elements ranked by the heuristic.
"""
all_ordered_sample_indices = [MarginSampling.rank_2d(c) for c in confidences_3d]
return Heuristic.ordered_indices_list_to_final_rank(all_ordered_sample_indices)
[docs] @staticmethod
def beam_search_decoder(predictions, top_k=3):
output_sequences = [([], 0)]
# looping through all the predictions
for token_probs in predictions:
new_sequences = []
# append new tokens to old sequences and re-score
for old_seq, old_score in output_sequences:
for char_index, char in enumerate(token_probs):
new_seq = old_seq + [char_index]
# considering log-likelihood for scoring
value = char
if value:
new_score = old_score + math.log(value)
else:
new_score = old_score - math.inf
new_sequences.append((new_seq, new_score))
# sort all new sequences in the de-creasing order of their score
output_sequences = sorted(
new_sequences, key=lambda val: val[1], reverse=True
)
# select top-k based on score
output_sequences = output_sequences[:top_k]
return output_sequences
[docs] @staticmethod
def rank_entities(entity_confidences: List[List[List[float]]]) -> List[int]:
"""
Queries are ranked on the basis of Margin Sampling for tag sequences. This approach uses beam search to
obtain the top 2 queries/sequences in terms of the query confidences for entities. The margin is calculated
between these top two sequences.
(For more information about this method: https://dl.acm.org/doi/pdf/10.5555/1613715.1613855)
"""
query_margin_list = []
for sequence in entity_confidences:
top_two_sequences = MarginSampling.beam_search_decoder(sequence, top_k=2)
# anti-log to get back probabilities
margin = math.exp(top_two_sequences[0][1]) - math.exp(
top_two_sequences[1][1]
)
query_margin_list.append(margin)
low_to_high_margin = np.argsort(query_margin_list)
return list(low_to_high_margin)
[docs]class EntropySampling(ABC):
[docs] @staticmethod
def rank_2d(confidences_2d: List[List[float]]) -> List[int]:
"""Calculates the entropy score of the confidences per element. Elements are ranked from
highest to lowest entropy.
Args:
confidences_2d (List[List[float]]): Confidence probabilities per element.
Returns:
ranked_indices (List[int]): Indices corresponding to elements ranked by the heuristic.
"""
entropy_per_element = scipy_entropy(
np.array(confidences_2d), axis=1, base=ENTROPY_LOG_BASE
)
high_to_low_entropy = np.argsort(entropy_per_element)[::-1]
return list(high_to_low_entropy)
[docs] @staticmethod
def rank_3d(confidences_3d: List[List[List[float]]]) -> List[int]:
"""Calculates the entropy score of the confidences per element. Elements are ranked from
highest to lowest entropy. This is done for each confidence_2d in a confidence_3d. The
rankings are added to generate a final ranking.
Args:
confidences_3d (List[List[List[float]]]): Confidence probabilities per element.
Returns:
ranked_indices (List[int]): Indices corresponding to elements ranked by the heuristic.
"""
all_ordered_sample_indices = [
EntropySampling.rank_2d(c) for c in confidences_3d
]
return Heuristic.ordered_indices_list_to_final_rank(all_ordered_sample_indices)
[docs] @staticmethod
def rank_entities(entity_confidences: List[List[List[float]]]) -> List[int]:
"""Calculates the entropy score of the entity confidences per element.
Elements are ranked from highest to lowest entropy.
Returns:
Ranked lists based on either:
Token Entropy: Average of per token entropies across a query; or
Total Token Entropy: Sum of token entropies across a query.
"""
sequence_entropy_list = []
for sequence in entity_confidences:
entropy_per_token = scipy_entropy(
np.array(sequence), axis=1, base=ENTROPY_LOG_BASE
)
total_entropy = sum(entropy_per_token)
total_token_entropy = total_entropy / len(entropy_per_token)
sequence_entropy_list.append(total_token_entropy)
high_to_low_entropy = np.argsort(sequence_entropy_list)[::-1]
return list(high_to_low_entropy)
[docs]class DisagreementSampling(ABC):
[docs] @staticmethod
def rank_2d(confidences_2d: List[List[float]]) -> List[int]:
"""Need confidences_2d from more than one model (confidences_3d) to run
DisagreementSampling.
Args:
confidences_2d (List[List[float]]): Confidence probabilities per element.
Returns:
ranked_indices (List[int]): Indices corresponding to elements ranked by the heuristic.
"""
raise NotImplementedError(
"DisagreementSampling does not support 2d confidences."
)
[docs] @staticmethod
def rank_3d(confidences_3d: List[List[List[float]]]) -> List[int]:
"""Finds the most frequent class label for a given element across all models.
Calculates the agreement per element (% of models who voted the most frequent class).
Ranks elements by highest to lowest disagreement.
Args:
confidences_3d (List[List[List[float]]]): Confidence probabilities per element.
Returns:
ranked_indices (List[int]): Indices corresponding to elements ranked by the heuristic.
"""
# X: Model, Y: Classes Chosen Per Element
chosen_classes_per_model = np.argmax(confidences_3d, axis=2)
disagreement_scores = 1 - np.array(
[max(np.bincount(row)) / len(row) for row in chosen_classes_per_model.T]
)
high_to_low_disagreement = np.argsort(disagreement_scores)
return list(high_to_low_disagreement)
[docs]class KLDivergenceSampling(ABC):
[docs] @staticmethod
def rank_2d(confidences_2d: List[List[float]]) -> List[int]:
"""Need confidences_2d from more than one model (confidences_3d) to run
KLDivergenceSampling.
Args:
confidences_2d (List[List[float]]): Confidence probabilities per element.
Returns:
ranked_indices (List[int]): Indices corresponding to elements ranked by the heuristic.
"""
raise NotImplementedError(
"KLDivergenceSampling does not support 2d confidences."
)
[docs] @staticmethod
def rank_3d(
confidences_3d: List[List[List[float]]], confidence_segments: Dict = None
) -> List[int]:
"""Calculates the KL Divergence between the average confidence distribution across
all models for a given class and the confidence distribution for a given element in
said class. Elements are ranked from highest to lowest divergence.
Args:
confidences_3d (List[List[List[float]]]): Confidence probabilities per element.
confidence_segments (Dict[(str, Tuple(int,int))]): A dictionary mapping
segments to run KL Divergence.
Returns:
ranked_indices (List[int]): Indices corresponding to elements ranked by the heuristic.
"""
if confidence_segments:
divergences = (
KLDivergenceSampling.get_divergences_per_element_with_segments(
confidences_3d, confidence_segments
)
)
else:
divergences = KLDivergenceSampling.get_divergences_per_element_no_segments(
confidences_3d
)
divergence_per_element = np.max(divergences, axis=0)
ranked_indices_high_to_low_divergence = np.argsort(divergence_per_element)[::-1]
return list(ranked_indices_high_to_low_divergence)
[docs] @staticmethod
def get_divergences_per_element_no_segments(
confidences_3d: List[List[List[float]]],
) -> List[List[float]]:
"""
Args:
confidences_3d (List[List[List[float]]]): Confidence probabilities per element.
Returns:
divergences (List[List[float]]): Divergences per model for each element.
"""
# Calculate average prediction values.
q_x = np.mean(confidences_3d, axis=0)
# Duplicate the mean distribution by number of models
num_models, _, _ = np.array(confidences_3d).shape
q_x = [q_x for n in range(num_models)]
# X: Model, Y: Divergence Per Element
divergences = scipy_entropy(confidences_3d, q_x, axis=2, base=ENTROPY_LOG_BASE)
return divergences
[docs] @staticmethod
def get_divergences_per_element_with_segments(
confidences_3d: List[List[List[float]]], confidence_segments: Dict
) -> List[List[float]]:
"""Calculate divergences by segments defined in confidence segments where
p_d is the probabilities within class X and q_d is the mean probability distribution
for class X. Divergence(p_d, q_d) is calculated for each element in all classes.
Args:
confidences_3d (List[List[List[float]]]): Confidence probabilities per element.
confidence_segments (Dict[(str, Tuple(int,int))]): A dictionary mapping
segments to run KL Divergence.
Returns:
divergences (List[List[float]]): Divergences per model for each element.
"""
avg_preds = np.mean(confidences_3d, axis=0)
divergences = []
# Calculate q_d
q_d = {d: [] for d in confidence_segments}
for row in avg_preds:
domain = KLDivergenceSampling.get_domain(confidence_segments, row)
q_d[domain].append(row)
for pred in confidences_3d:
# Calculate p_d
p_d = {d: [] for d in confidence_segments}
for row in pred:
domain = KLDivergenceSampling.get_domain(confidence_segments, row)
p_d[domain].append(row)
# Calculate Divergence Scores by Domain
divergence_scores_by_domain = {d: [] for d in confidence_segments}
for domain in confidence_segments:
if len(p_d[domain]) > 0 and len(q_d[domain]) > 0:
divergence_scores_by_domain[domain] = scipy_entropy(
np.array(p_d[domain]),
np.array(q_d[domain]),
axis=1,
base=ENTROPY_LOG_BASE,
)
else:
divergence_scores_by_domain[domain] = 0
# Reorder Divergence Scores by Domain to Original order
single_pred_divergence_scores = []
domain_counter = {d: 0 for d in confidence_segments}
for row in pred:
domain = KLDivergenceSampling.get_domain(confidence_segments, row)
single_pred_divergence_scores.append(
divergence_scores_by_domain[domain][domain_counter[domain]]
)
domain_counter[domain] += 1
divergences.append(single_pred_divergence_scores)
return divergences
[docs] @staticmethod
def get_domain(confidence_segments: Dict, row: List[List[float]]) -> str:
"""Get the domain for a given probability row, inferred based on the non-zero values.
Args:
confidence_segments (Dict[str, tuple(int, int)]): A mapping between domains (str) to the
corresponding indices in the probability vector. Used for intent-level KLD.
row (List[List[float]]): A single row representing a queries probability distrubition.
Returns:
domain (str): The domain that the given row belongs to.
Raises:
AssertionError: If a row does not have an associated domain.
"""
if np.sum(row) == 0:
row = list(np.repeat(1 / len(row), repeats=len(row)))
for domain in confidence_segments:
start, end = confidence_segments[domain]
if sum(row[start : end + 1]) > 0:
return domain
raise AssertionError(f"Row does not have an associated domain, Row: {row}")
[docs]class EnsembleSampling(ABC):
[docs] @staticmethod
def get_heuristics_2d() -> tuple:
return (LeastConfidenceSampling, MarginSampling, EntropySampling)
[docs] @staticmethod
def get_heuristics_3d() -> tuple:
return (
LeastConfidenceSampling,
MarginSampling,
EntropySampling,
DisagreementSampling,
KLDivergenceSampling,
)
[docs] @staticmethod
def rank_2d(confidences_2d: List[List[float]]) -> List[int]:
"""Combine ranks from all heuristics that can support ranking given 2d confidence
input.
Args:
confidences_2d (List[List[float]]): Confidence probabilities per element.
Returns:
ranked_indices (List[int]): Indices corresponding to elements ranked by the heuristic.
"""
all_ordered_sample_indices = [
heuristic.rank_2d(confidences_2d)
for heuristic in EnsembleSampling.get_heuristics_2d()
]
return Heuristic.ordered_indices_list_to_final_rank(all_ordered_sample_indices)
[docs] @staticmethod
def rank_3d(confidences_3d: List[List[List[float]]]) -> List[int]:
"""Combine ranks from all heuristics that can support ranking given 3d confidence
input.
Args:
confidences_3d (List[List[List[float]]]): Confidence probabilities per element.
Returns:
ranked_indices (List[int]): Indices corresponding to elements ranked by the heuristic.
"""
all_ordered_sample_indices = [
heuristic.rank_3d(confidences_3d)
for heuristic in EnsembleSampling.get_heuristics_3d()
]
return Heuristic.ordered_indices_list_to_final_rank(all_ordered_sample_indices)
[docs]class HeuristicsFactory:
"""Heuristics Factory Class"""
[docs] @staticmethod
def get_heuristic(heuristic) -> Heuristic:
"""A static method to get a Heuristic class.
Args:
heuristic (str): Name of the desired Heuristic class
Returns:
(Heuristic): Heuristic Class
"""
heuristic_classes = [
RandomSampling,
LeastConfidenceSampling,
MarginSampling,
EntropySampling,
DisagreementSampling,
EnsembleSampling,
KLDivergenceSampling,
]
for heuristic_class in heuristic_classes:
if heuristic == heuristic_class.__name__:
return heuristic_class()
raise AssertionError(f" {heuristic} is not a valid heuristic.")