# -*- coding: utf-8 -*-
#
# Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import re
from abc import ABC, abstractmethod
from enum import Enum
from typing import List
from tqdm import tqdm
from .resource_loader import ResourceLoader
from .components._config import (
ENGLISH_LANGUAGE_CODE,
ENGLISH_US_LOCALE,
)
from .components.translators import NoOpTranslator, TranslatorFactory
from .text_preparation.spacy_model_factory import SpacyModelFactory
from .system_entity_recognizer import (
DucklingRecognizer,
duckling_item_to_query_entity,
)
from .markup import load_query, dump_queries
from .core import Entity, Span, QueryEntity, _get_overlap, NestedEntity
from .exceptions import MarkupError
from .models.helpers import register_annotator
from .constants import (
DUCKLING_TO_SYS_ENTITY_MAPPINGS,
ANNOTATOR_TO_SYS_ENTITY_MAPPINGS,
SPACY_SYS_ENTITIES_NOT_IN_DUCKLING,
CURRENCY_SYMBOLS,
SYSTEM_ENTITY_PREFIX,
)
from .components import NaturalLanguageProcessor
from .path import get_entity_types
from .query_factory import QueryFactory
logger = logging.getLogger(__name__)
[docs]class AnnotatorAction(Enum):
ANNOTATE = "annotate"
UNANNOTATE = "unannotate"
[docs]class Annotator(ABC):
"""
Abstract Annotator class that can be used to build a custom Annotation class.
"""
# pylint: disable=W0613
def __init__(
self,
app_path,
annotation_rules=None,
language=ENGLISH_LANGUAGE_CODE,
locale=ENGLISH_US_LOCALE,
overwrite=False,
unannotate_supported_entities_only=True,
unannotation_rules=None,
**kwargs,
):
"""Initializes an annotator.
Args:
app_path (str): The location of the MindMeld app.
annotation_rules (list): List of Annotation rules.
language (str, optional): Language as specified using a 639-1/2 code.
locale (str, optional): The locale representing the ISO 639-1 language code and \
ISO3166 alpha 2 country code separated by an underscore character.
overwrite (bool): Whether to overwrite existing annotations with conflicting spans.
unannotate_supported_entities_only (bool): Only allow removal of supported entities.
unannotation_rules (list): List of Annotation rules.
"""
self.app_path = app_path
self.language = language
self.locale = locale
self.overwrite = overwrite
self.annotation_rules = annotation_rules or []
self.unannotate_supported_entities_only = unannotate_supported_entities_only
self.unannotation_rules = unannotation_rules or []
self._resource_loader = ResourceLoader.create_resource_loader(app_path)
self.duckling = DucklingRecognizer.get_instance()
def _get_file_entities_map(self, action: AnnotatorAction):
"""Creates a dictionary that maps file paths to entities given
regex rules defined in the config.
Args:
action (AnnotatorAction): Can be "annotate" or "unannotate". Used as a key
to access a list of regex rules in the config dictionary.
Returns:
file_entities_map (dict): A dictionary that maps file paths in an
App to a list of entities.
"""
all_file_paths = self._resource_loader.get_all_file_paths()
file_entities_map = {path: [] for path in all_file_paths}
if action == AnnotatorAction.ANNOTATE:
rules = self.annotation_rules
elif action == AnnotatorAction.UNANNOTATE:
rules = self.unannotation_rules
else:
raise AssertionError(f"{action} is an invalid Annotator action.")
for rule in rules:
pattern = Annotator._get_pattern(rule)
compiled_pattern = re.compile(pattern)
filtered_paths = self._resource_loader.filter_file_paths(
compiled_pattern=compiled_pattern, file_paths=all_file_paths
)
for path in filtered_paths:
entities = self._get_entities(rule)
file_entities_map[path] = entities
return file_entities_map
@staticmethod
def _get_pattern(rule):
"""Convert a rule represented as a dictionary with the keys "domains", "intents",
"entities" into a regex pattern.
Args:
rule (dict): Annotation/Unannotation rule.
Returns:
pattern (str): Regex pattern specifying allowed file paths.
"""
pattern = [rule[x] for x in ["domains", "intents", "files"]]
return ".*/" + "/".join(pattern)
def _get_entities(self, rule):
"""Process the entities specified in a rule dictionary. Check if they are valid
for the given annotator.
Args:
rule (dict): Annotation/Unannotation rule with an "entities" key.
Returns:
valid_entities (list): List of valid entities specified in the rule.
"""
if rule["entities"].strip() in ["*", ".*", ".+"]:
return ["*"]
entities = re.sub("[()]", "", rule["entities"]).split("|")
valid_entities = []
for entity in entities:
entity = entity.strip()
if self.valid_entity_check(entity):
valid_entities.append(entity)
else:
logger.warning("%s is not a valid entity. Skipping entity.", entity)
return valid_entities
@property
@abstractmethod
def supported_entity_types(self):
"""
Returns:
supported_entity_types (list): List of supported entity types.
"""
raise NotImplementedError("Subclasses must implement this method")
[docs] def valid_entity_check(self, entity):
"""Determine if an entity type is valid.
Args:
entity (str): Name of entity to annotate.
Returns:
bool: Whether entity is valid.
"""
entity = entity.lower().strip()
return entity in self.supported_entity_types
[docs] def annotate(self):
"""Annotate data."""
if not self.annotation_rules:
logger.warning(
"""'annotate' field is not configured or misconfigured in the `config.py`.
We can't find any file to annotate."""
)
return
self._modify_queries(action=AnnotatorAction.ANNOTATE)
[docs] def unannotate(self):
"""Unannotate data."""
if not self.unannotate:
logger.warning(
"""'unannotate' field is not configured or misconfigured in the `config.py`.
We can't find any file to unannotate."""
)
return
self._modify_queries(action=AnnotatorAction.UNANNOTATE)
def _modify_queries(self, action: AnnotatorAction):
"""Iterates through App files and annotates or unannotates queries.
Args:
action (AnnotatorAction): Can be "annotate" or "unannotate".
"""
file_entities_map = self._get_file_entities_map(action=action)
query_factory = QueryFactory.create_query_factory(self.app_path)
path_list = [p for p in file_entities_map if file_entities_map[p]]
for path in path_list:
processed_queries = Annotator._get_processed_queries(
file_path=path, query_factory=query_factory
)
tqdm_desc = "Processing " + path + ": "
for processed_query in tqdm(processed_queries, ascii=True, desc=tqdm_desc):
entity_types = file_entities_map[path]
if action == AnnotatorAction.ANNOTATE:
self._annotate_query(
processed_query=processed_query,
entity_types=entity_types,
)
elif action == AnnotatorAction.UNANNOTATE:
self._unannotate_query(
processed_query=processed_query,
remove_entities=entity_types,
)
with open(path, "w") as outfile:
outfile.write("".join(list(dump_queries(processed_queries))))
outfile.close()
@staticmethod
def _get_processed_queries(file_path, query_factory):
"""Converts queries in a given path to processed queries.
Skips and presents a warning if loading the query creates an error.
Args:
file_path (str): Path to file containing queries.
query_factory (QueryFactory): Used to generate processed queries.
Returns:
processed_queries (list): List of processed queries from file.
"""
with open(file_path) as infile:
queries = infile.readlines()
processed_queries = []
domain, intent = file_path.split(os.sep)[-3:-1]
for query in queries:
try:
processed_query = load_query(
markup=query,
domain=domain,
intent=intent,
query_factory=query_factory,
)
processed_queries.append(processed_query)
except (AssertionError, MarkupError):
logger.warning("Skipping query. Error in processing: %s", query)
return processed_queries
def _annotate_query(self, processed_query, entity_types):
"""Updates the entities of a processed query with newly
annotated entities.
Args:
processed_query (ProcessedQuery): The processed query to update.
entity_types (list): List of entities allowed for annotation.
"""
current_entities = list(processed_query.entities)
annotated_entities = self._get_annotated_entities(
processed_query=processed_query, entity_types=entity_types
)
final_entities = Annotator._resolve_conflicts(
target_entities=annotated_entities if self.overwrite else current_entities,
other_entities=current_entities if self.overwrite else annotated_entities,
)
processed_query.entities = tuple(final_entities)
def _get_annotated_entities(self, processed_query, entity_types=None):
"""Creates a list of query entities after parsing the text of a
processed query.
Args:
processed_query (ProcessedQuery): A processed query.
entity_types (list): List of entities allowed for annotation.
Returns:
query_entities (list): List of query entities.
"""
if len(entity_types) == 0:
return []
entity_types = None if entity_types == ["*"] else entity_types
return self.parse(
sentence=processed_query.query.text,
entity_types=entity_types,
domain=processed_query.domain,
intent=processed_query.intent,
)
@staticmethod
def _item_to_query_entity(item, processed_query):
"""Converts an item returned from parse into a query entity.
Args:
item (dict): Dictionary representing an entity with the keys -
"body", "start", "end", "value", "dim". ("role" is an optional attribute.)
processed_query (ProcessedQuery): The processed query that the
entity is found in.
Returns:
query_entity (QueryEntity): The converted query entity.
"""
span = Span(start=item["start"], end=item["end"] - 1)
role = item.get("role")
entity = Entity(
text=item["body"], entity_type=item["dim"], role=role, value=item["value"]
)
query_entity = QueryEntity.from_query(
query=processed_query.query, span=span, entity=entity
)
return query_entity
@staticmethod
def _resolve_conflicts(target_entities, other_entities):
"""Resolve overlaps between existing entities and newly annotad entities.
Args:
target_entities (list): List of existing query entities.
other_entities (list): List of new query entities.
Returns:
final_entities (list): List of resolved query entities.
"""
additional_entities = []
for o_entity in other_entities:
no_overlaps = [
not _get_overlap(o_entity.span, t_entity.span)
for t_entity in target_entities
]
if all(no_overlaps):
additional_entities.append(o_entity)
target_entities.extend(additional_entities)
return target_entities
# pylint: disable=R0201
def _unannotate_query(self, processed_query, remove_entities):
"""Removes specified entities in a processed query. If all entities are being
removed, this function will not remove entities that the annotator does not support
unless it is explicitly specified to do so in the config with the param
"unannotate_supported_entities_only" (bool).
Args:
processed_query (ProcessedQuery): A processed query.
remove_entities (list): List of entities to remove.
"""
keep_entities = []
for query_entity in processed_query.entities:
if remove_entities == ["*"]:
is_supported_entity = self.valid_entity_check(query_entity.entity.type)
if self.unannotate_supported_entities_only and not is_supported_entity:
keep_entities.append(query_entity)
elif query_entity.entity.type not in remove_entities:
keep_entities.append(query_entity)
processed_query.entities = tuple(keep_entities)
[docs] @abstractmethod
def parse(self, sentence, **kwargs):
"""Extract entities from a sentence. Detected entities should be
represented as dictionaries with the following keys: "body", "start"
(start index), "end" (end index), "value", "dim" (entity type).
Args:
sentence (str): Sentence to detect entities.
Returns:
query_entities (list): List of QueryEntity objects.
"""
raise NotImplementedError("Subclasses must implement this method")
[docs]class SpacyAnnotator(Annotator):
"""Annotator class that uses spacy to generate annotations.
Depending on the language, supported entities can include: "sys_time", "sys_interval",
"sys_duration", "sys_number", "sys_amount-of-money", "sys_distance", "sys_weight",
"sys_ordinal", "sys_quantity", "sys_percent", "sys_org", "sys_loc", "sys_person",
"sys_gpe", "sys_norp", "sys_fac", "sys_product", "sys_event", "sys_law", "sys_langauge",
"sys_work-of-art", "sys_other-quantity".
For more information on the supported entities for the Spacy Annotator check the MindMeld docs.
"""
def __init__(self, *args, **kwargs):
"""Initializes a SpacyAnnotator.
Args:
app_path (str): The location of the MindMeld app.
annotation_rules (list): List of Annotation rules.
language (str, optional): Language as specified using a 639-1/2 code.
locale (str, optional): The locale representing the ISO 639-1 language code and \
ISO3166 alpha 2 country code separated by an underscore character.
overwrite (bool): Whether to overwrite existing annotations with conflicting spans.
spacy_model_size (str, optional): Size of the Spacy model to use. ("sm", "md", or "lg")
unannotate_supported_entities_only (bool): Only allow removal of supported entities.
unannotation_rules (list): List of Annotation rules.
"""
super().__init__(*args, **kwargs)
self.spacy_model_size = kwargs.get("spacy_model_size", "lg")
self.nlp = SpacyModelFactory.get_spacy_language_model(
self.language, self.spacy_model_size
)
@property
def supported_entity_types(self): # pylint: disable=W0236
"""This function generates a list of supported entities for the given language.
These entities labels are mapped to MindMeld sys_entities.
The "misc" spacy entity is skipped since the category too broad to be
helpful in an application.
Returns:
supported_entity_types (list): List of supported entity types.
"""
spacy_supported_entities = [e.lower() for e in self.nlp.get_pipe("ner").labels]
supported_entities = set()
for entity in spacy_supported_entities:
if entity == "misc":
continue
if entity in ["time", "date", "datetime"]:
supported_entities.update(["sys_time", "sys_duration", "sys_interval"])
elif entity in ANNOTATOR_TO_SYS_ENTITY_MAPPINGS:
supported_entities.add(ANNOTATOR_TO_SYS_ENTITY_MAPPINGS[entity])
else:
supported_entities.add(f"sys_{entity}")
if "sys_weight" in supported_entities:
supported_entities.update(["sys_distance", "sys_other-quantity"])
supported_entities = self._remove_unresolvable_entities(supported_entities)
return supported_entities
def _remove_unresolvable_entities(self, entities):
"""Remove entities that need duckling to be resolved but are not
supported by duckling for the given language.
Args:
entities (list): List of entities to filter.
Returns:
filtered_entities (list): Filtered entities.
"""
filtered_entities = []
for entity in entities:
if entity not in SPACY_SYS_ENTITIES_NOT_IN_DUCKLING:
if (
self.language in DUCKLING_TO_SYS_ENTITY_MAPPINGS
and entity in DUCKLING_TO_SYS_ENTITY_MAPPINGS[self.language]
):
filtered_entities.append(entity)
else:
filtered_entities.append(entity)
return filtered_entities
[docs] def parse(self, sentence, entity_types=None, **kwargs):
"""Extracts entities from a sentence. Detected entities should are
represented as dictionaries with the following keys: "body", "start"
(start index), "end" (end index), "value", "dim" (entity type).
Args:
sentence (str): Sentence to detect entities.
entity_types (list): List of entity types to annotate. If None, all
possible entity types will be annotated.
Returns:
query_entities (list): List of QueryEntity objects.
"""
doc = self.nlp(sentence)
spacy_entities = [
{
"body": ent.text,
"start": ent.start_char,
"end": ent.end_char,
"value": {"value": ent.text},
"dim": ent.label_.lower(),
}
for ent in doc.ents
]
entity_resolution_func_map = {
"time": self._resolve_time_date,
"date": self._resolve_time_date,
"datetime": self._resolve_time_date,
"cardinal": self._resolve_cardinal,
"money": self._resolve_money,
"ordinal": self._resolve_ordinal,
"quantity": self._resolve_quantity,
"percent": self._resolve_percent,
"person": self._resolve_person,
}
entities = []
for entity in spacy_entities:
if entity["dim"] in ["per", "persName"]:
entity["dim"] = "person"
elif entity["dim"] == "misc":
continue
if entity["dim"] in entity_resolution_func_map:
params = {"entity": entity}
if entity["dim"] in ["time", "date", "datetime"]:
params["entity_types"] = entity_types
elif entity["dim"] in ["money"]:
params["sentence"] = sentence
entity = entity_resolution_func_map[entity["dim"]](**params)
else:
entity["dim"] = SYSTEM_ENTITY_PREFIX + entity["dim"].replace("_", "-")
if entity:
entities.append(entity)
if entity_types:
entities = [e for e in entities if e["dim"] in entity_types]
processed_query = load_query(
sentence,
query_factory=self._resource_loader.query_factory,
domain=kwargs.get("domain"),
intent=kwargs.get("intent"),
)
return [
Annotator._item_to_query_entity(entity, processed_query)
for entity in entities
]
def _resolve_time_date(self, entity, entity_types=None):
"""Resolves a time related entity. First, an exact match is searched for. If
not found, the largest substring match is searched for. If the span of the entity
does not share the exact span match with duckling entities then it is likely that
spacy has recognized an additional word in the span. For example, "nearly 15 minutes"
doesn't have an exact match but the largest substring match correctly resolves for
the substring "15 minutes". Order of priority for the time entities is sys_duration,
sys_interval, and sys_time.
Args:
entity (dict): A dictionary representing an entity.
entity_types (list): List of entity types to parse. If None, all possible
entity types will be parsed.
Returns:
entity (dict): A resolved entity dict or None if the entity isn't resolved.
"""
candidates = self.duckling.get_candidates_for_text(
entity["body"], language=self.language, locale=self.locale
)
if len(candidates) == 0:
return
time_entities = ["sys_duration", "sys_interval", "sys_time"]
if entity_types:
time_entities = [e for e in time_entities if e in entity_types]
if SpacyAnnotator._resolve_time_exact_match(entity, candidates, time_entities):
return entity
elif SpacyAnnotator._resolve_largest_substring(
entity, candidates, entity_types=time_entities, is_time_related=True
):
return entity
@staticmethod
def _get_time_entity_type(candidate):
"""Determine the "sys" type given a time-related Duckling candidate dictionary.
Args:
candidate (dict): A Duckling candidate.
Returns:
entity_type (str): Entity type. ("sys_duration", "sys_interval" or "sys_time")
"""
if candidate["dim"] == "duration":
return "sys_duration"
if candidate["dim"] == "time":
if candidate["value"]["type"] == "interval":
return "sys_interval"
else:
return "sys_time"
@staticmethod
def _resolve_time_exact_match(entity, candidates, time_entities):
"""Resolve a time-related entity given Duckling candidates on the first
exact match.
Args:
entity (dict): A dictionary representing an entity.
candidates (list): List of dictionary candidates returned by Duckling.parse().
time_entities (list): List of allowed time-related entity types.
Returns:
entity (dict): A resolved entity dict or None if the entity isn't resolved.
"""
for candidate in candidates:
candidate_entity = SpacyAnnotator._get_time_entity_type(candidate)
if (
candidate_entity in time_entities
and candidate["body"] == entity["body"]
):
entity["dim"] = candidate_entity
entity["value"] = candidate["value"]
return entity
@staticmethod
def _resolve_largest_substring(entity, candidates, entity_types, is_time_related):
"""Resolve an entity by the largest substring match given Duckling candidates.
Args:
entity (dict): A dictionary representing an entity.
candidates (list): List of dictionary candidates returned by Duckling.parse().
entity_types (list): List of entity types to check.
is_time_related (bool): Whether the entity is related to time.
Returns:
entity (dict): A resolved entity dict or None if the entity isn't resolved.
"""
largest_candidate = None
resolved_entity_type = None
for entity_type in entity_types:
for candidate in candidates:
if is_time_related:
candidate_entity = SpacyAnnotator._get_time_entity_type(candidate)
else:
candidate_entity = candidate["entity_type"]
if (
candidate_entity == entity_type
and candidate["body"] in entity["body"]
and (
largest_candidate is None
or len(candidate["body"]) > len(largest_candidate["body"])
)
):
largest_candidate = candidate
resolved_entity_type = entity_type
if largest_candidate:
entity["body"] = largest_candidate["body"]
offset = entity["start"]
entity["start"] = offset + largest_candidate["start"]
entity["end"] = offset + largest_candidate["end"]
entity["value"] = largest_candidate["value"]
entity["dim"] = resolved_entity_type
return entity
def _resolve_cardinal(self, entity):
if self._resolve_exact_match(entity):
return entity
candidates = self.duckling.get_candidates_for_text(
entity["body"], language=self.language, locale=self.locale
)
if self._resolve_largest_substring(
entity, candidates, entity_types=["sys_number"], is_time_related=False
):
return entity
def _resolve_money(self, entity, sentence):
for symbol in CURRENCY_SYMBOLS:
if symbol in sentence:
start = entity["start"]
if (start == 1 and sentence[0] == symbol) or (
start >= 2 and sentence[start - 2 : start] == " " + symbol
):
entity["start"] -= 1
entity["body"] = sentence[entity["start"] : entity["end"]]
return self._resolve_exact_match(entity)
def _resolve_ordinal(self, entity):
return self._resolve_exact_match(entity)
def _resolve_exact_match(self, entity):
"""Resolves an entity by exact match and corresponding type.
Args:
entity (dict): A dictionary representing an entity.
Returns:
entity (dict): A resolved entity dict or None if the entity isn't resolved.
"""
entity["dim"] = ANNOTATOR_TO_SYS_ENTITY_MAPPINGS[entity["dim"]]
candidates = self.duckling.get_candidates_for_text(
entity["body"], language=self.language, locale=self.locale
)
if len(candidates) == 0:
return
for candidate in candidates:
if (
candidate["entity_type"] == entity["dim"]
and entity["body"] == candidate["body"]
):
entity["value"] = candidate["value"]
return entity
def _resolve_quantity(self, entity):
"""Resolves a quantity related entity. First looks for an exact match, then
for the largest substring match. Order of priority is "sys_distance" then "sys_quantity".
Unresolved entities are labelled as "sys_other-quantity"
Args:
entity (dict): A dictionary representing an entity.
Returns:
entity (dict): A resolved entity dict or None if the entity isn't resolved.
"""
candidates = self.duckling.get_candidates_for_text(entity["body"])
if len(candidates) == 0:
entity["dim"] = "sys_other-quantity"
return entity
entity_types = ["distance", "quantity"]
for entity_type in entity_types:
for candidate in candidates:
if (
candidate["dim"] == entity_type
and candidate["body"] == entity["body"]
):
entity["value"] = candidate["value"]
entity["dim"] = ANNOTATOR_TO_SYS_ENTITY_MAPPINGS[entity_type]
return entity
if SpacyAnnotator._resolve_largest_substring(
entity, candidates, entity_types=entity_types, is_time_related=False
):
return entity
else:
entity["dim"] = "sys_other-quantity"
return entity
def _resolve_percent(self, entity):
"""Resolves an entity related to percentage. Uses a heuristic of finding
the largest candidate value and dividing by 100. If the candidate value is
a float, the float value divided by 100 is immediately returned.
Args:
entity (dict): A dictionary representing an entity.
Returns:
entity (dict): A resolved entity dict or None if the entity isn't resolved.
"""
entity["dim"] = ANNOTATOR_TO_SYS_ENTITY_MAPPINGS[entity["dim"]]
candidates = self.duckling.get_candidates_for_text(
entity["body"], language=self.language, locale=self.locale
)
if len(candidates) == 0:
return
possible_values = []
for candidate in candidates:
if candidate["entity_type"] == "sys_number":
value = candidate["value"]["value"]
if isinstance(value, float):
entity["value"]["value"] = value / 100
return entity
else:
possible_values.append(value)
entity["value"]["value"] = max(possible_values) / 100
return entity
def _resolve_person(self, entity):
"""Resolves a person entity by unlabelling a possessive "'s" from the
name if it exists.
Args:
entity (dict): A dictionary representing an entity.
Returns:
entity (dict): A resolved entity dict.
"""
entity["dim"] = ANNOTATOR_TO_SYS_ENTITY_MAPPINGS[entity["dim"]]
if self._is_plural_entity(entity):
entity["value"] = {"value": entity["body"][:-2]}
entity["body"] = entity["body"][:-2]
entity["end"] -= 2
return entity
def _is_plural_entity(self, entity):
"""Check if an entity is plural.
Args:
entity (dict): A dictionary representing an entity.
Returns:
is_plural (bool): Whether the entity is plural.
"""
return (
self.language == ENGLISH_LANGUAGE_CODE
and len(entity["body"]) >= 2
and entity["body"][-2:] == "'s"
)
[docs]class BootstrapAnnotator(Annotator):
"""Bootstrap Annotator class used to generate annotations based on existing annotations."""
def __init__(self, *args, **kwargs):
"""Initializes a BootstrapAnnotator.
Args:
app_path (str): The location of the MindMeld app.
annotation_rules (list): List of Annotation rules.
confidence_threshold (float): The minimum confidence value to accept a detected entity.
language (str, optional): Language as specified using a 639-1/2 code.
locale (str, optional): The locale representing the ISO 639-1 language code and \
ISO3166 alpha 2 country code separated by an underscore character.
overwrite (bool): Whether to overwrite existing annotations with conflicting spans.
unannotate_supported_entities_only (bool): Only allow removal of supported entities.
unannotation_rules (list): List of Annotation rules.
"""
super().__init__(*args, **kwargs)
self.confidence_threshold = kwargs.get("confidence_threshold", 0)
if self.confidence_threshold < 0 or self.confidence_threshold > 1:
raise ValueError(
"{!r} is not a valid confidence threshold. Select a value between 0 and 1.".format(
self.confidence_threshold
)
)
logger.info("BootstrapAnnotator is loading %s.", self.app_path)
self.nlp = NaturalLanguageProcessor(self.app_path)
self.nlp.build()
[docs] def parse(self, sentence, entity_types, domain: str, intent: str, **kwargs):
"""
Args:
sentence (str): Sentence to detect entities.
entity_types (list): List of entity types to parse. If None, all
possible entity types will be parsed.
domain (str): Allowed domain.
intent (str): Allowed intent.
Returns:
query_entities (list): List of QueryEntity objects.
"""
response = self.nlp.process(
sentence, allowed_nlp_classes={domain: {intent: {}}}, verbose=True
)
entities = []
for i, entity in enumerate(response["entities"]):
if not entity_types or entity["type"] in entity_types:
entity_confidence = response["confidences"]["entities"][i][
entity["type"]
]
if entity_confidence >= self.confidence_threshold:
entities.append(
{
"body": entity.get("text"),
"start": entity.get("span", {}).get("start"),
"end": entity.get("span", {}).get("end") + 1,
"dim": entity.get("type"),
"value": entity.get("value"),
"role": entity.get("role"),
}
)
processed_query = load_query(
sentence,
query_factory=self._resource_loader.query_factory,
domain=kwargs.get("domain"),
intent=kwargs.get("intent"),
)
return [
Annotator._item_to_query_entity(entity, processed_query)
for entity in entities
]
[docs] def text_queries_to_processed_queries(self, text_queries: List[str]):
"""Converts text queries into processed queries.
Args:
text_queries (List[str]): List of raw text queries.
Returns:
processed_queries (List[ProcessedQuery]): List of processed queries.
"""
return [
self.nlp.process_query(query=self.nlp.create_query(q)) for q in text_queries
]
@property
def supported_entity_types(self): # pylint: disable=W0236
"""
Returns:
supported_entity_types (list): List of supported entity types.
"""
return get_entity_types(self.app_path)
[docs] def valid_entity_check(self, entity):
"""Determine if an entity type is valid.
Args:
entity (str): Name of entity to annotate.
Returns:
bool: Whether entity is valid.
"""
entity = entity.lower().strip()
return Entity.is_system_entity(entity) or entity in self.supported_entity_types
[docs]class NoTranslationDucklingAnnotator(Annotator):
"""The NoTranslationDucklingAnnotator detects entities by filtering non-English candidates
from Duckling to a set containing the largest non-overlapping spans.
Unlike the TranslationDucklingAnnotator, this annotator does not use a translation service.
Unlike the MultiLingualAnnotator, this annotator does not use non-English Spacy NER models.
"""
def __init__(self, *args, **kwargs):
"""Initializes a NoTranslationDucklingAnnotator.
Args:
app_path (str): The location of the MindMeld app.
annotation_rules (list): List of Annotation rules.
language (str, optional): Language as specified using a 639-1/2 code.
locale (str, optional): The locale representing the ISO 639-1 language code and \
ISO3166 alpha 2 country code separated by an underscore character.
overwrite (bool): Whether to overwrite existing annotations with conflicting spans.
unannotate_supported_entities_only (bool): Only allow removal of supported entities.
unannotation_rules (list): List of Annotation rules.
"""
super().__init__(*args, **kwargs)
[docs] def parse(self, sentence, entity_types=None, **kwargs):
"""
Args:
sentence (str): Sentence to detect entities.
entity_types (list): List of entity types to parse. If None, all
possible entity types will be parsed.
Returns:
query_entities (list): List of QueryEntity objects.
"""
duckling_candidates = self.duckling.get_candidates_for_text(
sentence,
entity_types=entity_types,
language=self.language,
locale=self.locale,
)
filtered_candidates = (
NoTranslationDucklingAnnotator._filter_out_bad_duckling_candidates(
duckling_candidates
)
)
final_candidates = NestedEntity.get_largest_non_overlapping_entities(
filtered_candidates, lambda x: Span(x["start"], x["end"] - 1))
if entity_types:
final_candidates = [
e for e in final_candidates if e["entity_type"] in entity_types
]
query = self._resource_loader.query_factory.create_query(sentence)
return [
duckling_item_to_query_entity(query, candidate)
for candidate in final_candidates
]
@property
def supported_entity_types(self): # pylint: disable=W0236
"""
Returns:
supported_entity_types (list): List of supported entity types.
"""
return DUCKLING_TO_SYS_ENTITY_MAPPINGS[self.language]
@staticmethod
def _filter_out_bad_duckling_candidates(candidates):
"""Pipeline function to filter initial list of duckling candidates using heuristics.
Args:
candidates (list): List of duckling candidates
Returns:
filtered_candidates (list): List of filtered duckling candidates.
"""
filtered_candidates = (
NoTranslationDucklingAnnotator._remove_unresolved_sys_amount_of_money(
candidates
)
)
return filtered_candidates
@staticmethod
def _remove_unresolved_sys_amount_of_money(candidates):
"""Do not label candidate entities that are sys_amount-of-money but
do not have an "unknown" unit type.
"""
return [
candidate
for candidate in candidates
if not (
candidate["dim"] == "amount-of-money"
and candidate["value"].get("unit") == "unknown"
)
]
[docs]class TranslationDucklingAnnotator(Annotator):
"""The TranslationDucklingAnnotator detects entities in non-English sentences using
a translation service and Duckling by following these steps:
1. The non-English sentence is translated to English.
2. Spacy detects entities in the translated English sentence.
3. Duckling detects non-English entities in the non-English sentence.
4. A heuristic in parse() is used to match and filer the non-English entities
against the English entities.
5. The final set of filtered non-English entities are returned.
Unlike the NoTranslationDucklingAnnotator, this annotator uses a translation service.
Unlike the MultiLingualAnnotator, this annotator does not use non-English Spacy NER models.
"""
def __init__(self, *args, **kwargs):
"""Initializes a TranslationDucklingAnnotator.
Args:
app_path (str): The location of the MindMeld app.
annotation_rules (list): List of Annotation rules.
en_annotator (SpacyAnnotator): A Spacy Annotator with language set to English ("en").
translator (str): A translator to use such as 'GoogleTranslator' or 'NoOpTranslator'.
language (str, optional): Language as specified using a 639-1/2 code.
locale (str, optional): The locale representing the ISO 639-1 language code and \
ISO3166 alpha 2 country code separated by an underscore character.
overwrite (bool): Whether to overwrite existing annotations with conflicting spans.
unannotate_supported_entities_only (bool): Only allow removal of supported entities.
unannotation_rules (list): List of Annotation rules.
"""
super().__init__(*args, **kwargs)
assert (
self.language != ENGLISH_LANGUAGE_CODE
), "The 'language' for a TranslationDucklingAnnotator cannot be set to English."
translator = kwargs.get("translator")
if not translator:
raise AssertionError("'translator' cannot be None.")
elif translator == NoOpTranslator.__name__:
raise AssertionError(
"The 'translator' for a TranslationDucklingAnnotator cannot "
f"be set to {NoOpTranslator.__name__}."
)
self.translator = TranslatorFactory().get_translator(translator)
self.en_annotator = kwargs.get("en_annotator") or SpacyAnnotator(
app_path=self.app_path,
language=ENGLISH_LANGUAGE_CODE,
locale=ENGLISH_US_LOCALE,
)
[docs] def parse(self, sentence, entity_types=None, **kwargs):
"""Implements a heuristic to match English entities detected by Spacy on the
translated non-English sentence against the non-English entities detected by
Duckling on the non-English sentence.
Args:
sentence (str): Sentence to detect entities.
entity_types (list): List of entity types to parse. If None, all
possible entity types will be parsed.
Returns:
query_entities (list): List of QueryEntity objects.
"""
candidates = self.en_annotator.duckling.get_candidates_for_text(
sentence,
entity_types=entity_types,
language=self.language,
locale=self.locale,
)
en_sentence = self.translator.translate( # pylint: disable=E1128
sentence, target_language=ENGLISH_LANGUAGE_CODE
)
en_entities = self.en_annotator.parse(en_sentence, entity_types=entity_types)
final_candidates = []
for entity in en_entities:
value_matched_candidates = []
for candidate in candidates:
# Skip the candidate if the type does not match
if entity.entity.type != candidate["entity_type"]:
continue
# Store the candidate if there is a value match
if entity.entity.value == candidate["value"]:
value_matched_candidates.append(candidate)
# Skip the the translation-match check if value-match candidates exist
if value_matched_candidates:
continue
# Check if the translated entity text matches candidate entity text
if (
self.translator.translate(
entity.entity.text, target_language=self.language
)
== candidate["body"]
):
final_candidates.append(candidate)
break
# Select the largest of the candidates with a value match
if value_matched_candidates:
final_candidates.append(
max(value_matched_candidates, key=lambda x: len(x["body"]))
)
if entity_types:
final_candidates = [
e for e in final_candidates if e["entity_type"] in entity_types
]
query = self._resource_loader.query_factory.create_query(sentence)
return [
duckling_item_to_query_entity(query, candidate)
for candidate in final_candidates
]
@property
def supported_entity_types(self): # pylint: disable=W0236
"""
Returns:
supported_entity_types (list): List of supported entity types.
"""
supported_entity_types = set(
self.en_annotator.supported_entity_types
).intersection(DUCKLING_TO_SYS_ENTITY_MAPPINGS[self.language])
return list(supported_entity_types)
[docs]class MultiLingualAnnotator(Annotator):
"""The MultiLingualAnnotator detects entities in English and non-English sentences.
1. If the 'language' is English, this annotator solely uses the Spacy's English NER model to
detect entities.
2. If the 'language' is not English, this annotator will detect entities using both Spacy
non-English NER models and a Duckling-based Annotator.
A. The TranslationDucklingAnnotator will be used if a 'translator' service is available
(E.g. "GoogleTranslator"). Non-English duckling candidates are matched to English
entities detected by Spacy's English NER model.
B. The NoTranslationDucklingAnnotator will be used if a 'translator' service is not
available. The set of Non-English duckling candidates with the largest non-overlapping
spans is selected.
"""
def __init__(self, *args, **kwargs):
"""Initializes a TranslationDucklingAnnotator.
Args:
app_path (str): The location of the MindMeld app.
annotation_rules (list): List of Annotation rules.
en_annotator (SpacyAnnotator): A Spacy Annotator with language set to English ("en").
translator (str): A translator to use such as 'GoogleTranslator' or 'NoOpTranslator'.
language (str, optional): Language as specified using a 639-1/2 code.
locale (str, optional): The locale representing the ISO 639-1 language code and \
ISO3166 alpha 2 country code separated by an underscore character.
overwrite (bool): Whether to overwrite existing annotations with conflicting spans.
unannotate_supported_entities_only (bool): Only allow removal of supported entities.
unannotation_rules (list): List of Annotation rules.
"""
super().__init__(*args, **kwargs)
self.translator = kwargs.get("translator", NoOpTranslator.__name__)
self.en_annotator = SpacyAnnotator(
app_path=self.app_path,
language=ENGLISH_LANGUAGE_CODE,
locale=ENGLISH_US_LOCALE,
)
if self.language != ENGLISH_LANGUAGE_CODE:
self.duckling_annotator = self._get_duckling_annotator()
self.non_en_annotator = SpacyAnnotator(
app_path=self.app_path,
language=self.language,
locale=self.locale,
)
def _get_duckling_annotator(self):
if self.translator != NoOpTranslator.__name__:
return TranslationDucklingAnnotator(
app_path=self.app_path,
language=self.language,
locale=self.locale,
en_annotator=self.en_annotator,
translator=self.translator,
)
return NoTranslationDucklingAnnotator(
app_path=self.app_path,
language=self.language,
locale=self.locale,
)
[docs] def parse(self, sentence, entity_types=None, **kwargs):
"""
Args:
sentence (str): Sentence to detect entities.
entity_types (list): List of entity types to parse. If None, all
possible entity types will be parsed.
Returns:
query_entities (list): List of QueryEntity objects.
"""
if self.language == ENGLISH_LANGUAGE_CODE:
return self.en_annotator.parse(sentence, entity_types=entity_types)
non_en_spacy_entities = self.non_en_annotator.parse(
sentence, entity_types=entity_types
)
duckling_entities = self.duckling_annotator.parse(
sentence, entity_types=entity_types
)
merged_entities = Annotator._resolve_conflicts(
non_en_spacy_entities, duckling_entities
)
return merged_entities
@property
def supported_entity_types(self): # pylint: disable=W0236
"""
Returns:
supported_entity_types (list): List of supported entity types.
"""
if self.language == ENGLISH_LANGUAGE_CODE:
return self.en_annotator.supported_entity_types
supported_entities = set(self.non_en_annotator.supported_entity_types)
if self.language in DUCKLING_TO_SYS_ENTITY_MAPPINGS:
supported_entities.update(self.duckling_annotator.supported_entity_types)
return supported_entities
[docs]def register_all_annotators():
register_annotator("SpacyAnnotator", SpacyAnnotator)
register_annotator("BootstrapAnnotator", BootstrapAnnotator)
register_annotator("MultiLingualAnnotator", MultiLingualAnnotator)