# -*- coding: utf-8 -*-
#
# Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module consists of encoders that serve as input to pytorch modules
"""
import json
import logging
import os
from abc import abstractmethod, ABC
from itertools import chain
from typing import Dict, List, Union, Any, Tuple
import torch
from .helpers import BatchData, TokenizerType, ClassificationType
from .._util import _get_module_or_attr
from ..containers import HuggingfaceTransformersContainer
try:
from tokenizers import normalizers
from tokenizers.trainers import Trainer
from tokenizers.normalizers import Lowercase, NFD, StripAccents
from tokenizers.pre_tokenizers import Whitespace
from tokenizers.models import BPE, WordPiece
from tokenizers.trainers import BpeTrainer, WordPieceTrainer
from tokenizers.processors import TemplateProcessing
NO_TOKENIZERS_MODULE = False
except ImportError:
NO_TOKENIZERS_MODULE = True
pass
logger = logging.getLogger(__name__)
[docs]class AbstractEncoder(ABC):
"""
Defines a stateful tokenizer. Unlike the tokenizer in the text_preperation_pipeline, tokenizers
derived from this abstract class have a state such a vocabulary or a trained/pretrained model
that is used for encoding an input textual string into sequence of ids or a sequence of
embeddings. These outputs are used by the initial layers of neural nets.
"""
def __init__(self, **kwargs):
if "classification_type" not in kwargs:
msg = "The key 'classification_type' is required to initialize an Encoder class."
raise ValueError(msg)
self.classification_type = ClassificationType(kwargs["classification_type"])
[docs] @abstractmethod
def prepare(self, examples: List[str]):
"""
Method that fits the tokenizer and creates a state that can be dumped or used for encoding
Args:
examples: List of text strings that will be used for creating the state of the tokenizer
"""
raise NotImplementedError("Subclasses must implement this method")
[docs] @abstractmethod
def dump(self, path: str):
"""
Method that dumps the state (if any) of the tokenizer
Args:
path: The folder where the state has to be dumped
"""
raise NotImplementedError("Subclasses must implement this method")
[docs] @abstractmethod
def load(self, path: str):
"""
Method that dumps the state (if any) of the tokenizer
Args:
path: The folder where the dumped state can be found. Not all tokenizers dump with same
file names, hence we use a folder name rather than filename.
"""
raise NotImplementedError("Subclasses must implement this method")
@abstractmethod
def _tokenize(self, text: str) -> List[str]:
"""
Method that converts a peice of text into a sequence of strings
Args:
text (str): Input text.
Returns:
tokens (List[str]): List of tokens.
"""
raise NotImplementedError("Subclasses must implement this method")
[docs] @abstractmethod
def batch_encode(
self, examples: List[str], padding_length: int = None, add_terminals: bool = False, **kwargs
) -> BatchData:
"""
Method that encodes a list of texts into a list of sequence of ids
Args:
examples: List of text strings that will be encoded as a batch
padding_length: The maximum length of each encoded input. Sequences less than this
length are padded to padding_length, longer sequences are trimmed. If not specified,
the max length of examples upon tokenization is used as padding_length.
add_terminals: A boolean flag that determines if terminal special tokens are to be added
to the tokenized examples or not.
Returns:
BatchData: A dictionary-like object for the supplied batch of data, consisting of
various tensor inputs to the neural computation graph as well as any other inputs
required during the forward computation.
Special note on `add_terminals` when using for sequence classification:
This flag can be True or False in general. Setting it to False will lead to errors in
case of Huggingface tokenizers as they are generally built to include terminals along
with pad tokens. Hence, the default value for `add_terminals` is False in case of
encoders built on top of AbstractVocabLookupEncoder and True for Hugginface ones. This
value can be True or False for encoders based on AbstractVocabLookupEncoder for sequence
classification.
"""
raise NotImplementedError("Subclasses must implement this method")
[docs] @abstractmethod
def get_vocab(self) -> Dict:
"""Returns a dictionary of vocab tokens as keys and their ids as values"""
raise NotImplementedError("Subclasses must implement this method")
[docs] def get_pad_token_idx(self) -> Union[None, int]:
"""
If there exists a padding token's index in the vocab, it is returned; useful while
initializing an embedding layer. Else returns a None.
"""
if not hasattr(self, "pad_token_idx"):
return None
return getattr(self, "pad_token_idx")
@property
def number_of_terminal_tokens(self) -> int:
"""
Returns the (maximum) number of terminal tokens used by the encoder during
batch encoding when add_terminals is set to True.
"""
raise NotImplementedError
def _trim_a_list_of_sub_token_groups(
x: List[List[Any]],
max_len: int,
y: List[Any] = None
) -> Union[Tuple[List[Any], List], List[Any]]:
"""
Given a list of sub-tokens sequences (aka. groups) upon a tokenization step, this method
identifies the first N groups that can be consumed within the allowed max length max_len.
Args:
x: List of groups of sub-words, obtained upon whitespace pre-tokenization and word-level
tokenization using a huggingface tokenizer
max_len: The maximum length of ravelled output expected. If given a value greater than the
number of all sub-words inputted, it is clipped to number of all sub-words.
y: Labels accompanying each group in x
"""
max_len = min(max_len, sum([len(_x) for _x in x]))
curr_len = 0
if y:
new_x, new_y = [], []
# iter through each sub-tokens group w/ respective
# group's label (eg. group: ["m", "##ug"])
for _x, _y in zip(x, y):
if curr_len + len(_x) > max_len:
return new_x, new_y
elif curr_len + len(_x) <= max_len:
new_x.append(_x)
new_y.append(_y)
curr_len += len(_x)
return new_x, new_y
else:
new_x = []
# iter through each sub-tokens group w/o respective
# group's label (eg. group: ["m", "##ug"])
for _x in x:
if curr_len + len(_x) > max_len:
return new_x
elif curr_len + len(_x) <= max_len:
new_x.append(_x)
curr_len += len(_x)
return new_x
[docs]class AbstractVocabLookupEncoder(AbstractEncoder):
"""
Abstract class wrapped around AbstractEncoder that has a vocabulary lookup as the state.
"""
SPECIAL_TOKENS_DICT = {
"pad_token": "<PAD>",
"unk_token": "<UNK>",
"start_token": "<START>",
"end_token": "<END>",
}
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.token2id = {}
@property
def id2token(self):
return {i: t for t, i in self.token2id.items()}
[docs] def prepare(self, examples: List[str]):
examples = [ex.strip() for ex in examples]
all_tokens = dict.fromkeys(chain.from_iterable([self._tokenize(text) for text in examples]))
self.token2id = {t: i for i, t in enumerate(all_tokens)}
for name, token in self.__class__.SPECIAL_TOKENS_DICT.items():
self.token2id.update({token: len(self.token2id)})
setattr(self, f"{name}", token)
setattr(self, f"{name}_idx", self.token2id[token])
[docs] def dump(self, path: str):
filename = self._get_filename(path)
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, "w") as opfile:
for token in self.token2id:
opfile.write(f"{token}\n")
with open(os.path.join(path, "special_vocab.json"), "w") as fp:
json.dump(self.__class__.SPECIAL_TOKENS_DICT, fp, indent=4)
msg = f"The state of {self.__class__.__name__} is successfully dumped at '{filename}'"
logger.info(msg)
[docs] def load(self, path: str):
filename = self._get_filename(path)
with open(filename, "r") as opfile:
tokens = [line.strip() for line in opfile]
self.token2id = {t: i for i, t in enumerate(tokens)}
with open(os.path.join(path, "special_vocab.json"), "r") as fp:
special_tokens_dict = json.load(fp)
for name, token in special_tokens_dict.items():
setattr(self, f"{name}", token)
setattr(self, f"{name}_idx", self.token2id[token])
msg = f"The state of {self.__class__.__name__} is successfully loaded from '{filename}'"
logger.info(msg)
def _get_filename(self, path: str):
if not os.path.isdir(path):
msg = f"The dump method of {self.__class__.__name__} only accepts diretory as the " \
f"path argument."
logger.error(msg)
raise ValueError(msg)
filename = os.path.join(path, "vocab.txt")
return filename
@abstractmethod
def _tokenize(self, text: str) -> List[str]:
raise NotImplementedError("Subclasses must implement this method")
[docs] def batch_encode(
self, examples: List[str], padding_length: int = None, add_terminals: bool = False,
_return_tokenized_examples: bool = False, **kwargs
) -> BatchData:
n_terminals = self.number_of_terminal_tokens if add_terminals else 0
if self.classification_type == ClassificationType.TEXT:
tokenized_examples = [self._tokenize(example) for example in examples]
max_curr_len = max([len(ex) for ex in tokenized_examples]) + n_terminals
padding_length_including_terminals = min(max_curr_len, padding_length) \
if padding_length else max_curr_len
padding_length_excluding_terminals = padding_length_including_terminals - n_terminals
_trimmed_examples = [
ex[:padding_length_excluding_terminals] for ex in tokenized_examples
]
split_lengths = [[1] * len(ex) for ex in _trimmed_examples]
# convert (sub) words into their respective token ids
seq_ids = [
self._encode_text(
example,
padding_length_including_terminals,
add_terminals
)
for example in _trimmed_examples
]
if _return_tokenized_examples:
_examples = _trimmed_examples
else:
_examples = None
else:
# We split input text at whitespace because tagger models always use query_text_type
# as 'normalized_text' which consist of whitespaces irrespective of the choice of
# langauge (English, Japanese, etc.)
split_at = " "
# tokenize each word of each input separately
# get maximum length of each example, accounting for terminals (start & end tokens)
# If padding_length is None, padding has to be done to the max length of the input batch
tokenized_examples = [
[self._tokenize(word) for word in example.split(split_at)] for example in examples
]
max_curr_len = max([len(sum(t_ex, [])) for t_ex in tokenized_examples]) + n_terminals
padding_length_including_terminals = min(max_curr_len, padding_length) \
if padding_length else max_curr_len
padding_length_excluding_terminals = padding_length_including_terminals - n_terminals
_trimmed_examples = [
_trim_a_list_of_sub_token_groups(
tokenized_example, padding_length_excluding_terminals
)
for tokenized_example in tokenized_examples
] # List[List[List[str]]], innermost List[str] is a list of sub-words for a given word
split_lengths = [[len(x) for x in ex] for ex in _trimmed_examples]
# convert (sub) words into their respective token ids
seq_ids = [
self._encode_text(
sum(example, []),
padding_length_including_terminals,
add_terminals
)
for example in _trimmed_examples
]
if _return_tokenized_examples:
_examples = [sum(list_of_t, []) for list_of_t in _trimmed_examples]
else:
_examples = None
return BatchData(**{
# number of groups per example
"seq_lengths": torch.as_tensor( # Tensor1d[int]
[len(_split_lengths) + n_terminals for _split_lengths in split_lengths],
dtype=torch.long
),
# len of each subgroup; for each example, sum of its split_lengths will be equal to
# the sequence length minus terminals.
"split_lengths": [
torch.as_tensor(_split_lengths, dtype=torch.long)
for _split_lengths in split_lengths
], # List[Tensor1d[int]],
"seq_ids": torch.as_tensor(seq_ids, dtype=torch.long),
**({"_examples": _examples} if _return_tokenized_examples else {}),
})
def _encode_text(self, list_of_tokens: List[str], padding_length: int, add_terminals: bool):
"""
Encodes a list of tokens in to a list of ids based on vocab and special token ids.
Args:
list_of_tokens (List[str]): List of words or sub-words that are to be encoded
padding_length (int): Maximum length of the encoded sequence; sequences shorter than
this length are padded with a pad index while longer sequences are trimmed.
Upon encoding, the length of outputted list of ids will be this length.
add_terminals (bool): Whether terminal start and end tokens are to be added or not to
the encoded sequence
Returns:
list_of_ids (List[int]): Sequence of ids corresponding to the input tokens
"""
list_of_tokens_with_terminals = (
[getattr(self, "start_token")] +
list_of_tokens +
[getattr(self, "end_token")]
) if add_terminals else list_of_tokens
list_of_tokens_with_terminals_and_padding = (
list_of_tokens_with_terminals +
[getattr(self, "pad_token")] * (padding_length - len(list_of_tokens_with_terminals))
)
list_of_ids = [
self.token2id.get(token, getattr(self, "unk_token_idx"))
for token in list_of_tokens_with_terminals_and_padding
]
return list_of_ids
[docs] def get_vocab(self) -> Dict:
return self.token2id
@property
def number_of_terminal_tokens(self) -> int:
"""
Returns the (maximum) number of terminal tokens used by the encoder during
batch encoding when add_terminals is set to True.
"""
return 2
[docs]class WhitespaceEncoder(AbstractVocabLookupEncoder):
"""
Encoder that tokenizes at whitespace. Not useful for languages such as Chinese.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
if self.classification_type == ClassificationType.TEXT:
msg = "For languages like Japanese, Chinese, etc. that do not have whitespaces, " \
"consider using a pretrained huggingface tokenizer or a character tokenizer " \
"when not using 'query_text_type':'normalized_text'."
logger.warning(msg)
def _tokenize(self, text: str) -> List[str]:
return text.strip("\n").split(" ")
[docs]class CharEncoder(AbstractVocabLookupEncoder):
"""
A simple tokenizer that tokenizes at character level
"""
def _tokenize(self, text: str) -> List[str]:
return list(text.strip("\n"))
[docs]class WhitespaceAndCharDualEncoder(AbstractVocabLookupEncoder):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.char_token2id = {}
if self.classification_type != ClassificationType.TAGGER:
msg = "The inputted tokenizer type can only be used with a tagger model."
raise ValueError(msg)
SPECIAL_CHAR_TOKENS_DICT = {
"char_pad_token": "<CHAR_PAD>",
"char_unk_token": "<CHAR_UNK>",
"char_start_token": "<CHAR_START>",
"char_end_token": "<CHAR_END>",
}
@property
def char_id2token(self):
return {i: t for t, i in self.char_token2id.items()}
@staticmethod
def _char_tokenize(text: str) -> List[str]:
return list(text.strip("\n"))
def _tokenize(self, text: str) -> List[str]:
return text.strip("\n").split(" ")
[docs] def prepare(self, examples: List[str]):
super().prepare(examples)
examples = [ex.strip() for ex in examples]
all_tokens = dict.fromkeys(
chain.from_iterable([self._char_tokenize(text) for text in examples])
)
self.char_token2id = {t: i for i, t in enumerate(all_tokens)}
for name, token in self.__class__.SPECIAL_CHAR_TOKENS_DICT.items():
self.char_token2id.update({token: len(self.char_token2id)})
setattr(self, f"{name}", token)
setattr(self, f"{name}_idx", self.char_token2id[token])
[docs] def dump(self, path: str):
super().dump(path)
filename = self._get_char_filename(path)
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, "w") as opfile:
for token in self.char_token2id:
opfile.write(f"{token}\n")
with open(os.path.join(path, "special_char_vocab.json"), "w") as fp:
json.dump(self.__class__.SPECIAL_CHAR_TOKENS_DICT, fp, indent=4)
msg = f"The state of {self.__class__.__name__} is successfully dumped at '{filename}'"
logger.info(msg)
[docs] def load(self, path: str):
super().load(path)
filename = self._get_char_filename(path)
with open(filename, "r") as opfile:
tokens = [line.strip() for line in opfile]
self.char_token2id = {t: i for i, t in enumerate(tokens)}
with open(os.path.join(path, "special_char_vocab.json"), "r") as fp:
special_char_tokens_dict = json.load(fp)
for name, token in special_char_tokens_dict.items():
setattr(self, f"{name}", token)
setattr(self, f"{name}_idx", self.char_token2id[token])
msg = f"The state of {self.__class__.__name__} is successfully loaded from '{filename}'"
logger.info(msg)
def _get_char_filename(self, path: str):
if not os.path.isdir(path):
msg = f"The dump method of {self.__class__.__name__} only accepts diretory as the " \
f"path argument."
logger.error(msg)
raise ValueError(msg)
filename = os.path.join(path, "char_vocab.txt")
return filename
@property
def number_of_char_terminal_tokens(self) -> int:
"""
Returns the number of char terminal tokens used by the encoder during batch encoding when
add_terminals is set to True
"""
return 2
@property
def number_of_terminal_tokens(self) -> int:
"""
Returns the number of terminal tokens used by the encoder during batch encoding when
add_terminals is set to True.
"""
return 0
[docs] def batch_encode(
self, examples: List[str], char_padding_length: int = None, char_add_terminals: bool = True,
add_terminals: bool = False, _return_tokenized_examples: bool = False, **kwargs
) -> BatchData:
if add_terminals:
msg = f"The param 'add_terminals' must not be True to encode a batch using " \
f"{self.__class__.__name__}."
logger.error(msg)
raise ValueError(msg)
batch_data = super().batch_encode(
examples=examples, add_terminals=False, _return_tokenized_examples=True, **kwargs
)
# use tokenized examples to obtain tokens for char tokenization
_examples = batch_data.pop("_examples")
char_seq_ids, char_seq_lengths = [], []
for _seq_tokens in _examples:
# compute padding length for character sequences
_curr_max = max([len(word) for word in _seq_tokens])
_curr_max = _curr_max + self.number_of_char_terminal_tokens \
if char_add_terminals else _curr_max
char_padding_length = (
min(char_padding_length, _curr_max) if char_padding_length else _curr_max
)
_char_seq_ids, _char_seq_lengths = zip(*[
self._encode_chars(list(word), char_padding_length, char_add_terminals)
for word in _seq_tokens
])
char_seq_ids.append(_char_seq_ids)
char_seq_lengths.append(_char_seq_lengths)
batch_data.update({
"char_seq_ids": [torch.as_tensor(_ids, dtype=torch.long) for _ids in char_seq_ids],
"char_seq_lengths": [torch.as_tensor(_ls, dtype=torch.long) for _ls in char_seq_lengths]
})
return batch_data
def _encode_chars(self, list_of_tokens: List[str], padding_length: int, add_terminals: bool):
"""
Encodes a list of tokens in to a list of character ids based on the encoder's char vocab
Args:
list_of_tokens (List[str]): List of chars that are to be encoded
padding_length (int): Maximum length of the encoded sequence; sequences shorter than
this length are padded with a pad index while longer sequences are trimmed
add_terminals (bool): Whether terminal start and end tokens are to be added or not to
the encoded sequence
Returns:
list_of_ids (List[int]): Sequence of ids corresponding to the input chars
seq_length (int): The length of sequence upon encoding (before padding)
"""
list_of_chars = (
list_of_tokens[:padding_length - self.number_of_char_terminal_tokens]
if add_terminals else list_of_tokens[:padding_length]
)
list_of_chars_with_terminals = (
[getattr(self, "char_start_token")] +
list_of_chars +
[getattr(self, "char_end_token")]
) if add_terminals else list_of_chars
list_of_chars_with_terminals_and_padding = (
list_of_chars_with_terminals +
[getattr(self, "char_pad_token")] * (
padding_length - len(list_of_chars_with_terminals))
)
list_of_ids = [
self.char_token2id.get(token, getattr(self, "char_unk_token_idx"))
for token in list_of_chars_with_terminals_and_padding
]
return list_of_ids, len(list_of_chars_with_terminals)
[docs] def get_char_vocab(self) -> Dict:
return self.char_token2id
[docs] def get_char_pad_token_idx(self) -> Union[None, int]:
"""
If there exists a char padding token's index in the vocab, it is returned; useful while
initializing an embedding layer. Else returns a None.
"""
if not hasattr(self, "char_pad_token_idx"):
return None
return getattr(self, "char_pad_token_idx")
[docs]class AbstractHuggingfaceTrainableEncoder(AbstractEncoder):
"""
Abstract class wrapped around AbstractEncoder that is based on Huggingface's tokenizers library
for creating state model.
reference:
https://huggingface.co/docs/tokenizers/python/latest/pipeline.html
"""
SPECIAL_TOKENS = ["[UNK]", "[CLS]", "[SEP]", "[PAD]", "[MASK]"]
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.tokenizer = None
self.trainer = Trainer
if NO_TOKENIZERS_MODULE:
msg = "Must install extra [transformers] by running " \
"'pip install mindmeld[transformers]'"
raise ImportError(msg)
if self.classification_type == ClassificationType.TEXT:
msg = f"The pre-tokenizer for {self.__class__.__name__} is set to 'Whitespace'. " \
f"For languages like Japanese, Chinese, etc. that do not have whitespaces, " \
f"consider using a pretrained huggingface tokenizer or a character tokenizer " \
f"when not using 'query_text_type':'normalized_text'."
logger.warning(msg)
[docs] def prepare(self, examples: List[str]):
"""
references:
- Huggingface: tutorials/python/training_from_memory.html @ https://tinyurl.com/6hxrtspa
- https://huggingface.co/docs/tokenizers/python/latest/index.html
"""
self._prepare_pipeline()
trainer = self.trainer(
# vocab_size=30000,
vocab_size=100,
special_tokens=self.__class__.SPECIAL_TOKENS
)
self.tokenizer.train_from_iterator(examples, trainer=trainer, length=len(examples))
def _prepare_pipeline(self):
self.tokenizer.normalizer = normalizers.Sequence([NFD(), Lowercase(), StripAccents()])
# TODO: The PreTokenizer which is Whitespace currently can be made customizable so as to
# use for languages that have no whitespaces such as Japanese, Chinese, etc.
self.tokenizer.pre_tokenizer = Whitespace()
self.tokenizer.post_processor = TemplateProcessing(
single="[CLS] $A [SEP]",
pair="[CLS] $A [SEP] $B:1 [SEP]:1",
special_tokens=[
("[CLS]", 1),
("[SEP]", 2),
],
)
self.tokenizer.enable_padding(
pad_id=self.__class__.SPECIAL_TOKENS.index("[PAD]"),
pad_token="[PAD]"
)
[docs] def dump(self, path: str):
filename = self._get_filename(path)
os.makedirs(os.path.dirname(filename), exist_ok=True)
self.tokenizer.save(filename)
msg = f"The state of {self.__class__.__name__} is successfully dumped at '{filename}'"
logger.info(msg)
[docs] def load(self, path: str):
filename = self._get_filename(path)
self.tokenizer = _get_module_or_attr("tokenizers", "Tokenizer").from_file(filename)
self._prepare_pipeline()
msg = f"The state of {self.__class__.__name__} is successfully loaded from '{filename}'"
logger.info(msg)
def _get_filename(self, path: str):
if not os.path.isdir(path):
msg = f"The dump method of {self.__class__.__name__} only accepts diretory as the " \
f"path argument."
logger.error(msg)
raise ValueError(msg)
filename = os.path.join(path, "tokenizer.json")
return filename
def _tokenize(self, text: str) -> List[str]:
"""
Example:
--------
output = tokenizer.encode("Hello, y'all! How are you 😁 ?")
print(output.tokens)
# ["Hello", ",", "y", "'", "all", "!", "How", "are", "you", "[UNK]", "?"]
"""
output = self.tokenizer.encode(text, add_special_tokens=False)
# By disabling add_special_tokens, one can expect tokenized outputs without any terminal
# [CLS] and [SEP] tokens
return output.tokens
[docs] def batch_encode(
self, examples: List[str], padding_length: int = None, add_terminals: bool = True, **kwargs
) -> BatchData:
"""
Example:
--------
output = tokenizer.encode_batch(["Hello, y'all!", "How are you 😁 ?"])
print(output[1].tokens)
# ["[CLS]", "How", "are", "you", "[UNK]", "?", "[SEP]", "[PAD]"]
NOTE:
-----
Passing the argument `padding_length` to set the max length for batch encoding is not
available yet for Huggingface tokenizers
"""
if not add_terminals:
msg = f"The param 'add_terminals' must be True to encode a batch using " \
f"{self.__class__.__name__}."
raise ValueError(msg)
if padding_length is not None:
msg = f"{self.__class__.__name__} does not support setting padding length during" \
f"batch_encode() method."
logger.warning(msg)
n_terminals = self.number_of_terminal_tokens if add_terminals else 0
# We do not distinguish between ClassificationType.TEXT or ClassificationType.TAGGER here.
# Also note that the pre_tokenizer is currently set to Whitespace.
output = self.tokenizer.encode_batch(examples, add_special_tokens=True)
seq_ids = [o.ids for o in output]
attention_masks = [o.attention_mask for o in output]
words = [o.words for o in output]
split_lengths = []
for words_nums in words: # an eg. sequence [None,0,1,2,3,3,3,3,4,4,...,16,None,...,None]
curr_len = 0
curr_num = 0
_split_lengths = []
for word_num in words_nums[1:]: # the first corresponds to CLS token
if word_num == curr_num:
curr_len += 1
elif word_num is None:
_split_lengths.append(curr_len)
break
else:
assert word_num == curr_num + 1, print(word_num, curr_num)
_split_lengths.append(curr_len)
curr_len = 1
curr_num = word_num
split_lengths.append(_split_lengths)
return BatchData(**{
# num of groups per example
"seq_lengths": torch.as_tensor( # Tensor1d[int]
[len(_split_lengths) + n_terminals for _split_lengths in split_lengths],
dtype=torch.long
),
# len of each subgroup; for each example, sum of its split_lengths will be equal to
# the sum of attention mask minus terminals.
"split_lengths": [
torch.as_tensor(_split_lengths, dtype=torch.long)
for _split_lengths in split_lengths
], # List[Tensor1d[int]],
"seq_ids": torch.as_tensor(seq_ids, dtype=torch.long), # Tensor2d[int]
"attention_masks": torch.as_tensor(attention_masks, dtype=torch.long), # Tensor2d[int]
})
[docs] def get_vocab(self) -> Dict:
return self.tokenizer.get_vocab()
[docs] def get_pad_token_idx(self) -> int:
return self.tokenizer.token_to_id("[PAD]")
@property
def number_of_terminal_tokens(self) -> int:
"""
Returns the (maximum) number of terminal tokens used by the encoder during
batch encoding when add_terminals is set to True.
"""
return 2
[docs]class BytePairEncodingEncoder(AbstractHuggingfaceTrainableEncoder):
"""
Encoder that fits a BPE model based on the input examples
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.tokenizer = _get_module_or_attr("tokenizers", "Tokenizer")(BPE())
self.trainer = BpeTrainer
[docs]class WordPieceEncoder(AbstractHuggingfaceTrainableEncoder):
"""
Encoder that fits a WordPiece model based on the input examples
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.tokenizer = _get_module_or_attr("tokenizers", "Tokenizer")(WordPiece())
self.trainer = WordPieceTrainer
[docs]class HuggingfacePretrainedEncoder(AbstractEncoder):
def __init__(self, pretrained_model_name_or_path=None, **kwargs):
super().__init__(**kwargs)
self.pretrained_model_name_or_path = pretrained_model_name_or_path
self.config, self.tokenizer = None, None
self._number_of_terminal_tokens = None
self.__model_max_length = -1
[docs] def prepare(self, examples: List[str]):
del examples
if self.pretrained_model_name_or_path is None:
msg = f"Need a valid 'pretrained_model_name_or_path' path to fit " \
f"{self.__class__.__name__} but found value: {self.pretrained_model_name_or_path}"
raise ValueError(msg)
hf_trans = HuggingfaceTransformersContainer(self.pretrained_model_name_or_path)
self.config = hf_trans.get_transformer_model_config()
self.tokenizer = hf_trans.get_transformer_model_tokenizer()
[docs] def dump(self, path: str):
if not os.path.isdir(path):
msg = f"The dump method of {self.__class__.__name__} only accepts diretory as the " \
f"path argument."
logger.error(msg)
raise ValueError(msg)
os.makedirs(path, exist_ok=True)
self.config.save_pretrained(path)
self.tokenizer.save_pretrained(path)
[docs] def load(self, path: str):
if not os.path.isdir(path):
msg = f"The dump method of {self.__class__.__name__} only accepts diretory as the " \
f"path argument."
logger.error(msg)
raise ValueError(msg)
hf_trans = HuggingfaceTransformersContainer(path)
self.config = hf_trans.get_transformer_model_config()
self.tokenizer = hf_trans.get_transformer_model_tokenizer()
def _tokenize(self, text: str) -> List[str]:
return self.tokenizer.tokenize(text)
@property
def number_of_terminal_tokens(self) -> int:
"""Overwrite parent class' definition of number of terminal tokens"""
if not self._number_of_terminal_tokens:
self._number_of_terminal_tokens = len(self.tokenizer.encode(""))
return self._number_of_terminal_tokens
@property
def _model_max_length(self) -> int:
"""Returns the maximum length of tokens per example allowed for the pretrained tokenizer"""
if self.__model_max_length == -1:
try:
self.__model_max_length = self.tokenizer.model_max_length
except AttributeError as e:
# case in which the huggingface tokenizer doesn't have this attribute
logger.info(e)
self.__model_max_length = None
return self.__model_max_length
[docs] def batch_encode(
self, examples: List[str], padding_length: int = None, add_terminals: bool = True, **kwargs
) -> BatchData:
if not add_terminals:
msg = f"The param 'add_terminals' must be True to encode a batch using " \
f"{self.__class__.__name__}."
logger.error(msg)
raise ValueError(msg)
n_terminals = self.number_of_terminal_tokens if add_terminals else 0
if self.classification_type == ClassificationType.TEXT:
# https://huggingface.co/docs/transformers/v4.16.2/en/preprocessing
hgf_encodings = self.tokenizer(
examples, padding=True, truncation=True, max_length=padding_length,
return_tensors="pt"
) # Huggingface returns a BatchEncodings object; needs to be converted to a dictionary
split_lengths = [
[1] * (sum(msk) - n_terminals) for msk in hgf_encodings["attention_mask"]
]
else:
# We split input text at whitespace because tagger models always use query_text_type
# as 'normalized_text' which consist of whitespaces irrespective of the choice of
# langauge (English, Japanese, etc.)
split_at = " "
if any([
"GPT2Tokenizer" in str(parent_class) for parent_class in
self.tokenizer.__class__.__mro__
]): # tokenizers like RobertaTokenizer that use Byte-level BPE (eg. distilroberta-base)
msg = "The inputted choice of pretrained huggingface tokenizer is based on " \
"Byte-level BPE (eg. 'GPT2Tokenizer', 'RobertaTokenizer', etc.) which " \
"treats spaces like parts of the tokens. " \
"This conflicts with the use of 'query_text_type':'normalized_text' for " \
"tagger models. Consider using a different pretrained model for tagging."
raise NotImplementedError(msg)
# tokenize each word of each input separately
# get maximum length of each example, accounting for terminal tokens- cls, sep
# If padding_length is None, padding has to be done to the max length of the input batch
tokenized_examples = [
[self._tokenize(word) for word in example.split(split_at)] for example in examples
]
max_curr_len = max([len(sum(t_ex, [])) for t_ex in tokenized_examples]) + n_terminals
padding_length_including_terminals = min(max_curr_len, padding_length) \
if padding_length else max_curr_len
if self._model_max_length:
# padding_length cannot exceed the transformer model's maximum length
padding_length_including_terminals = min(
padding_length_including_terminals, self._model_max_length)
padding_length_excluding_terminals = padding_length_including_terminals - n_terminals
_trimmed_examples = [
_trim_a_list_of_sub_token_groups(tokenized_example,
padding_length_excluding_terminals)
for tokenized_example in tokenized_examples
] # List[List[List[str]]], innermost List[str] is a list of sub-words for a given word
split_lengths = [[len(x) for x in ex] for ex in _trimmed_examples]
# Problem if trimmed examples are not detokenized before __call__ method:
# Huggingface does not provide a method where-in the tokenizer's encode method(__call__
# method in the latest versions) can be called with a list of already tokenized text.
# Calling ```tokenized_examples=[" ".join(sum(ex, [])) for ex in _trimmed_examples]```
# & then passing it to ```self.tokenizer.__call__(tokenized_examples, ...)```
# inadvertently re-tokenizes already tokenized strings. Eg: "ttyl" tokenized
# to ["t", "##ty", "#l"] upon trimming operations and then reverted to "t ##ty ##l" is
# incorrectly tokenized later in __call__ as ["t", "#", "#", "t", "y", "#", "#", "l"]
# and then encoded into ids.
# Note that this is not the case with some subclasses of
# AbstractHuggingfaceTrainableEncoder (e.g. BytePairEncodingEncoder)
# which might not have prepending tokens such as '##' for sub-words.
_detokenized_examples = [
split_at.join(
[self.tokenizer.convert_tokens_to_string(group) for group in _trimmed_example]
)
for _trimmed_example in _trimmed_examples
]
# https://huggingface.co/docs/transformers/v4.16.2/en/preprocessing
hgf_encodings = self.tokenizer(
_detokenized_examples, padding=True, truncation=True, max_length=None,
return_tensors="pt"
) # Huggingface returns a BatchEncodings object; needs to be converted to a dictionary
return BatchData(**{
# number of groups per example
"seq_lengths": torch.as_tensor( # Tensor1d[int]
[len(_split_lengths) + n_terminals for _split_lengths in split_lengths],
dtype=torch.long
),
# len of each subgroup; for each example, sum of its split_lengths will be equal to
# the sum of attention mask minus terminals.
"split_lengths": [
torch.as_tensor(_split_lengths, dtype=torch.long)
for _split_lengths in split_lengths
], # List[Tensor1d[int]],
# all the different outputs produced by huggingface's pretrained tokenizer,
# consisting of inputs_ids, attention_masks, etc.
"hgf_encodings": {**hgf_encodings},
})
[docs] def get_vocab(self) -> Dict:
return self.tokenizer.get_vocab()
[docs] def get_pad_token_idx(self) -> int:
return self.tokenizer.pad_token_id