Source code for trafilatura.xml

# pylint:disable-msg=E0611,I1101
"""
All functions related to XML generation, processing and validation.
"""

import csv
import logging
import lzma

from html import unescape
from io import StringIO
from json import dumps as json_dumps
from pathlib import Path
from pickle import load as load_pickle

try:  # Python 3.8+
    from importlib.metadata import version
except ImportError:
    from importlib_metadata import version

from lxml.etree import (Element, RelaxNG, SubElement, XMLParser, fromstring,
                        tostring)


from .filters import text_chars_test
from .utils import sanitize, sanitize_tree


LOGGER = logging.getLogger(__name__)
PKG_VERSION = version("trafilatura")

# validation
TEI_SCHEMA = str(Path(__file__).parent / 'data/tei-schema-pickle.lzma')
TEI_VALID_TAGS = {'ab', 'body', 'cell', 'code', 'del', 'div', 'graphic', 'head', 'hi', \
                  'item', 'lb', 'list', 'p', 'quote', 'ref', 'row', 'table'}
TEI_VALID_ATTRS = {'rend', 'rendition', 'role', 'target', 'type'}
TEI_RELAXNG = None  # to be downloaded later if necessary
TEI_REMOVE_TAIL = {"ab", "p"}

CONTROL_PARSER = XMLParser(remove_blank_text=True)

NEWLINE_ELEMS = {
    'cell': '|',
    'item': '\n- ',
    **{tag: '\n' for tag in ['code', 'graphic', 'head', 'lb', 'list', 'p', 'quote', 'row', 'table']}
}
SPECIAL_FORMATTING = {'del', 'head', 'hi', 'ref'}
WITH_ATTRIBUTES = {'cell', 'del', 'graphic', 'head', 'hi', 'item', 'list', 'ref'}

NESTING_WHITELIST = {"cell", "figure", "item", "note", "quote"}

META_ATTRIBUTES = [
    'sitename', 'title', 'author', 'date', 'url', 'hostname',
    'description', 'categories', 'tags', 'license', 'id',
    'fingerprint', 'language'
]

HI_FORMATTING = {'#b': '**', '#i': '*', '#u': '__', '#t': '`'}


def build_json_output(docmeta):
    '''Build JSON output based on extracted information'''
    outputdict = {slot: getattr(docmeta, slot, None) for slot in docmeta.__slots__}
    outputdict.update({
        'source': outputdict.pop('url'),
        'source-hostname': outputdict.pop('sitename'),
        'excerpt': outputdict.pop('description'),
        'categories': ';'.join(outputdict.pop('categories')),
        'tags': ';'.join(outputdict.pop('tags')),
        'text': xmltotxt(outputdict.pop('body'), include_formatting=False),
    })

    commentsbody = outputdict.pop('commentsbody')
    if commentsbody is not None:
        outputdict['comments'] = xmltotxt(commentsbody, include_formatting=False)

    return json_dumps(outputdict, ensure_ascii=False)


def clean_attributes(tree):
    '''Remove unnecessary attributes.'''
    for elem in tree.iter('*'):
        if elem.tag not in WITH_ATTRIBUTES:
            elem.attrib.clear()
    return tree


def remove_empty_elements(tree):
    '''Remove text elements without text.'''
    for element in tree.iter('*'):  # 'head', 'hi', 'item', 'p'
        if len(element) == 0 and text_chars_test(element.text) is False and text_chars_test(element.tail) is False:
            parent = element.getparent()
            # not root element or element which is naturally empty
            # do not remove elements inside <code> to preserve formatting
            if parent is not None and element.tag != "graphic" and parent.tag != 'code':
                element.getparent().remove(element)
    return tree


def strip_double_tags(tree):
    "Prevent nested tags among a fixed list of tags."
    for elem in reversed(tree.xpath(".//head | .//code | .//p")):
        for subelem in elem.iterdescendants("code", "head", "p"):
            if subelem.getparent().tag in NESTING_WHITELIST:
                continue
            if subelem.tag == elem.tag:
                merge_with_parent(subelem)
    return tree


