Source code for subs2vec.clean_subs

"""Clean and concatenate OpenSubtitles archives into a single training corpus."""
import os
import zipfile
import argparse
import re
from .utensils import log_timer
from lxml import etree
import logging
logging.basicConfig(format='[{levelname}] {message}', style='{', level=logging.INFO)


def _strip_upos(tree):
    # format [word]_[POS tag]
    stripped = []
    for node in tree.iter():
        if node.tag == 's':
            stripped.append('\n')
        if node.tag == 'w':
            stripped.append(f'{node.text}_{node.get("upos")} ')
    return u''.join(stripped)


def _strip_lemma(tree):
    # format [lemmatized word]
    stripped = []
    for node in tree.iter():
        if node.tag == 's':
            stripped.append('\n')
        if node.tag == 'w':
            stripped.append(f'{node.get("lemma")}_{node.get("upos")} ')
    return u''.join(stripped)


def _strip_txt(tree):
    # format [sentence]
    for node in tree.iter():
        if node.tag == 'meta':
            tree.remove(node)
    return etree.tostring(tree, encoding=str, method='text')


def _strip_viz(tree):
    # format [timestamp in ms] [sentence]
    stripped = []
    for node in tree.iter():
        if node.tag == 's':
            children = list(node)
            if len(children) > 0:
                if children[0].tag == 'time':
                    timestamp = children[0].get('value').replace(':', '').replace(',', '.')
                    txt = etree.tostring(node, encoding=str, method='text').replace('\n', '')
                    stripped.append(f'[{timestamp}] {txt}')
    return u'\n'.join(stripped)


def _strip_xml(text, xmlparser, ioformat='txt'):
    tree = etree.fromstring(text, xmlparser)
    if ioformat == 'upos':
        return _strip_upos(tree)
    elif ioformat == 'lemma':
        return _strip_lemma(tree)
    elif ioformat == 'txt':
        return _strip_txt(tree)
    elif ioformat == 'viz':
        return _strip_viz(tree)


[docs]@log_timer def strip_archive(lang, ioformat='txt', years=(1900, 2100)): """Strip xml from a compressed OpenSubtitles archive. Writes stripped output directly to a new compressed archive. :param lang: language of the archive to strip (selects the correct archive) :param ioformat: input/output format, default is txt :param years: tuple of first and last year to select only subtitles from a range of years, default is 1900 to 2100 """ read_zip = zipfile.ZipFile(f'{lang}.zip', 'r') write_zip = zipfile.ZipFile(f'{lang}_stripped.zip', 'a') if ioformat in ['txt', 'viz']: dirpath = 'OpenSubtitles/raw' elif ioformat in ['upos', 'lemma']: dirpath = 'OpenSubtitles/parsed' filepaths = [] for filepath in read_zip.namelist(): if filepath.endswith('xml'): if filepath.startswith(os.path.join(dirpath, lang)): if int(filepath.split('/')[3]) in range(*years): filepaths.append(filepath) logging.info(f'stripping xml from {len(filepaths)} subtitles in {lang}') # XML parser recover option is needed to deal with malformed XML in subs xmlparser = etree.XMLParser(recover=True, encoding='utf-8') for filepath in sorted(filepaths): write_zip.writestr(filepath.replace('xml', ioformat), _strip_xml(read_zip.open(filepath).read(), xmlparser, ioformat))
regeces = [ (r'<.*?>', ''), # strip other xml tags (r'http.*?(?:[\s\n\]]|$)', ''), # strip links (r'\s\(.*?\)', ''), # remove everything in parentheses (r'([^\s]{2,})[\.\!\?\:\;]+?[\s\n]|$', '\\1\n'), # break sentences at periods (r"[-–—/']", ' '), # replace hyphens, apostrophes and slashes with spaces (r'\s*\n\s*', '\n'), # strip empty lines and lines containing whitespace (r'\s{2,}', ' '), # strip excessive spaces ] patterns = [(re.compile(regec[0], re.IGNORECASE), regec[1]) for regec in regeces] def _strip_punctuation(txt, ioformat='txt'): """Strip punctuation from a string of text. :param txt: text to strip punctuation from :param ioformat: input/output format, default is txt :return: stripped text """ if ioformat != 'viz': for pattern in patterns: txt = pattern[0].sub(pattern[1], txt) if ioformat == 'txt': txt = ''.join([letter for letter in txt if (letter.isalnum() or letter.isspace())]) elif ioformat in ['lemma', 'upos']: txt = ''.join([letter for letter in txt if (letter.isalnum() or letter.isspace() or (letter == '_'))]) else: txt = ''.join([letter for letter in txt if (letter.isalnum() or letter.isspace())]) return txt
[docs]@log_timer def join_archive(lang, ioformat='txt', years=(1900, 2050), verbose=False): """Concatenate a stripped OpenSubtitles archive into a single training corpus. :param lang: language of the archive to join (selects the correct archive) :param ioformat: input/output format (default is txt) :param years: tuple of first and last year to select only subtitles from a range of years, default is 1900 to 2100. :param verbose: boolean setting whether or not to print a progress bar to the command line, default is False :return: number of files that were concatenated """ read_zip = zipfile.ZipFile(f'{lang}_stripped.zip', 'r') out_fname = f'{lang}.{ioformat}' if ioformat in ['txt', 'viz']: dirpath = 'OpenSubtitles/raw' elif ioformat in ['upos', 'lemma']: dirpath = 'OpenSubtitles/parsed' filepaths = [] for filepath in read_zip.namelist(): if filepath.endswith(ioformat): if filepath.startswith(os.path.join(dirpath, lang)): if int(filepath.split('/')[3]) in range(*years): filepaths.append(filepath) total = len(filepaths) logging.info(f'joining {len(filepaths)} subtitles in {lang} into a single file') i = 0 with open(out_fname, 'w') as outfile: for filepath in filepaths: outfile.write(_strip_punctuation(read_zip.open(filepath).read().decode('utf-8'), ioformat)) if verbose: i += 1 print(f'\tprogress: {(float(i) / total) * 100:5.2f}%', end='\r') if verbose: print('') return total
if __name__ == '__main__': argparser = argparse.ArgumentParser(description='clean subtitles for training distributional semantics models') argparser.add_argument('lang', help='language to clean') argparser.add_argument('--strip', action='store_true', help='strip xml from subtitles archive') argparser.add_argument('--years', default=(1900, 2050), nargs=2, type=int, help='range of years to include') argparser.add_argument('--join', action='store_true', help='concatenate stripped archive') argparser.add_argument('--ioformat', default='txt', choices=['txt', 'lemma', 'upos', 'viz'], help='input/output format') args = argparser.parse_args() if args.strip: strip_archive(lang=args.lang, ioformat=args.ioformat, years=args.years) if args.join: join_archive(lang=args.lang, ioformat=args.ioformat, years=args.years)