Source code for trafilatura.core

# pylint:disable-msg=E0611,I1101
"""
Module bundling all functions needed to extract the text in a webpage.
"""

import logging
import re  # import regex as re
import warnings
from copy import deepcopy

from lxml.etree import Element, SubElement, XPath, strip_elements, strip_tags, tostring
from lxml.html import tostring

# own
from .external import (SANITIZED_XPATH, justext_rescue, sanitize_tree,
                       try_readability)
from .filters import (LANGID_FLAG, check_html_lang, duplicate_test,
                      language_filter, text_chars_test)
from .hashing import content_fingerprint
from .htmlprocessing import (convert_tags, delete_by_link_density,
                             handle_textnode, link_density_test_tables,
                             process_node, prune_unwanted_nodes, tree_cleaning)
from .metadata import Document, extract_metadata
from .settings import BASIC_CLEAN_XPATH, DEFAULT_CONFIG, TAG_CATALOG, use_config
from .utils import (is_image_file, load_html, normalize_unicode, trim,
                    FORMATTING_PROTECTED)
from .xml import (build_json_output, build_tei_output, build_xml_output, control_xml_output,
                  remove_empty_elements, strip_double_tags, xmltotxt, xmltocsv)
from .xpaths import (BODY_XPATH, COMMENTS_DISCARD_XPATH, COMMENTS_XPATH,
                     DISCARD_IMAGE_ELEMENTS, OVERALL_DISCARD_XPATH,
                     PAYWALL_DISCARD_XPATH, PRECISION_DISCARD_XPATH,
                     REMOVE_COMMENTS_XPATH, TEASER_DISCARD_XPATH)

LOGGER = logging.getLogger(__name__)

P_FORMATTING = {'hi', 'ref'}
TABLE_ELEMS = {'td', 'th'}
TABLE_ALL = {'td', 'th', 'hi'}
FORMATTING = {'hi', 'ref', 'span'}
CODES_QUOTES = {'code', 'quote'}
NOT_AT_THE_END = {'head', 'ref'}

JSON_SEARCH = re.compile(r'"articlebody": *"(.+?)(?<!\\)"', re.I)


class Extractor:
    "Defines a class to store all extraction options."
    __slots__ = [
    'config', 'fast', 'precision', 'recall', 'comments',
    'formatting', 'links', 'images', 'tables', 'dedup', 'lang',
    ]
    # consider dataclasses for Python 3.7+
    def __init__(self, config, fast, precision, recall, comments,
                 formatting, links, images, tables, deduplicate,
                 target_language):
        self.config = config
        self.fast = fast
        self.precision = precision
        self.recall = recall
        self.comments = comments
        self.formatting = formatting
        self.links = links
        self.images = images
        self.tables = tables
        self.dedup = deduplicate
        self.lang = target_language


def handle_titles(element, options):
    '''Process head elements (titles)'''
    if len(element) == 0:
        # maybe needs attention?
        # if element.tail and re.search(r'\w', element.tail):
        #    LOGGER.debug('tail in title, stripping: %s', element.tail)
        #    element.tail = None
        title = process_node(element, options)
    # children
    else:
        title = deepcopy(element)
        # list instead of element.iter('*')
        # TODO: write tests for it and check
        for child in list(element):
            # if child.tag not in potential_tags:
            #    LOGGER.debug('unexpected in title: %s %s %s', child.tag, child.text, child.tail)
            #    continue
            processed_child = handle_textnode(child, options, comments_fix=False)
            if processed_child is not None:
                title.append(processed_child)
            child.tag = 'done'
    if title is not None and text_chars_test(''.join(title.itertext())) is True:
        return title
    return None


def handle_formatting(element, options):
    '''Process formatting elements (b, i, etc. converted to hi) found
       outside of paragraphs'''
    formatting = process_node(element, options)
    if len(element) == 0 and formatting is None:
        return None
    # repair orphan elements
    # if formatting is None:
    #    formatting = Element(element.tag)
    #     return None
    # if len(element) > 0:
    #    for child in element.iter('*'):
    #        if child.tag not in potential_tags:
    #            LOGGER.debug('unexpected in title: %s %s %s', child.tag, child.text, child.tail)
    #            continue
    #        processed_child = handle_textnode(child, options, comments_fix=False)
    #        if processed_child is not None:
    #            formatting.append(processed_child)
    #        child.tag = 'done'
    # if text_chars_test(element.text) is True:
    #    processed_child.text = trim(element.text)
    # if text_chars_test(element.tail) is True:
    #    processed_child.tail = trim(element.tail)
    # if len(element) == 0:
    #    processed_element = process_node(element, options)
    # children
    # else:
    #    processed_element = Element(element.tag)
    #    processed_element.text, processed_element.tail = element.text, element.tail
    #    for child in element.iter('*'):
    #        processed_child = handle_textnode(child, options, comments_fix=False)
    #        if processed_child is not None:
    #            processed_element.append(processed_child)
    #        child.tag = 'done'
    # repair orphan elements
    # shorter code but triggers warning:
    # parent = element.getparent() or element.getprevious()
    parent = element.getparent()
    if parent is None:
        parent = element.getprevious()
    if parent is None or parent.tag not in FORMATTING_PROTECTED:
        processed_element = Element('p')
        processed_element.insert(0, formatting)
    else:
        processed_element = formatting
    return processed_element