def build_xml_output(docmeta):
    '''Build XML output tree based on extracted information'''
    output = Element('doc')
    output = add_xml_meta(output, docmeta)
    docmeta.body.tag = 'main'
    # clean XML tree
    output.append(clean_attributes(docmeta.body))
    if docmeta.commentsbody is not None:
        docmeta.commentsbody.tag = 'comments'
        output.append(clean_attributes(docmeta.commentsbody))
# XML invalid characters
# https://chase-seibert.github.io/blog/2011/05/20/stripping-control-characters-in-python.html
    return output


def control_xml_output(output_tree, output_format, tei_validation, docmeta):
    '''Make sure the XML output is conform and valid if required'''
    output_tree = sanitize_tree(output_tree)
    # necessary for cleaning
    control_string = tostring(output_tree, encoding='unicode')
    output_tree = fromstring(control_string, CONTROL_PARSER)
    # validate
    if output_format == 'xmltei' and tei_validation is True:
        result = validate_tei(output_tree)
        LOGGER.debug('TEI validation result: %s %s %s', result, docmeta.id, docmeta.url)
    return tostring(output_tree, pretty_print=True, encoding='unicode').strip()


def add_xml_meta(output, docmeta):
    '''Add extracted metadata to the XML output tree'''
    for attribute in META_ATTRIBUTES:
        value = getattr(docmeta, attribute, None)
        if value is not None:
            output.set(attribute, value if isinstance(value, str) else ';'.join(value))
    return output


def build_tei_output(docmeta):
    '''Build TEI-XML output tree based on extracted information'''
    # build TEI tree
    output = write_teitree(docmeta)
    # filter output (strip unwanted elements), just in case
    # check and repair
    output = check_tei(output, docmeta.url)
    return output


def check_tei(xmldoc, url):
    '''Check if the resulting XML file is conform and scrub remaining tags'''
    # convert head tags
    for elem in xmldoc.iter('head'):
        elem.tag = 'ab'
        elem.set('type', 'header')
        parent = elem.getparent()
        if len(elem) > 0:
            new_elem = _tei_handle_complex_head(elem)
            parent.replace(elem, new_elem)
            elem = new_elem
        if parent.tag == "p":
            _move_element_one_level_up(elem)
    # convert <lb/> when child of <div> to <p>
    for element in xmldoc.findall(".//text/body//div/lb"):
        if element.tail and element.tail.strip():
            element.tag = 'p'
            element.text = element.tail
            element.tail = None
    # look for elements that are not valid
    for element in xmldoc.findall('.//text/body//*'):
        if element.tag in TEI_REMOVE_TAIL and element.tail and element.tail.strip():
            _handle_unwanted_tails(element)
        # check elements
        if element.tag not in TEI_VALID_TAGS:
            # disable warnings for chosen categories
            # if element.tag not in ('div', 'span'):
            LOGGER.warning('not a TEI element, removing: %s %s', element.tag, url)
            merge_with_parent(element)
            continue
        if element.tag == "div":
            _handle_text_content_of_div_nodes(element)
            _wrap_unwanted_siblings_of_div(element)
        # check attributes
        for attribute in element.attrib:
            if attribute not in TEI_VALID_ATTRS:
                LOGGER.warning('not a valid TEI attribute, removing: %s in %s %s', attribute, element.tag, url)
                element.attrib.pop(attribute)
    return xmldoc


