Source code for mindmeld.active_learning.alp

# -*- coding: utf-8 -*-
#
# Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#     http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module contains the ActiveLearningPipeline class.
"""

import logging
import math

from .data_loading import DataBucketFactory
from .results_manager import ResultsManager
from .plot_manager import PlotManager
from .classifiers import MindMeldALClassifier
from .heuristics import HeuristicsFactory

from ..constants import TuneLevel, TuningType
from ..resource_loader import ProcessedQueryList

logger = logging.getLogger(__name__)


[docs]class ActiveLearningPipeline: # pylint: disable=R0902 """Class that executes the strategy tuning and query selection process for the Active Learning Pipeline.""" def __init__( # pylint: disable=R0913 self, app_path: str, train_pattern: str, test_pattern: str, train_seed_pct: float, n_classifiers: int, n_epochs: int, batch_size: int, classifier_tuning_strategies: list, tagger_tuning_strategies: list, tuning_level: list, classifier_selection_strategy: str, tagger_selection_strategy: str, save_sampled_queries: bool, aggregate_statistic: str, class_level_statistic: str, log_usage_pct: float, labeled_logs_pattern: str, unlabeled_logs_path: str, output_folder: str, ): """ Args: app_path (str): Path to MindMeld application train_pattern (str): Regex pattern to match train files. For example, ".*train.*.txt" test_pattern (str): Regex pattern to match test files. For example, ".*test.*.txt" train_seed_pct (float): Percentage of training data to use as the initial seed n_classifiers (int): Number of classifiers to be used by multi-model heuristics n_epochs (int): Number of epochs to run tuning batch_size (int): Number of queries to select at each iteration tuning_level (list): The hierarchy levels to tune ("domain" or "intent" and/or "entity") classifier_tuning_strategies (List[str]): List of strategies to use for classifier tuning (Options: "LeastConfidenceSampling", "EntropySampling", "MarginSampling", "RandomSampling", "KLDivergenceSampling", "DisagreementSampling", "EnsembleSampling") tagger_tuning_strategies (List[str]): List of strategies to use for tagger tuning (Options: "LeastConfidenceSampling", "EntropySampling", "MarginSampling") classifier_selection_strategy (str): Single strategy to use for log selection tagger_selection_strategy (str): Single strategy to use for log selection save_sampled_queries (bool): Whether to save the queries sampled at each iteration aggregate_statistic (str): Aggregate statistic to record. (Options: "accuracy", "f1_weighted", "f1_macro", "f1_micro".) class_level_statistic (str): Class_level statistic to record. (Options: "f_beta", "percision", "recall") log_usage_pct (float): Percentage of the log data to use for selection labeled_logs_pattern (str): Pattern to obtain logs already labeled in a MindMeld app unlabeled_logs_path (str): Path to a log text file with unlabeled queries output_folder (str): Folder to store active learning results. """ self.app_path = app_path self.train_pattern = train_pattern self.test_pattern = test_pattern self.train_seed_pct = train_seed_pct self.n_classifiers = n_classifiers self.n_epochs = n_epochs self.batch_size = batch_size self.tuning_level = tuning_level self.classifier_tuning_strategies = classifier_tuning_strategies self.tagger_tuning_strategies = tagger_tuning_strategies self.classifier_selection_strategy = classifier_selection_strategy self.tagger_selection_strategy = tagger_selection_strategy self.save_sampled_queries = save_sampled_queries self.aggregate_statistic = MindMeldALClassifier._validate_aggregate_statistic( aggregate_statistic ) self.class_level_statistic = ( MindMeldALClassifier._validate_class_level_statistic(class_level_statistic) ) self.log_usage_pct = log_usage_pct self.labeled_logs_pattern = labeled_logs_pattern self.unlabeled_logs_path = unlabeled_logs_path self.output_folder = output_folder self.results_manager = ResultsManager(output_folder) self.mindmeld_al_classifier = self._get_mindmeld_al_classifier() self.init_unsampled_queries_ids = None self.init_sampled_queries_ids = None self.data_bucket = None if not (self.classifier_tuning_strategies or self.tagger_tuning_strategies): raise ValueError("No tuning strategy provided.") for level in self.tuning_level: if level not in [allowed_level.value for allowed_level in TuneLevel]: raise ValueError(f"Invalid tuning level: {level}") if ( TuneLevel.DOMAIN.value in self.tuning_level and TuneLevel.INTENT.value in self.tuning_level ): logger.warning( "Both 'domain' and 'intent' provided as tuning levels. " "Only one can be selected for classifier tuning. Selecting 'intent'." ) def _get_mindmeld_al_classifier(self): """ Creates an instance of a MindMeld Active Learning Classifier. """ return MindMeldALClassifier( self.app_path, self.tuning_level, self.n_classifiers, self.aggregate_statistic, self.class_level_statistic, ) @property def num_iterations(self) -> int: """Calculates the number of iterations needed for tuning. Returns: num_iterations (int): Number of iterations needed for tuning. """ # An additional iteration is added to save training data after the last sampling round. return 1 + math.ceil(len(self.init_unsampled_queries_ids) / self.batch_size) @property def __dict__(self): """ Custom dictionary method used to save key experiment params. """ return { "app_path": self.app_path, "train_pattern": self.train_pattern, "test_pattern": self.test_pattern, "train_seed_pct": self.train_seed_pct, "n_classifiers": self.n_classifiers, "n_epochs": self.n_epochs, "batch_size": self.batch_size, "tuning_level": self.tuning_level, "classifier_tuning_strategies": self.classifier_tuning_strategies, "tagger_tuning_strategies": self.tagger_tuning_strategies, "classifier_selection_strategy": self.classifier_selection_strategy, "tagger_selection_strategy": self.tagger_selection_strategy, "save_sampled_queries": self.save_sampled_queries, "log_usage_pct": self.log_usage_pct, "labeled_logs_pattern": self.labeled_logs_pattern, "unlabeled_logs_path": self.unlabeled_logs_path, "output_folder": self.output_folder, }
[docs] def tune_strategies(self): """Loads the initial data bucket and then trains on every strategy.""" logger.info("Creating output folder and saving params.") self.results_manager.create_experiment_folder( active_learning_params=self.__dict__, tuning_level=self.tuning_level, classifier_tuning_strategies=self.classifier_tuning_strategies, tagger_tuning_strategies=self.tagger_tuning_strategies, ) logger.info("Creating strategy tuning data bucket.") self.data_bucket = DataBucketFactory.get_data_bucket_for_strategy_tuning( self.app_path, self.tuning_level, self.train_pattern, self.test_pattern, self.train_seed_pct, ) self.init_sampled_queries_ids = self.data_bucket.sampled_queries.elements self.init_unsampled_queries_ids = self.data_bucket.unsampled_queries.elements logger.info("Starting tuning.") self._train_all_strategies()
[docs] def select_queries(self): """Selects the next batch of queries to label from a set of log queries.""" logger.info("Loading queries for active-learning selection.") # pylint: disable-next=too-many-function-args self.data_bucket = DataBucketFactory.get_data_bucket_for_query_selection( self.app_path, self.tuning_level, self.train_pattern, self.test_pattern, self.unlabeled_logs_path, self.labeled_logs_pattern, self.log_usage_pct, ) self.init_sampled_queries_ids = self.data_bucket.sampled_queries.elements self.init_unsampled_queries_ids = self.data_bucket.unsampled_queries.elements logger.info("Starting selection of log queries.") if self.classifier_selection_strategy and ( TuneLevel.DOMAIN.value in self.tuning_level or TuneLevel.INTENT.value in self.tuning_level ): newly_sampled_queries = self._run_strategy( tuning_type=TuningType.CLASSIFIER, strategy=self.classifier_selection_strategy, select_mode=True, ) self.results_manager.write_log_selected_queries_json( strategy=self.classifier_selection_strategy, queries=newly_sampled_queries, tuning_type=TuningType.CLASSIFIER, ) if ( self.tagger_selection_strategy and TuneLevel.ENTITY.value in self.tuning_level ): newly_sampled_queries = self._run_strategy( tuning_type=TuningType.TAGGER, strategy=self.tagger_selection_strategy, select_mode=True, ) self.results_manager.write_log_selected_queries_json( strategy=self.tagger_selection_strategy, queries=newly_sampled_queries, tuning_type=TuningType.TAGGER, )
[docs] def plot(self): """Creates the generated folder and its subfolders if they do not already exist.""" plot_manager = PlotManager( experiment_dir_path=self.results_manager.experiment_folder, aggregate_statistic=self.aggregate_statistic, class_level_statistic=self.class_level_statistic, plot_entities=( TuneLevel.ENTITY.value in self.tuning_level and self.tagger_tuning_strategies ), plot_intents=( TuneLevel.INTENT.value in self.tuning_level and self.classifier_tuning_strategies ), ) plot_manager.generate_plots()
def _train_all_strategies(self): """ Train with all active learning strategies.""" # Checks to ensure classifier/tagger tuning is only run when specified in tuning levels. if self.classifier_tuning_strategies and ( TuneLevel.DOMAIN.value in self.tuning_level or TuneLevel.INTENT.value in self.tuning_level ): for strategy in self.classifier_tuning_strategies: self._run_strategy(tuning_type=TuningType.CLASSIFIER, strategy=strategy) if ( self.tagger_tuning_strategies and TuneLevel.ENTITY.value in self.tuning_level ): for strategy in self.tagger_tuning_strategies: self._run_strategy(tuning_type=TuningType.TAGGER, strategy=strategy) def _run_strategy( self, tuning_type: TuningType, strategy: str, select_mode: bool = False ): """Helper function to train a single strategy. Args: tuning_type (TuningType): Component to be tuned ("classifier" or "tagger") strategy (str): Single strategy to train select_mode (bool): If True, accuracies will not be recorded and run will terminate after first iteration. If False, accuracies will be recorded. """ newly_sampled_queries_ids = [] heuristic = HeuristicsFactory.get_heuristic(strategy) for epoch in range(self.n_epochs): self._reset_data_bucket() for iteration in range(self.num_iterations): self._log_tuning_status(tuning_type, strategy, epoch, iteration) if iteration == 0: newly_sampled_queries_ids = ( self.data_bucket.sampled_queries.elements ) # Run training and obtain probability distributions for each query ( eval_stats, confidences_2d, confidences_3d, confidence_segments, ) = self.mindmeld_al_classifier.train( self.data_bucket, heuristic, tuning_type ) if not select_mode: self._save_training_data( tuning_type, strategy, epoch, iteration, newly_sampled_queries_ids, eval_stats, ) num_unsampled = len(self.data_bucket.unsampled_queries) if num_unsampled > 0: newly_sampled_queries_ids = self.data_bucket.sample_and_update( sampling_size=self._get_sampling_size(num_unsampled), confidences_2d=confidences_2d, confidences_3d=confidences_3d, heuristic=heuristic, confidence_segments=confidence_segments, tuning_type=tuning_type, ) # Terminate on the first iteration if in selection mode. if select_mode: return self.data_bucket.get_queries(newly_sampled_queries_ids) # if unsampled data is exhausted, end iterations for epoch. if not len(self.data_bucket.unsampled_queries): break def _reset_data_bucket(self): """ Reset the DataBucket to the initial DataBucket after every epoch.""" self.data_bucket.unsampled_queries = ProcessedQueryList( cache=self.data_bucket.resource_loader.query_cache, elements=self.init_unsampled_queries_ids, ) self.data_bucket.sampled_queries = ProcessedQueryList( cache=self.data_bucket.resource_loader.query_cache, elements=self.init_sampled_queries_ids, ) def _log_tuning_status(self, tuning_type, strategy, epoch, iteration): logger.info( "Strategy: %s(%s). Epoch: %s. Iter: %s.", strategy, tuning_type.value, epoch, iteration, ) logger.info("Sampled Elements: %s", len(self.data_bucket.sampled_queries)) logger.info("Remaining Elements: %s", len(self.data_bucket.unsampled_queries)) def _save_training_data( self, tuning_type, strategy, epoch, iteration, newly_sampled_queries_ids, eval_stats, ): """ Save training data if in tuning mode. """ self.results_manager.update_accuracies_json( tuning_type, strategy, epoch, iteration, eval_stats ) if self.save_sampled_queries: self.results_manager.update_selected_queries_json( tuning_type, strategy, epoch, iteration, self.data_bucket.get_queries(newly_sampled_queries_ids), ) def _get_sampling_size(self, num_unsampled) -> int: """Calculate the number of elements to sample based on the batch_size and remaining number of elements in the pipeline. Returns: sampling_size (int): Number of elements to sample in the next iteration. """ return self.batch_size if num_unsampled > self.batch_size else num_unsampled
[docs]class ActiveLearningPipelineFactory: """Creates an ActiveLearningPipeline instance from values in a config."""
[docs] @staticmethod def create_from_config(config): """Creates an augmentor instance using the provided configuration Args: config (dict): A model configuration. Returns: ActiveLearningPipeline: An ActiveLearningPipeline class Raises: ValueError: When model configuration is invalid or required key is missing """ return ActiveLearningPipeline( app_path=config.get("app_path"), train_pattern=config.get("pre_tuning", {}).get("train_pattern"), test_pattern=config.get("pre_tuning", {}).get("test_pattern"), train_seed_pct=config.get("pre_tuning", {}).get("train_seed_pct"), n_classifiers=config.get("tuning", {}).get("n_classifiers"), n_epochs=config.get("tuning", {}).get("n_epochs"), batch_size=config.get("tuning", {}).get("batch_size"), classifier_tuning_strategies=config.get("tuning", {}).get( "classifier_tuning_strategies", [] ), tagger_tuning_strategies=config.get("tuning", {}).get( "tagger_tuning_strategies", [] ), tuning_level=config.get("tuning", {}).get("tuning_level", None), classifier_selection_strategy=config.get("query_selection", {}).get( "classifier_selection_strategy" ), tagger_selection_strategy=config.get("query_selection", {}).get( "tagger_selection_strategy" ), save_sampled_queries=config.get("tuning_output", {}).get( "save_sampled_queries" ), aggregate_statistic=config.get("tuning_output", {}).get( "aggregate_statistic" ), class_level_statistic=config.get("tuning_output", {}).get( "class_level_statistic" ), log_usage_pct=config.get("query_selection", {}).get("log_usage_pct"), labeled_logs_pattern=config.get("query_selection", {}).get( "labeled_logs_pattern" ), unlabeled_logs_path=config.get("query_selection", {}).get( "unlabeled_logs_path" ), output_folder=config.get("output_folder"), )