def handle_lists(element, options):
    '''Process lists elements'''
    processed_element = Element(element.tag)
    if element.text is not None and element.text.strip():
        newchildelem = SubElement(processed_element, "item")
        newchildelem.text = element.text
    # if element.tail is not None:
    #    processed_element.tail = element.text
    for child in element.iter('item'):
        newchildelem = Element('item')
        if len(child) == 0:
            processed_child = process_node(child, options)
            if processed_child is not None:
                newchildelem.text = processed_child.text
                if processed_child.tail is not None and processed_child.tail.strip():
                    newchildelem.text += " " + processed_child.tail
                processed_element.append(newchildelem)
        else:
            newchildelem.text = child.text
            # proceed with iteration, fix for nested elements
            for subelem in child.iterdescendants('*'):
                # beware of nested lists
                if subelem.tag == 'list':
                    processed_subchild = handle_lists(subelem, options)
                    if processed_subchild is not None:
                        newchildelem.append(processed_subchild)
                else:
                    processed_subchild = handle_textnode(subelem, options, comments_fix=False)
                    # add child element to processed_element
                    if processed_subchild is not None:
                        subchildelem = SubElement(newchildelem, processed_subchild.tag)
                        subchildelem.text, subchildelem.tail = processed_subchild.text, processed_subchild.tail
                        # set attributes
                        for attr in subelem.attrib:
                            subchildelem.set(attr, subelem.get(attr))
                # strip_tags(newchildelem, 'item')
                subelem.tag = 'done'
            if child.tail is not None and child.tail.strip():
                newchildelem_children = [el for el in newchildelem.getchildren() if el.tag != 'done']
                if newchildelem_children:
                    last_subchild = newchildelem_children[-1]
                    if last_subchild.tail is None or not last_subchild.tail.strip():
                        last_subchild.tail = child.tail
                    else:
                        last_subchild.tail += ' ' + child.tail
        if newchildelem.text or len(newchildelem) > 0:
            # set attribute
            if child.get('rend') is not None:
                newchildelem.set('rend', child.get('rend'))
            processed_element.append(newchildelem)
        child.tag = 'done'
    element.tag = 'done'
    # test if it has children and text. Avoid double tags??
    if len(processed_element) > 0 and text_chars_test(''.join(processed_element.itertext())) is True:
        # set attribute
        if element.get('rend') is not None:
            processed_element.set('rend', element.get('rend'))
        return processed_element
    return None


def is_code_block_element(element):
    # pip
    if element.get('lang') is not None or element.tag == 'code':
        return True
    # GitHub
    parent = element.getparent()
    if parent is not None and 'highlight' in parent.get('class', default=''):
        return True
    # highlightjs
    code = element.find('code')
    if code is not None and len(element.getchildren()) == 1:
        return True
    return False


def handle_code_blocks(element):
    processed_element = deepcopy(element)
    for child in element.iter('*'):
        child.tag = 'done'
    processed_element.tag = 'code'
    return processed_element


def handle_quotes(element, options):
    '''Process quotes elements'''
    if is_code_block_element(element):
        return handle_code_blocks(element)

    processed_element = Element(element.tag)
    for child in element.iter('*'):
        processed_child = process_node(child, options)  # handle_textnode(child, comments_fix=True)
        if processed_child is not None:
            newsub = SubElement(processed_element, child.tag)
            newsub.text, newsub.tail = processed_child.text, processed_child.tail
        child.tag = 'done'
    if len(processed_element) > 0 and text_chars_test(''.join(processed_element.itertext())) is True:
        # avoid double/nested tags
        strip_tags(processed_element, 'quote')
        return processed_element
    return None


def handle_other_elements(element, potential_tags, options):
    '''Handle diverse or unknown elements in the scope of relevant tags'''
    # handle w3schools code
    if element.tag == 'div' and 'w3-code' in element.get('class', default=''):
        return handle_code_blocks(element)
    # delete unwanted
    if element.tag not in potential_tags:
        if element.tag != 'done':
            LOGGER.debug('discarding element: %s %s', element.tag, element.text)
        return None
    if element.tag == 'div':
        # make a copy and prune it in case it contains sub-elements handled on their own?
        # divcopy = deepcopy(element)
        processed_element = handle_textnode(element, options, comments_fix=False, preserve_spaces=True)
        if processed_element is not None and text_chars_test(processed_element.text) is True:
            processed_element.attrib.clear()
            # small div-correction # could be moved elsewhere
            if processed_element.tag == 'div':
                processed_element.tag = 'p'
            # insert
            return processed_element
    else:
        LOGGER.debug('unexpected element seen: %s %s', element.tag, element.text)
    return None


