# Copyright (c) Facebook, Inc. and its affiliates.
"""
The processors exist in MMF to make data processing pipelines in various
datasets as similar as possible while allowing code reuse.
The processors also help maintain proper abstractions to keep only what matters
inside the dataset's code. This allows us to keep the dataset ``__getitem__``
logic really clean and no need about maintaining opinions about data type.
Processors can work on both images and text due to their generic structure.
To create a new processor, follow these steps:
1. Inherit the ``BaseProcessor`` class.
2. Implement ``_call`` function which takes in a dict and returns a dict with
same keys preprocessed as well as any extra keys that need to be returned.
3. Register the processor using ``@registry.register_processor('name')`` to
registry where 'name' will be used to refer to your processor later.
In processor's config you can specify ``preprocessor`` option to specify
different kind of preprocessors you want in your dataset.
Let's break down processor's config inside a dataset (VQA2.0) a bit to understand
different moving parts.
Config::
dataset_config:
vqa2:
data_dir: ${env.data_dir}
processors:
text_processor:
type: vocab
params:
max_length: 14
vocab:
type: intersected
embedding_name: glove.6B.300d
vocab_file: vqa2/defaults/extras/vocabs/vocabulary_100k.txt
preprocessor:
type: simple_sentence
params: {}
``BaseDataset`` will init the processors and they will available inside your
dataset with same attribute name as the key name, for e.g. `text_processor` will
be available as `self.text_processor` inside your dataset. As is with every module
in MMF, processor also accept a ``DictConfig`` with a `type` and `params`
attributes. `params` defined the custom parameters for each of the processors.
By default, processor initialization process will also init `preprocessor` attribute
which can be a processor config in itself. `preprocessor` can be then be accessed
inside the processor's functions.
Example::
from mmf.common.registry import registry
from mmf.datasets.processors import BaseProcessor
@registry.register_processor('my_processor')
class MyProcessor(BaseProcessor):
def __init__(self, config, *args, **kwargs):
return
def __call__(self, item, *args, **kwargs):
text = item['text']
text = [t.strip() for t in text.split(" ")]
return {"text": text}
"""
import collections
import copy
import logging
import os
import random
import re
import warnings
from collections import Counter, defaultdict
from dataclasses import dataclass
from typing import Any, Dict, Optional, Union
import numpy as np
import torch
from mmf.common.constants import IMAGE_COLOR_MEAN, IMAGE_COLOR_STD
from mmf.common.registry import registry
from mmf.common.typings import ProcessorConfigType
from mmf.utils.configuration import get_mmf_cache_dir, get_mmf_env
from mmf.utils.distributed import is_main, synchronize
from mmf.utils.file_io import PathManager
from mmf.utils.logger import log_class_usage
from mmf.utils.text import VocabDict
from mmf.utils.vocab import Vocab, WordToVectorDict
from omegaconf import DictConfig
logger = logging.getLogger(__name__)
[docs]@dataclass
class BatchProcessorConfigType:
processors: ProcessorConfigType
[docs]class BaseProcessor:
"""Every processor in MMF needs to inherit this class for compatibility
with MMF. End user mainly needs to implement ``__call__`` function.
Args:
config (DictConfig): Config for this processor, containing `type` and
`params` attributes if available.
"""
def __init__(self, *args, config: Optional[DictConfig] = None, **kwargs):
log_class_usage("Processor", self.__class__)
return
def __call__(self, item: Any, *args, **kwargs) -> Any:
"""Main function of the processor. Takes in a dict and returns back
a dict
Args:
item (Dict): Some item that needs to be processed.
Returns:
Dict: Processed dict.
"""
return item
[docs]class Processor:
"""Wrapper class used by MMF to initialized processor based on their
``type`` as passed in configuration. It retrieves the processor class
registered in registry corresponding to the ``type`` key and initializes
with ``params`` passed in configuration. All functions and attributes of
the processor initialized are directly available via this class.
Args:
config (DictConfig): DictConfig containing ``type`` of the processor to
be initialized and ``params`` of that processor.
"""
def __init__(self, config: ProcessorConfigType, *args, **kwargs):
if "type" not in config:
raise AttributeError(
"Config must have 'type' attribute to specify type of processor"
)
processor_class = registry.get_processor_class(config.type)
if processor_class is None:
raise ValueError(f"No processor class named {config.type} is defined.")
params = {}
if "params" not in config:
logger.warning(
"Config doesn't have 'params' attribute to "
"specify parameters of the processor "
f"of type {config.type}. Setting to default {{}}"
)
else:
params = config.params
self.processor = processor_class(params, *args, **kwargs)
self._dir_representation = dir(self)
def __call__(self, item, *args, **kwargs):
return self.processor(item, *args, **kwargs)
def __getattr__(self, name):
if "_dir_representation" in self.__dict__ and name in self._dir_representation:
return getattr(self, name)
elif "processor" in self.__dict__ and hasattr(self.processor, name):
return getattr(self.processor, name)
else:
raise AttributeError(f"The processor {name} doesn't exist in the registry.")
[docs]class BatchProcessor(BaseProcessor):
"""BatchProcessor is an extension of normal processor which usually are
used in cases where dataset works on full batch instead of samples.
Such cases can be observed in the case of the iterable datasets.
BatchProcessor if provided with processors key in the config, will
initialize a member variable processors_dict for you which will contain
initialization of all of the processors you specified and will need to process
your complete batch.
Rest it behaves in same way, expects an item and returns an item which can be
of any type.
"""
def __init__(self, config: BatchProcessorConfigType, *args, **kwargs):
extra_params = {"data_dir": get_mmf_env(key="data_dir")}
processors_dict = config.get("processors", {})
# Since build_processors also imports processor, import it at runtime to
# avoid circular dependencies
from mmf.utils.build import build_processors
self.processors = build_processors(processors_dict, **extra_params)
def __call__(self, item: Any) -> Any:
return item
[docs]@registry.register_processor("vocab")
class VocabProcessor(BaseProcessor):
"""Use VocabProcessor when you have vocab file and you want to process
words to indices. Expects UNK token as "<unk>" and pads sentences using
"<pad>" token. Config parameters can have ``preprocessor`` property which
is used to preprocess the item passed and ``max_length`` property which
points to maximum length of the sentence/tokens which can be convert to
indices. If the length is smaller, the sentence will be padded. Parameters
for "vocab" are necessary to be passed.
**Key**: vocab
Example Config::
dataset_config:
vqa2:
data_dir: ${env.data_dir}
processors:
text_processor:
type: vocab
params:
max_length: 14
vocab:
type: intersected
embedding_name: glove.6B.300d
vocab_file: vqa2/defaults/extras/vocabs/vocabulary_100k.txt
Args:
config (DictConfig): node containing configuration parameters of
the processor
Attributes:
vocab (Vocab): Vocab class object which is abstraction over the vocab
file passed.
"""
MAX_LENGTH_DEFAULT = 50
PAD_TOKEN = "<pad>"
PAD_INDEX = 0
def __init__(self, config, *args, **kwargs):
if not hasattr(config, "vocab"):
raise AttributeError(
"config passed to the processor has no attribute vocab"
)
self.vocab = Vocab(*args, **config.vocab, **kwargs)
self._init_extras(config)
def _init_extras(self, config, *args, **kwargs):
self.preprocessor = None
if hasattr(config, "max_length"):
self.max_length = config.max_length
else:
warnings.warn(
"No 'max_length' parameter in Processor's "
"configuration. Setting to {}.".format(self.MAX_LENGTH_DEFAULT)
)
self.max_length = self.MAX_LENGTH_DEFAULT
if "preprocessor" in config:
self.preprocessor = Processor(config.preprocessor, *args, **kwargs)
if self.preprocessor is None:
raise ValueError(
f"No text processor named {config.preprocessor} is defined."
)
def __call__(self, item):
"""Call requires item to have either "tokens" attribute or either
"text" attribute. If "text" is present, it will tokenized using
the preprocessor.
Args:
item (Dict): Dict containing the "text" or "tokens".
Returns:
Dict: Dict containing indices in "text" key, "tokens" in "tokens"
key and "length" of the string in "length" key.
"""
indices = None
if not isinstance(item, dict):
raise TypeError(
"Argument passed to the processor must be "
"a dict with either 'text' or 'tokens' as "
"keys"
)
if "tokens" in item:
tokens = item["tokens"]
indices = self._map_strings_to_indices(item["tokens"])
elif "text" in item:
if self.preprocessor is None:
raise AssertionError(
"If tokens are not provided, a text "
"processor must be defined in the config"
)
tokens = self.preprocessor({"text": item["text"]})["text"]
indices = self._map_strings_to_indices(tokens)
else:
raise AssertionError(
"A dict with either 'text' or 'tokens' keys "
"must be passed to the processor"
)
tokens, length = self._pad_tokens(tokens)
return {"text": indices, "tokens": tokens, "length": length}
def _pad_tokens(self, tokens):
padded_tokens = [self.PAD_TOKEN] * self.max_length
token_length = min(len(tokens), self.max_length)
padded_tokens[:token_length] = tokens[:token_length]
token_length = torch.tensor(token_length, dtype=torch.long)
return padded_tokens, token_length
[docs] def get_pad_index(self):
"""Get index of padding <pad> token in vocabulary.
Returns:
int: index of the padding token.
"""
return self.vocab.get_pad_index()
[docs] def get_vocab_size(self):
"""Get size of the vocabulary.
Returns:
int: size of the vocabulary.
"""
return self.vocab.get_size()
def _map_strings_to_indices(self, tokens):
length = min(len(tokens), self.max_length)
tokens = tokens[:length]
output = torch.zeros(self.max_length, dtype=torch.long)
output.fill_(self.vocab.get_pad_index())
for idx, token in enumerate(tokens):
output[idx] = self.vocab.stoi[token]
return output
[docs]@registry.register_processor("glove")
class GloVeProcessor(VocabProcessor):
"""Inherits VocabProcessor, and returns GloVe vectors for each of the
words. Maps them to index using vocab processor, and then gets GloVe vectors
corresponding to those indices.
Args:
config (DictConfig): Configuration parameters for GloVe same as
:func:`~VocabProcessor`.
"""
def __init__(self, config, *args, **kwargs):
if not hasattr(config, "vocab"):
raise AttributeError(
"Config passed to the processor has no attribute vocab"
)
vocab_processor_config = copy.deepcopy(config)
# GloVeProcessor needs vocab type to be "intersected"
vocab_processor_config.vocab.type = "intersected"
if "vocab_file" not in vocab_processor_config.vocab:
warnings.warn(
"'vocab_file' key is not present in the config."
" Switching to pretrained vocab."
)
vocab_processor_config.vocab.type = "pretrained"
self._init_extras(vocab_processor_config)
self.config = vocab_processor_config
self._already_downloaded = False
self._args = args
self._kwargs = kwargs
def __call__(self, item):
if not self._already_downloaded:
self.vocab = Vocab(*self._args, **self.config.vocab, **self._kwargs)
self._already_downloaded = True
indices = super().__call__(item)["text"]
embeddings = torch.zeros(
(len(indices), self.vocab.get_embedding_dim()), dtype=torch.float
)
for idx, index in enumerate(indices):
embeddings[idx] = self.vocab.vectors[index]
return {"text": embeddings}
[docs]@registry.register_processor("fasttext")
class FastTextProcessor(VocabProcessor):
"""FastText processor, similar to GloVe processor but returns FastText vectors.
Args:
config (DictConfig): Configuration values for the processor.
"""
def __init__(self, config, *args, **kwargs):
self._init_extras(config)
self.config = config
self._download_initially = config.get("download_initially", True)
self._already_downloaded = False
self._already_loaded = False
if self._download_initially:
self._try_download()
def _try_download(self):
_is_main = is_main()
if self._already_downloaded:
return
needs_download = False
if not hasattr(self.config, "model_file"):
if _is_main:
warnings.warn(
"'model_file' key is required but missing "
"from FastTextProcessor's config."
)
needs_download = True
model_file = self.config.model_file
# If model_file is already an existing path don't join to cache dir
if not PathManager.exists(model_file):
model_file = os.path.join(get_mmf_cache_dir(), model_file)
if not PathManager.exists(model_file):
if _is_main:
warnings.warn(f"No model file present at {model_file}.")
needs_download = True
if needs_download:
logger.info("Downloading FastText bin")
model_file = self._download_model()
self.model_file = model_file
self._already_downloaded = True
synchronize()
def _download_model(self):
_is_main = is_main()
model_file_path = os.path.join(get_mmf_cache_dir(), "wiki.en.bin")
if not _is_main:
return model_file_path
if PathManager.exists(model_file_path):
logger.info(f"Vectors already present at {model_file_path}.")
return model_file_path
import requests
from mmf.common.constants import FASTTEXT_WIKI_URL
from tqdm import tqdm
PathManager.mkdirs(os.path.dirname(model_file_path))
response = requests.get(FASTTEXT_WIKI_URL, stream=True)
with PathManager.open(model_file_path, "wb") as f:
pbar = tqdm(
total=int(response.headers["Content-Length"]) / 4096,
miniters=50,
disable=not _is_main,
)
idx = 0
for data in response.iter_content(chunk_size=4096):
if data:
if idx % 50 == 0:
pbar.update(len(data))
f.write(data)
idx += 1
pbar.close()
logger.info(f"fastText bin downloaded at {model_file_path}.")
return model_file_path
def _load_fasttext_model(self, model_file):
if self._already_loaded:
return
from fasttext import load_model
logger.info(f"Loading fasttext model now from {model_file}")
self.model = load_model(model_file)
# String to Vector
self.stov = WordToVectorDict(self.model)
logger.info("Finished loading fasttext model")
self._already_loaded = True
def _map_strings_to_indices(self, tokens):
length = min(len(tokens), self.max_length)
tokens = tokens[:length]
output = torch.full(
(self.max_length, self.model.get_dimension()),
fill_value=self.PAD_INDEX,
dtype=torch.float,
)
for idx, token in enumerate(tokens):
output[idx] = torch.from_numpy(self.stov[token])
return output
def __call__(self, item):
self._load_fasttext_model(self.model_file)
return super().__call__(item)
[docs]@registry.register_processor("vqa_answer")
class VQAAnswerProcessor(BaseProcessor):
"""Processor for generating answer scores for answers passed using VQA
accuracy formula. Using VocabDict class to represent answer vocabulary,
so parameters must specify "vocab_file". "num_answers" in parameter config
specify the max number of answers possible. Takes in dict containing
"answers" or "answers_tokens". "answers" are preprocessed to generate
"answers_tokens" if passed.
Args:
config (DictConfig): Configuration for the processor
Attributes:
answer_vocab (VocabDict): Class representing answer vocabulary
"""
DEFAULT_NUM_ANSWERS = 10
def __init__(self, config, *args, **kwargs):
if not hasattr(config, "vocab_file"):
raise AttributeError(
"'vocab_file' argument required, but not "
"present in AnswerProcessor's config"
)
self.answer_vocab = VocabDict(config.vocab_file, *args, **kwargs)
self.PAD_IDX = self.answer_vocab.word2idx("<pad>")
self.BOS_IDX = self.answer_vocab.word2idx("<s>")
self.EOS_IDX = self.answer_vocab.word2idx("</s>")
self.UNK_IDX = self.answer_vocab.UNK_INDEX
# Set EOS to something not achievable if it is not there
if self.EOS_IDX == self.UNK_IDX:
self.EOS_IDX = len(self.answer_vocab)
self.preprocessor = None
if hasattr(config, "preprocessor"):
self.preprocessor = Processor(config.preprocessor)
if self.preprocessor is None:
raise ValueError(
f"No processor named {config.preprocessor} is defined."
)
if hasattr(config, "num_answers"):
self.num_answers = config.num_answers
else:
self.num_answers = self.DEFAULT_NUM_ANSWERS
warnings.warn(
"'num_answers' not defined in the config. "
"Setting to default of {}".format(self.DEFAULT_NUM_ANSWERS)
)
def __call__(self, item):
"""Takes in dict with answers or answers_tokens, and returns back
a dict with answers (processed), "answers_indices" which point to
indices of the answers if present and "answers_scores" which represent
VQA style scores for the answers.
Args:
item (Dict): Dict containing answers or answers_tokens
Returns:
Dict: Processed answers, indices and scores.
"""
tokens = []
if not isinstance(item, dict):
raise TypeError("'item' passed to processor must be a dict")
if "answer_tokens" in item:
tokens = item["answer_tokens"]
elif "answers" in item and item["answers"] is not None:
if self.preprocessor is None:
raise AssertionError(
"'preprocessor' must be defined if you "
"don't pass 'answer_tokens'"
)
tokens = [
self.preprocessor({"text": answer})["text"]
for answer in item["answers"]
]
else:
raise AssertionError(
"'answers' or 'answer_tokens' must be passed"
" to answer processor in a dict"
)
if len(tokens) != 0:
tokens = self._increase_to_ten(tokens)
answers_indices = torch.zeros(self.DEFAULT_NUM_ANSWERS, dtype=torch.long)
answers_indices.fill_(self.answer_vocab.get_unk_index())
for idx, token in enumerate(tokens):
answers_indices[idx] = self.answer_vocab.word2idx(token)
answers_scores = self.compute_answers_scores(answers_indices)
return {
"answers": tokens,
"answers_indices": answers_indices,
"answers_scores": answers_scores,
}
[docs] def get_vocab_size(self):
"""Get vocab size of the answer vocabulary. Can also include
soft copy dynamic answer space size.
Returns:
int: size of the answer vocabulary
"""
return self.answer_vocab.num_vocab
[docs] def get_true_vocab_size(self):
"""True vocab size can be different from normal vocab size in some cases
such as soft copy where dynamic answer space is added.
Returns:
int: True vocab size.
"""
return self.answer_vocab.num_vocab
[docs] def word2idx(self, word):
"""Convert a word to its index according to vocabulary
Args:
word (str): Word to be converted to index.
Returns:
int: Index of the word.
"""
return self.answer_vocab.word2idx(word)
[docs] def idx2word(self, idx):
"""Index to word according to the vocabulary.
Args:
idx (int): Index to be converted to the word.
Returns:
str: Word corresponding to the index.
"""
return self.answer_vocab.idx2word(idx)
[docs] def compute_answers_scores(self, answers_indices):
"""Generate VQA based answer scores for answers_indices.
Args:
answers_indices (torch.LongTensor): tensor containing indices of the answers
Returns:
torch.FloatTensor: tensor containing scores.
"""
scores = torch.zeros(self.get_vocab_size(), dtype=torch.float)
gt_answers = list(enumerate(answers_indices))
unique_answers = set(answers_indices.tolist())
for answer in unique_answers:
accs = []
for gt_answer in gt_answers:
other_answers = [item for item in gt_answers if item != gt_answer]
matching_answers = [item for item in other_answers if item[1] == answer]
acc = min(1, float(len(matching_answers)) / 3)
accs.append(acc)
avg_acc = sum(accs) / len(accs)
if answer != self.answer_vocab.UNK_INDEX:
scores[answer] = avg_acc
return scores
def _increase_to_ten(self, tokens):
while len(tokens) < self.DEFAULT_NUM_ANSWERS:
tokens += tokens[: self.DEFAULT_NUM_ANSWERS - len(tokens)]
return tokens
[docs]@registry.register_processor("graph_vqa_answer")
class GraphVQAAnswerProcessor(BaseProcessor):
"""Processor for generating answer scores for answers passed using VQA
accuracy formula. Using VocabDict class to represent answer vocabulary,
so parameters must specify "vocab_file". "num_answers" in parameter config
specify the max number of answers possible. Takes in dict containing
"answers" or "answers_tokens". "answers" are preprocessed to generate
"answers_tokens" if passed.
This version also takes a graph vocab and predicts a main and graph
stream simultanously
Args:
config (DictConfig): Configuration for the processor
Attributes:
answer_vocab (VocabDict): Class representing answer vocabulary
"""
DEFAULT_NUM_ANSWERS = 10
def __init__(self, config, *args, **kwargs):
if not hasattr(config, "vocab_file"):
raise AttributeError(
"'vocab_file' argument required, but not "
"present in AnswerProcessor's config"
)
self.answer_vocab = VocabDict(config.vocab_file, *args, **kwargs)
self.PAD_IDX = self.answer_vocab.word2idx("<pad>")
self.BOS_IDX = self.answer_vocab.word2idx("<s>")
self.EOS_IDX = self.answer_vocab.word2idx("</s>")
self.UNK_IDX = self.answer_vocab.UNK_INDEX
# Set EOS to something not achievable if it is not there
if self.EOS_IDX == self.UNK_IDX:
self.EOS_IDX = len(self.answer_vocab)
self.preprocessor = None
if hasattr(config, "preprocessor"):
self.preprocessor = Processor(config.preprocessor)
if self.preprocessor is None:
raise ValueError(
f"No processor named {config.preprocessor} is defined."
)
if hasattr(config, "num_answers"):
self.num_answers = config.num_answers
else:
self.num_answers = self.DEFAULT_NUM_ANSWERS
warnings.warn(
"'num_answers' not defined in the config. "
"Setting to default of {}".format(self.DEFAULT_NUM_ANSWERS)
)
# Load the graph answer vocab file
if os.path.exists(config.graph_vocab_file):
graph_vocab_file = config.graph_vocab_file
else:
graph_vocab_file = os.path.join(
os.getenv("MMF_DATA_DIR"), "datasets", config.graph_vocab_file
)
self.graph_vocab = sorted(list(torch.load(graph_vocab_file)))
self.ans2graphvocabidx = {}
for graphvocabidx, graph_ans in enumerate(self.graph_vocab):
# Right now, this does need to overlap with the regular vocab
self.ans2graphvocabidx[graph_ans] = graphvocabidx
# Make sure graph_ans actually in vocab
assert graph_ans in self.answer_vocab.word2idx_dict
self.config = config
def __call__(self, item):
"""Takes in dict with answers or answers_tokens, and returns back
a dict with answers (processed), "answers_indices" which point to
indices of the answers if present and "answers_scores" which represent
VQA style scores for the answers.
Args:
item (Dict): Dict containing answers or answers_tokens
Returns:
Dict: Processed answers, indices and scores.
"""
tokens = []
if not isinstance(item, dict):
raise TypeError("'item' passed to processor must be a dict")
if "answer_tokens" in item:
tokens = item["answer_tokens"]
elif "answers" in item and item["answers"] is not None:
if self.preprocessor is None:
raise AssertionError(
"'preprocessor' must be defined if you "
"don't pass 'answer_tokens'"
)
tokens = [
self.preprocessor({"text": answer})["text"]
for answer in item["answers"]
]
else:
raise AssertionError(
"'answers' or 'answer_tokens' must be passed"
" to answer processor in a dict"
)
if len(tokens) != 0:
tokens = self._increase_to_ten(tokens)
answers_indices = torch.zeros(self.DEFAULT_NUM_ANSWERS, dtype=torch.long)
answers_indices.fill_(self.answer_vocab.get_unk_index())
for idx, token in enumerate(tokens):
answers_indices[idx] = self.answer_vocab.word2idx(token)
answers_scores = self.compute_answers_scores(answers_indices)
# Get answer scores for the graph vocab
if self.config.concat_scores:
answers_scores_graph = torch.zeros(len(self.graph_vocab), dtype=torch.float)
unique_answers = set(answers_indices.tolist())
for answer in unique_answers:
# Get the original score
if answer != self.answer_vocab.UNK_INDEX:
score = answers_scores[answer]
# Copy into graph scores (if it's in there)
ans_str = self.answer_vocab.idx2word(answer)
if ans_str in self.ans2graphvocabidx:
graph_idx = self.ans2graphvocabidx[ans_str]
answers_scores_graph[graph_idx] = score
# Concat scores
answers_scores = torch.cat([answers_scores, answers_scores_graph], dim=0)
return {
"answers": tokens,
"answers_indices": answers_indices,
"answers_scores": answers_scores,
}
[docs] def get_vocab_size(self):
"""Get vocab size of the answer vocabulary. Can also include
soft copy dynamic answer space size.
Returns:
int: size of the answer vocabulary
"""
return self.answer_vocab.num_vocab
[docs] def get_true_vocab_size(self):
"""True vocab size can be different from normal vocab size in some cases
such as soft copy where dynamic answer space is added.
Returns:
int: True vocab size.
"""
return self.answer_vocab.num_vocab
[docs] def word2idx(self, word):
"""Convert a word to its index according to vocabulary
Args:
word (str): Word to be converted to index.
Returns:
int: Index of the word.
"""
return self.answer_vocab.word2idx(word)
[docs] def idx2word(self, idx):
"""Index to word according to the vocabulary.
Args:
idx (int): Index to be converted to the word.
Returns:
str: Word corresponding to the index.
"""
return self.answer_vocab.idx2word(idx)
[docs] def compute_answers_scores(self, answers_indices):
"""Generate VQA based answer scores for answers_indices.
Args:
answers_indices (torch.LongTensor): tensor containing indices of the answers
Returns:
torch.FloatTensor: tensor containing scores.
"""
scores = torch.zeros(self.get_vocab_size(), dtype=torch.float)
gt_answers = list(enumerate(answers_indices))
unique_answers = set(answers_indices.tolist())
for answer in unique_answers:
accs = []
for gt_answer in gt_answers:
other_answers = [item for item in gt_answers if item != gt_answer]
matching_answers = [item for item in other_answers if item[1] == answer]
acc = min(1, float(len(matching_answers)) / 3)
accs.append(acc)
avg_acc = sum(accs) / len(accs)
if answer != self.answer_vocab.UNK_INDEX:
scores[answer] = avg_acc
return scores
def _increase_to_ten(self, tokens):
while len(tokens) < self.DEFAULT_NUM_ANSWERS:
tokens += tokens[: self.DEFAULT_NUM_ANSWERS - len(tokens)]
return tokens
[docs]@registry.register_processor("multi_hot_answer_from_vocab")
class MultiHotAnswerFromVocabProcessor(VQAAnswerProcessor):
def __init__(self, config, *args, **kwargs):
super().__init__(config, *args, **kwargs)
[docs] def compute_answers_scores(self, answers_indices):
scores = torch.zeros(self.get_vocab_size(), dtype=torch.float)
scores[answers_indices] = 1
scores[self.answer_vocab.UNK_INDEX] = 0
return scores
[docs]@registry.register_processor("soft_copy_answer")
class SoftCopyAnswerProcessor(VQAAnswerProcessor):
"""Similar to Answer Processor but adds soft copy dynamic answer space to it.
Read https://arxiv.org/abs/1904.08920 for extra information on soft copy
and LoRRA.
Args:
config (DictConfig): Configuration for soft copy processor.
"""
DEFAULT_MAX_LENGTH = 50
def __init__(self, config, *args, **kwargs):
super().__init__(config, *args, **kwargs)
if hasattr(config, "max_length"):
self.max_length = config.max_length
else:
self.max_length = self.DEFAULT_MAX_LENGTH
warnings.warn(
"'max_length' not defined in the config. "
"Setting to default of {}".format(self.DEFAULT_MAX_LENGTH)
)
self.context_preprocessor = None
if hasattr(config, "context_preprocessor"):
self.context_preprocessor = Processor(config.context_preprocessor)
[docs] def get_vocab_size(self):
"""Size of Vocab + Size of Dynamic soft-copy based answer space
Returns:
int: Size of vocab + size of dynamic soft-copy answer space.
"""
answer_vocab_nums = self.answer_vocab.num_vocab
answer_vocab_nums += self.max_length
return answer_vocab_nums
[docs] def get_true_vocab_size(self):
"""Actual vocab size which only include size of the vocabulary file.
Returns:
int: Actual size of vocabs.
"""
return self.answer_vocab.num_vocab
def __call__(self, item):
answers = item["answers"]
scores = super().__call__({"answers": answers})
indices = scores["answers_indices"]
answers = scores["answers"]
scores = scores["answers_scores"]
tokens_scores = scores.new_zeros(self.max_length)
tokens = item["tokens"]
length = min(len(tokens), self.max_length)
gt_answers = list(enumerate(answers))
if self.context_preprocessor is not None:
tokens = [
self.context_preprocessor({"text": token})["text"] for token in tokens
]
answer_counter = Counter(answers)
for idx, token in enumerate(tokens[:length]):
if answer_counter[token] == 0:
continue
accs = []
for gt_answer in gt_answers:
other_answers = [item for item in gt_answers if item != gt_answer]
matching_answers = [item for item in other_answers if item[1] == token]
acc = min(1, float(len(matching_answers)) / 3)
accs.append(acc)
tokens_scores[idx] = sum(accs) / len(accs)
# Scores are already proper size, see L314. Now,
# fix scores for soft copy candidates
scores[-len(tokens_scores) :] = tokens_scores
return {
"answers": answers,
"answers_indices": indices,
"answers_scores": scores,
}
[docs]@registry.register_processor("simple_word")
class SimpleWordProcessor(BaseProcessor):
"""Tokenizes a word and processes it.
Attributes:
tokenizer (function): Type of tokenizer to be used.
"""
def __init__(self, *args, **kwargs):
from mmf.utils.text import word_tokenize
self.tokenizer = word_tokenize
def __call__(self, item, *args, **kwargs):
return {"text": self.tokenizer(item["text"], *args, **kwargs)}
[docs]@registry.register_processor("simple_sentence")
class SimpleSentenceProcessor(BaseProcessor):
"""Tokenizes a sentence and processes it.
Attributes:
tokenizer (function): Type of tokenizer to be used.
"""
def __init__(self, *args, **kwargs):
from mmf.utils.text import tokenize
self.tokenizer = tokenize
def __call__(self, item, *args, **kwargs):
return {"text": self.tokenizer(item["text"], *args, **kwargs)}
[docs]@registry.register_processor("bbox")
class BBoxProcessor(VocabProcessor):
"""Generates bboxes in proper format.
Takes in a dict which contains "info" key which is a list of dicts
containing following for each of the the bounding box
Example bbox input::
{
"info": [
{
"bounding_box": {
"top_left_x": 100,
"top_left_y": 100,
"width": 200,
"height": 300
}
},
...
]
}
This will further return a Sample in a dict with key "bbox" with last
dimension of 4 corresponding to "xyxy". So sample will look like following:
Example Sample::
Sample({
"coordinates": torch.Size(n, 4),
"width": List[number], # size n
"height": List[number], # size n
"bbox_types": List[str] # size n, either xyxy or xywh.
# currently only supports xyxy.
})
"""
def __init__(self, config, *args, **kwargs):
from mmf.utils.dataset import build_bbox_tensors
self.lambda_fn = build_bbox_tensors
self._init_extras(config)
def __call__(self, item):
info = item["info"]
if self.preprocessor is not None:
info = self.preprocessor(info)
return {"bbox": self.lambda_fn(info, self.max_length)}
[docs]@registry.register_processor("caption")
class CaptionProcessor(BaseProcessor):
"""Processes a caption with start, end and pad tokens and returns raw string.
Args:
config (DictConfig): Configuration for caption processor.
"""
def __init__(self, config, *args, **kwargs):
if not hasattr(config, "vocab"):
raise AttributeError(
"config passed to the processor has no " "attribute vocab"
)
self.vocab = Vocab(*args, **config.vocab, **kwargs)
def __call__(self, item):
for idx, v in enumerate(item):
if v == self.vocab.EOS_INDEX:
item = item[:idx]
break
tokens = [
self.vocab.get_itos()[w]
for w in item
if w
not in {self.vocab.SOS_INDEX, self.vocab.EOS_INDEX, self.vocab.PAD_INDEX}
]
caption = " ".join(tokens)
return {"tokens": tokens, "caption": caption}
[docs]@registry.register_processor("evalai_answer")
class EvalAIAnswerProcessor(BaseProcessor):
"""Processes an answer similar to Eval AI"""
CONTRACTIONS = {
"aint": "ain't",
"arent": "aren't",
"cant": "can't",
"couldve": "could've",
"couldnt": "couldn't",
"couldn'tve": "couldn't've",
"couldnt've": "couldn't've",
"didnt": "didn't",
"doesnt": "doesn't",
"dont": "don't",
"hadnt": "hadn't",
"hadnt've": "hadn't've",
"hadn'tve": "hadn't've",
"hasnt": "hasn't",
"havent": "haven't",
"hed": "he'd",
"hed've": "he'd've",
"he'dve": "he'd've",
"hes": "he's",
"howd": "how'd",
"howll": "how'll",
"hows": "how's",
"Id've": "I'd've",
"I'dve": "I'd've",
"Im": "I'm",
"Ive": "I've",
"isnt": "isn't",
"itd": "it'd",
"itd've": "it'd've",
"it'dve": "it'd've",
"itll": "it'll",
"let's": "let's",
"maam": "ma'am",
"mightnt": "mightn't",
"mightnt've": "mightn't've",
"mightn'tve": "mightn't've",
"mightve": "might've",
"mustnt": "mustn't",
"mustve": "must've",
"neednt": "needn't",
"notve": "not've",
"oclock": "o'clock",
"oughtnt": "oughtn't",
"ow's'at": "'ow's'at",
"'ows'at": "'ow's'at",
"'ow'sat": "'ow's'at",
"shant": "shan't",
"shed've": "she'd've",
"she'dve": "she'd've",
"she's": "she's",
"shouldve": "should've",
"shouldnt": "shouldn't",
"shouldnt've": "shouldn't've",
"shouldn'tve": "shouldn't've",
"somebody'd": "somebodyd",
"somebodyd've": "somebody'd've",
"somebody'dve": "somebody'd've",
"somebodyll": "somebody'll",
"somebodys": "somebody's",
"someoned": "someone'd",
"someoned've": "someone'd've",
"someone'dve": "someone'd've",
"someonell": "someone'll",
"someones": "someone's",
"somethingd": "something'd",
"somethingd've": "something'd've",
"something'dve": "something'd've",
"somethingll": "something'll",
"thats": "that's",
"thered": "there'd",
"thered've": "there'd've",
"there'dve": "there'd've",
"therere": "there're",
"theres": "there's",
"theyd": "they'd",
"theyd've": "they'd've",
"they'dve": "they'd've",
"theyll": "they'll",
"theyre": "they're",
"theyve": "they've",
"twas": "'twas",
"wasnt": "wasn't",
"wed've": "we'd've",
"we'dve": "we'd've",
"weve": "we've",
"werent": "weren't",
"whatll": "what'll",
"whatre": "what're",
"whats": "what's",
"whatve": "what've",
"whens": "when's",
"whered": "where'd",
"wheres": "where's",
"whereve": "where've",
"whod": "who'd",
"whod've": "who'd've",
"who'dve": "who'd've",
"wholl": "who'll",
"whos": "who's",
"whove": "who've",
"whyll": "why'll",
"whyre": "why're",
"whys": "why's",
"wont": "won't",
"wouldve": "would've",
"wouldnt": "wouldn't",
"wouldnt've": "wouldn't've",
"wouldn'tve": "wouldn't've",
"yall": "y'all",
"yall'll": "y'all'll",
"y'allll": "y'all'll",
"yall'd've": "y'all'd've",
"y'alld've": "y'all'd've",
"y'all'dve": "y'all'd've",
"youd": "you'd",
"youd've": "you'd've",
"you'dve": "you'd've",
"youll": "you'll",
"youre": "you're",
"youve": "you've",
}
NUMBER_MAP = {
"none": "0",
"zero": "0",
"one": "1",
"two": "2",
"three": "3",
"four": "4",
"five": "5",
"six": "6",
"seven": "7",
"eight": "8",
"nine": "9",
"ten": "10",
}
ARTICLES = ["a", "an", "the"]
PERIOD_STRIP = re.compile(r"(?!<=\d)(\.)(?!\d)")
COMMA_STRIP = re.compile(r"(?<=\d)(\,)+(?=\d)")
PUNCTUATIONS = [
";",
r"/",
"[",
"]",
'"',
"{",
"}",
"(",
")",
"=",
"+",
"\\",
"_",
"-",
">",
"<",
"@",
"`",
",",
"?",
"!",
]
def __init__(self, *args, **kwargs):
pass
def word_tokenize(self, word):
word = word.lower()
word = word.replace(",", "").replace("?", "").replace("'s", " 's")
return word.strip()
def process_punctuation(self, in_text):
out_text = in_text
for p in self.PUNCTUATIONS:
if (p + " " in in_text or " " + p in in_text) or (
re.search(self.COMMA_STRIP, in_text) is not None
):
out_text = out_text.replace(p, "")
else:
out_text = out_text.replace(p, " ")
out_text = self.PERIOD_STRIP.sub("", out_text, re.UNICODE)
return out_text
def process_digit_article(self, in_text):
out_text = []
temp_text = in_text.lower().split()
for word in temp_text:
word = self.NUMBER_MAP.setdefault(word, word)
if word not in self.ARTICLES:
out_text.append(word)
else:
pass
for word_id, word in enumerate(out_text):
if word in self.CONTRACTIONS:
out_text[word_id] = self.CONTRACTIONS[word]
out_text = " ".join(out_text)
return out_text
def __call__(self, item):
item = self.word_tokenize(item)
item = item.replace("\n", " ").replace("\t", " ").strip()
item = self.process_punctuation(item)
item = self.process_digit_article(item)
return item
[docs]@registry.register_processor("phoc")
class PhocProcessor(VocabProcessor):
"""
Compute PHOC features from text tokens
"""
def __init__(self, config, *args, **kwargs):
from mmf.utils.phoc import build_phoc
self._build_phoc = build_phoc
self._init_extras(config)
self.config = config
def _map_strings_to_indices(self, tokens):
length = min(len(tokens), self.max_length)
tokens = tokens[:length]
phoc_dim = 604
output = torch.full(
(self.max_length, phoc_dim), fill_value=self.PAD_INDEX, dtype=torch.float
)
for idx, token in enumerate(tokens):
output[idx] = torch.from_numpy(self._build_phoc(token))
return output
[docs]@registry.register_processor("copy")
class CopyProcessor(BaseProcessor):
"""
Copy boxes from numpy array
"""
def __init__(self, config, *args, **kwargs):
self.max_length = config.max_length
def __call__(self, item):
blob = item["blob"]
final_blob = np.zeros((self.max_length,) + blob.shape[1:], blob.dtype)
final_blob[: len(blob)] = blob[: len(final_blob)]
return {"blob": torch.from_numpy(final_blob)}
[docs]@registry.register_processor("m4c_answer")
class M4CAnswerProcessor(BaseProcessor):
"""
Process a TextVQA answer for iterative decoding in M4C
"""
def __init__(self, config, *args, **kwargs):
super().__init__(config, *args, **kwargs)
self.answer_vocab = VocabDict(config.vocab_file, *args, **kwargs)
self.PAD_IDX = self.answer_vocab.word2idx("<pad>")
self.BOS_IDX = self.answer_vocab.word2idx("<s>")
self.EOS_IDX = self.answer_vocab.word2idx("</s>")
self.UNK_IDX = self.answer_vocab.UNK_INDEX
# make sure PAD_IDX, BOS_IDX and PAD_IDX are valid (not <unk>)
assert self.PAD_IDX != self.answer_vocab.UNK_INDEX
assert self.BOS_IDX != self.answer_vocab.UNK_INDEX
assert self.EOS_IDX != self.answer_vocab.UNK_INDEX
assert self.PAD_IDX == 0
self.answer_preprocessor = Processor(config.preprocessor)
assert self.answer_preprocessor is not None
self.num_answers = config.num_answers
self.max_length = config.max_length
self.max_copy_steps = config.max_copy_steps
assert self.max_copy_steps >= 1
self.match_answer_to_unk = False
def tokenize(self, sentence):
return sentence.split()
[docs] def match_answer_to_vocab_ocr_seq(
self, answer, vocab2idx_dict, ocr2inds_dict, max_match_num=20
):
"""
Match an answer to a list of sequences of indices
each index corresponds to either a fixed vocabulary or an OCR token
(in the index address space, the OCR tokens are after the fixed vocab)
"""
num_vocab = len(vocab2idx_dict)
answer_words = self.tokenize(answer)
answer_word_matches = []
for word in answer_words:
# match answer word to fixed vocabulary
matched_inds = []
if word in vocab2idx_dict:
matched_inds.append(vocab2idx_dict.get(word))
# match answer word to OCR
# we put OCR after the fixed vocabulary in the answer index space
# so add num_vocab offset to the OCR index
matched_inds.extend([num_vocab + idx for idx in ocr2inds_dict[word]])
if len(matched_inds) == 0:
if self.match_answer_to_unk:
matched_inds.append(vocab2idx_dict.get("<unk>"))
else:
return []
answer_word_matches.append(matched_inds)
# expand per-word matched indices into the list of matched sequences
if len(answer_word_matches) == 0:
return []
idx_seq_list = [()]
for matched_inds in answer_word_matches:
idx_seq_list = [
seq + (idx,) for seq in idx_seq_list for idx in matched_inds
]
if len(idx_seq_list) > max_match_num:
idx_seq_list = idx_seq_list[:max_match_num]
return idx_seq_list
def get_vocab_size(self):
answer_vocab_nums = self.answer_vocab.num_vocab
answer_vocab_nums += self.max_length
return answer_vocab_nums
def get_true_vocab_size(self):
return self.answer_vocab.num_vocab
def compute_answer_scores(self, answers):
gt_answers = list(enumerate(answers))
unique_answers = sorted(set(answers))
unique_answer_scores = [0] * len(unique_answers)
for idx, unique_answer in enumerate(unique_answers):
accs = []
for gt_answer in gt_answers:
other_answers = [item for item in gt_answers if item != gt_answer]
matching_answers = [
item for item in other_answers if item[1] == unique_answer
]
acc = min(1, float(len(matching_answers)) / 3)
accs.append(acc)
unique_answer_scores[idx] = sum(accs) / len(accs)
unique_answer2score = {
a: s for a, s in zip(unique_answers, unique_answer_scores)
}
return unique_answer2score
def __call__(self, item):
answers = item["answers"]
if not answers:
return {
"sampled_idx_seq": None,
"train_prev_inds": torch.zeros(self.max_copy_steps, dtype=torch.long),
}
answers = [self.answer_preprocessor({"text": a})["text"] for a in answers]
assert len(answers) == self.num_answers
# Step 1: calculate the soft score of ground-truth answers
unique_answer2score = self.compute_answer_scores(answers)
# Step 2: fill the first step soft scores for tokens
scores = torch.zeros(
self.max_copy_steps, self.get_vocab_size(), dtype=torch.float
)
# match answers to fixed vocabularies and OCR tokens.
ocr2inds_dict = defaultdict(list)
for idx, token in enumerate(item["tokens"]):
ocr2inds_dict[token].append(idx)
answer_dec_inds = [
self.match_answer_to_vocab_ocr_seq(
a, self.answer_vocab.word2idx_dict, ocr2inds_dict
)
for a in answers
]
# Collect all the valid decoding sequences for each answer.
# This part (idx_seq_list) was pre-computed in imdb (instead of online)
# to save time
all_idx_seq_list = []
for answer, idx_seq_list in zip(answers, answer_dec_inds):
all_idx_seq_list.extend(idx_seq_list)
# fill in the soft score for the first decoding step
score = unique_answer2score[answer]
for idx_seq in idx_seq_list:
score_idx = idx_seq[0]
# the scores for the decoding Step 0 will be the maximum
# among all answers starting with that vocab
# for example:
# if "red apple" has score 0.7 and "red flag" has score 0.8
# the score for "red" at Step 0 will be max(0.7, 0.8) = 0.8
scores[0, score_idx] = max(scores[0, score_idx], score)
# train_prev_inds is the previous prediction indices in auto-regressive
# decoding
train_prev_inds = torch.zeros(self.max_copy_steps, dtype=torch.long)
# train_loss_mask records the decoding steps where losses are applied
train_loss_mask = torch.zeros(self.max_copy_steps, dtype=torch.float)
if len(all_idx_seq_list) > 0:
# sample a random decoding answer sequence for teacher-forcing
idx_seq = all_idx_seq_list[np.random.choice(len(all_idx_seq_list))]
dec_step_num = min(1 + len(idx_seq), self.max_copy_steps)
train_loss_mask[:dec_step_num] = 1.0
train_prev_inds[0] = self.BOS_IDX
for t in range(1, dec_step_num):
train_prev_inds[t] = idx_seq[t - 1]
score_idx = idx_seq[t] if t < len(idx_seq) else self.EOS_IDX
scores[t, score_idx] = 1.0
else:
idx_seq = ()
answer_info = {
"answers": answers,
"answers_scores": scores,
"sampled_idx_seq": idx_seq,
"train_prev_inds": train_prev_inds,
"train_loss_mask": train_loss_mask,
}
return answer_info
[docs]@registry.register_processor("m4c_caption")
class M4CCaptionProcessor(M4CAnswerProcessor):
def __init__(self, config, *args, **kwargs):
super().__init__(config, *args, **kwargs)
import re
self.SENTENCE_SPLIT_REGEX = re.compile(r"(\W+)")
self.match_answer_to_unk = True
def tokenize(self, sentence):
sentence = sentence.lower()
sentence = (
sentence.replace(",", "")
.replace("?", "")
.replace(".", "")
.replace("'s", " 's")
)
tokens = self.SENTENCE_SPLIT_REGEX.split(sentence)
tokens = [t.strip() for t in tokens if len(t.strip()) > 0]
return tokens
def compute_answer_scores(self, answers):
unique_answer2score = {a: 1.0 for a in answers}
return unique_answer2score
[docs]@registry.register_processor("masked_region")
class MaskedRegionProcessor(BaseProcessor):
"""
Masks a region with probability `mask_probability`
"""
def __init__(self, config, *args, **kwargs):
super().__init__(config, *args, **kwargs)
self.mask_prob = config.get("mask_probability", 0.15)
self.mask_region_prob = config.get("mask_region_probability", 0.9)
def __call__(self, item):
image_labels = []
for i in range(item.shape[0]):
prob = random.random()
# mask token with 15% probability
if prob < self.mask_prob:
prob /= self.mask_prob
if prob < self.mask_region_prob:
item[i] = 0
image_labels.append(1)
else:
# no masking token (will be ignored by loss function later)
image_labels.append(-1)
return torch.tensor(image_labels, dtype=torch.long)
[docs]@dataclass
class MultiClassFromFileConfig:
# Vocab file containing the strings for the available classes
vocab_file: str
[docs]@registry.register_processor("multi_class_from_file")
class MultiClassFromFile(BaseProcessor):
"""Label processor for multi class cases where the labels are
saved in a file.
"""
def __init__(self, config: MultiClassFromFileConfig, *args, **kwargs):
self.label_vocab = VocabDict(config.vocab_file, *args, **kwargs)
def __call__(self, item: Union[Dict[str, Any], str]) -> Dict[str, Any]:
if isinstance(item, collections.abc.Mapping):
label = item["label"]
else:
label = item
# Remove UNK by subtracting 1 from output
# UNK will always be at 0 even if it is not in vocab as it is automatically
# always added by vocab dict
class_index = self.label_vocab.word2idx(label) - 1
assert class_index != -1, f"{label} is not present in vocab file"
return {"class_index": torch.tensor(class_index, dtype=torch.long)}
[docs]@registry.register_processor("detr_image_and_target")
class DETRImageAndTargetProcessor(BaseProcessor):
"""Process a detection image and target in consistent with DETR. At training time,
random crop is done. At test time, an image is deterministically resized with short
side equal to `image_size` (while ensuring its long side no larger than `max_size`)
"""
def __init__(self, config, *args, **kwargs):
super().__init__(config, *args, **kwargs)
from mmf.datasets.processors import detection_transforms as T
train_image_sizes = list(config.train_image_sizes)
self.training_transform = T.Compose(
[
T.RandomHorizontalFlip(),
T.RandomSelect(
T.RandomResize(train_image_sizes, max_size=config.max_size),
T.Compose(
[
T.RandomResize(list(config.train_resize_random_sizes)),
T.RandomSizeCrop(*config.train_crop_size),
T.RandomResize(train_image_sizes, max_size=config.max_size),
]
),
),
T.ToTensor(),
T.Normalize(IMAGE_COLOR_MEAN, IMAGE_COLOR_STD),
]
)
self.inference_transform = T.Compose(
[
T.RandomResize([config.test_image_size], max_size=config.max_size),
T.ToTensor(),
T.Normalize(IMAGE_COLOR_MEAN, IMAGE_COLOR_STD),
]
)
def __call__(self, item):
dataset_type = item["dataset_type"]
img = item["img"]
target = item["target"]
if dataset_type == "train":
img, target = self.training_transform(img, target)
elif dataset_type == "val" or dataset_type == "test":
img, target = self.inference_transform(img, target)
else:
raise Exception(f"unknown dataset_type: {dataset_type}")
return {"img": img, "target": target}