|
import re |
|
from typing import List, Optional, Union, Dict, Any |
|
from functools import cached_property |
|
|
|
import pypinyin |
|
import torch |
|
from hangul_romanize import Transliter |
|
from hangul_romanize.rule import academic |
|
from num2words import num2words |
|
from spacy.lang.ar import Arabic |
|
from spacy.lang.en import English |
|
from spacy.lang.es import Spanish |
|
from spacy.lang.ja import Japanese |
|
from spacy.lang.zh import Chinese |
|
from transformers import PreTrainedTokenizerFast, BatchEncoding |
|
from transformers.tokenization_utils_base import TruncationStrategy, PaddingStrategy |
|
from tokenizers import Tokenizer |
|
from tokenizers.pre_tokenizers import WhitespaceSplit |
|
from tokenizers.processors import TemplateProcessing |
|
|
|
from auralis.models.xttsv2.components.tts.layers.xtts.zh_num2words import TextNorm as zh_num2words |
|
|
|
import cutlet |
|
|
|
def get_spacy_lang(lang): |
|
if lang == "zh": |
|
return Chinese() |
|
elif lang == "ja": |
|
return Japanese() |
|
elif lang == "ar": |
|
return Arabic() |
|
elif lang == "es": |
|
return Spanish() |
|
else: |
|
|
|
return English() |
|
|
|
|
|
def find_best_split_point(text: str, target_pos: int, window_size: int = 30) -> int: |
|
""" |
|
Find best split point near target position considering punctuation and language markers. |
|
added for better sentence splitting in TTS. |
|
""" |
|
|
|
markers = [ |
|
|
|
(r'[.!?؟။။။]+[\s]*', 1.0), |
|
(r'[\n\r]+\s*[\n\r]+', 1.0), |
|
(r'[:|;;:;][\s]*', 0.9), |
|
|
|
|
|
(r'[,,،、][\s]*', 0.8), |
|
(r'[)}\])】』»›》\s]+', 0.7), |
|
(r'[-—−]+[\s]*', 0.7), |
|
|
|
|
|
(r'\s+[&+=/\s]+\s+', 0.6), |
|
(r'[\s]+', 0.5), |
|
] |
|
|
|
|
|
start = max(0, target_pos - window_size) |
|
end = min(len(text), target_pos + window_size) |
|
window = text[start:end] |
|
|
|
best_pos = target_pos |
|
best_score = 0 |
|
|
|
for pattern, priority in markers: |
|
matches = list(re.finditer(pattern, window)) |
|
for match in matches: |
|
|
|
pos = start + match.end() |
|
distance = abs(pos - target_pos) |
|
distance_score = 1 - (distance / (window_size * 2)) |
|
|
|
|
|
score = priority * distance_score |
|
|
|
if score > best_score: |
|
best_score = score |
|
best_pos = pos |
|
|
|
return best_pos |
|
|
|
|
|
def split_sentence(text: str, lang: str, text_split_length: int = 250) -> List[str]: |
|
""" |
|
Enhanced sentence splitting with language awareness and optimal breakpoints. |
|
|
|
Args: |
|
text: Input text to split |
|
lang: Language code |
|
text_split_length: Target length for splits |
|
|
|
Returns: |
|
List of text splits optimized for TTS |
|
""" |
|
text = text.strip() |
|
if len(text) <= text_split_length: |
|
return [text] |
|
|
|
nlp = get_spacy_lang(lang) |
|
if "sentencizer" not in nlp.pipe_names: |
|
nlp.add_pipe("sentencizer") |
|
|
|
|
|
doc = nlp(text) |
|
sentences = list(doc.sents) |
|
|
|
splits = [] |
|
current_split = [] |
|
current_length = 0 |
|
|
|
for sent in sentences: |
|
sentence_text = str(sent).strip() |
|
sentence_length = len(sentence_text) |
|
|
|
|
|
if current_length + sentence_length <= text_split_length: |
|
current_split.append(sentence_text) |
|
current_length += sentence_length + 1 |
|
|
|
|
|
elif sentence_length > text_split_length: |
|
|
|
if current_split: |
|
splits.append(" ".join(current_split)) |
|
current_split = [] |
|
current_length = 0 |
|
|
|
|
|
remaining = sentence_text |
|
while len(remaining) > text_split_length: |
|
split_pos = find_best_split_point( |
|
remaining, |
|
text_split_length, |
|
window_size=30 |
|
) |
|
|
|
|
|
splits.append(remaining[:split_pos].strip()) |
|
remaining = remaining[split_pos:].strip() |
|
|
|
|
|
if remaining: |
|
current_split = [remaining] |
|
current_length = len(remaining) |
|
|
|
|
|
else: |
|
splits.append(" ".join(current_split)) |
|
current_split = [sentence_text] |
|
current_length = sentence_length |
|
|
|
|
|
if current_split: |
|
splits.append(" ".join(current_split)) |
|
|
|
cleaned_sentences = [s[:-1]+' ' if s.endswith('.') else s for s in splits if s] |
|
|
|
return cleaned_sentences |
|
|
|
_whitespace_re = re.compile(r"\s+") |
|
|
|
|
|
_abbreviations = { |
|
"en": [ |
|
(re.compile("\\b%s\\." % x[0], re.IGNORECASE), x[1]) |
|
for x in [ |
|
("mrs", "misess"), |
|
("mr", "mister"), |
|
("dr", "doctor"), |
|
("st", "saint"), |
|
("co", "company"), |
|
("jr", "junior"), |
|
("maj", "major"), |
|
("gen", "general"), |
|
("drs", "doctors"), |
|
("rev", "reverend"), |
|
("lt", "lieutenant"), |
|
("hon", "honorable"), |
|
("sgt", "sergeant"), |
|
("capt", "captain"), |
|
("esq", "esquire"), |
|
("ltd", "limited"), |
|
("col", "colonel"), |
|
("ft", "fort"), |
|
] |
|
], |
|
"es": [ |
|
(re.compile("\\b%s\\." % x[0], re.IGNORECASE), x[1]) |
|
for x in [ |
|
("sra", "señora"), |
|
("sr", "señor"), |
|
("dr", "doctor"), |
|
("dra", "doctora"), |
|
("st", "santo"), |
|
("co", "compañía"), |
|
("jr", "junior"), |
|
("ltd", "limitada"), |
|
] |
|
], |
|
"fr": [ |
|
(re.compile("\\b%s\\." % x[0], re.IGNORECASE), x[1]) |
|
for x in [ |
|
("mme", "madame"), |
|
("mr", "monsieur"), |
|
("dr", "docteur"), |
|
("st", "saint"), |
|
("co", "compagnie"), |
|
("jr", "junior"), |
|
("ltd", "limitée"), |
|
] |
|
], |
|
"de": [ |
|
(re.compile("\\b%s\\." % x[0], re.IGNORECASE), x[1]) |
|
for x in [ |
|
("fr", "frau"), |
|
("dr", "doktor"), |
|
("st", "sankt"), |
|
("co", "firma"), |
|
("jr", "junior"), |
|
] |
|
], |
|
"pt": [ |
|
(re.compile("\\b%s\\." % x[0], re.IGNORECASE), x[1]) |
|
for x in [ |
|
("sra", "senhora"), |
|
("sr", "senhor"), |
|
("dr", "doutor"), |
|
("dra", "doutora"), |
|
("st", "santo"), |
|
("co", "companhia"), |
|
("jr", "júnior"), |
|
("ltd", "limitada"), |
|
] |
|
], |
|
"it": [ |
|
(re.compile("\\b%s\\." % x[0], re.IGNORECASE), x[1]) |
|
for x in [ |
|
|
|
("sig", "signore"), |
|
("dr", "dottore"), |
|
("st", "santo"), |
|
("co", "compagnia"), |
|
("jr", "junior"), |
|
("ltd", "limitata"), |
|
] |
|
], |
|
"pl": [ |
|
(re.compile("\\b%s\\." % x[0], re.IGNORECASE), x[1]) |
|
for x in [ |
|
("p", "pani"), |
|
("m", "pan"), |
|
("dr", "doktor"), |
|
("sw", "święty"), |
|
("jr", "junior"), |
|
] |
|
], |
|
"ar": [ |
|
(re.compile("\\b%s\\." % x[0], re.IGNORECASE), x[1]) |
|
for x in [ |
|
|
|
] |
|
], |
|
"zh": [ |
|
(re.compile("\\b%s\\." % x[0], re.IGNORECASE), x[1]) |
|
for x in [ |
|
|
|
] |
|
], |
|
"cs": [ |
|
(re.compile("\\b%s\\." % x[0], re.IGNORECASE), x[1]) |
|
for x in [ |
|
("dr", "doktor"), |
|
("ing", "inženýr"), |
|
("p", "pan"), |
|
|
|
] |
|
], |
|
"ru": [ |
|
(re.compile("\\b%s\\b" % x[0], re.IGNORECASE), x[1]) |
|
for x in [ |
|
("г-жа", "госпожа"), |
|
("г-н", "господин"), |
|
("д-р", "доктор"), |
|
|
|
] |
|
], |
|
"nl": [ |
|
(re.compile("\\b%s\\." % x[0], re.IGNORECASE), x[1]) |
|
for x in [ |
|
("dhr", "de heer"), |
|
("mevr", "mevrouw"), |
|
("dr", "dokter"), |
|
("jhr", "jonkheer"), |
|
|
|
] |
|
], |
|
"tr": [ |
|
(re.compile("\\b%s\\." % x[0], re.IGNORECASE), x[1]) |
|
for x in [ |
|
("b", "bay"), |
|
("byk", "büyük"), |
|
("dr", "doktor"), |
|
|
|
] |
|
], |
|
"hu": [ |
|
(re.compile("\\b%s\\." % x[0], re.IGNORECASE), x[1]) |
|
for x in [ |
|
("dr", "doktor"), |
|
("b", "bácsi"), |
|
("nőv", "nővér"), |
|
|
|
] |
|
], |
|
"ko": [ |
|
(re.compile("\\b%s\\." % x[0], re.IGNORECASE), x[1]) |
|
for x in [ |
|
|
|
] |
|
], |
|
} |
|
|
|
def expand_abbreviations_multilingual(text, lang="en"): |
|
if lang in _abbreviations: |
|
for regex, replacement in _abbreviations[lang]: |
|
text = re.sub(regex, replacement, text) |
|
return text |
|
|
|
_symbols_multilingual = { |
|
"en": [ |
|
(re.compile(r"%s" % re.escape(x[0]), re.IGNORECASE), x[1]) |
|
for x in [ |
|
("&", " and "), |
|
("@", " at "), |
|
("%", " percent "), |
|
("#", " hash "), |
|
("$", " dollar "), |
|
("£", " pound "), |
|
("°", " degree "), |
|
] |
|
], |
|
"es": [ |
|
(re.compile(r"%s" % re.escape(x[0]), re.IGNORECASE), x[1]) |
|
for x in [ |
|
("&", " y "), |
|
("@", " arroba "), |
|
("%", " por ciento "), |
|
("#", " numeral "), |
|
("$", " dolar "), |
|
("£", " libra "), |
|
("°", " grados "), |
|
] |
|
], |
|
"fr": [ |
|
(re.compile(r"%s" % re.escape(x[0]), re.IGNORECASE), x[1]) |
|
for x in [ |
|
("&", " et "), |
|
("@", " arobase "), |
|
("%", " pour cent "), |
|
("#", " dièse "), |
|
("$", " dollar "), |
|
("£", " livre "), |
|
("°", " degrés "), |
|
] |
|
], |
|
"de": [ |
|
(re.compile(r"%s" % re.escape(x[0]), re.IGNORECASE), x[1]) |
|
for x in [ |
|
("&", " und "), |
|
("@", " at "), |
|
("%", " prozent "), |
|
("#", " raute "), |
|
("$", " dollar "), |
|
("£", " pfund "), |
|
("°", " grad "), |
|
] |
|
], |
|
"pt": [ |
|
(re.compile(r"%s" % re.escape(x[0]), re.IGNORECASE), x[1]) |
|
for x in [ |
|
("&", " e "), |
|
("@", " arroba "), |
|
("%", " por cento "), |
|
("#", " cardinal "), |
|
("$", " dólar "), |
|
("£", " libra "), |
|
("°", " graus "), |
|
] |
|
], |
|
"it": [ |
|
(re.compile(r"%s" % re.escape(x[0]), re.IGNORECASE), x[1]) |
|
for x in [ |
|
("&", " e "), |
|
("@", " chiocciola "), |
|
("%", " per cento "), |
|
("#", " cancelletto "), |
|
("$", " dollaro "), |
|
("£", " sterlina "), |
|
("°", " gradi "), |
|
] |
|
], |
|
"pl": [ |
|
(re.compile(r"%s" % re.escape(x[0]), re.IGNORECASE), x[1]) |
|
for x in [ |
|
("&", " i "), |
|
("@", " małpa "), |
|
("%", " procent "), |
|
("#", " krzyżyk "), |
|
("$", " dolar "), |
|
("£", " funt "), |
|
("°", " stopnie "), |
|
] |
|
], |
|
"ar": [ |
|
|
|
(re.compile(r"%s" % re.escape(x[0]), re.IGNORECASE), x[1]) |
|
for x in [ |
|
("&", " و "), |
|
("@", " على "), |
|
("%", " في المئة "), |
|
("#", " رقم "), |
|
("$", " دولار "), |
|
("£", " جنيه "), |
|
("°", " درجة "), |
|
] |
|
], |
|
"zh": [ |
|
|
|
(re.compile(r"%s" % re.escape(x[0]), re.IGNORECASE), x[1]) |
|
for x in [ |
|
("&", " 和 "), |
|
("@", " 在 "), |
|
("%", " 百分之 "), |
|
("#", " 号 "), |
|
("$", " 美元 "), |
|
("£", " 英镑 "), |
|
("°", " 度 "), |
|
] |
|
], |
|
"cs": [ |
|
|
|
(re.compile(r"%s" % re.escape(x[0]), re.IGNORECASE), x[1]) |
|
for x in [ |
|
("&", " a "), |
|
("@", " na "), |
|
("%", " procento "), |
|
("#", " křížek "), |
|
("$", " dolar "), |
|
("£", " libra "), |
|
("°", " stupně "), |
|
] |
|
], |
|
"ru": [ |
|
|
|
(re.compile(r"%s" % re.escape(x[0]), re.IGNORECASE), x[1]) |
|
for x in [ |
|
("&", " и "), |
|
("@", " собака "), |
|
("%", " процентов "), |
|
("#", " номер "), |
|
("$", " доллар "), |
|
("£", " фунт "), |
|
("°", " градус "), |
|
] |
|
], |
|
"nl": [ |
|
|
|
(re.compile(r"%s" % re.escape(x[0]), re.IGNORECASE), x[1]) |
|
for x in [ |
|
("&", " en "), |
|
("@", " bij "), |
|
("%", " procent "), |
|
("#", " hekje "), |
|
("$", " dollar "), |
|
("£", " pond "), |
|
("°", " graden "), |
|
] |
|
], |
|
"tr": [ |
|
(re.compile(r"%s" % re.escape(x[0]), re.IGNORECASE), x[1]) |
|
for x in [ |
|
("&", " ve "), |
|
("@", " at "), |
|
("%", " yüzde "), |
|
("#", " diyez "), |
|
("$", " dolar "), |
|
("£", " sterlin "), |
|
("°", " derece "), |
|
] |
|
], |
|
"hu": [ |
|
(re.compile(r"%s" % re.escape(x[0]), re.IGNORECASE), x[1]) |
|
for x in [ |
|
("&", " és "), |
|
("@", " kukac "), |
|
("%", " százalék "), |
|
("#", " kettőskereszt "), |
|
("$", " dollár "), |
|
("£", " font "), |
|
("°", " fok "), |
|
] |
|
], |
|
"ko": [ |
|
|
|
(re.compile(r"%s" % re.escape(x[0]), re.IGNORECASE), x[1]) |
|
for x in [ |
|
("&", " 그리고 "), |
|
("@", " 에 "), |
|
("%", " 퍼센트 "), |
|
("#", " 번호 "), |
|
("$", " 달러 "), |
|
("£", " 파운드 "), |
|
("°", " 도 "), |
|
] |
|
], |
|
} |
|
|
|
def expand_symbols_multilingual(text, lang="en"): |
|
if lang in _symbols_multilingual: |
|
for regex, replacement in _symbols_multilingual[lang]: |
|
text = re.sub(regex, replacement, text) |
|
text = text.replace(" ", " ") |
|
return text.strip() |
|
|
|
_ordinal_re = { |
|
"en": re.compile(r"([0-9]+)(st|nd|rd|th)"), |
|
"es": re.compile(r"([0-9]+)(º|ª|er|o|a|os|as)"), |
|
"fr": re.compile(r"([0-9]+)(º|ª|er|re|e|ème)"), |
|
"de": re.compile(r"([0-9]+)(st|nd|rd|th|º|ª|\.(?=\s|$))"), |
|
"pt": re.compile(r"([0-9]+)(º|ª|o|a|os|as)"), |
|
"it": re.compile(r"([0-9]+)(º|°|ª|o|a|i|e)"), |
|
"pl": re.compile(r"([0-9]+)(º|ª|st|nd|rd|th)"), |
|
"ar": re.compile(r"([0-9]+)(ون|ين|ث|ر|ى)"), |
|
"cs": re.compile(r"([0-9]+)\.(?=\s|$)"), |
|
"ru": re.compile(r"([0-9]+)(-й|-я|-е|-ое|-ье|-го)"), |
|
"nl": re.compile(r"([0-9]+)(de|ste|e)"), |
|
"tr": re.compile(r"([0-9]+)(\.|inci|nci|uncu|üncü|\.)"), |
|
"hu": re.compile(r"([0-9]+)(\.|adik|edik|odik|edik|ödik|ödike|ik)"), |
|
"ko": re.compile(r"([0-9]+)(번째|번|차|째)"), |
|
} |
|
_number_re = re.compile(r"[0-9]+") |
|
|
|
_currency_re = { |
|
"USD": re.compile(r"((\$[0-9\.\,]*[0-9]+)|([0-9\.\,]*[0-9]+\$))"), |
|
"GBP": re.compile(r"((£[0-9\.\,]*[0-9]+)|([0-9\.\,]*[0-9]+£))"), |
|
"EUR": re.compile(r"(([0-9\.\,]*[0-9]+€)|((€[0-9\.\,]*[0-9]+)))"), |
|
} |
|
|
|
_comma_number_re = re.compile(r"\b\d{1,3}(,\d{3})*(\.\d+)?\b") |
|
_dot_number_re = re.compile(r"\b\d{1,3}(\.\d{3})*(\,\d+)?\b") |
|
_decimal_number_re = re.compile(r"([0-9]+[.,][0-9]+)") |
|
|
|
def _remove_commas(m): |
|
text = m.group(0) |
|
if "," in text: |
|
text = text.replace(",", "") |
|
return text |
|
|
|
def _remove_dots(m): |
|
text = m.group(0) |
|
if "." in text: |
|
text = text.replace(".", "") |
|
return text |
|
|
|
def _expand_decimal_point(m, lang="en"): |
|
amount = m.group(1).replace(",", ".") |
|
return num2words(float(amount), lang=lang if lang != "cs" else "cz") |
|
|
|
def _expand_currency(m, lang="en", currency="USD"): |
|
amount = float((re.sub(r"[^\d.]", "", m.group(0).replace(",", ".")))) |
|
full_amount = num2words(amount, to="currency", currency=currency, lang=lang if lang != "cs" else "cz") |
|
|
|
and_equivalents = { |
|
"en": ", ", |
|
"es": " con ", |
|
"fr": " et ", |
|
"de": " und ", |
|
"pt": " e ", |
|
"it": " e ", |
|
"pl": ", ", |
|
"cs": ", ", |
|
"ru": ", ", |
|
"nl": ", ", |
|
"ar": ", ", |
|
"tr": ", ", |
|
"hu": ", ", |
|
"ko": ", ", |
|
} |
|
|
|
if amount.is_integer(): |
|
last_and = full_amount.rfind(and_equivalents.get(lang, ", ")) |
|
if last_and != -1: |
|
full_amount = full_amount[:last_and] |
|
|
|
return full_amount |
|
|
|
def _expand_ordinal(m, lang="en"): |
|
return num2words(int(m.group(1)), ordinal=True, lang=lang if lang != "cs" else "cz") |
|
|
|
def _expand_number(m, lang="en"): |
|
return num2words(int(m.group(0)), lang=lang if lang != "cs" else "cz") |
|
|
|
def expand_numbers_multilingual(text, lang="en"): |
|
if lang == "zh": |
|
text = zh_num2words()(text) |
|
else: |
|
if lang in ["en", "ru"]: |
|
text = re.sub(_comma_number_re, _remove_commas, text) |
|
else: |
|
text = re.sub(_dot_number_re, _remove_dots, text) |
|
try: |
|
text = re.sub(_currency_re["GBP"], lambda m: _expand_currency(m, lang, "GBP"), text) |
|
text = re.sub(_currency_re["USD"], lambda m: _expand_currency(m, lang, "USD"), text) |
|
text = re.sub(_currency_re["EUR"], lambda m: _expand_currency(m, lang, "EUR"), text) |
|
except Exception as e: |
|
pass |
|
if lang != "tr": |
|
text = re.sub(_decimal_number_re, lambda m: _expand_decimal_point(m, lang), text) |
|
if lang in _ordinal_re: |
|
text = re.sub(_ordinal_re[lang], lambda m: _expand_ordinal(m, lang), text) |
|
text = re.sub(_number_re, lambda m: _expand_number(m, lang), text) |
|
return text |
|
|
|
def lowercase(text): |
|
return text.lower() |
|
|
|
def collapse_whitespace(text): |
|
return re.sub(_whitespace_re, " ", text) |
|
|
|
def multilingual_cleaners(text, lang): |
|
text = text.replace('"', "") |
|
if lang == "tr": |
|
text = text.replace("İ", "i") |
|
text = text.replace("Ö", "ö") |
|
text = text.replace("Ü", "ü") |
|
text = lowercase(text) |
|
text = expand_numbers_multilingual(text, lang) |
|
text = expand_abbreviations_multilingual(text, lang) |
|
text = expand_symbols_multilingual(text, lang=lang) |
|
text = collapse_whitespace(text) |
|
return text |
|
|
|
def basic_cleaners(text): |
|
"""Basic pipeline that lowercases and collapses whitespace without transliteration.""" |
|
text = lowercase(text) |
|
text = collapse_whitespace(text) |
|
return text |
|
|
|
def chinese_transliterate(text): |
|
return "".join( |
|
[p[0] for p in pypinyin.pinyin(text, style=pypinyin.Style.TONE3, heteronym=False, neutral_tone_with_five=True)] |
|
) |
|
|
|
def japanese_cleaners(text, katsu): |
|
text = katsu.romaji(text) |
|
text = lowercase(text) |
|
return text |
|
|
|
def korean_transliterate(text, transliter): |
|
return transliter.translit(text) |
|
|
|
|
|
|
|
class XTTSTokenizerFast(PreTrainedTokenizerFast): |
|
""" |
|
Fast Tokenizer implementation for XTTS model using HuggingFace's PreTrainedTokenizerFast |
|
""" |
|
|
|
def __init__( |
|
self, |
|
vocab_file: str = None, |
|
tokenizer_object: Optional[Tokenizer] = None, |
|
unk_token: str = "[UNK]", |
|
pad_token: str = "[PAD]", |
|
bos_token: str = "[START]", |
|
eos_token: str = "[STOP]", |
|
auto_map: dict = {"AutoTokenizer": ["AstraMindAI/xtts2-gpt--tokenizer.XTTSTokenizerFast", None]}, |
|
clean_up_tokenization_spaces: bool = True, |
|
**kwargs |
|
): |
|
if tokenizer_object is None and vocab_file is not None: |
|
tokenizer_object = Tokenizer.from_file(vocab_file) |
|
|
|
if tokenizer_object is not None: |
|
|
|
tokenizer_object.pre_tokenizer = WhitespaceSplit() |
|
tokenizer_object.post_processor = TemplateProcessing( |
|
single=f"{bos_token} $A {eos_token}", |
|
special_tokens=[ |
|
(bos_token, tokenizer_object.token_to_id(bos_token)), |
|
(eos_token, tokenizer_object.token_to_id(eos_token)), |
|
], |
|
) |
|
|
|
super().__init__( |
|
tokenizer_object=tokenizer_object, |
|
unk_token=unk_token, |
|
pad_token=pad_token, |
|
bos_token=bos_token, |
|
eos_token=eos_token, |
|
clean_up_tokenization_spaces=clean_up_tokenization_spaces, |
|
**kwargs |
|
) |
|
|
|
|
|
self.char_limits = { |
|
"en": 250, "de": 253, "fr": 273, "es": 239, |
|
"it": 213, "pt": 203, "pl": 224, "zh": 82, |
|
"ar": 166, "cs": 186, "ru": 182, "nl": 251, |
|
"tr": 226, "ja": 71, "hu": 224, "ko": 95, |
|
} |
|
|
|
|
|
self._katsu = None |
|
self._korean_transliter = Transliter(academic) |
|
|
|
|
|
if self.pad_token_id is None: |
|
self.pad_token_id = self.tokenizer.token_to_id(self.pad_token) |
|
|
|
@cached_property |
|
def katsu(self): |
|
if self._katsu is None: |
|
self._katsu = cutlet.Cutlet() |
|
return self._katsu |
|
|
|
def preprocess_text(self, text: str, lang: str) -> str: |
|
"""Apply text preprocessing for language""" |
|
base_lang = lang.split("-")[0] |
|
if base_lang in {"ar", "cs", "de", "en", "es", "fr", "hu", "it", |
|
"nl", "pl", "pt", "ru", "tr", "zh", "ko"}: |
|
text = multilingual_cleaners(text, base_lang) |
|
if base_lang == "zh": |
|
text = chinese_transliterate(text) |
|
if base_lang == "ko": |
|
text = korean_transliterate(text, self._korean_transliter) |
|
elif base_lang == "ja": |
|
text = japanese_cleaners(text, self.katsu) |
|
else: |
|
text = basic_cleaners(text) |
|
return text |
|
|
|
def batch_encode_with_split(self, texts: Union[str, List[str]], lang: Union[str, List[str]], |
|
**kwargs) -> torch.Tensor: |
|
""" |
|
Split texts into smaller chunks based on language character limits and encode them using HuggingFace fast tokenizer. |
|
strictly mimic the xttsv2 tokenizer |
|
""" |
|
|
|
if isinstance(texts, str): |
|
texts = [texts] |
|
if isinstance(lang, str): |
|
lang = [lang] |
|
|
|
if len(lang) == 1 and len(texts) > 1: |
|
lang = lang * len(texts) |
|
|
|
|
|
if len(texts) != len(lang): |
|
raise ValueError(f"Number of texts ({len(texts)}) does not match number of languages ({len(lang)}).") |
|
|
|
chunk_list = [] |
|
max_splits = 0 |
|
|
|
|
|
for text, text_lang in zip(texts, lang): |
|
|
|
base_lang = text_lang.split("-")[0] |
|
char_limit = self.char_limits.get(base_lang, 250) |
|
|
|
|
|
|
|
|
|
|
|
chunk_list = split_sentence(text, base_lang, text_split_length=char_limit) |
|
|
|
|
|
if not self.is_fast: |
|
raise ValueError("The tokenizer must be a fast tokenizer.") |
|
|
|
|
|
encoding: BatchEncoding = self( |
|
chunk_list, |
|
lang = lang, |
|
add_special_tokens=False, |
|
padding=False, |
|
**kwargs |
|
) |
|
|
|
|
|
return encoding['input_ids'] |
|
|
|
def _batch_encode_plus( |
|
self, |
|
batch_text_or_text_pairs, |
|
add_special_tokens: bool = True, |
|
padding_strategy=PaddingStrategy.DO_NOT_PAD, |
|
truncation_strategy=TruncationStrategy.DO_NOT_TRUNCATE, |
|
max_length: Optional[int] = None, |
|
stride: int = 0, |
|
is_split_into_words: bool = False, |
|
pad_to_multiple_of: Optional[int] = None, |
|
return_tensors: Optional[str] = None, |
|
return_token_type_ids: Optional[bool] = None, |
|
return_attention_mask: Optional[bool] = None, |
|
return_overflowing_tokens: bool = False, |
|
return_special_tokens_mask: bool = False, |
|
return_offsets_mapping: bool = False, |
|
return_length: bool = False, |
|
verbose: bool = True, |
|
**kwargs |
|
) -> Dict[str, Any]: |
|
""" |
|
Override batch encoding to handle language-specific preprocessing |
|
""" |
|
lang = kwargs.pop("lang", ["en"] * len(batch_text_or_text_pairs)) |
|
if isinstance(lang, str): |
|
lang = [lang] |
|
|
|
if len(lang) == 1 and len(batch_text_or_text_pairs) > 1: |
|
lang = lang * len(batch_text_or_text_pairs) |
|
|
|
|
|
if len(batch_text_or_text_pairs) != len(lang): |
|
raise ValueError(f"Number of texts ({len(batch_text_or_text_pairs)}) does not match number of languages ({len(lang)}).") |
|
|
|
|
|
processed_texts = [] |
|
for text, text_lang in zip(batch_text_or_text_pairs, lang): |
|
if isinstance(text, str): |
|
|
|
|
|
processed_text = self.preprocess_text(text, text_lang) |
|
|
|
|
|
base_lang = text_lang.split("-")[0] |
|
lang_code = "zh-cn" if base_lang == "zh" else base_lang |
|
processed_text = f"[{lang_code}]{processed_text}" |
|
processed_text = processed_text.replace(" ", "[SPACE]") |
|
|
|
processed_texts.append(processed_text) |
|
else: |
|
processed_texts.append(text) |
|
|
|
|
|
return super()._batch_encode_plus( |
|
processed_texts, |
|
add_special_tokens=add_special_tokens, |
|
padding_strategy=padding_strategy, |
|
truncation_strategy=truncation_strategy, |
|
max_length=max_length, |
|
stride=stride, |
|
is_split_into_words=is_split_into_words, |
|
pad_to_multiple_of=pad_to_multiple_of, |
|
return_tensors=return_tensors, |
|
return_token_type_ids=return_token_type_ids, |
|
return_attention_mask=return_attention_mask, |
|
return_overflowing_tokens=return_overflowing_tokens, |
|
return_special_tokens_mask=return_special_tokens_mask, |
|
return_offsets_mapping=return_offsets_mapping, |
|
return_length=return_length, |
|
verbose=verbose, |
|
**kwargs |
|
) |
|
|
|
|
|
def __call__( |
|
self, |
|
text: Union[str, List[str]], |
|
lang: Union[str, List[str]] = "en", |
|
add_special_tokens: bool = True, |
|
padding: Union[bool, str, PaddingStrategy] = False, |
|
truncation: Union[bool, str, TruncationStrategy] = False, |
|
max_length: Optional[int] = None, |
|
stride: int = 0, |
|
return_tensors: Optional[str] = None, |
|
return_token_type_ids: Optional[bool] = None, |
|
return_attention_mask: Optional[bool] = True, |
|
**kwargs |
|
): |
|
""" |
|
Main tokenization method |
|
""" |
|
|
|
if isinstance(text, str): |
|
text = [text] |
|
if isinstance(lang, str): |
|
lang = [lang] |
|
|
|
if len(lang) == 1 and len(text) > 1: |
|
lang = lang * len(text) |
|
|
|
|
|
if len(text) != len(lang): |
|
raise ValueError(f"Number of texts ({len(text)}) does not match number of languages ({len(lang)}).") |
|
|
|
|
|
if isinstance(padding, bool): |
|
padding_strategy = PaddingStrategy.LONGEST if padding else PaddingStrategy.DO_NOT_PAD |
|
else: |
|
padding_strategy = PaddingStrategy(padding) |
|
|
|
|
|
if isinstance(truncation, bool): |
|
truncation_strategy = TruncationStrategy.LONGEST_FIRST if truncation else TruncationStrategy.DO_NOT_TRUNCATE |
|
else: |
|
truncation_strategy = TruncationStrategy(truncation) |
|
|
|
|
|
encoded = self._batch_encode_plus( |
|
text, |
|
add_special_tokens=add_special_tokens, |
|
padding_strategy=padding_strategy, |
|
truncation_strategy=truncation_strategy, |
|
max_length=max_length, |
|
stride=stride, |
|
return_tensors=return_tensors, |
|
return_token_type_ids=return_token_type_ids, |
|
return_attention_mask=return_attention_mask, |
|
lang=lang, |
|
**kwargs |
|
) |
|
|
|
return encoded |
|
|