def handle_paragraphs(element, potential_tags, options):
    '''Process paragraphs (p) elements along with their children,
       trim and clean the content'''
    element.attrib.clear()
    # strip_tags(element, 'p') # change in precision due to spaces?
    # no children
    if len(element) == 0:
        processed_element = process_node(element, options)
        if processed_element is not None:
            return processed_element
        return None
    # children
    processed_element = Element(element.tag)
    for child in element.iter('*'):
        if child.tag not in potential_tags and child.tag != 'done':
            LOGGER.debug('unexpected in p: %s %s %s', child.tag, child.text, child.tail)
            continue
        # spacing = child.tag in SPACING_PROTECTED  # todo: outputformat.startswith('xml')?
        # todo: act on spacing here?
        processed_child = handle_textnode(child, options, comments_fix=False, preserve_spaces=True)
        if processed_child is not None:
            # todo: needing attention!
            if processed_child.tag == 'p':
                LOGGER.debug('extra p within p: %s %s %s', processed_child.tag, processed_child.text,
                             processed_child.tail)
                if processed_element.text:
                    processed_element.text += ' ' + processed_child.text
                else:
                    processed_element.text = processed_child.text
                continue
            # handle formatting
            newsub = Element(child.tag)
            if processed_child.tag in P_FORMATTING:
                # check depth and clean
                if len(processed_child) > 0:
                    for item in processed_child:  # children are lists
                        if text_chars_test(item.text) is True:
                            item.text = ' ' + item.text
                        strip_tags(processed_child, item.tag)
                # correct attributes
                if child.tag == 'hi':
                    newsub.set('rend', child.get('rend'))
                elif child.tag == 'ref':
                    if child.get('target') is not None:
                        newsub.set('target', child.get('target'))
            # handle line breaks
            # elif processed_child.tag == 'lb':
            #    try:
            #        processed_child.tail = process_node(child, options).tail
            #    except AttributeError:  # no text
            #        pass
            # prepare text
            # todo: to be moved to handle_textnode()
            # if text_chars_test(processed_child.text) is False:
            #    processed_child.text = ''
            # if text_chars_test(processed_child.tail) is False:
            #    processed_child.tail = ''
            # if there are already children
            # if len(processed_element) > 0:
            #    if text_chars_test(processed_child.tail) is True:
            #        newsub.tail = processed_child.text + processed_child.tail
            #    else:
            #        newsub.tail = processed_child.text
            newsub.text, newsub.tail = processed_child.text, processed_child.tail
            processed_element.append(newsub)
        child.tag = 'done'
    # finish
    if len(processed_element) > 0:
        # clean trailing lb-elements
        if (
                processed_element[-1].tag == 'lb'
                and processed_element[-1].tail is None
        ):
            processed_element[-1].getparent().remove(processed_element[-1])
        return processed_element
    if processed_element.text:
        return processed_element
    LOGGER.debug('discarding p-child: %s', tostring(processed_element))
    return None


def define_cell_type(element):
    '''Determine cell element type and mint new element'''
    # define tag
    cell_element = Element('cell')
    if element.tag == 'th':
        cell_element.set('role', 'head')
    return cell_element


def handle_table(table_elem, potential_tags, options):
    '''Process single table element'''
    newtable = Element('table')
    newrow = Element('row')
    # strip these structural elements
    strip_tags(table_elem, 'thead', 'tbody', 'tfoot')
    # explore sub-elements
    for subelement in table_elem.iterdescendants():
        if subelement.tag == 'tr':
            # process existing row
            if len(newrow) > 0:
                newtable.append(newrow)
                newrow = Element('row')
        elif subelement.tag in TABLE_ELEMS:
            newchildelem = define_cell_type(subelement)
            # process
            if len(subelement) == 0:
                processed_cell = process_node(subelement, options)
                if processed_cell is not None:
                    newchildelem.text, newchildelem.tail = processed_cell.text, processed_cell.tail
            else:
                # proceed with iteration, fix for nested elements
                newchildelem.text, newchildelem.tail = subelement.text, subelement.tail
                subelement.tag = "done"
                for child in subelement.iterdescendants():
                    if child.tag in TABLE_ALL:
                        # todo: define attributes properly
                        if child.tag in TABLE_ELEMS:
                            # subcell_elem = define_cell_type(subelement)
                            child.tag = 'cell'
                        processed_subchild = handle_textnode(child, options, preserve_spaces=True, comments_fix=True)
                    # todo: lists in table cells
                    else:
                        # subcell_elem = Element(child.tag)
                        processed_subchild = handle_textelem(child, potential_tags.union(['div']), options)
                    # add child element to processed_element
                    if processed_subchild is not None:
                        subchildelem = SubElement(newchildelem, processed_subchild.tag)
                        subchildelem.text, subchildelem.tail = processed_subchild.text, processed_subchild.tail
                    child.tag = 'done'
            # add to tree
            if newchildelem.text or len(newchildelem) > 0:
                newrow.append(newchildelem)
        # beware of nested tables
        elif subelement.tag == 'table':
            break
        # cleanup
        subelement.tag = 'done'
    # end of processing
    if len(newrow) > 0:
        newtable.append(newrow)
    if len(newtable) > 0:
        return newtable
    return None


