Source code for mindmeld.active_learning.data_loading

# -*- coding: utf-8 -*-
# Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

This module contains classes used to load queries for the Active Learning Pipeline.

from typing import Dict, List
import logging

from .heuristics import Heuristic, stratified_random_sample, EntropySampling

from ..auto_annotator import BootstrapAnnotator
from ..components._config import DEFAULT_AUTO_ANNOTATOR_CONFIG
from ..constants import TuneLevel, TuningType, AL_MAX_LOG_USAGE_PCT
from ..core import ProcessedQuery
from ..markup import read_query_file
from ..resource_loader import ResourceLoader, ProcessedQueryList

logger = logging.getLogger(__name__)

[docs]class LabelMap: """Class that handles label encoding and mapping.""" def __init__(self, query_tree: Dict): """ Args: query_tree (dict): Nested Dictionary containing queries. Has the format: {"domain":{"intent":[Query List]}}. """ self.domain_to_intents = LabelMap.get_domain_to_intents(query_tree) self.domain2id = LabelMap._get_domain_mappings(self.domain_to_intents) self.id2domain = LabelMap._reverse_dict(self.domain2id) self.domain_to_intent2id = LabelMap._get_intent_mappings(self.domain_to_intents) self.id2intent = LabelMap._reverse_nested_dict(self.domain_to_intent2id)
[docs] @staticmethod def get_domain_to_intents(query_tree: Dict) -> Dict: """ Args: query_tree (dict): Nested Dictionary containing queries. Has the format: {"domain":{"intent":[Query List]}} Returns: domain_to_intents (dict): Dict mapping domains to a list of intents. """ domain_to_intents = {} for domain in query_tree: domain_to_intents[domain] = list(query_tree[domain]) return domain_to_intents
@staticmethod def _get_domain_mappings(domain_to_intents: Dict) -> Dict: """Creates a dictionary that maps domains to encoded ids. Args: domain_to_intents (dict): Dict mapping domains to a list of intents. Returns: domain2id (dict): dict with domain to id mappings. """ domain2id = {} domains = list(domain_to_intents) for index, domain in enumerate(domains): domain2id[domain] = index return domain2id @staticmethod def _get_intent_mappings(domain_to_intents: Dict) -> Dict: """Creates a dictionary that maps intents to encoded ids. Args: domain_to_intents (dict): Dict mapping domains to a list of intents. Returns: domain_to_intent2id (dict): dict with intent to id mappings. """ domain_to_intent2id = {} for domain in domain_to_intents: intent_labels = {} for index, intent in enumerate(domain_to_intents[domain]): intent_labels[intent] = index domain_to_intent2id[domain] = intent_labels return domain_to_intent2id @staticmethod def _reverse_dict(dictionary: Dict[str, int]): """ Returns: reversed_dict (dict): Reversed dictionary. """ reversed_dict = {v: k for k, v in dictionary.items()} return reversed_dict @staticmethod def _reverse_nested_dict(dictionary: Dict[str, Dict[str, int]]): """ Returns: reversed_dict (dict): Reversed dictionary. """ reversed_dict = {} for parent_key, parent_value in dictionary.items(): reversed_dict[parent_key] = LabelMap._reverse_dict(parent_value) return reversed_dict @staticmethod def _get_entity_mappings(query_list: ProcessedQueryList) -> Dict: """ Generates index mapping for entity labels in an application. Supports both BIO and BIOES tag schemes. Args: query_list (ProcessedQueryList): Data structure containing a list of processed queries. Returns: Dictionary mapping entity tags to index in entity vector. """ entity_labels = set()"Generating Entity Labels...") for d, i, entities in zip(, query_list.intents(), query_list.entities() ): if len(entities): for entity in entities: e = str(entity.entity.type) entity_labels.add(f"{d}.{i}.B|{e}") entity_labels.add(f"{d}.{i}.I|{e}") entity_labels.add(f"{d}.{i}.S|{e}") entity_labels.add(f"{d}.{i}.E|{e}") e = "O|" entity_labels.add(f"{d}.{i}.{e}") entity_labels = sorted(list(entity_labels)) return dict(zip(entity_labels, range(len(entity_labels))))
[docs] @staticmethod def get_class_labels( tuning_level: list, query_list: ProcessedQueryList ) -> List[str]: """Creates a class label for a set of queries. These labels are used to split queries by type. Labels follow the format of "domain" or "domain|intent". For example, "date|get_date". Args: tuning_level (list): The hierarchy levels to tune ("domain", "intent" or "entity") query_list (ProcessedQueryList): Data structure containing a list of processed queries. Returns: class_labels (List[str]): list of labels for classification task. """ if TuneLevel.INTENT.value in tuning_level: return [ f"{d}.{i}" for d, i in zip(, query_list.intents()) ] else: return [f"{d}" for d in]
[docs] @staticmethod def create_label_map(app_path, file_pattern): """Creates a label map. Args: app_path (str): Path to MindMeld application file_pattern (str): Regex pattern to match text files. (".*train.*.txt") Returns: label_map (LabelMap): A label map. """ resource_loader = ResourceLoader.create_resource_loader(app_path) query_tree = resource_loader.get_labeled_queries(label_set=file_pattern) return LabelMap(query_tree)
[docs]class LogQueriesLoader: def __init__(self, app_path: str, tuning_level: list, log_file_path: str): """This class loads data as processed queries from a specified log file. Args: app_path (str): Path to the MindMeld application. tuning_level (list): The hierarchy levels to tune ("domain", "intent" or "entity") log_file_path (str): Path to the log file with log queries. """ self.app_path = app_path self.tuning_level = tuning_level self.log_file_path = log_file_path
[docs] @staticmethod def deduplicate_raw_text_queries(log_queries_iter) -> List[str]: """Removes duplicates in the text queries. Args: log_queries_iter (generator): Log queries generator. Returns: filtered_text_queries (List[str]): a List of filtered text queries. """ return list(set(q for q in log_queries_iter))
[docs] def convert_text_queries_to_processed( self, text_queries: List[str] ) -> List[ProcessedQuery]: """Converts text queries to processed queries using an annotator. Args: text_queries (List[str]): a List of text queries. Returns: queries (List[ProcessedQuery]): List of processed queries. """"Loading a Bootstrap Annotator to process log queries.") annotator_params = DEFAULT_AUTO_ANNOTATOR_CONFIG annotator_params["app_path"] = self.app_path bootstrap_annotator = BootstrapAnnotator(**annotator_params) return bootstrap_annotator.text_queries_to_processed_queries( text_queries=text_queries )
@property def queries(self): log_queries_iter = read_query_file(self.log_file_path) filtered_text_queries = LogQueriesLoader.deduplicate_raw_text_queries( log_queries_iter ) return self.convert_text_queries_to_processed(filtered_text_queries)
[docs]class DataBucket: """Class to hold data throughout the Active Learning training pipeline. Responsible for data conversion, filtration, and storage. """ def __init__( self, label_map, resource_loader, test_queries: ProcessedQueryList, unsampled_queries: ProcessedQueryList, sampled_queries: ProcessedQueryList, ): """ Args: app_path (str): Path to MindMeld application test_queries (ProcessedQueryList): Queries to use for evaluation. unsampled_queries (ProcessedQueryList): Queries to sample from iteratively. sampled_queries (ProcessedQueryList): Queries currently included in the sample set. """ self.label_map = label_map self.resource_loader = resource_loader self.test_queries = test_queries self.unsampled_queries = unsampled_queries self.sampled_queries = sampled_queries
[docs] def get_queries(self, query_ids): """Method to get multiple queries from the QueryCache given a list of query ids. Args: query_ids (List[int]): List of ids corresponding to queries in the QueryCache. Returns: queries (List[ProcessedQuery]): List of processed queries from the cache. """ return [ self.resource_loader.query_cache.get(query_id) for query_id in query_ids ]
[docs] def update_sampled_queries(self, newly_sampled_queries_ids): """Update the current set of sampled queries by adding the set of newly sampled queries. A new PrcoessedQueryList object is created with the updated set of query ids. Args: newly_sampled_queries_ids (List[int]): List of ids corresponding the newly sampled queries in the QueryCache. """ sampled_queries_ids = self.sampled_queries.elements + newly_sampled_queries_ids self.sampled_queries = ProcessedQueryList( cache=self.resource_loader.query_cache, elements=sampled_queries_ids )
[docs] def update_unsampled_queries(self, remaining_indices): """Update the current set of unsampled queries by removing the set of newly sampled queries. A new PrcoessedQueryList object is created with the updated set of query ids. Args: remaining_indices (List[int]): List of ids corresponding the reamining queries queries in self.unsampled_queries. """ remaining_queries_ids = [ self.unsampled_queries.elements[i] for i in remaining_indices ] self.unsampled_queries = ProcessedQueryList( cache=self.resource_loader.query_cache, elements=remaining_queries_ids )
[docs] def sample_and_update( self, sampling_size: int, confidences_2d: List[List[float]], confidences_3d: List[List[List[float]]], heuristic: Heuristic, confidence_segments: Dict = None, tuning_type: TuningType = TuningType.CLASSIFIER, ): """Method to sample a DataBucket's unsampled_queries and update its sampled_queries and newly_sampled_queries. Args: sampling_size (int): Number of elements to sample in the next iteration. confidences_2d (List[List[float]]): Confidence probabilities per element. (3d for tagger tuning) confidences_3d (List[List[List[float]]]): Confidence probabilities per element. heuristic (Heuristic): Selection strategy. confidence_segments (Dict[(str, Tuple(int,int))]): A dictionary mapping segments to run KL Divergence. tuning_type (TuningType): Component to be tuned ("classifier" or "tagger") Returns: newly_sampled_queries_ids (List[int]): List of ids corresponding the newly sampled queries in the QueryCache. """ if tuning_type == TuningType.CLASSIFIER: params_rank_3d = {"confidences_3d": confidences_3d} if confidence_segments: params_rank_3d["confidence_segments"] = confidence_segments ranked_indices_2d = ( heuristic.rank_3d(**params_rank_3d) if confidences_3d else heuristic.rank_2d(confidences_2d) ) newly_sampled_indices = ranked_indices_2d[:sampling_size] remaining_indices = ranked_indices_2d[sampling_size:] else: try: ranked_entity_indices = heuristic.rank_entities(confidences_2d) except (TypeError, ValueError): # if heuristic does not have entity AL support default to entropy heuristic = EntropySampling ranked_entity_indices = heuristic.rank_entities(confidences_2d) newly_sampled_indices = ranked_entity_indices[:sampling_size] remaining_indices = ranked_entity_indices[sampling_size:] newly_sampled_queries_ids = [ self.unsampled_queries.elements[i] for i in newly_sampled_indices ] self.update_sampled_queries(newly_sampled_queries_ids) self.update_unsampled_queries(remaining_indices) return newly_sampled_queries_ids
[docs] @staticmethod def filter_queries_by_nlp_component( query_list: ProcessedQueryList, component_type: str, component_name: str ): """Filter queries for training preperation. Args: query_list (list): List of queries to filter component_type (str): Component type of desired queries (e.g. "domain") component_name (str): Component name of desired queries (e.g. "smart_home") Returns: filtered_queries_indices (list): List of indices of filtered queries. filtered_queries (list): List of filtered queries. """ filtered_queries = [] filtered_queries_indices = [] for index, query in enumerate(query_list.processed_queries()): if getattr(query, component_type) == component_name: filtered_queries_indices.append(index) filtered_queries.append(query) return filtered_queries_indices, filtered_queries
[docs]class DataBucketFactory: """Class to generate the initial data for experimentation. (Seed Queries, Remaining Queries, and Test Queries). Handles initial sampling and data split based on configuation details. """
[docs] @staticmethod def get_data_bucket_for_strategy_tuning( app_path: str, tuning_level: list, train_pattern: str, test_pattern: str, train_seed_pct: float, ): """Creates a DataBucket to be used for strategy tuning. Args: app_path (str): Path to MindMeld application tuning_level (list): The hierarchy levels to tune ("domain", "intent" or "entity") train_pattern (str): Regex pattern to match train files. (".*train.*.txt") test_pattern (str): Regex pattern to match test files. (".*test.*.txt") train_seed_pct (float): Percentage of training data to use as the initial seed Returns: strategy_tuning_data_bucket (DataBucket): DataBucket for tuning """ label_map = LabelMap.create_label_map(app_path, train_pattern) resource_loader = ResourceLoader.create_resource_loader(app_path) train_query_list = resource_loader.get_flattened_label_set( label_set=train_pattern ) if TuneLevel.ENTITY.value in tuning_level: label_map.entity2id = LabelMap._get_entity_mappings(train_query_list) label_map.id2entity = LabelMap._reverse_dict(label_map.entity2id) train_class_labels = LabelMap.get_class_labels(tuning_level, train_query_list) ranked_indices = stratified_random_sample(train_class_labels) sampling_size = int(train_seed_pct * len(train_query_list)) sampled_query_ids = [ train_query_list.elements[i] for i in ranked_indices[:sampling_size] ] unsampled_query_ids = [ train_query_list.elements[i] for i in ranked_indices[sampling_size:] ] sampled_queries = ProcessedQueryList( resource_loader.query_cache, sampled_query_ids ) unsampled_queries = ProcessedQueryList( resource_loader.query_cache, unsampled_query_ids ) test_queries = resource_loader.get_flattened_label_set(label_set=test_pattern) return DataBucket( label_map, resource_loader, test_queries, unsampled_queries, sampled_queries )
[docs] @staticmethod def get_data_bucket_for_query_selection( app_path: str, tuning_level: list, train_pattern: str, test_pattern: str, unlabeled_logs_path: str, labeled_logs_pattern: str = None, log_usage_pct: float = AL_MAX_LOG_USAGE_PCT, ): """Creates a DataBucket to be used for log query selection. Args: app_path (str): Path to MindMeld application tuning_level (list): The hierarchy levels to tune ("domain", "intent" or "entity") train_pattern (str): Regex pattern to match train files. For example, ".*train.*.txt" test_pattern (str): Regex pattern to match test files. For example, ".*test.*.txt" unlabeled_logs_path (str): Path a logs text file with unlabeled queries labeled_logs_pattern (str): Pattern to obtain logs already labeled within a MindMeld app log_usage_pct (float): Percentage of the log data to use for selection Returns: query_selection_data_bucket (DataBucket): DataBucket for log query selection """ label_map = LabelMap.create_label_map(app_path, train_pattern) resource_loader = ResourceLoader.create_resource_loader(app_path) train_query_list = resource_loader.get_flattened_label_set( label_set=train_pattern ) if TuneLevel.ENTITY.value in tuning_level: label_map.entity2id = LabelMap._get_entity_mappings(train_query_list) label_map.id2entity = LabelMap._reverse_dict(label_map.entity2id) if labeled_logs_pattern: log_query_list = resource_loader.get_flattened_label_set( label_set=labeled_logs_pattern ) else: log_queries = LogQueriesLoader( app_path, tuning_level, unlabeled_logs_path ).queries log_queries_keys = [ resource_loader.query_cache.get_key(q.domain, q.intent, q.query.text) for q in log_queries ] log_query_row_ids = [ resource_loader.query_cache.put(key, query) for key, query in zip(log_queries_keys, log_queries) ] log_query_list = ProcessedQueryList( cache=resource_loader.query_cache, elements=log_query_row_ids ) if log_usage_pct < AL_MAX_LOG_USAGE_PCT: sampling_size = int(log_usage_pct * len(log_query_list)) log_class_labels, _ = label_map.get_class_labels( tuning_level, log_query_list ) ranked_indices = stratified_random_sample(log_class_labels) log_query_ids = [ log_query_list.elements[i] for i in ranked_indices[:sampling_size] ] log_queries = ProcessedQueryList(log_query_list.cache, log_query_ids) sampled_queries = resource_loader.get_flattened_label_set( label_set=train_pattern ) test_queries = resource_loader.get_flattened_label_set(label_set=test_pattern) return DataBucket( label_map, resource_loader, test_queries, log_query_list, sampled_queries )