# -*- coding: utf-8 -*-
#
# Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This module contains the dialogue manager component of MindMeld"""
import asyncio
import copy
import json
import logging
import random
import warnings
from functools import cmp_to_key, partial
from typing import List, Optional
from marshmallow.exceptions import ValidationError
import immutables
from .. import path
from .request import FrozenParams, Params, Request
from .schemas import DEFAULT_FORM_SCHEMA, DEFAULT_RESPONSE_SCHEMA, ParamsSchema
from ..core import Entity, FormEntity
from ..models import entity_features, query_features
from ..models.helpers import DEFAULT_SYS_ENTITIES
mod_logger = logging.getLogger(__name__)
[docs]class DirectiveNames:
"""A constants object for directive names."""
LIST = "list"
"""A directive to display a list."""
LISTEN = "listen"
"""A directive to listen (start speech recognition)."""
REPLY = "reply"
"""A directive to display a text view."""
RESET = "reset"
"""A directive to reset."""
SPEAK = "speak"
"""A directive to speak text out loud."""
SUGGESTIONS = "suggestions"
"""A view for a list of suggestions."""
SLEEP = "sleep"
"""A directive to put the client to sleep after a specified number of milliseconds."""
[docs]class DirectiveTypes:
"""A constants object for directive types."""
VIEW = "view"
"""An action directive."""
ACTION = "action"
"""A view directive."""
[docs]class DialogueStateException(Exception):
def __init__(self, message=None, target_dialogue_state=None):
super().__init__(message)
self.target_dialogue_state = target_dialogue_state
[docs]class DialogueStateRule:
"""A rule that determines a dialogue state. Each rule represents a pattern that must match in
order to invoke a particular dialogue state.
Attributes:
dialogue_state (str): The name of the dialogue state.
domain (str): The name of the domain to match against.
entity_types (set): The set of entity types to match against.
intent (str): The name of the intent to match against.
targeted_only (bool): Whether the state is targeted only.
default (bool): Whether this is the default state.
"""
logger = mod_logger.getChild("DialogueStateRule")
"""Class logger."""
def __init__(self, dialogue_state, **kwargs):
"""Initializes a dialogue state rule.
Args:
dialogue_state (str): The name of the dialogue state.
domain (str): The name of the domain to match against.
has_entity (str): A single entity type to match.
has_entities (list, set): A list/set of entity types to match
against.
intent (str): The name of the intent to match against.
"""
self.dialogue_state = dialogue_state
key_kwargs = (
("domain",),
("intent",),
("has_entity", "has_entities"),
("targeted_only",),
("default",),
)
valid_kwargs = set()
for keys in key_kwargs:
valid_kwargs.update(keys)
for kwarg in kwargs:
if kwarg not in valid_kwargs:
raise TypeError(
(
"DialogueStateRule() got an unexpected keyword argument"
" '{!s}'"
).format(kwarg)
)
resolved = {}
for keys in key_kwargs:
if len(keys) == 2:
single, plural = keys # pylint: disable=unbalanced-tuple-unpacking
if single in kwargs and plural in kwargs:
msg = "Only one of {!r} and {!r} can be specified for a dialogue state rule"
raise ValueError(msg.format(single, plural))
elif single in kwargs and isinstance(kwargs[single], str):
resolved[plural] = {kwargs[single]}
elif plural in kwargs and isinstance(
kwargs[plural], (list, set, tuple)
):
resolved[plural] = set(kwargs[plural])
else:
if single in kwargs:
msg = "Invalid argument type {!r} for {!r}"
raise ValueError(msg.format(kwargs[single], single))
elif plural in kwargs:
msg = "Invalid argument type {!r} for {!r}"
raise ValueError(msg.format(kwargs[plural], plural))
elif keys[0] in kwargs:
resolved[keys[0]] = kwargs[keys[0]]
self.domain = resolved.get("domain", None)
self.intent = resolved.get("intent", None)
self.targeted_only = resolved.get("targeted_only", False)
self.default = resolved.get("default", False)
entities = resolved.get("has_entities", None)
self.entity_types = None
if entities is not None:
for entity in entities:
if not isinstance(entity, str):
msg = "Invalid entity specification for dialogue state rule: {!r}"
raise ValueError(msg.format(entities))
self.entity_types = frozenset(entities)
if self.targeted_only and any([self.domain, self.intent, self.entity_types]):
raise ValueError(
"For a dialogue state rule, if targeted_only is "
"True, domain, intent, and has_entity must be omitted"
)
if self.default and any(
[self.domain, self.intent, self.entity_types, self.targeted_only]
):
raise ValueError(
"For a dialogue state rule, if default is True, "
"domain, intent, has_entity, and targeted_only must be omitted"
)
[docs] def apply(self, request):
"""Applies the dialogue state rule to the given context.
Args:
request (Request): A request object.
Returns:
(bool): Whether or not the context matches.
"""
# Note: this will probably change as the details of "context" are worked out
# bail if this rule is only reachable via target_dialogue_state
if self.targeted_only:
return False
# check domain is correct
if self.domain is not None and self.domain != request.domain:
return False
# check intent is correct
if self.intent is not None and self.intent != request.intent:
return False
# check expected entity types are present
if self.entity_types is not None:
# TODO cache entity types
entity_types = set()
for entity in request.entities:
entity_types.add(entity["type"])
if len(self.entity_types & entity_types) < len(self.entity_types):
return False
return True
@property
def complexity(self):
"""Returns an integer representing the complexity of this dialogue state rule.
Components of a rule in order of increasing complexity are as follows:
default rule, domains, intents, entity types, entity mappings.
Returns:
(int): A number representing the rule complexity.
"""
complexity = [0] * 4
if self.entity_types:
complexity[0] = len(self.entity_types)
if self.intent:
complexity[1] = 1
if self.domain:
complexity[2] = 1
if self.default:
complexity[3] = 1
return tuple(complexity)
def __eq__(self, other):
if isinstance(other, self.__class__):
return self.__dict__ == other.__dict__
raise NotImplementedError
def __ne__(self, other):
if isinstance(other, self.__class__):
return not self.__eq__(other)
raise NotImplementedError
def __repr__(self):
return "<{} {!r}>".format(self.__class__.__name__, self.dialogue_state)
[docs] @staticmethod
def compare(this, that):
"""Compares the complexity of two dialogue state rules.
Args:
this (DialogueStateRule): A dialogue state rule.
that (DialogueStateRule): A dialogue state rule.
Returns:
(int): The comparison result
-1: that is more complex than this
0: this and that are equally complex
1: this is more complex than that
"""
if not (
isinstance(this, DialogueStateRule) and isinstance(that, DialogueStateRule)
):
raise NotImplementedError
# https://docs.python.org/3.0/whatsnew/3.0.html#ordering-comparisons
return (this.complexity > that.complexity) - (this.complexity < that.complexity)
[docs]class DialogueManager:
logger = mod_logger.getChild("DialogueManager")
def __init__(self, responder_class=None, async_mode=False):
self.async_mode = async_mode
self.handler_map = {}
self.middlewares = []
self.rules = []
self.responder_class = responder_class or DialogueResponder
self.default_rule = None
[docs] def handle(self, **kwargs):
"""A decorator that is used to register dialogue state rules."""
def _decorator(func):
name = kwargs.pop("name", None)
self.add_dialogue_rule(name, func, **kwargs)
return func
return _decorator
[docs] def middleware(self, *args):
"""A decorator that is used to register dialogue handler middleware."""
def _decorator(func):
self.add_middleware(func)
return func
if args and callable(args[0]):
# Support syntax: @middleware
_decorator(args[0])
return args[0]
# Support syntax: @middleware()
return _decorator
[docs] def add_middleware(self, middleware):
"""Adds middleware for the dialogue manager. Middleware will be
called for each message before the dialogue state handler. Middleware
registered first will be called first.
Args:
middleware (callable): A dialogue manager middleware
function.
"""
if self.async_mode and not asyncio.iscoroutinefunction(middleware):
msg = (
"Cannot use middleware {!r} in async mode. "
"Middleware must be coroutine function."
)
raise TypeError(msg.format(middleware.__name__))
self.middlewares.append(middleware)
[docs] def add_dialogue_rule(self, name, handler, **kwargs):
"""Adds a dialogue state rule for the dialogue manager.
Args:
name (str): The name of the dialogue state.
handler (function): The dialogue state handler function.
kwargs (dict): A list of options to be passed to the DialogueStateRule initializer.
"""
if name is None:
name = handler.__name__
if self.async_mode and not asyncio.iscoroutinefunction(handler):
msg = (
"Cannot use dialogue state handler {!r} in async mode. "
"Handler must be coroutine function in async mode."
)
raise TypeError(msg.format(name))
rule = DialogueStateRule(name, **kwargs)
self.rules.append(rule)
self.rules.sort(key=cmp_to_key(DialogueStateRule.compare), reverse=True)
if handler is not None:
old_handler = self.handler_map.get(name)
if old_handler is not None and old_handler != handler:
msg = (
"Handler mapping is overwriting an existing dialogue state: %s"
% name
)
raise AssertionError(msg)
self.handler_map[name] = handler
if rule.default:
if self.default_rule:
raise AssertionError("Only one default rule may be specified")
self.default_rule = rule
[docs] def apply_handler(self, request, responder, target_dialogue_state=None):
"""Applies the dialogue state handler for the most complex matching rule.
Args:
request (Request): The request object.
responder (DialogueResponder): The responder object.
target_dialogue_state (str, optional): The target dialogue state.
Returns:
(DialogueResponder): A DialogueResponder containing the dialogue state and directives.
"""
if self.async_mode:
return self._apply_handler_async(
request, responder, target_dialogue_state=target_dialogue_state
)
return self._apply_handler_sync(
request, responder, target_dialogue_state=target_dialogue_state
)
def _apply_handler_sync(self, request, responder, target_dialogue_state=None):
"""Applies the dialogue state handler for the most complex matching rule.
Args:
request (Request): The request object.
responder (DialogueResponder): The responder object.
target_dialogue_state (str, optional): The target dialogue state.
Returns:
(DialogueResponder): A DialogueResponder containing the dialogue state and directives.
"""
try:
return self._attempt_handler_sync(
request, responder, target_dialogue_state=target_dialogue_state
)
except DialogueStateException as e:
if e.target_dialogue_state != target_dialogue_state:
target_dialogue_state = e.target_dialogue_state
else:
self.logger.warning(
"Ignoring target dialogue state '{}'".format(
e.target_dialogue_state
)
)
target_dialogue_state = None
if target_dialogue_state:
self.logger.warning(
"Ignoring target dialogue state '{}'".format(target_dialogue_state)
)
return self._attempt_handler_sync(request, responder)
def _attempt_handler_sync(self, request, responder, target_dialogue_state=None):
"""Tries to apply the dialogue state handler for the most complex matching rule
Args:
request (Request): The request object.
responder (DialogueResponder): The responder object.
target_dialogue_state (str, optional): The target dialogue state.
Returns:
(DialogueResponder): A DialogueResponder containing the dialogue state and directives.
"""
dialogue_state = self._get_dialogue_state(request, target_dialogue_state)
handler = self._get_dialogue_handler(dialogue_state)
responder.dialogue_state = dialogue_state
res = handler(request, responder)
# Add dialogue flow's sub-dialogue_state if provided
if res and isinstance(res, dict) and "dialogue_state" in res:
# TODO: check if this flow is executed, currently not covered in tests
dialogue_state = ".".join([dialogue_state, res["dialogue_state"]])
responder.dialogue_state = dialogue_state
return responder
async def _apply_handler_async(
self, request, responder, target_dialogue_state=None
):
"""Applies the dialogue state handler for the most complex matching rule.
Args:
request (Request): The request object from the DM
responder (DialogueResponder): The responder from the DM
target_dialogue_state (str, optional): The target dialogue state
Returns:
DialogueResponder: A DialogueResponder containing the dialogue state and directives
"""
try:
return await self._attempt_handler_async(
request, responder, target_dialogue_state=target_dialogue_state
)
except DialogueStateException as e:
if e.target_dialogue_state != target_dialogue_state:
target_dialogue_state = e.target_dialogue_state
else:
self.logger.warning(
"Ignoring target dialogue state '{}'".format(
e.target_dialogue_state
)
)
target_dialogue_state = None
if target_dialogue_state:
self.logger.warning(
"Ignoring target dialogue state '{}'".format(target_dialogue_state)
)
return await self._attempt_handler_async(request, responder)
async def _attempt_handler_async(
self, request, responder, target_dialogue_state=None
):
"""Tries to apply the dialogue state handler for the most complex matching rule
Args:
request (Request): The request object from the DM
responder (DialogueResponder): The responder from the DM
target_dialogue_state (str, optional): The target dialogue state
Returns:
DialogueResponder: A DialogueResponder containing the dialogue state and directives
"""
dialogue_state = self._get_dialogue_state(request, target_dialogue_state)
handler = self._get_dialogue_handler(dialogue_state)
responder.dialogue_state = dialogue_state
result_handler = await handler(request, responder)
# Add dialogue flow's sub-dialogue_state if provided
if (
result_handler
and isinstance(result_handler, dict)
and "dialogue_state" in result_handler
):
# TODO: check if this flow is executed, currently not covered in tests
dialogue_state = "{}.{}".format(
dialogue_state, result_handler["dialogue_state"]
)
responder.dialogue_state = dialogue_state
return responder
[docs] @staticmethod
def reprocess(target_dialogue_state=None):
"""Forces the dialogue manager to back out of the flow based on the initial target
dialogue state setting and reselect a handler, following a new target dialogue state
Args:
target_dialogue_state (str, optional): a dialogue_state name to push system into
"""
raise DialogueStateException(
message="reprocess", target_dialogue_state=target_dialogue_state
)
def _get_dialogue_state(self, request, target_dialogue_state=None):
dialogue_state = None
for rule in self.rules:
if target_dialogue_state:
if target_dialogue_state == rule.dialogue_state:
dialogue_state = rule.dialogue_state
break
else:
if rule.apply(request):
dialogue_state = rule.dialogue_state
break
if dialogue_state is None:
msg = "Failed to find dialogue state for {domain}.{intent}".format(
domain=request.domain, intent=request.intent
)
self.logger.info(msg, request)
return dialogue_state
def _get_dialogue_handler(self, dialogue_state):
handler = (
self.handler_map[dialogue_state]
if dialogue_state
else self._default_handler
)
for m in reversed(self.middlewares):
handler = partial(m, handler=handler)
return handler
def _create_responder(self):
return self.responder_class(slots={})
@staticmethod
def _default_handler(context, responder):
# TODO: implement default handler
pass
[docs]class DialogueFlow(DialogueManager):
"""A special dialogue manager subclass used to implement dialogue flows.
Dialogue flows allow developers to implement multiple turn interactions
where only a subset of dialogue states should be accessible or where
different dialogue rules should apply.
Attributes:
app (Application): The application that initializes this flow.
exit_flow_states (list): The list of exit states.
"""
logger = mod_logger.getChild("DialogueFlow")
"""Class logger."""
all_flows = {}
"""The dictionary that references all dialogue flows."""
def __init__(self, name, entrance_handler, app, **kwargs):
super().__init__(async_mode=app.async_mode)
self._name = name
self.all_flows[name] = self
self.app = app
self.exit_flow_states = []
def _set_target_state(request, responder):
responder.params.target_dialogue_state = self.flow_state
return entrance_handler(request, responder)
async def _async_set_target_state(request, responder):
responder.params.target_dialogue_state = self.flow_state
return await entrance_handler(request, responder)
self._entrance_handler = (
_async_set_target_state if self.async_mode else _set_target_state
)
app.add_dialogue_rule(self.name, self._entrance_handler, **kwargs)
handler = (
self._apply_flow_handler_async
if self.async_mode
else self._apply_flow_handler_sync
)
app.add_dialogue_rule(self.flow_state, handler, targeted_only=True)
@property
def name(self):
"""The name of this flow."""
return self._name
@property
def flow_state(self):
"""The state of the flow (<name>_flow)."""
return self._name + "_flow"
@property
def dialogue_manager(self):
"""The dialogue manager which contains this flow."""
if self.app and self.app.app_manager:
return self.app.app_manager.dialogue_manager
return None
def __call__(self, ctx, responder):
return self._entrance_handler(ctx, responder)
[docs] def use_middleware(self, *args):
"""Allows a middleware to be added to this flow."""
def _decorator(func):
self.add_middleware(func)
return func
try:
# Support syntax: @middleware
func = args[0]
if not callable(func):
raise TypeError
_decorator(func)
return func
except (IndexError, TypeError):
# Support syntax: @middleware()
return _decorator
[docs] def handle(self, **kwargs):
"""The dialogue flow handler."""
def _decorator(func):
name = kwargs.pop("name", None)
exit_flow = kwargs.pop("exit_flow", False)
if exit_flow:
func_name = name or func.__name__
self.exit_flow_states.append(func_name)
self.add_dialogue_rule(name, func, **kwargs)
return func
return _decorator
def _apply_flow_handler_sync(self, request, responder):
"""Applies the dialogue state handler for the dialogue flow and set the target dialogue
state to the flow state.
Args:
request (Request): The request object.
responder (DialogueResponder): The responder object.
Returns:
(dict): A dict containing the dialogue state and directives.
"""
dialogue_state = self._get_dialogue_state(request)
handler = self._get_dialogue_handler(dialogue_state)
if dialogue_state not in self.exit_flow_states:
responder.params.target_dialogue_state = self.flow_state
handler(request, responder)
return {"dialogue_state": dialogue_state, "directives": responder.directives}
async def _apply_flow_handler_async(self, request, responder):
"""Applies the dialogue state handler for the dialogue flow and sets the target dialogue
state to the flow state asynchronously.
Args:
request (Request): The request object.
responder (DialogueResponder): The responder object.
Returns:
(dict): A dict containing the dialogue state and directives.
"""
dialogue_state = self._get_dialogue_state(request)
handler = self._get_dialogue_handler(dialogue_state)
if dialogue_state not in self.exit_flow_states:
responder.params.target_dialogue_state = self.flow_state
res = handler(request, responder)
if asyncio.iscoroutine(res):
await res
return {"dialogue_state": dialogue_state, "directives": responder.directives}
def _get_dialogue_handler(self, dialogue_state):
handler = (
self.handler_map[dialogue_state]
if dialogue_state
else self._default_handler
)
try:
middlewares = self.middlewares
except AttributeError:
middlewares = getattr(self, "middleware", tuple())
for m in reversed(middlewares):
handler = partial(m, handler=handler)
return handler
[docs]class AutoEntityFilling:
"""A class to implement Automatic Entity (Slot) Filling
(AEF) that allows developers to prompt users for completing the missing
requirements for entity slots.
"""
_logger = mod_logger.getChild("AutoEntityFilling")
"""Class logger."""
def __init__(self, handler, form, app):
"""
Args:
handler (func): The function to which control is returned after completion of flow.
form (dict): Developer-defined slot-filling form.
app (Application): The application that initializes this flow.
"""
self._app = app
self._app.lazy_init()
self._handler = handler
self._form = Form(entities=form.get('entities'), exit_keys=form.get('exit_keys'),
exit_msg=form.get('exit_msg'), max_retries=form.get('max_retries'))
self._local_entity_form = None
self._prompt_turn = None
self._params_schema = ParamsSchema(
context={
"nlp": self._app.app_manager.nlp,
"dialogue_handler_map": self._app.app_manager.dialogue_manager.handler_map
}
)
def _set_next_turn(self, request, responder):
"""Set target dialogue state to the entrance handler's name"""
responder.params.allowed_intents = tuple(
["{}.{}".format(request.domain, request.intent)]
)
responder.params.target_dialogue_state = self._handler.__name__
def _exit_flow(self, responder):
"""Exits this flow and clears the related parameter for re-usability"""
self._prompt_turn = None
self._local_entity_form = None
responder.params.allowed_intents = tuple()
responder.exit_flow()
def _extract_query_features(
self, text, time_zone=None, timestamp=None, locale=None, language=None
):
""" Extracts Query object from the user input and converts it into
appropriate format for entity extraction.
Args:
text (Request.text): Text in the query
time_zone (str, optional): An IANA time zone id to create the query relative to.
timestamp (int, optional): A reference unix timestamp to create the query relative to,
in seconds.
locale (str, optional): The locale representing the ISO 639-1 language code and \
ISO3166 alpha 2 country code separated by an underscore character.
language (str, optional): Language as specified using a 639-1/2 code
Returns:
Query: A newly constructed query
"""
query_factory = self._app.app_manager.nlp.resource_loader.query_factory
query = query_factory.create_query(text, time_zone, timestamp, locale, language)
return query
def _validate(self, request, slot):
"""Validates the user input based on the entity type and validation type.
Args:
request: Request object
slot: FormEntity object
Returns:
(bool): Boolean whether the user input fulfills the slot requirements
"""
text = request.text
entity_type = slot.entity
extracted_feature = {}
_resolved_value = {}
query = None
if slot.default_eval:
if entity_type in DEFAULT_SYS_ENTITIES:
# system entity validation - checks for presence of required system entity
try:
entity_text = str(request.entities[0]["value"][0]["value"])
except (KeyError, IndexError):
entity_text = text
query = self._extract_query_features(entity_text)
resources = {}
extracted_feature = dict(
query_features.extract_sys_candidates([entity_type])(
query, resources
)
)
else:
# gazetteer validation
try:
query = self._extract_query_features(
request.entities[0]["value"][0]["cname"]
)
except (KeyError, IndexError):
query = self._extract_query_features(text)
gaz = self._app.app_manager.nlp.resource_loader.get_gazetteer(
entity_type
)
# payload format for entity feature extractors:
# tuple(query (Query Object), list of entities, entity index)
_payload = (query, [query], 0)
if len(gaz) > 0:
gazetteer = {"gazetteers": {entity_type: gaz}}
extracted_feature = entity_features.extract_in_gaz_features()(
_payload, gazetteer
)
if not extracted_feature:
return False, _resolved_value
if request.entities:
_resolved_value = request.entities[0]["value"]
if slot.hints:
# hints / user-list validation
if text in slot.hints:
extracted_feature.update({"hint_validated_entity": text})
else:
return False, _resolved_value
if slot.custom_eval:
# Custom validation using function provided by developer.
# Should return True/False/Custom Resolution value for validation status.
# If false, overall validation fails. If either true or a custom resolved
# value is returned, then the validation succeeds.
custom_eval_func = self._app.registry.functions_registry[slot.custom_eval]
_validity = custom_eval_func(request)
if _validity is False:
# For checking 'false' return cases
return False, _resolved_value
if _validity is not True:
# For cases with custom resolution value return
if entity_type in DEFAULT_SYS_ENTITIES:
# for custom system entity resolution
_resolved_value = [{"value": _validity}]
else:
# for custom gazetteer entity resolution
_resolved_value = [{"cname": _validity}]
extracted_feature.update({"custom_validated_entity": text})
# return True iff user input results in extracted features (i.e. successfully validated)
return len(extracted_feature) > 0, _resolved_value
def _initial_fill(self, request):
"""Performs the first pass and fills the entity form with entity values available
in the initial query.
Args:
request (Request): The request object.
"""
for entity in request.entities:
entity_type = entity["type"]
role = entity["role"]
for slot in self._local_entity_form:
if entity_type == slot.entity:
if (slot.role is None) or (role == slot.role):
slot.value = dict(entity)
break
def _end_slot_fill(self, request, responder, async_mode):
# Returns filled entity objects as request.entities
# We pass in the previous turn's responder's params to the current request
request = self._app.app_manager.request_class(
text=request.text,
domain=request.domain,
intent=request.intent,
entities=tuple([slot.value for slot in self._local_entity_form]),
context=request.context or {},
history=request.history or [],
frame=responder.frame or {},
params=request.params,
form=immutables.Map(DEFAULT_FORM_SCHEMA.dump(self._form)),
)
self._exit_flow(responder)
if async_mode:
return self._end_slot_fill_async(request, responder)
return self._end_slot_fill_sync(request, responder)
def _end_slot_fill_sync(self, request, responder):
return self._handler(request, responder)
async def _end_slot_fill_async(self, request, responder):
return await self._handler(request, responder)
def _prompt_slot(self, responder, nlr):
"""Prompts user for missing slot.
Args:
responder (DialogueResponder): responder object.
nlr (str): natural language response to prompt for the missing slot.
"""
response_form = copy.deepcopy(self._form)
response_form.entities = self._local_entity_form
responder.form = DEFAULT_FORM_SCHEMA.dump(response_form)
responder.reply(nlr)
responder.speak(nlr)
self._retry_attempts = 0
self._prompt_turn = False
def _retry_logic(self, request, responder, nlr):
if self._retry_attempts < self._form.max_retries:
self._retry_attempts += 1
response_form = copy.deepcopy(self._form)
response_form.entities = self._local_entity_form
responder.form = DEFAULT_FORM_SCHEMA.dump(response_form)
responder.reply(nlr)
responder.speak(nlr)
else:
# max attempts exceeded, reset counter, exit auto_fill.
self._retry_attempts = 0
self._exit_flow(responder)
# reprocess query to obtain intended nlp config, ignoring previous config.
processed_query = self._app.app_manager.nlp.process(query_text=request.text)
# create new request object from the current responder object.
response_form = copy.deepcopy(self._form)
request = self._app.app_manager.request_class(
context=request.context or {},
history=request.history or [],
frame=responder.frame or {},
params=FrozenParams(**dict(responder.params)),
form=DEFAULT_FORM_SCHEMA.dump(response_form),
**processed_query,
)
# call intended handler from reprocessed query.
self._app.app_manager.dialogue_manager.apply_handler(request, responder)
def __call__(self, request, responder):
"""
The iterative call to fill missing slots in the entity form till all slots have been
filled up or the flow has been exited.
Args:
request (Request): The request object.
responder (DialogueResponder): The responder object.
"""
# If form iteration in request object, continue using that.
# If None, set to original form.
if request.form and request.form["entities"]:
self._local_entity_form = [FormEntity(**copy.deepcopy(elem))
for elem in request.form["entities"]]
else:
self._local_entity_form = None
if request.text.lower() in self._form.exit_keys:
responder.reply(self._form.exit_msg)
responder.speak(self._form.exit_msg)
self._exit_flow(responder)
return
self._set_next_turn(request, responder)
if self._prompt_turn is None or not self._local_entity_form:
# Entering the flow
self._prompt_turn = True
self._local_entity_form = copy.deepcopy(self._form.entities)
self._retry_attempts = 0
# Fill the form with the entities in the first query
self._initial_fill(request)
# convert json response to FormEntity objects (deserialize)
for i, slot in enumerate(self._local_entity_form):
if isinstance(slot, dict):
self._local_entity_form[i] = FormEntity(**slot)
# Iterate through all slots, fill in empty ones
for slot in self._local_entity_form:
if not slot.value:
# check if user has been prompted for this entity slot
if self._prompt_turn:
self._prompt_slot(responder, slot.responses)
responder.listen()
return
# If already prompted,
# validate the user response and retry if invalid response
_is_valid, _resolved_value = self._validate(request, slot)
if not _is_valid:
# retry logic
self._retry_logic(request, responder, slot.retry_response)
responder.listen()
return
slot.value = Entity(
text=request.text,
entity_type=slot.entity,
role=slot.role,
value=_resolved_value,
).to_dict()
# Reset prompt for next slot
self._prompt_turn = True
# Finish slot-filling and return to handler
return self._end_slot_fill(request, responder, self._app.async_mode)
[docs] async def call_async(self, request, responder):
"""The slot-filling call for asynchronous apps
Args:
request (Request): The request object.
responder (DialogueResponder): The responder object.
"""
self(request, responder)
[docs] def invoke(self, request, responder):
"""
Invoke slot-filling as a direct call without requiring a decorator.
"""
# ensures that the slot-filling function is targeted.
kwargs = {"targeted_only": True}
name = self._handler.__name__
try:
# sets a dialogue rule for the handler passed in this invoke call to iteratively call
# the slot-filling flow till completion or exit. This rule is added temporarily for this
# flow and reset for the handler with every new invoke call.
self._app.app_manager.dialogue_manager.add_dialogue_rule(
name, self.__call__, **kwargs
)
except AssertionError:
self._app.app_manager.dialogue_manager.handler_map[name] = self.__call__
# re-run to continue flow
self(request, responder)
[docs] async def invoke_async(self, request, responder):
"""
Async invoke slot-filling as a direct call without requiring a decorator.
"""
await self.invoke(request, responder)
[docs]class DialogueResponder:
"""The dialogue responder helps generate directives and fill slots in the
system-generated natural language responses.
"""
_logger = mod_logger.getChild("DialogueResponder")
DirectiveNames = DirectiveNames
"""The list of directive names."""
DirectiveTypes = DirectiveTypes
"""The list of directive types."""
def __init__(
self,
frame=None,
params=None,
history=None,
slots=None,
request=None,
dialogue_state=None,
directives=None,
form=None,
):
"""
Initializes a dialogue responder.
Args:
frame (dict): The frame object.
params (Params): The params object.
history (list): The history of the responder.
slots (dict): The slots of the responder.
request (Request): The request object associated with the responder.
dialogue_state (str): The dialogue state.
directives (list): The directives of the responder.
form (list): Autofill entities
"""
self.directives = directives or []
self.frame = frame or {}
self.params = params
self.dialogue_state = dialogue_state
self.slots = slots or {}
self.form = form
self.request = request
self.history = history
@property
def history(self):
return self._history
@history.setter
def history(self, history):
# If any of the elements in the history list is a map, then we validate the element
# as a DialogueResponder object and serialize that to make sure the dictionary complies
# with the attributes of a DialogueResponder object
if isinstance(history, (list, tuple)) and \
any(isinstance(item, (dict, immutables.Map)) for item in history):
try:
self._history = [dict(DialogueResponder(**DEFAULT_RESPONSE_SCHEMA.load(item)))
for item in history]
except ValidationError as err:
# TODO: Fix deserialization issues between workbench and mindmeld history payloads
logging.warning("Could not deserialize history properly due to error: %s, "
"this might be due to version incompatibility. "
"We set the history to what "
"is passed in to provide backwards compatibility.", err.messages)
self._history = history
else:
self._history = history or []
@property
def request(self):
return self._request
@request.setter
def request(self, value):
if isinstance(value, dict):
if isinstance(value['params'], dict):
value['params'] = Params(**value['params'])
self._request = Request(**value)
else:
self._request = value or Request()
@property
def params(self):
return self._params
@params.setter
def params(self, value):
if isinstance(value, dict):
self._params = Params(**value)
else:
self._params = value or Params()
def __iter__(self):
for key, value in DEFAULT_RESPONSE_SCHEMA.dump(self).items():
yield key, value
[docs] def reply(self, text):
"""Adds a 'reply' directive.
Args:
text (str): The text of the reply.
"""
text = self._process_template(text)
self.display(DirectiveNames.REPLY, payload={"text": text})
[docs] def speak(self, text):
"""Adds a 'speak' directive.
Args:
text (str): The text to speak aloud.
"""
text = self._process_template(text)
self.act(DirectiveNames.SPEAK, payload={"text": text})
[docs] def list(self, items):
"""Adds a 'list' view directive.
Args:
items (list): The list of dictionary objects.
"""
items = items or []
self.display(DirectiveNames.LIST, payload=items)
[docs] def suggest(self, suggestions):
"""Adds a 'suggestions' directive.
Args:
suggestions (list): A list of suggestions.
"""
suggestions = suggestions or []
self.display(DirectiveNames.SUGGESTIONS, payload=suggestions)
[docs] def listen(self):
"""Adds a 'listen' directive."""
self.act(DirectiveNames.LISTEN)
[docs] def reset(self):
"""Adds a 'reset' directive."""
self.act(DirectiveNames.RESET)
[docs] def display(self, name, payload=None):
"""Adds an arbitrary directive of type 'view'.
Args:
name (str): The name of the directive.
payload (dict, optional): The payload for the view.
"""
self.direct(name, DirectiveTypes.VIEW, payload=payload)
[docs] def act(self, name, payload=None):
"""Adds an arbitrary directive of type 'action'.
Args:
name (str): The name of the directive.
payload (dict, optional): The payload for the action.
"""
self.direct(name, DirectiveTypes.ACTION, payload=payload)
[docs] def direct(self, name, dtype, payload=None):
"""Adds an arbitrary directive.
Args:
name (str): The name of the directive.
dtype (str): The type of the directive.
payload (dict, optional): The payload for the view.
"""
directive = {"name": name, "type": dtype}
if payload:
directive["payload"] = payload
self.directives.append(directive)
[docs] def respond(self, directive):
"""Adds an arbitrary directive.
Args:
directive (dict): A directive.
"""
msg = "respond() is deprecated. Instead use direct()."
warnings.warn(msg)
self.directives.append(directive)
[docs] def prompt(self, text):
"""Alias for `reply()`. Deprecated.
Args:
text (str): The text of the reply.
"""
msg = "prompt() is deprecated. Please use reply() and listen() instead"
warnings.warn(msg)
self.reply(text)
[docs] def sleep(self, delay=0):
"""Adds a 'sleep' directive.
Args:
delay (int): The amount of milliseconds to wait before putting the client to sleep.
"""
self.act(DirectiveNames.SLEEP, payload={"delay": delay})
@staticmethod
def _choose(items):
"""Chooses a random item from items."""
result = items
if isinstance(items, (tuple, list)):
result = random.choice(items)
elif isinstance(items, set):
result = random.choice(tuple(items))
return result
def _process_template(self, text):
return self._choose(text).format(**self.slots)
[docs] def exit_flow(self):
"""Exit the current flow by clearing the target dialogue state."""
self.params.target_dialogue_state = None
[docs]class Conversation:
"""The conversation object is a very basic MindMeld client.
It can be useful for testing out dialogue flows in python.
Example:
>>> convo = Conversation(app_path='path/to/my/app')
>>> convo.say('Hello')
['Hello. I can help you find store hours. How can I help?']
>>> convo.say('Is the store on elm open?')
['The 23 Elm Street Kwik-E-Mart is open from 7:00 to 19:00.']
Attributes:
history (list): The history of the conversation. Starts with the most recent message.
context (dict): The context of the conversation, containing user context.
default_params (Params): The default params to use with each turn. These \
defaults will be overridden by params passed for each turn.
params (FrozenParams): The params returned by the most recent turn.
force_sync (bool): Force synchronous return for `say()` and `process()` \
even when app is in async mode.
verbose (bool, optional): If True, returns class probabilities along with class \
prediction.
"""
_logger = mod_logger.getChild("Conversation")
def __init__(
self,
app=None,
app_path=None,
nlp=None,
context=None,
default_params=None,
force_sync=False,
verbose=False,
):
"""
Args:
app (Application, optional): An initialized app object. Either app or app_path must
be given.
app_path (None, optional): The path to the app data. Used to create an app object.
Either app or app_path must be given.
nlp (NaturalLanguageProcessor, optional): A natural language processor for the app.
If passed, changes to this processor will affect the response from `say()`
context (dict, optional): The context to be used in the conversation.
default_params (Params, optional): The default params to use with each turn. These
defaults will be overridden by params passed for each turn.
force_sync (bool, optional): Force synchronous return for `say()` and `process()`
even when app is in async mode.
verbose (bool, optional): If True, returns class probabilities along with class \
prediction.
"""
app = app or path.get_app(app_path)
app.lazy_init(nlp)
self._app_manager = app.app_manager
if not self._app_manager.ready:
self._app_manager.load()
self.context = context or {}
self.history = []
self.frame = {}
self.form = {}
self.default_params = default_params or Params()
self.force_sync = force_sync
self.params = FrozenParams()
self.verbose = verbose
self._params_schema = ParamsSchema(
context={
"nlp": self._app_manager.nlp,
"dialogue_handler_map": self._app_manager.dialogue_manager.handler_map
}
)
[docs] def say(self, text, params=None, force_sync=False):
"""Send a message in the conversation. The message will be
processed by the app based on the current state of the conversation and
returns the extracted messages from the directives.
Args:
text (str): The text of a message.
params (dict): The params to use with this message,
overriding any defaults which may have been set.
force_sync (bool, optional): Force synchronous response
even when app is in async mode.
Returns:
(list): A text representation of the dialogue responses.
"""
if self._app_manager.async_mode:
res = self._say_async(text, params=params)
if self.force_sync or force_sync:
return asyncio.get_event_loop().run_until_complete(res)
return res
response = self.process(text, params=params)
# handle directives
response_texts = [self._follow_directive(a) for a in response.directives]
return response_texts
async def _say_async(self, text, params=None):
"""Send a message in the conversation. The message will be
processed by the app based on the current state of the conversation and
returns the extracted messages from the directives.
Args:
text (str): The text of a message.
params (dict): The params to use with this message,
overriding any defaults which may have been set.
Returns:
(list): A text representation of the dialogue responses.
"""
response = await self.process(text, params=params)
# handle directives
response_texts = [self._follow_directive(a) for a in response.directives]
return response_texts
[docs] def process(self, text, params=None, force_sync=False):
"""Send a message in the conversation. The message will be processed by
the app based on the current state of the conversation and returns
the response.
Args:
text (str): The text of a message.
params (dict): The params to use with this message, overriding any defaults
which may have been set.
force_sync (bool, optional): Force synchronous response
even when app is in async mode.
Returns:
(dict): The dictionary response.
"""
if self._app_manager.async_mode:
res = self._process_async(text, params=params)
if self.force_sync or force_sync:
return asyncio.get_event_loop().run_until_complete(res)
return res
if not self._app_manager.ready:
self._app_manager.load()
internal_params = copy.deepcopy(self.params)
if isinstance(params, dict):
# Validate params
params = self._params_schema.load(params)
params = FrozenParams(**params)
if isinstance(internal_params, dict):
# Validate internal params
internal_params = self._params_schema.load(internal_params)
internal_params = FrozenParams(**internal_params)
if params:
# If the params arg is explicitly set, overight the internal params
for k, v in vars(params).items():
vars(internal_params)[k] = v
response = self._app_manager.parse(
text,
params=internal_params,
context=self.context,
frame=self.frame,
form=self.form,
history=self.history,
verbose=self.verbose,
)
# Validate params
response.param = Params(**self._params_schema.load(dict(response.params)))
self.history = response.history
self.frame = response.frame
self.form = response.form
self.params = response.params
return response
async def _process_async(self, text, params=None):
"""Send a message in the conversation. The message will be processed by
the app based on the current state of the conversation and returns
the response.
Args:
text (str): The text of a message.
params (dict): The params to use with this message, overriding any defaults
which may have been set.
Returns:
(DialogueResponder): The DialogueResponder Response.
"""
if not self._app_manager.ready:
await self._app_manager.load()
internal_params = copy.deepcopy(self.params)
if isinstance(params, dict):
params = self._params_schema.load(params)
params = FrozenParams(**params)
if isinstance(internal_params, dict):
internal_params = FrozenParams(**internal_params)
if params:
# If the params arg is explicitly set, overight the internal params
for k, v in vars(params).items():
vars(internal_params)[k] = v
response = await self._app_manager.parse(
text,
params=internal_params,
context=self.context,
frame=self.frame,
form=self.form,
history=self.history,
verbose=self.verbose,
)
# Validate params
response.param = Params(**self._params_schema.load(dict(response.params)))
self.history = response.history
self.frame = response.frame
self.form = response.form
self.params = response.params
return response
def _follow_directive(self, directive):
msg = ""
try:
directive_name = directive["name"]
if directive_name in [DirectiveNames.REPLY, DirectiveNames.SPEAK]:
msg = directive["payload"]["text"]
elif directive_name == DirectiveNames.SUGGESTIONS:
suggestions = directive["payload"]
if not suggestions:
raise ValueError
msg = "Suggestion{}:".format("" if len(suggestions) == 1 else "s")
texts = []
for idx, suggestion in enumerate(suggestions):
if idx > 0:
msg += ", {!r}"
else:
msg += " {!r}"
texts.append(self._generate_suggestion_text(suggestion))
msg = msg.format(*texts)
elif directive_name == DirectiveNames.LIST:
msg = "\n".join(
[
json.dumps(item, indent=4, sort_keys=True)
for item in directive["payload"]
]
)
elif directive_name == DirectiveNames.LISTEN:
msg = "Listening..."
elif directive_name == DirectiveNames.RESET:
msg = "Resetting..."
except (KeyError, ValueError, AttributeError):
msg = "Unsupported response: {!r}".format(directive)
return msg
@staticmethod
def _generate_suggestion_text(suggestion):
pieces = []
if "text" in suggestion:
pieces.append(suggestion["text"])
if suggestion["type"] != "text":
pieces.append("({})".format(suggestion["type"]))
return " ".join(pieces)
[docs] def reset(self):
"""Reset the history, frame and params of the Conversation object."""
self.history = []
self.frame = {}
self.form = {}
self.params = FrozenParams()