def handle_image(element):
    '''Process image element'''
    # image source
    processed_element = Element(element.tag)
    if is_image_file(element.get('data-src')):
        processed_element.set('src', element.get('data-src'))
    elif is_image_file(element.get('src')):
        processed_element.set('src', element.get('src'))
    else:
        # take the first corresponding attribute
        for attr in element.attrib:
            if attr.startswith('data-src') and is_image_file(element.get(attr)):
                processed_element.set('src', element.get(attr))
                break
    # additional data
    if element.get('alt') is not None:
        processed_element.set('alt', element.get('alt'))
    if element.get('title') is not None:
        processed_element.set('title', element.get('title'))
    # don't return empty elements or elements without source, just None
    if len(processed_element.attrib) == 0 or not processed_element.get('src'):
        return None
    # post-processing: URLs
    url = processed_element.get('src')
    processed_element.set('src', re.sub(r'^//', 'http://', url))
    return processed_element


def handle_textelem(element, potential_tags, options):
    '''Process text element and determine how to deal with its content'''
    new_element = None
    # bypass: nested elements
    if element.tag == 'list':
        new_element = handle_lists(element, options)
    elif element.tag in CODES_QUOTES:
        new_element = handle_quotes(element, options)
    elif element.tag == 'head':
        new_element = handle_titles(element, options)
    elif element.tag == 'p':
        new_element = handle_paragraphs(element, potential_tags, options)
    elif element.tag == 'lb':
        if text_chars_test(element.tail) is True:
            element = process_node(element, options)
            if element is not None:
                new_element = Element('p')
                new_element.text = element.tail
    elif element.tag in FORMATTING:
        new_element = handle_formatting(element, options)  # process_node(element, options)
    elif element.tag == 'table' and 'table' in potential_tags:
        new_element = handle_table(element, potential_tags, options)
    elif element.tag == 'graphic' and 'graphic' in potential_tags:
        new_element = handle_image(element)
    else:
        # other elements (div, ??, ??)
        new_element = handle_other_elements(element, potential_tags, options)
    return new_element


def recover_wild_text(tree, result_body, options, potential_tags=TAG_CATALOG):
    '''Look for all previously unconsidered wild elements, including outside of the determined
       frame and throughout the document to recover potentially missing text parts'''
    LOGGER.debug('Recovering wild text elements')
    search_expr = './/blockquote|.//code|.//p|.//pre|.//q|.//quote|.//table|.//div[contains(@class, \'w3-code\')]'
    if options.recall is True:
        potential_tags.update(['div', 'lb'])
        search_expr += '|.//div|.//lb|.//list'
    # prune
    search_tree = prune_unwanted_sections(tree, potential_tags, options)
    # decide if links are preserved
    if 'ref' not in potential_tags:
        strip_tags(search_tree, 'a', 'ref', 'span')
    else:
        strip_tags(search_tree, 'span')
    subelems = search_tree.xpath(search_expr)
    result_body.extend(filter(lambda x: x is not None, (handle_textelem(e, potential_tags, options)
                       for e in subelems)))
    return result_body


def prune_unwanted_sections(tree, potential_tags, options):
    'Rule-based deletion of targeted document sections'
    # prune the rest
    tree = prune_unwanted_nodes(tree, OVERALL_DISCARD_XPATH, with_backup=True)
    tree = prune_unwanted_nodes(tree, PAYWALL_DISCARD_XPATH)
    # decide if images are preserved
    if 'graphic' not in potential_tags:
        tree = prune_unwanted_nodes(tree, DISCARD_IMAGE_ELEMENTS)
    # balance precision/recall
    if options.recall is False:
        tree = prune_unwanted_nodes(tree, TEASER_DISCARD_XPATH)
        if options.precision is True:
            tree = prune_unwanted_nodes(tree, PRECISION_DISCARD_XPATH)
    # remove elements by link density
    tree = delete_by_link_density(tree, 'div', backtracking=True, favor_precision=options.precision)
    tree = delete_by_link_density(tree, 'list', backtracking=False, favor_precision=options.precision)
    tree = delete_by_link_density(tree, 'p', backtracking=False, favor_precision=options.precision)
    # also filter fw/head, table and quote elements?
    if options.precision is True:
        # delete trailing titles
        while len(tree) > 0 and (tree[-1].tag == 'head'):
            tree[-1].getparent().remove(tree[-1])
        tree = delete_by_link_density(tree, 'head', backtracking=False)  # favor_precision=options.precision
        tree = delete_by_link_density(tree, 'quote', backtracking=False)  # favor_precision=options.precision
    return tree