[docs] def validate_tei(xmldoc): # , filename="" '''Check if an XML document is conform to the guidelines of the Text Encoding Initiative''' global TEI_RELAXNG if TEI_RELAXNG is None: # load validator with lzma.open(TEI_SCHEMA, 'rb') as schemafile: schema_data = load_pickle(schemafile) TEI_RELAXNG = RelaxNG(fromstring(schema_data)) result = TEI_RELAXNG.validate(xmldoc) if result is False: LOGGER.warning('not a valid TEI document: %s', TEI_RELAXNG.error_log.last_error) return result
def replace_element_text(element, include_formatting): '''Determine element text based on **just the text** of the element. You must deal with the tail separately.''' elem_text = element.text # handle formatting: convert to markdown if include_formatting is True and element.text is not None: if element.tag in ('del', 'head'): if element.tag == 'head': try: number = int(element.get('rend')[1]) except (TypeError, ValueError): number = 2 elem_text = f'{"#" * number} {elem_text}' elif element.tag == 'del': elem_text = f'~~{elem_text}~~' elif element.tag == 'hi': rend = element.get('rend') if rend in HI_FORMATTING: elem_text = f'{HI_FORMATTING[rend]}{elem_text}{HI_FORMATTING[rend]}' elif element.tag == 'code': if '\n' in element.text: elem_text = f'```\n{elem_text}\n```' else: elem_text = f'`{elem_text}`' # handle links if element.tag == 'ref': if elem_text is not None: link_text = f'[{elem_text}]' if element.get('target') is not None: elem_text = f"{link_text}({element.get('target')})" else: LOGGER.warning('missing link attribute: %s %s', elem_text, element.attrib) elem_text = link_text else: LOGGER.warning('empty link: %s %s', elem_text, element.attrib) # handle text return (elem_text or '') def merge_with_parent(element, include_formatting=False): '''Merge element with its parent and convert formatting to markdown.''' parent = element.getparent() if parent is None: return full_text = replace_element_text(element, include_formatting) if element.tail is not None: full_text = f'{full_text}{element.tail}' previous = element.getprevious() if previous is not None: # There is a previous node, append text to its tail if previous.tail is not None: previous.tail = f'{previous.tail} {full_text}' else: previous.tail = full_text elif parent.text is not None: parent.text = f'{parent.text} {full_text}' else: parent.text = full_text parent.remove(element) def process_element(element, returnlist, include_formatting): # Process children recursively if element.text is not None: # this is the text that comes before the first child textelement = replace_element_text(element, include_formatting) returnlist.append(textelement) for child in element: process_element(child, returnlist, include_formatting) if element.text is None and element.tail is None: if element.tag == 'graphic': # add source, default to '' text = f'{element.get("title", "")} {element.get("alt", "")}' returnlist.extend(['![', text.strip(), ']', '(', element.get('src', ''), ')']) # newlines for textless elements if element.tag in ('graphic', 'row', 'table'): returnlist.append('\n') return # Nothing more to do with textless elements # Process text # Common elements (Now processes end-tag logic correctly) if element.tag in NEWLINE_ELEMS: returnlist.extend([NEWLINE_ELEMS[element.tag], '\n']) elif element.tag == 'comments': returnlist.append('\n\n') else: if element.tag not in SPECIAL_FORMATTING: LOGGER.debug('unprocessed element in output: %s', element.tag) returnlist.extend([' ']) # this is text that comes after the closing tag, so it should be after any NEWLINE_ELEMS if element.tail is not None: returnlist.append(element.tail)
[docs] def xmltotxt(xmloutput, include_formatting): '''Convert to plain text format and optionally preserve formatting as markdown.''' returnlist = [] process_element(xmloutput, returnlist, include_formatting) return unescape(sanitize(''.join(returnlist)))
def xmltocsv(document, include_formatting, *, delim="\t", null="null"): "Convert the internal XML document representation to a CSV string." # preprocessing posttext = xmltotxt(document.body, include_formatting) if document.commentsbody is not None: commentstext = xmltotxt(document.commentsbody, include_formatting) else: commentstext = "" # output config output = StringIO() outputwriter = csv.writer(output, delimiter=delim, quoting=csv.QUOTE_MINIMAL) # organize fields data = [d or null for d in ( document.url, document.id, document.fingerprint, document.hostname, document.title, document.image, document.date, posttext, commentstext, document.license, document.pagetype, ) ] outputwriter.writerow(data) return output.getvalue() def write_teitree(docmeta): '''Bundle the extracted post and comments into a TEI tree''' teidoc = Element('TEI', xmlns='http://www.tei-c.org/ns/1.0') header = write_fullheader(teidoc, docmeta) textelem = SubElement(teidoc, 'text') textbody = SubElement(textelem, 'body') # post postbody = clean_attributes(docmeta.body) postbody.tag = 'div' postbody.set('type', 'entry') # rendition='#pst' textbody.append(postbody) # comments if docmeta.commentsbody is not None: commentsbody = clean_attributes(docmeta.commentsbody) commentsbody.tag = 'div' commentsbody.set('type', 'comments') # rendition='#cmt' textbody.append(commentsbody) return teidoc def _define_publisher_string(docmeta): '''Construct a publisher string to include in TEI header''' if docmeta.hostname and docmeta.sitename: publisherstring = f'{docmeta.sitename.strip()} ({docmeta.hostname})' elif docmeta.hostname: publisherstring = docmeta.hostname elif docmeta.sitename: publisherstring = docmeta.sitename else: LOGGER.warning('no publisher for URL %s', docmeta.url) publisherstring = 'N/A' return publisherstring def write_fullheader(teidoc, docmeta): '''Write TEI header based on gathered metadata''' # todo: add language info header = SubElement(teidoc, 'teiHeader') filedesc = SubElement(header, 'fileDesc') bib_titlestmt = SubElement(filedesc, 'titleStmt') bib_titlemain = SubElement(bib_titlestmt, 'title', type='main') bib_titlemain.text = docmeta.title if docmeta.author: bib_author = SubElement(bib_titlestmt, 'author') bib_author.text = docmeta.author publicationstmt_a = SubElement(filedesc, 'publicationStmt') publisher_string = _define_publisher_string(docmeta) # license, if applicable if docmeta.license: publicationstmt_publisher = SubElement(publicationstmt_a, 'publisher') publicationstmt_publisher.text = publisher_string availability = SubElement(publicationstmt_a, 'availability') avail_p = SubElement(availability, 'p') avail_p.text = docmeta.license # insert an empty paragraph for conformity else: publicationstmt_p = SubElement(publicationstmt_a, 'p') notesstmt = SubElement(filedesc, 'notesStmt') if docmeta.id: idno = SubElement(notesstmt, 'note', type='id') idno.text = docmeta.id fingerprint = SubElement(notesstmt, 'note', type='fingerprint') fingerprint.text = docmeta.fingerprint sourcedesc = SubElement(filedesc, 'sourceDesc') source_bibl = SubElement(sourcedesc, 'bibl') # determination of sigle string if docmeta.sitename and docmeta.date: sigle = docmeta.sitename + ', ' + docmeta.date elif not docmeta.sitename and docmeta.date: sigle = docmeta.date elif docmeta.sitename: sigle = docmeta.sitename else: LOGGER.warning('no sigle for URL %s', docmeta.url) sigle = '' if docmeta.title: source_bibl.text = docmeta.title + '. ' + sigle else: source_bibl.text = '. ' + sigle source_sigle = SubElement(sourcedesc, 'bibl', type='sigle') source_sigle.text = sigle biblfull = SubElement(sourcedesc, 'biblFull') bib_titlestmt = SubElement(biblfull, 'titleStmt') bib_titlemain = SubElement(bib_titlestmt, 'title', type='main') bib_titlemain.text = docmeta.title if docmeta.author: bib_author = SubElement(bib_titlestmt, 'author') bib_author.text = docmeta.author publicationstmt = SubElement(biblfull, 'publicationStmt') publication_publisher = SubElement(publicationstmt, 'publisher') publication_publisher.text = publisher_string if docmeta.url: publication_url = SubElement(publicationstmt, 'ptr', type='URL', target=docmeta.url) publication_date = SubElement(publicationstmt, 'date') publication_date.text = docmeta.date profiledesc = SubElement(header, 'profileDesc') abstract = SubElement(profiledesc, 'abstract') abstract_p = SubElement(abstract, 'p') abstract_p.text = docmeta.description if len(docmeta.categories) > 0 or len(docmeta.tags) > 0: textclass = SubElement(profiledesc, 'textClass') keywords = SubElement(textclass, 'keywords') if len(docmeta.categories) > 0: cat_list = SubElement(keywords, 'term', type='categories') cat_list.text = ','.join(docmeta.categories) if len(docmeta.tags) > 0: tags_list = SubElement(keywords, 'term', type='tags') tags_list.text = ','.join(docmeta.tags) encodingdesc = SubElement(header, 'encodingDesc') appinfo = SubElement(encodingdesc, 'appInfo') application = SubElement(appinfo, 'application', version=PKG_VERSION, ident='Trafilatura') label = SubElement(application, 'label') label.text = 'Trafilatura' pointer = SubElement(application, 'ptr', target='https://github.com/adbar/trafilatura') return header def _handle_text_content_of_div_nodes(element): if element.text and element.text.strip(): if element.getchildren() and element[0].tag == 'p': p_text = element[0].text or "" element[0].text = f'{element.text} {p_text}'.strip() else: new_child = Element("p") new_child.text = element.text element.insert(0, new_child) element.text = None if element.tail and element.tail.strip(): if element.getchildren() and element[-1].tag == 'p': p_text = element[-1].text or "" element[-1].text = f'{p_text} {element.tail}'.strip() else: new_child = Element("p") new_child.text = element.tail element.append(new_child) element.tail = None def _handle_unwanted_tails(element): "Handle tail on p and ab elements" if element.tag == 'p': if element.text: element.text += ' ' + element.tail.strip() else: element.text = element.tail else: new_sibling = Element('p') new_sibling.text = element.tail.strip() parent = element.getparent() parent.insert(parent.index(element) + 1 , new_sibling) element.tail = None def _tei_handle_complex_head(element): new_element = Element('ab', attrib=element.attrib) new_element.text = element.text.strip() if element.text is not None else None for child in element.iterchildren(): if child.tag == 'p': if len(new_element) > 0 or new_element.text: # add <lb> if <ab> has no children or last tail contains text if len(new_element) == 0 or new_element[-1].tail: SubElement(new_element, 'lb') new_element[-1].tail = child.text else: new_element.text = child.text else: new_element.append(child) if element.tail is not None and element.tail.strip(): new_element.tail = element.tail.strip() return new_element def _wrap_unwanted_siblings_of_div(div_element): new_sibling = Element("div") new_sibling_index = None parent = div_element.getparent() # check siblings after target element for sibling in div_element.itersiblings(): if sibling.tag == "div": break if sibling.tag in {"p", "list", "table", "quote", "ab"}: if new_sibling_index is None: new_sibling_index = parent.index(sibling) new_sibling.append(sibling) # some elements (e.g. <lb/>) can appear next to div, but # order of elements should be kept, thus add and reset new_sibling else: if new_sibling_index is not None and len(new_sibling) != 0: parent.insert(new_sibling_index, new_sibling) new_sibling = Element("div") new_sibling_index = None if new_sibling_index is not None and len(new_sibling) != 0: parent.insert(new_sibling_index, new_sibling) def _move_element_one_level_up(element): """ Fix TEI compatibility issues by moving certain p-elems up in the XML tree. There is always a n+2 nesting for p-elements with the minimal structure ./TEI/text/body/p """ parent = element.getparent() grand_parent = parent.getparent() new_elem = Element("p") new_elem.extend(sibling for sibling in element.itersiblings()) grand_parent.insert(grand_parent.index(parent) + 1, element) if element.tail and element.tail.strip(): new_elem.text = element.tail.strip() element.tail = None if parent.tail and parent.tail.strip(): new_elem.tail = parent.tail.strip() parent.tail = None if len(new_elem) != 0 or new_elem.text or new_elem.tail: grand_parent.insert(grand_parent.index(element) + 1, new_elem) if len(parent) == 0 and parent.text is None: grand_parent.remove(parent)