bazarr/libs/auditok/core.py
Michiel van Baak Jansen 30ef713fa2
Downgrade auditok to version 0.1.5
ffsubsync pinned auditok to 0.1.5. We missed this when upgrading
ffsubsync and auditok. Since we dont run pip to install the
libraries, there is no version checks
2021-05-01 08:07:20 -04:00

452 lines
17 KiB
Python

"""
This module gathers processing (i.e. tokenization) classes.
Class summary
=============
.. autosummary::
StreamTokenizer
"""
from auditok.util import DataValidator
__all__ = ["StreamTokenizer"]
class StreamTokenizer():
"""
Class for stream tokenizers. It implements a 4-state automaton scheme
to extract sub-sequences of interest on the fly.
:Parameters:
`validator` :
instance of `DataValidator` that implements `is_valid` method.
`min_length` : *(int)*
Minimum number of frames of a valid token. This includes all \
tolerated non valid frames within the token.
`max_length` : *(int)*
Maximum number of frames of a valid token. This includes all \
tolerated non valid frames within the token.
`max_continuous_silence` : *(int)*
Maximum number of consecutive non-valid frames within a token.
Note that, within a valid token, there may be many tolerated \
*silent* regions that contain each a number of non valid frames up to \
`max_continuous_silence`
`init_min` : *(int, default=0)*
Minimum number of consecutive valid frames that must be **initially** \
gathered before any sequence of non valid frames can be tolerated. This
option is not always needed, it can be used to drop non-valid tokens as
early as possible. **Default = 0** means that the option is by default
ineffective.
`init_max_silence` : *(int, default=0)*
Maximum number of tolerated consecutive non-valid frames if the \
number already gathered valid frames has not yet reached 'init_min'.
This argument is normally used if `init_min` is used. **Default = 0**,
by default this argument is not taken into consideration.
`mode` : *(int, default=0)*
`mode` can be:
1. `StreamTokenizer.STRICT_MIN_LENGTH`:
if token *i* is delivered because `max_length`
is reached, and token *i+1* is immediately adjacent to
token *i* (i.e. token *i* ends at frame *k* and token *i+1* starts
at frame *k+1*) then accept token *i+1* only of it has a size of at
least `min_length`. The default behavior is to accept token *i+1*
event if it is shorter than `min_length` (given that the above conditions
are fulfilled of course).
:Examples:
In the following code, without `STRICT_MIN_LENGTH`, the 'BB' token is
accepted although it is shorter than `min_length` (3), because it immediately
follows the latest delivered token:
.. code:: python
from auditok import StreamTokenizer, StringDataSource, DataValidator
class UpperCaseChecker(DataValidator):
def is_valid(self, frame):
return frame.isupper()
dsource = StringDataSource("aaaAAAABBbbb")
tokenizer = StreamTokenizer(validator=UpperCaseChecker(),
min_length=3,
max_length=4,
max_continuous_silence=0)
tokenizer.tokenize(dsource)
:output:
.. code:: python
[(['A', 'A', 'A', 'A'], 3, 6), (['B', 'B'], 7, 8)]
The following tokenizer will however reject the 'BB' token:
.. code:: python
dsource = StringDataSource("aaaAAAABBbbb")
tokenizer = StreamTokenizer(validator=UpperCaseChecker(),
min_length=3, max_length=4,
max_continuous_silence=0,
mode=StreamTokenizer.STRICT_MIN_LENGTH)
tokenizer.tokenize(dsource)
:output:
.. code:: python
[(['A', 'A', 'A', 'A'], 3, 6)]
2. `StreamTokenizer.DROP_TRAILING_SILENCE`: drop all tailing non-valid frames
from a token to be delivered if and only if it is not **truncated**.
This can be a bit tricky. A token is actually delivered if:
- a. `max_continuous_silence` is reached
:or:
- b. Its length reaches `max_length`. This is called a **truncated** token
In the current implementation, a `StreamTokenizer`'s decision is only based on already seen
data and on incoming data. Thus, if a token is truncated at a non-valid but tolerated
frame (`max_length` is reached but `max_continuous_silence` not yet) any tailing
silence will be kept because it can potentially be part of valid token (if `max_length`
was bigger). But if `max_continuous_silence` is reached before `max_length`, the delivered
token will not be considered as truncated but a result of *normal* end of detection
(i.e. no more valid data). In that case the tailing silence can be removed if you use
the `StreamTokenizer.DROP_TRAILING_SILENCE` mode.
:Example:
.. code:: python
tokenizer = StreamTokenizer(validator=UpperCaseChecker(), min_length=3,
max_length=6, max_continuous_silence=3,
mode=StreamTokenizer.DROP_TRAILING_SILENCE)
dsource = StringDataSource("aaaAAAaaaBBbbbb")
tokenizer.tokenize(dsource)
:output:
.. code:: python
[(['A', 'A', 'A', 'a', 'a', 'a'], 3, 8), (['B', 'B'], 9, 10)]
The first token is delivered with its tailing silence because it is truncated
while the second one has its tailing frames removed.
Without `StreamTokenizer.DROP_TRAILING_SILENCE` the output would be:
.. code:: python
[(['A', 'A', 'A', 'a', 'a', 'a'], 3, 8), (['B', 'B', 'b', 'b', 'b'], 9, 13)]
3. `StreamTokenizer.STRICT_MIN_LENGTH | StreamTokenizer.DROP_TRAILING_SILENCE`:
use both options. That means: first remove tailing silence, then ckeck if the
token still has at least a length of `min_length`.
"""
SILENCE = 0
POSSIBLE_SILENCE = 1
POSSIBLE_NOISE = 2
NOISE = 3
STRICT_MIN_LENGTH = 2
DROP_TRAILING_SILENCE = 4
# alias
DROP_TAILING_SILENCE = 4
def __init__(self, validator,
min_length, max_length, max_continuous_silence,
init_min=0, init_max_silence=0,
mode=0):
if not isinstance(validator, DataValidator):
raise TypeError("'validator' must be an instance of 'DataValidator'")
if max_length <= 0:
raise ValueError("'max_length' must be > 0 (value={0})".format(max_length))
if min_length <= 0 or min_length > max_length:
raise ValueError("'min_length' must be > 0 and <= 'max_length' (value={0})".format(min_length))
if max_continuous_silence >= max_length:
raise ValueError("'max_continuous_silence' must be < 'max_length' (value={0})".format(max_continuous_silence))
if init_min >= max_length:
raise ValueError("'init_min' must be < 'max_length' (value={0})".format(max_continuous_silence))
self.validator = validator
self.min_length = min_length
self.max_length = max_length
self.max_continuous_silence = max_continuous_silence
self.init_min = init_min
self.init_max_silent = init_max_silence
self._mode = None
self.set_mode(mode)
self._strict_min_length = (mode & self.STRICT_MIN_LENGTH) != 0
self._drop_tailing_silence = (mode & self.DROP_TRAILING_SILENCE) != 0
self._deliver = None
self._tokens = None
self._state = None
self._data = None
self._contiguous_token = False
self._init_count = 0
self._silence_length = 0
self._start_frame = 0
self._current_frame = 0
def set_mode(self, mode):
"""
:Parameters:
`mode` : *(int)*
New mode, must be one of:
- `StreamTokenizer.STRICT_MIN_LENGTH`
- `StreamTokenizer.DROP_TRAILING_SILENCE`
- `StreamTokenizer.STRICT_MIN_LENGTH | StreamTokenizer.DROP_TRAILING_SILENCE`
- `0`
See `StreamTokenizer.__init__` for more information about the mode.
"""
if not mode in [self.STRICT_MIN_LENGTH, self.DROP_TRAILING_SILENCE,
self.STRICT_MIN_LENGTH | self.DROP_TRAILING_SILENCE, 0]:
raise ValueError("Wrong value for mode")
self._mode = mode
self._strict_min_length = (mode & self.STRICT_MIN_LENGTH) != 0
self._drop_tailing_silence = (mode & self.DROP_TRAILING_SILENCE) != 0
def get_mode(self):
"""
Return the current mode. To check whether a specific mode is activated use
the bitwise 'and' operator `&`. Example:
.. code:: python
if mode & self.STRICT_MIN_LENGTH != 0:
do_something()
"""
return self._mode
def _reinitialize(self):
self._contiguous_token = False
self._data = []
self._tokens = []
self._state = self.SILENCE
self._current_frame = -1
self._deliver = self._append_token
def tokenize(self, data_source, callback=None):
"""
Read data from `data_source`, one frame a time, and process the read frames in
order to detect sequences of frames that make up valid tokens.
:Parameters:
`data_source` : instance of the :class:`DataSource` class that implements a `read` method.
'read' should return a slice of signal, i.e. frame (of whatever \
type as long as it can be processed by validator) and None if \
there is no more signal.
`callback` : an optional 3-argument function.
If a `callback` function is given, it will be called each time a valid token
is found.
:Returns:
A list of tokens if `callback` is None. Each token is tuple with the following elements:
.. code python
(data, start, end)
where `data` is a list of read frames, `start`: index of the first frame in the
original data and `end` : index of the last frame.
"""
self._reinitialize()
if callback is not None:
self._deliver = callback
while True:
frame = data_source.read()
if frame is None:
break
self._current_frame += 1
self._process(frame)
self._post_process()
if callback is None:
_ret = self._tokens
self._tokens = None
return _ret
def _process(self, frame):
frame_is_valid = self.validator.is_valid(frame)
if self._state == self.SILENCE:
if frame_is_valid:
# seems we got a valid frame after a silence
self._init_count = 1
self._silence_length = 0
self._start_frame = self._current_frame
self._data.append(frame)
if self._init_count >= self.init_min:
self._state = self.NOISE
if len(self._data) >= self.max_length:
self._process_end_of_detection(True)
else:
self._state = self.POSSIBLE_NOISE
elif self._state == self.POSSIBLE_NOISE:
if frame_is_valid:
self._silence_length = 0
self._init_count += 1
self._data.append(frame)
if self._init_count >= self.init_min:
self._state = self.NOISE
if len(self._data) >= self.max_length:
self._process_end_of_detection(True)
else:
self._silence_length += 1
if self._silence_length > self.init_max_silent or \
len(self._data) + 1 >= self.max_length:
# either init_max_silent or max_length is reached
# before _init_count, back to silence
self._data = []
self._state = self.SILENCE
else:
self._data.append(frame)
elif self._state == self.NOISE:
if frame_is_valid:
self._data.append(frame)
if len(self._data) >= self.max_length:
self._process_end_of_detection(True)
elif self.max_continuous_silence <= 0 :
# max token reached at this frame will _deliver if _contiguous_token
# and not _strict_min_length
self._process_end_of_detection()
self._state = self.SILENCE
else:
# this is the first silent frame following a valid one
# and it is tolerated
self._silence_length = 1
self._data.append(frame)
self._state = self.POSSIBLE_SILENCE
if len(self._data) == self.max_length:
self._process_end_of_detection(True)
# don't reset _silence_length because we still
# need to know the total number of silent frames
elif self._state == self.POSSIBLE_SILENCE:
if frame_is_valid:
self._data.append(frame)
self._silence_length = 0
self._state = self.NOISE
if len(self._data) >= self.max_length:
self._process_end_of_detection(True)
else:
if self._silence_length >= self.max_continuous_silence:
if self._silence_length < len(self._data):
# _deliver only gathered frames aren't all silent
self._process_end_of_detection()
else:
self._data = []
self._state = self.SILENCE
self._silence_length = 0
else:
self._data.append(frame)
self._silence_length += 1
if len(self._data) >= self.max_length:
self._process_end_of_detection(True)
# don't reset _silence_length because we still
# need to know the total number of silent frames
def _post_process(self):
if self._state == self.NOISE or self._state == self.POSSIBLE_SILENCE:
if len(self._data) > 0 and len(self._data) > self._silence_length:
self._process_end_of_detection()
def _process_end_of_detection(self, truncated=False):
if not truncated and self._drop_tailing_silence and self._silence_length > 0:
# happens if max_continuous_silence is reached
# or max_length is reached at a silent frame
self._data = self._data[0: - self._silence_length]
if (len(self._data) >= self.min_length) or \
(len(self._data) > 0 and \
not self._strict_min_length and self._contiguous_token):
_end_frame = self._start_frame + len(self._data) - 1
self._deliver(self._data, self._start_frame, _end_frame)
if truncated:
# next token (if any) will start at _current_frame + 1
self._start_frame = self._current_frame + 1
# remember that it is contiguous with the just delivered one
self._contiguous_token = True
else:
self._contiguous_token = False
else:
self._contiguous_token = False
self._data = []
def _append_token(self, data, start, end):
self._tokens.append((data, start, end))