def extract_content(tree, options):
    '''Find the main content of a page using a set of XPath expressions,
       then extract relevant elements, strip them of unwanted subparts and
       convert them'''
    # backup
    backup_tree = deepcopy(tree)
    # init
    result_body = Element('body')
    potential_tags = set(TAG_CATALOG)
    if options.tables is True:
        potential_tags.update(['table', 'td', 'th', 'tr'])
    if options.images is True:
        potential_tags.add('graphic')
    if options.links is True:
        potential_tags.add('ref')
    # iterate
    for expr in BODY_XPATH:
        # select tree if the expression has been found
        try:
            subtree = expr(tree)[0]
        except IndexError:
            continue
        # prune the subtree
        subtree = prune_unwanted_sections(subtree, potential_tags, options)
        # second pass?
        # subtree = delete_by_link_density(subtree, 'list', backtracking=False, favor_precision=options.precision)
        if 'table' in potential_tags or options.precision is True:
            for elem in subtree.iter('table'):
                if link_density_test_tables(elem) is True:
                    elem.getparent().remove(elem)
        # skip if empty tree
        if len(subtree) == 0:
            continue
        # no paragraphs containing text, or not enough
        ptest = subtree.xpath('//p//text()')
        if options.recall is True:
            factor = 5
        elif options.precision is True:
            factor = 1
        else:
            factor = 3
        if not ptest or len(''.join(ptest)) < options.config.getint('DEFAULT', 'MIN_EXTRACTED_SIZE') * factor:
            potential_tags.add('div')
        # polish list of potential tags
        if 'ref' not in potential_tags:
            strip_tags(subtree, 'ref')
        if 'span' not in potential_tags:
            strip_tags(subtree, 'span')
        LOGGER.debug(sorted(potential_tags))
        # proper extraction
        subelems = subtree.xpath('.//*')
        # e.g. only lb-elems in a div
        if {e.tag for e in subelems} == {'lb'}:
            subelems = [subtree]
        # extract content
        result_body.extend(filter(lambda x: x is not None, (handle_textelem(e, potential_tags, options) for e in subelems)))
        # remove trailing titles
        while len(result_body) > 0 and (result_body[-1].tag in NOT_AT_THE_END):
            result_body[-1].getparent().remove(result_body[-1])
        # exit the loop if the result has children
        if len(result_body) > 1:
            LOGGER.debug(expr)
            break
    temp_text = ' '.join(result_body.itertext()).strip()
    # try parsing wild <p> elements if nothing found or text too short
    # todo: test precision and recall settings here
    if len(result_body) == 0 or len(temp_text) < options.config.getint('DEFAULT', 'MIN_EXTRACTED_SIZE'):
        result_body = recover_wild_text(backup_tree, result_body, options, potential_tags)
        temp_text = ' '.join(result_body.itertext()).strip()
    # filter output
    strip_elements(result_body, 'done')
    strip_tags(result_body, 'div')
    # return
    return result_body, temp_text, len(temp_text)


def process_comments_node(elem, potential_tags, options):
    '''Process comment node and determine how to deal with its content'''
    if elem.tag in potential_tags:
        # print(elem.tag, elem.text_content())
        processed_element = handle_textnode(elem, options, comments_fix=True)
        # test length and remove
        if processed_element is not None:  # and processed_element.text not in COMMENTS_BLACKLIST:
            processed_element.attrib.clear()
            # if textfilter(elem) is True:  # ^Pingback
            #    return None
            return processed_element
    return None


[docs] def extract_comments(tree, options): '''Try and extract comments out of potential sections in the HTML''' comments_body = Element('body') # define iteration strategy potential_tags = set(TAG_CATALOG) # 'span' # potential_tags.add('div') trouble with <div class="comment-author meta"> for expr in COMMENTS_XPATH: # select tree if the expression has been found subtree = expr(tree) if not subtree: continue subtree = subtree[0] # prune subtree = prune_unwanted_nodes(subtree, COMMENTS_DISCARD_XPATH) # todo: unified stripping function, taking include_links into account strip_tags(subtree, 'a', 'ref', 'span') # extract content # for elem in subtree.xpath('.//*'): # processed_elem = process_comments_node(elem, potential_tags) # if processed_elem is not None: # comments_body.append(processed_elem) # processed_elems = (process_comments_node(elem, potential_tags, options) for elem in # subtree.xpath('.//*')) comments_body.extend(filter(lambda x: x is not None, (process_comments_node(e, potential_tags, options) for e in subtree.xpath('.//*')))) # control if len(comments_body) > 0: # if it has children LOGGER.debug(expr) # remove corresponding subtree subtree.getparent().remove(subtree) break # lengths temp_comments = ' '.join(comments_body.itertext()).strip() return comments_body, temp_comments, len(temp_comments), tree
def compare_extraction(tree, backup_tree, url, body, text, len_text, options): '''Decide whether to choose own or external extraction based on a series of heuristics''' min_target_length = options.config.getint('DEFAULT', 'MIN_EXTRACTED_SIZE') # bypass for recall if options.recall is True and len_text > min_target_length * 10: return body, text, len_text algo_flag, jt_result = False, False # prior cleaning backup_tree = prune_unwanted_nodes(backup_tree, PAYWALL_DISCARD_XPATH) if options.precision is True: backup_tree = prune_unwanted_nodes(backup_tree, OVERALL_DISCARD_XPATH) # try with readability temppost_algo = try_readability(backup_tree) # unicode fix necessary on certain systems (#331) algo_text = trim(tostring(temppost_algo, method='text', encoding='utf-8').decode('utf-8')) len_algo = len(algo_text) # compare LOGGER.debug('extracted length: %s (algorithm) %s (extraction)', len_algo, len_text) # conditions to use alternative algorithms if len_algo in (0, len_text): algo_flag = False elif len_text == 0 and len_algo > 0: algo_flag = True elif len_text > 2 * len_algo: algo_flag = False elif len_algo > 2 * len_text: algo_flag = True # borderline cases elif not body.xpath('.//p//text()') and len_algo > min_target_length * 2: algo_flag = True elif len(body.findall('.//table')) > len(body.findall('.//p')) and len_algo > min_target_length * 2: algo_flag = True # https://github.com/adbar/trafilatura/issues/354 elif options.recall is True and not body.xpath('.//head') and temppost_algo.xpath('.//h2|.//h3|.//h4') and len_algo > len_text: algo_flag = True else: LOGGER.debug('extraction values: %s %s for %s', len_text, len_algo, url) algo_flag = False # apply decision if algo_flag: body, text, len_text = temppost_algo, algo_text, len_algo LOGGER.debug('using generic algorithm: %s', url) else: LOGGER.debug('using custom extraction: %s', url) # override faulty extraction: try with justext if body.xpath(SANITIZED_XPATH) or len_text < min_target_length: # body.find(...) # or options.recall is True ? LOGGER.debug('unclean document triggering justext examination: %s', url) # tree = prune_unwanted_sections(tree, {}, options) body2, text2, len_text2, jt_result = justext_rescue(tree, url, options.lang, body, 0, '') # prevent too short documents from replacing the main text if jt_result is True and not len_text > 4*len_text2: # threshold could be adjusted LOGGER.debug('using justext, length: %s', len_text2) body, text, len_text = body2, text2, len_text2 # post-processing: remove unwanted sections if algo_flag is True and jt_result is False: body, text, len_text = sanitize_tree(body, options) return body, text, len_text def basic_cleaning(tree): "Remove a few section types from the document." for elem in BASIC_CLEAN_XPATH(tree): elem.getparent().remove(elem) return tree
[docs] def baseline(filecontent): """Use baseline extraction function targeting text paragraphs and/or JSON metadata. Args: filecontent: HTML code as binary string or string. Returns: A LXML <body> element containing the extracted paragraphs, the main text as string, and its length as integer. """ tree = load_html(filecontent) postbody = Element('body') if tree is None: return postbody, '', 0 # scrape from json text for elem in tree.iterfind('.//script[@type="application/ld+json"]'): if elem.text and '"article' in elem.text: mymatch = JSON_SEARCH.search(elem.text) if mymatch: elem = SubElement(postbody, 'p') elem.text = trim(mymatch[1].replace('\\"', '"')) return postbody, elem.text, len(elem.text) tree = basic_cleaning(tree) # scrape from article tag article_elem = tree.find('.//article') if article_elem is not None: temp_text = trim(article_elem.text_content()) if len(temp_text) > 100: elem = SubElement(postbody, 'p') elem.text = temp_text return postbody, temp_text, len(temp_text) # scrape from text paragraphs results = set() for element in tree.iter('blockquote', 'code', 'p', 'pre', 'q', 'quote'): entry = element.text_content() if entry not in results: elem = SubElement(postbody, 'p') elem.text = entry results.add(entry) temp_text = trim('\n'.join(postbody.itertext())) if len(temp_text) > 100: return postbody, temp_text, len(temp_text) # default strategy: clean the tree and take everything postbody = Element('body') body_elem = tree.find('.//body') if body_elem is not None: # elem.text = trim(body_elem.text_content()) text = '\n'.join([trim(e) for e in body_elem.itertext()]) if len(text) > 100: elem = SubElement(postbody, 'p') elem.text = text return postbody, text, len(text) # new fallback text = html2txt(tree) elem = SubElement(postbody, 'p') elem.text = text return postbody, text, len(text)
# old: return postbody, '', 0
[docs] def html2txt(content): """Run basic html2txt on a document. Args: content: HTML document as string or LXML element. Returns: The extracted text in the form of a string or an empty string. """ tree = load_html(content) if tree is None: return "" body = tree.find(".//body") if body is None: return "" tree = basic_cleaning(tree) return " ".join(body.text_content().split()).strip()
def determine_returnstring(document, output_format, include_formatting, tei_validation): '''Convert XML tree to chosen format, clean the result and output it as a string''' # XML (TEI) steps if 'xml' in output_format: # last cleaning for element in document.body.iter('*'): if element.tag != 'graphic' and len(element) == 0 and not element.text and not element.tail: parent = element.getparent() # do not remove elements inside <code> to preserve formatting if parent is not None and parent.tag != 'code': parent.remove(element) # build output trees strip_double_tags(document.body) remove_empty_elements(document.body) if output_format == 'xml': output = build_xml_output(document) elif output_format == 'xmltei': output = build_tei_output(document) # can be improved returnstring = control_xml_output(output, output_format, tei_validation, document) # CSV elif output_format == 'csv': returnstring = xmltocsv(document, include_formatting) # JSON elif output_format == 'json': returnstring = build_json_output(document) # TXT else: returnstring = xmltotxt(document.body, include_formatting) if document.commentsbody is not None: comments_text = xmltotxt(document.commentsbody, include_formatting) returnstring = f"{returnstring}\n{comments_text}".strip() # normalize Unicode format (defaults to NFC) return normalize_unicode(returnstring)
[docs] def bare_extraction(filecontent, url=None, no_fallback=False, # fast=False, favor_precision=False, favor_recall=False, include_comments=True, output_format='python', target_language=None, include_tables=True, include_images=False, include_formatting=False, include_links=False, deduplicate=False, date_extraction_params=None, only_with_metadata=False, with_metadata=False, max_tree_size=None, url_blacklist=None, author_blacklist=None, as_dict=True, prune_xpath=None, config=DEFAULT_CONFIG): """Internal function for text extraction returning bare Python variables. Args: filecontent: HTML code as string. url: URL of the webpage. no_fallback: Use faster heuristics and skip backup extraction. favor_precision: prefer less text but correct extraction. favor_recall: prefer more text even when unsure. include_comments: Extract comments along with the main text. output_format: Define an output format, Python being the default and the interest of this internal function. Other values: "txt", "csv", "json", "xml", or "xmltei". target_language: Define a language to discard invalid documents (ISO 639-1 format). include_tables: Take into account information within the HTML <table> element. include_images: Take images into account (experimental). include_formatting: Keep structural elements related to formatting (present in XML format, converted to markdown otherwise). include_links: Keep links along with their targets (experimental). deduplicate: Remove duplicate segments and documents. date_extraction_params: Provide extraction parameters to htmldate as dict(). only_with_metadata: Only keep documents featuring all essential metadata (date, title, url). max_tree_size: Discard documents with too many elements. url_blacklist: Provide a blacklist of URLs as set() to filter out documents. author_blacklist: Provide a blacklist of Author Names as set() to filter out authors. as_dict: Legacy option, return a dictionary instead of a class with attributes. prune_xpath: Provide an XPath expression to prune the tree before extraction. can be str or list of str. config: Directly provide a configparser configuration. Returns: A Python dict() containing all the extracted information or None. Raises: ValueError: Extraction problem. """ # init if url_blacklist is None: url_blacklist = set() # deprecation warnings if with_metadata is True: only_with_metadata = with_metadata warnings.warn( '"with_metadata" will be deprecated in a future version, use "only_with_metadata instead"', PendingDeprecationWarning ) #if no_fallback is True: # fast = no_fallback #warnings.warn( # '"no_fallback" will be deprecated in a future version, use "fast" instead', # PendingDeprecationWarning #) # load data try: tree = load_html(filecontent) if tree is None: LOGGER.error('empty HTML tree for URL %s', url) raise ValueError # quick and dirty HTML lang check if target_language is not None and (no_fallback is True or LANGID_FLAG is False): if check_html_lang(tree, target_language) is False: LOGGER.error('wrong HTML meta language for URL %s', url) raise ValueError # extract metadata if necessary if output_format != 'txt': if not date_extraction_params: date_extraction_params = { "extensive_search": config.getboolean('DEFAULT', 'EXTENSIVE_DATE_SEARCH'), } document = extract_metadata(tree, url, date_extraction_params, no_fallback, author_blacklist) # cut short if extracted URL in blacklist if document.url in url_blacklist: LOGGER.warning('blacklisted URL: %s', url) raise ValueError # cut short if core elements are missing if only_with_metadata is True and any( x is None for x in [document.date, document.title, document.url] ): LOGGER.error('no metadata for URL %s', url) raise ValueError else: document = Document() # regroup extraction options options = Extractor(config, no_fallback, favor_precision, favor_recall, include_comments, include_formatting, include_links, include_images, include_tables, deduplicate, target_language) # prune all xpath expressions that user specified # no backup as this is unetre full control of the user if prune_xpath is not None: if isinstance(prune_xpath, str): prune_xpath = [prune_xpath] tree = prune_unwanted_nodes(tree, [XPath(x) for x in prune_xpath]) # backup (or not) for further processing tree_backup_1 = deepcopy(tree) if no_fallback is False else None tree_backup_2 = deepcopy(tree) # clean + use LXML cleaner cleaned_tree = tree_cleaning(tree, options) cleaned_tree_backup = deepcopy(cleaned_tree) # convert tags, the rest does not work without conversion cleaned_tree = convert_tags(cleaned_tree, options, url or document.url) # comments first, then remove if include_comments is True: commentsbody, temp_comments, len_comments, cleaned_tree = extract_comments(cleaned_tree, options) else: commentsbody, temp_comments, len_comments = None, '', 0 if favor_precision is True: cleaned_tree = prune_unwanted_nodes(cleaned_tree, REMOVE_COMMENTS_XPATH) # extract content postbody, temp_text, len_text = extract_content(cleaned_tree, options) # compare if necessary if no_fallback is False: postbody, temp_text, len_text = compare_extraction(cleaned_tree_backup, tree_backup_1, url, postbody, temp_text, len_text, options) # add baseline as additional fallback # rescue: try to use original/dirty tree # and favor_precision is False=? if len_text < config.getint('DEFAULT', 'MIN_EXTRACTED_SIZE'): postbody, temp_text, len_text = baseline(tree_backup_2) LOGGER.debug('non-clean extracted length: %s (extraction)', len_text) # tree size sanity check if max_tree_size is not None: # strip tags if len(postbody) > max_tree_size: LOGGER.debug('output tree too long: %s', len(postbody)) strip_tags(postbody, 'hi') # still too long, raise an error if len(postbody) > max_tree_size: LOGGER.debug('output tree too long: %s, discarding file', len(postbody)) raise ValueError # size checks if len_comments < config.getint('DEFAULT', 'MIN_EXTRACTED_COMM_SIZE'): LOGGER.debug('not enough comments %s', url) if len_text < config.getint('DEFAULT', 'MIN_OUTPUT_SIZE') and len_comments < config.getint('DEFAULT', 'MIN_OUTPUT_COMM_SIZE'): LOGGER.debug('text and comments not long enough: %s %s', len_text, len_comments) raise ValueError # check duplicates at body level if deduplicate is True and duplicate_test(postbody, config) is True: LOGGER.debug('discarding duplicate document for URL %s', url) raise ValueError # sanity check on language if target_language is not None: is_not_target_lang, document = language_filter(temp_text, temp_comments, target_language, document) if is_not_target_lang is True: LOGGER.debug('wrong language for URL %s', url) raise ValueError except (TypeError, ValueError): LOGGER.warning('discarding data for url: %s', url) # document.url , record_id return None # special case: python variables if output_format == 'python': document.text = xmltotxt(postbody, include_formatting) if include_comments is True: document.comments = xmltotxt(commentsbody, include_formatting) document.commentsbody = commentsbody document.raw_text = document.text document.body = postbody else: document.raw_text, document.body, document.commentsbody = temp_text, postbody, commentsbody if as_dict is True: document = {slot: getattr(document, slot, None) for slot in document.__slots__} return document
def timeout_handler(signum, frame): '''Raise a timeout exception to handle rare malicious files''' raise RuntimeError('unusual file processing time, aborting')
[docs] def extract(filecontent, url=None, record_id=None, no_fallback=False, favor_precision=False, favor_recall=False, include_comments=True, output_format='txt', tei_validation=False, target_language=None, include_tables=True, include_images=False, include_formatting=False, include_links=False, deduplicate=False, date_extraction_params=None, only_with_metadata=False, with_metadata=False, max_tree_size=None, url_blacklist=None, author_blacklist=None, settingsfile=None, prune_xpath=None, config=DEFAULT_CONFIG, **kwargs): """Main function exposed by the package: Wrapper for text extraction and conversion to chosen output format. Args: filecontent: HTML code as string. url: URL of the webpage. record_id: Add an ID to the metadata. no_fallback: Skip the backup extraction with readability-lxml and justext. favor_precision: prefer less text but correct extraction. favor_recall: when unsure, prefer more text. include_comments: Extract comments along with the main text. output_format: Define an output format: 'txt', 'csv', 'json', 'xml', or 'xmltei'. tei_validation: Validate the XML-TEI output with respect to the TEI standard. target_language: Define a language to discard invalid documents (ISO 639-1 format). include_tables: Take into account information within the HTML <table> element. include_images: Take images into account (experimental). include_formatting: Keep structural elements related to formatting (only valuable if output_format is set to XML). include_links: Keep links along with their targets (experimental). deduplicate: Remove duplicate segments and documents. date_extraction_params: Provide extraction parameters to htmldate as dict(). only_with_metadata: Only keep documents featuring all essential metadata (date, title, url). max_tree_size: Discard documents with too many elements. url_blacklist: Provide a blacklist of URLs as set() to filter out documents. author_blacklist: Provide a blacklist of Author Names as set() to filter out authors. settingsfile: Use a configuration file to override the standard settings. prune_xpath: Provide an XPath expression to prune the tree before extraction. can be str or list of str. config: Directly provide a configparser configuration. Returns: A string in the desired format or None. """ # older, deprecated functions if kwargs and any([ # output formats 'csv_output' in kwargs, 'json_output' in kwargs, 'tei_output' in kwargs, 'xml_output' in kwargs ]): raise NameError( 'Deprecated argument: use output_format instead, e.g. output_format="xml"' ) # todo: add with_metadata later # configuration init config = use_config(settingsfile, config) # extraction try: document = bare_extraction( filecontent, url=url, no_fallback=no_fallback, favor_precision=favor_precision, favor_recall=favor_recall, include_comments=include_comments, output_format=output_format, target_language=target_language, include_tables=include_tables, include_images=include_images, include_formatting=include_formatting, include_links=include_links, deduplicate=deduplicate, date_extraction_params=date_extraction_params, only_with_metadata=only_with_metadata, with_metadata=with_metadata, max_tree_size=max_tree_size, url_blacklist=url_blacklist, author_blacklist=author_blacklist, as_dict=False, prune_xpath=prune_xpath, config=config, ) except RuntimeError: LOGGER.error('Processing timeout for %s', url) document = None # post-processing if document is None: return None if output_format != 'txt': # add record ID to metadata document.id = record_id # calculate fingerprint if document.raw_text is not None: document.fingerprint = content_fingerprint(str(document.title) + " " + str(document.raw_text)) # return return determine_returnstring(document, output_format, include_formatting, tei_validation)
# for legacy and backwards compatibility def process_record(filecontent, url=None, record_id=None, no_fallback=False, include_comments=True, target_language=None, include_tables=True): "Legacy extraction function, now deprecated." # deprecation warning warnings.warn( "process_record() is deprecated, use extract() instead", DeprecationWarning ) return extract(filecontent, url=url, record_id=record_id, no_fallback=no_fallback, include_comments=include_comments, target_language=target_language, include_tables=include_tables)