# pylint:disable-msg=E0611,I1101
"""
Module bundling all functions needed to extract the text in a webpage.
"""
## This file is available from https://github.com/adbar/trafilatura
## under GNU GPL v3 license
# standard
import logging
import re # import regex as re
import warnings
from copy import deepcopy
from lxml.etree import Element, SubElement, strip_elements, strip_tags
from lxml.html import tostring
# own
from .external import (SANITIZED_XPATH, justext_rescue, sanitize_tree,
try_readability)
from .filters import (LANGID_FLAG, check_html_lang, duplicate_test,
language_filter, text_chars_test)
from .hashing import content_fingerprint
from .htmlprocessing import (convert_tags, delete_by_link_density,
handle_textnode, link_density_test_tables,
process_node, prune_unwanted_nodes, tree_cleaning)
from .metadata import Document, extract_metadata
from .settings import DEFAULT_CONFIG, TAG_CATALOG, use_config
from .utils import is_image_file, load_html, normalize_unicode, trim, txttocsv
from .xml import (build_json_output, build_tei_output, build_xml_output,
control_xml_output, remove_empty_elements, strip_double_tags,
xmltotxt)
from .xpaths import (BODY_XPATH, COMMENTS_DISCARD_XPATH, COMMENTS_XPATH,
DISCARD_IMAGE_ELEMENTS, OVERALL_DISCARD_XPATH,
PAYWALL_DISCARD_XPATH, PRECISION_DISCARD_XPATH,
REMOVE_COMMENTS_XPATH, TEASER_DISCARD_XPATH)
LOGGER = logging.getLogger(__name__)
FORMATTING_PROTECTED = {'cell', 'head', 'hi', 'item', 'p', 'quote', 'td'}
SPACING_PROTECTED = {'code', 'hi', 'ref'}
P_FORMATTING = {'hi', 'ref'}
TABLE_ELEMS = {'td', 'th'}
TABLE_ALL = {'td', 'th', 'hi'}
FORMATTING = {'hi', 'ref', 'span'}
CODES_QUOTES = {'code', 'quote'}
NOT_AT_THE_END = {'head', 'ref'}
JSON_SEARCH = re.compile(r'"articlebody": *"(.+?)(?<!\\)"', re.I)
class Extractor:
"Defines a class to store all extraction options."
__slots__ = [
'config', 'fast', 'precision', 'recall', 'comments',
'formatting', 'links', 'images', 'tables', 'dedup', 'lang',
]
# consider dataclasses for Python 3.7+
def __init__(self, config, fast, precision, recall, comments,
formatting, links, images, tables, deduplicate,
target_language):
self.config = config
self.fast = fast
self.precision = precision
self.recall = recall
self.comments = comments
self.formatting = formatting
self.links = links
self.images = images
self.tables = tables
self.dedup = deduplicate
self.lang = target_language
def handle_titles(element, options):
'''Process head elements (titles)'''
if len(element) == 0:
# maybe needs attention?
# if element.tail and re.search(r'\w', element.tail):
# LOGGER.debug('tail in title, stripping: %s', element.tail)
# element.tail = None
title = process_node(element, options)
# children
else:
title = deepcopy(element)
# list instead of element.iter('*')
# TODO: write tests for it and check
for child in list(element):
# if child.tag not in potential_tags:
# LOGGER.debug('unexpected in title: %s %s %s', child.tag, child.text, child.tail)
# continue
processed_child = handle_textnode(child, options, comments_fix=False)
if processed_child is not None:
title.append(processed_child)
child.tag = 'done'
if title is not None and text_chars_test(''.join(title.itertext())) is True:
return title
return None
def handle_formatting(element, options):
'''Process formatting elements (b, i, etc. converted to hi) found
outside of paragraphs'''
formatting = process_node(element, options)
if len(element) == 0 and formatting is None:
return None
# repair orphan elements
# if formatting is None:
# formatting = Element(element.tag)
# return None
# if len(element) > 0:
# for child in element.iter('*'):
# if child.tag not in potential_tags:
# LOGGER.debug('unexpected in title: %s %s %s', child.tag, child.text, child.tail)
# continue
# processed_child = handle_textnode(child, options, comments_fix=False)
# if processed_child is not None:
# formatting.append(processed_child)
# child.tag = 'done'
# if text_chars_test(element.text) is True:
# processed_child.text = trim(element.text)
# if text_chars_test(element.tail) is True:
# processed_child.tail = trim(element.tail)
# if len(element) == 0:
# processed_element = process_node(element, options)
# children
# else:
# processed_element = Element(element.tag)
# processed_element.text, processed_element.tail = element.text, element.tail
# for child in element.iter('*'):
# processed_child = handle_textnode(child, options, comments_fix=False)
# if processed_child is not None:
# processed_element.append(processed_child)
# child.tag = 'done'
# repair orphan elements
# shorter code but triggers warning:
# parent = element.getparent() or element.getprevious()
parent = element.getparent()
if parent is None:
parent = element.getprevious()
if parent is None or parent.tag not in FORMATTING_PROTECTED:
processed_element = Element('p')
processed_element.insert(0, formatting)
else:
processed_element = formatting
return processed_element
def handle_lists(element, options):
'''Process lists elements'''
processed_element = Element(element.tag)
if element.text is not None and element.text.strip():
newchildelem = SubElement(processed_element, "item")
newchildelem.text = element.text
# if element.tail is not None:
# processed_element.tail = element.text
for child in element.iter('item'):
newchildelem = Element('item')
if len(child) == 0:
processed_child = process_node(child, options)
if processed_child is not None:
newchildelem.text = processed_child.text
if processed_child.tail is not None and processed_child.tail.strip():
newchildelem.text += " " + processed_child.tail
processed_element.append(newchildelem)
else:
newchildelem.text = child.text
# proceed with iteration, fix for nested elements
for subelem in child.iterdescendants('*'):
# beware of nested lists
if subelem.tag == 'list':
processed_subchild = handle_lists(subelem, options)
if processed_subchild is not None:
newchildelem.append(processed_subchild)
else:
processed_subchild = handle_textnode(subelem, options, comments_fix=False)
# add child element to processed_element
if processed_subchild is not None:
subchildelem = SubElement(newchildelem, processed_subchild.tag)
subchildelem.text, subchildelem.tail = processed_subchild.text, processed_subchild.tail
# set attributes
for attr in subelem.attrib:
subchildelem.set(attr, subelem.get(attr))
# strip_tags(newchildelem, 'item')
subelem.tag = 'done'
if child.tail is not None and child.tail.strip():
newchildelem_children = [el for el in newchildelem.getchildren() if el.tag != 'done']
if newchildelem_children:
last_subchild = newchildelem_children[-1]
if last_subchild.tail is None or not last_subchild.tail.strip():
last_subchild.tail = child.tail
else:
last_subchild.tail += ' ' + child.tail
if newchildelem.text or len(newchildelem) > 0:
# set attribute
if child.get('rend') is not None:
newchildelem.set('rend', child.get('rend'))
processed_element.append(newchildelem)
child.tag = 'done'
element.tag = 'done'
# test if it has children and text. Avoid double tags??
if len(processed_element) > 0 and text_chars_test(''.join(processed_element.itertext())) is True:
# set attribute
if element.get('rend') is not None:
processed_element.set('rend', element.get('rend'))
return processed_element
return None
def is_code_block_element(element):
# pip
if element.get('lang') is not None or element.tag == 'code':
return True
# GitHub
parent = element.getparent()
if parent is not None and 'highlight' in parent.get('class', default=''):
return True
# highlightjs
code = element.find('code')
if code is not None and len(element.getchildren()) == 1:
return True
return False
def handle_code_blocks(element):
processed_element = deepcopy(element)
for child in element.iter('*'):
child.tag = 'done'
processed_element.tag = 'code'
return processed_element
def handle_quotes(element, options):
'''Process quotes elements'''
if is_code_block_element(element):
return handle_code_blocks(element)
processed_element = Element(element.tag)
for child in element.iter('*'):
processed_child = process_node(child, options) # handle_textnode(child, comments_fix=True)
if processed_child is not None:
newsub = SubElement(processed_element, child.tag)
newsub.text, newsub.tail = processed_child.text, processed_child.tail
child.tag = 'done'
if len(processed_element) > 0 and text_chars_test(''.join(processed_element.itertext())) is True:
# avoid double/nested tags
strip_tags(processed_element, 'quote')
return processed_element
return None
def handle_other_elements(element, potential_tags, options):
'''Handle diverse or unknown elements in the scope of relevant tags'''
# handle w3schools code
if element.tag == 'div' and 'w3-code' in element.get('class', default=''):
return handle_code_blocks(element)
# delete unwanted
if element.tag not in potential_tags:
if element.tag != 'done':
LOGGER.debug('discarding element: %s %s', element.tag, element.text)
return None
if element.tag == 'div':
# make a copy and prune it in case it contains sub-elements handled on their own?
# divcopy = deepcopy(element)
processed_element = handle_textnode(element, options, comments_fix=False)
if processed_element is not None and text_chars_test(processed_element.text) is True:
processed_element.attrib.clear()
# small div-correction # could be moved elsewhere
if processed_element.tag == 'div':
processed_element.tag = 'p'
# insert
return processed_element
else:
LOGGER.debug('unexpected element seen: %s %s', element.tag, element.text)
return None
def handle_paragraphs(element, potential_tags, options):
'''Process paragraphs (p) elements along with their children,
trim and clean the content'''
element.attrib.clear()
# strip_tags(element, 'p') # change in precision due to spaces?
# no children
if len(element) == 0:
processed_element = process_node(element, options)
if processed_element is not None:
return processed_element
return None
# children
processed_element = Element(element.tag)
for child in element.iter('*'):
if child.tag not in potential_tags and child.tag != 'done':
LOGGER.debug('unexpected in p: %s %s %s', child.tag, child.text, child.tail)
continue
# spacing = child.tag in SPACING_PROTECTED # todo: outputformat.startswith('xml')?
# todo: act on spacing here?
processed_child = handle_textnode(child, options, comments_fix=False, preserve_spaces=True)
if processed_child is not None:
# todo: needing attention!
if processed_child.tag == 'p':
LOGGER.debug('extra p within p: %s %s %s', processed_child.tag, processed_child.text,
processed_child.tail)
if processed_element.text:
processed_element.text += ' ' + processed_child.text
else:
processed_element.text = processed_child.text
continue
# handle formatting
newsub = Element(child.tag)
if processed_child.tag in P_FORMATTING:
# check depth and clean
if len(processed_child) > 0:
for item in processed_child: # children are lists
if text_chars_test(item.text) is True:
item.text = ' ' + item.text
strip_tags(processed_child, item.tag)
# correct attributes
if child.tag == 'hi':
newsub.set('rend', child.get('rend'))
elif child.tag == 'ref':
if child.get('target') is not None:
newsub.set('target', child.get('target'))
# handle line breaks
# elif processed_child.tag == 'lb':
# try:
# processed_child.tail = process_node(child, options).tail
# except AttributeError: # no text
# pass
# prepare text
# todo: to be moved to handle_textnode()
# if text_chars_test(processed_child.text) is False:
# processed_child.text = ''
# if text_chars_test(processed_child.tail) is False:
# processed_child.tail = ''
# if there are already children
# if len(processed_element) > 0:
# if text_chars_test(processed_child.tail) is True:
# newsub.tail = processed_child.text + processed_child.tail
# else:
# newsub.tail = processed_child.text
newsub.text, newsub.tail = processed_child.text, processed_child.tail
processed_element.append(newsub)
child.tag = 'done'
# finish
if len(processed_element) > 0:
# clean trailing lb-elements
if (
processed_element[-1].tag == 'lb'
and processed_element[-1].tail is None
):
processed_element[-1].getparent().remove(processed_element[-1])
return processed_element
if processed_element.text:
return processed_element
LOGGER.debug('discarding p-child: %s', tostring(processed_element))
return None
def define_cell_type(element):
'''Determine cell element type and mint new element'''
# define tag
cell_element = Element('cell')
if element.tag == 'th':
cell_element.set('role', 'head')
return cell_element
def handle_table(table_elem, potential_tags, options):
'''Process single table element'''
newtable = Element('table')
newrow = Element('row')
# strip these structural elements
strip_tags(table_elem, 'thead', 'tbody', 'tfoot')
# explore sub-elements
for subelement in table_elem.iterdescendants():
if subelement.tag == 'tr':
# process existing row
if len(newrow) > 0:
newtable.append(newrow)
newrow = Element('row')
elif subelement.tag in TABLE_ELEMS:
newchildelem = define_cell_type(subelement)
# process
if len(subelement) == 0:
processed_cell = process_node(subelement, options)
if processed_cell is not None:
newchildelem.text, newchildelem.tail = processed_cell.text, processed_cell.tail
else:
# proceed with iteration, fix for nested elements
newchildelem.text, newchildelem.tail = subelement.text, subelement.tail
subelement.tag = "done"
for child in subelement.iterdescendants():
if child.tag in TABLE_ALL:
# todo: define attributes properly
if child.tag in TABLE_ELEMS:
# subcell_elem = define_cell_type(subelement)
child.tag = 'cell'
processed_subchild = handle_textnode(child, options, preserve_spaces=True, comments_fix=True)
# todo: lists in table cells
else:
# subcell_elem = Element(child.tag)
processed_subchild = handle_textelem(child, potential_tags.union(['div']), options)
# add child element to processed_element
if processed_subchild is not None:
subchildelem = SubElement(newchildelem, processed_subchild.tag)
subchildelem.text, subchildelem.tail = processed_subchild.text, processed_subchild.tail
child.tag = 'done'
# add to tree
if newchildelem.text or len(newchildelem) > 0:
newrow.append(newchildelem)
# beware of nested tables
elif subelement.tag == 'table':
break
# cleanup
subelement.tag = 'done'
# end of processing
if len(newrow) > 0:
newtable.append(newrow)
if len(newtable) > 0:
return newtable
return None
def handle_image(element):
'''Process image element'''
# image source
processed_element = Element(element.tag)
if is_image_file(element.get('data-src')):
processed_element.set('src', element.get('data-src'))
elif is_image_file(element.get('src')):
processed_element.set('src', element.get('src'))
else:
# take the first corresponding attribute
for attr in element.attrib:
if attr.startswith('data-src') and is_image_file(element.get(attr)):
processed_element.set('src', element.get(attr))
break
# additional data
if element.get('alt') is not None:
processed_element.set('alt', element.get('alt'))
if element.get('title') is not None:
processed_element.set('title', element.get('title'))
# don't return empty elements or elements without source, just None
if len(processed_element.attrib) == 0 or not processed_element.get('src'):
return None
# post-processing: URLs
url = processed_element.get('src')
processed_element.set('src', re.sub(r'^//', 'http://', url))
return processed_element
def handle_textelem(element, potential_tags, options):
'''Process text element and determine how to deal with its content'''
new_element = None
# bypass: nested elements
if element.tag == 'list':
new_element = handle_lists(element, options)
elif element.tag in CODES_QUOTES:
new_element = handle_quotes(element, options)
elif element.tag == 'head':
new_element = handle_titles(element, options)
elif element.tag == 'p':
new_element = handle_paragraphs(element, potential_tags, options)
elif element.tag == 'lb':
if text_chars_test(element.tail) is True:
element = process_node(element, options)
if element is not None:
new_element = Element('p')
new_element.text = element.tail
elif element.tag in FORMATTING:
new_element = handle_formatting(element, options) # process_node(element, options)
elif element.tag == 'table' and 'table' in potential_tags:
new_element = handle_table(element, potential_tags, options)
elif element.tag == 'graphic' and 'graphic' in potential_tags:
new_element = handle_image(element)
else:
# other elements (div, ??, ??)
new_element = handle_other_elements(element, potential_tags, options)
return new_element
def recover_wild_text(tree, result_body, options, potential_tags=TAG_CATALOG):
'''Look for all previously unconsidered wild elements, including outside of the determined
frame and throughout the document to recover potentially missing text parts'''
LOGGER.debug('Recovering wild text elements')
search_expr = './/blockquote|.//code|.//p|.//pre|.//q|.//quote|.//table|.//div[contains(@class, \'w3-code\')]'
if options.recall is True:
potential_tags.update(['div', 'lb'])
search_expr += '|.//div|.//lb|.//list'
# prune
search_tree = prune_unwanted_sections(tree, potential_tags, options)
# decide if links are preserved
if 'ref' not in potential_tags:
strip_tags(search_tree, 'a', 'ref', 'span')
else:
strip_tags(search_tree, 'span')
subelems = search_tree.xpath(search_expr)
result_body.extend(filter(lambda x: x is not None, (handle_textelem(e, potential_tags, options)
for e in subelems)))
return result_body
def prune_unwanted_sections(tree, potential_tags, options):
'Rule-based deletion of targeted document sections'
# prune the rest
tree = prune_unwanted_nodes(tree, OVERALL_DISCARD_XPATH, with_backup=True)
tree = prune_unwanted_nodes(tree, PAYWALL_DISCARD_XPATH)
# decide if images are preserved
if 'graphic' not in potential_tags:
tree = prune_unwanted_nodes(tree, DISCARD_IMAGE_ELEMENTS)
# balance precision/recall
if options.recall is False:
tree = prune_unwanted_nodes(tree, TEASER_DISCARD_XPATH)
if options.precision is True:
tree = prune_unwanted_nodes(tree, PRECISION_DISCARD_XPATH)
# remove elements by link density
tree = delete_by_link_density(tree, 'div', backtracking=True, favor_precision=options.precision)
tree = delete_by_link_density(tree, 'list', backtracking=False, favor_precision=options.precision)
tree = delete_by_link_density(tree, 'p', backtracking=False, favor_precision=options.precision)
# also filter fw/head, table and quote elements?
if options.precision is True:
# delete trailing titles
while len(tree) > 0 and (tree[-1].tag == 'head'):
tree[-1].getparent().remove(tree[-1])
tree = delete_by_link_density(tree, 'head', backtracking=False) # favor_precision=options.precision
tree = delete_by_link_density(tree, 'quote', backtracking=False) # favor_precision=options.precision
return tree
def extract_content(tree, options):
'''Find the main content of a page using a set of XPath expressions,
then extract relevant elements, strip them of unwanted subparts and
convert them'''
# backup
backup_tree = deepcopy(tree)
# init
result_body = Element('body')
potential_tags = set(TAG_CATALOG)
if options.tables is True:
potential_tags.update(['table', 'td', 'th', 'tr'])
if options.images is True:
potential_tags.add('graphic')
if options.links is True:
potential_tags.add('ref')
# iterate
for expr in BODY_XPATH:
# select tree if the expression has been found
try:
subtree = tree.xpath(expr)[0]
except IndexError:
continue
# prune the subtree
subtree = prune_unwanted_sections(subtree, potential_tags, options)
# second pass?
# subtree = delete_by_link_density(subtree, 'list', backtracking=False, favor_precision=options.precision)
if 'table' in potential_tags or options.precision is True:
for elem in subtree.iter('table'):
if link_density_test_tables(elem) is True:
elem.getparent().remove(elem)
# skip if empty tree
if len(subtree) == 0:
continue
# no paragraphs containing text, or not enough
ptest = subtree.xpath('//p//text()')
if options.recall is True:
factor = 5
elif options.precision is True:
factor = 1
else:
factor = 3
if not ptest or len(''.join(ptest)) < options.config.getint('DEFAULT', 'MIN_EXTRACTED_SIZE') * factor:
potential_tags.add('div')
# polish list of potential tags
if 'ref' not in potential_tags:
strip_tags(subtree, 'ref')
if 'span' not in potential_tags:
strip_tags(subtree, 'span')
LOGGER.debug(sorted(potential_tags))
# proper extraction
subelems = subtree.xpath('.//*')
# e.g. only lb-elems in a div
if {e.tag for e in subelems} == {'lb'}:
subelems = [subtree]
# extract content
result_body.extend(filter(lambda x: x is not None, (handle_textelem(e, potential_tags, options) for e in subelems)))
# remove trailing titles
while len(result_body) > 0 and (result_body[-1].tag in NOT_AT_THE_END):
result_body[-1].getparent().remove(result_body[-1])
# exit the loop if the result has children
if len(result_body) > 1:
LOGGER.debug(expr)
break
temp_text = ' '.join(result_body.itertext()).strip()
# try parsing wild <p> elements if nothing found or text too short
# todo: test precision and recall settings here
if len(result_body) == 0 or len(temp_text) < options.config.getint('DEFAULT', 'MIN_EXTRACTED_SIZE'):
result_body = recover_wild_text(backup_tree, result_body, options, potential_tags)
temp_text = ' '.join(result_body.itertext()).strip()
# filter output
strip_elements(result_body, 'done')
strip_tags(result_body, 'div')
# return
return result_body, temp_text, len(temp_text)
def process_comments_node(elem, potential_tags, options):
'''Process comment node and determine how to deal with its content'''
if elem.tag in potential_tags:
# print(elem.tag, elem.text_content())
processed_element = handle_textnode(elem, options, comments_fix=True)
# test length and remove
if processed_element is not None: # and processed_element.text not in COMMENTS_BLACKLIST:
processed_element.attrib.clear()
# if textfilter(elem) is True: # ^Pingback
# return None
return processed_element
return None
def compare_extraction(tree, backup_tree, url, body, text, len_text, options):
'''Decide whether to choose own or external extraction
based on a series of heuristics'''
min_target_length = options.config.getint('DEFAULT', 'MIN_EXTRACTED_SIZE')
# bypass for recall
if options.recall is True and len_text > min_target_length * 10:
return body, text, len_text
algo_flag, jt_result = False, False
# prior cleaning
backup_tree = prune_unwanted_nodes(backup_tree, PAYWALL_DISCARD_XPATH)
if options.precision is True:
backup_tree = prune_unwanted_nodes(backup_tree, OVERALL_DISCARD_XPATH)
# try with readability
temppost_algo = try_readability(backup_tree)
# unicode fix necessary on certain systems (#331)
algo_text = trim(tostring(temppost_algo, method='text', encoding='utf-8').decode('utf-8'))
len_algo = len(algo_text)
# compare
LOGGER.debug('extracted length: %s (algorithm) %s (extraction)', len_algo, len_text)
# conditions to use alternative algorithms
if len_algo in (0, len_text):
algo_flag = False
elif len_text == 0 and len_algo > 0:
algo_flag = True
elif len_text > 2 * len_algo:
algo_flag = False
elif len_algo > 2 * len_text:
algo_flag = True
# borderline cases
elif not body.xpath('.//p//text()') and len_algo > min_target_length * 2:
algo_flag = True
elif len(body.findall('.//table')) > len(body.findall('.//p')) and len_algo > min_target_length * 2:
algo_flag = True
# https://github.com/adbar/trafilatura/issues/354
elif options.recall is True and not body.xpath('.//head') and temppost_algo.xpath('.//h2|.//h3|.//h4') and len_algo > len_text:
algo_flag = True
else:
LOGGER.debug('extraction values: %s %s for %s', len_text, len_algo, url)
algo_flag = False
# apply decision
if algo_flag:
body, text, len_text = temppost_algo, algo_text, len_algo
LOGGER.debug('using generic algorithm: %s', url)
else:
LOGGER.debug('using custom extraction: %s', url)
# override faulty extraction: try with justext
if body.xpath(SANITIZED_XPATH) or len_text < min_target_length: # body.find(...)
# or options.recall is True ?
LOGGER.debug('unclean document triggering justext examination: %s', url)
# tree = prune_unwanted_sections(tree, {}, options)
body2, text2, len_text2, jt_result = justext_rescue(tree, url, options.lang, body, 0, '')
# prevent too short documents from replacing the main text
if jt_result is True and not len_text > 4*len_text2: # threshold could be adjusted
LOGGER.debug('using justext, length: %s', len_text2)
body, text, len_text = body2, text2, len_text2
# post-processing: remove unwanted sections
if algo_flag is True and jt_result is False:
body, text, len_text = sanitize_tree(body, options)
return body, text, len_text
[docs]
def baseline(filecontent):
"""Use baseline extraction function targeting text paragraphs and/or JSON metadata.
Args:
filecontent: HTML code as binary string or string.
Returns:
A LXML <body> element containing the extracted paragraphs,
the main text as string, and its length as integer.
"""
tree = load_html(filecontent)
postbody = Element('body')
if tree is None:
return postbody, '', 0
# scrape from json text
for elem in tree.iterfind('.//script[@type="application/ld+json"]'):
if elem.text and '"article' in elem.text:
mymatch = JSON_SEARCH.search(elem.text)
if mymatch:
elem = SubElement(postbody, 'p')
elem.text = trim(mymatch[1].replace('\\"', '"'))
return postbody, elem.text, len(elem.text)
# basic tree cleaning
for elem in tree.xpath('.//aside|.//footer|.//script|.//style'):
elem.getparent().remove(elem)
# scrape from article tag
article_elem = tree.find('.//article')
if article_elem is not None:
temp_text = trim(article_elem.text_content())
if len(temp_text) > 100:
elem = SubElement(postbody, 'p')
elem.text = temp_text
return postbody, temp_text, len(temp_text)
# scrape from text paragraphs
results = set()
for element in tree.iter('blockquote', 'code', 'p', 'pre', 'q', 'quote'):
entry = element.text_content()
if entry not in results:
elem = SubElement(postbody, 'p')
elem.text = entry
results.add(entry)
temp_text = trim('\n'.join(postbody.itertext()))
if len(temp_text) > 100:
return postbody, temp_text, len(temp_text)
# default strategy: clean the tree and take everything
postbody = Element('body')
body_elem = tree.find('.//body')
if body_elem is not None:
# elem.text = trim(body_elem.text_content())
text = '\n'.join([trim(e) for e in body_elem.itertext()])
if len(text) > 100:
elem = SubElement(postbody, 'p')
elem.text = text
return postbody, text, len(text)
# new fallback
text = html2txt(tree)
elem = SubElement(postbody, 'p')
elem.text = text
return postbody, text, len(text)
# old: return postbody, '', 0
[docs]
def html2txt(content):
"""Run basic html2txt on a document.
Args:
content: HTML document as string or LXML element.
Returns:
The extracted text in the form of a string or an empty string.
"""
tree = load_html(content)
if tree is None:
return ''
return ' '.join(tree.text_content().split()).strip()
def determine_returnstring(document, output_format, include_formatting, tei_validation):
'''Convert XML tree to chosen format, clean the result and output it as a string'''
# XML (TEI) steps
if 'xml' in output_format:
# last cleaning
for element in document.body.iter('*'):
if element.tag != 'graphic' and len(element) == 0 and not element.text and not element.tail:
parent = element.getparent()
# do not remove elements inside <code> to preserve formatting
if parent is not None and parent.tag != 'code':
parent.remove(element)
# build output trees
strip_double_tags(document.body)
remove_empty_elements(document.body)
if output_format == 'xml':
output = build_xml_output(document)
elif output_format == 'xmltei':
output = build_tei_output(document)
# can be improved
returnstring = control_xml_output(output, output_format, tei_validation, document)
# CSV
elif output_format == 'csv':
posttext = xmltotxt(document.body, include_formatting)
if document.commentsbody is not None:
commentstext = xmltotxt(document.commentsbody, include_formatting)
else:
commentstext = ''
returnstring = txttocsv(posttext, commentstext, document)
# JSON
elif output_format == 'json':
returnstring = build_json_output(document)
# TXT
else:
returnstring = xmltotxt(document.body, include_formatting)
if document.commentsbody is not None:
comments_text = xmltotxt(document.commentsbody, include_formatting)
returnstring = f"{returnstring}\n{comments_text}".strip()
# normalize Unicode format (defaults to NFC)
return normalize_unicode(returnstring)
def timeout_handler(signum, frame):
'''Raise a timeout exception to handle rare malicious files'''
raise RuntimeError('unusual file processing time, aborting')
# for legacy and backwards compatibility
def process_record(filecontent, url=None, record_id=None, no_fallback=False,
include_comments=True, target_language=None,
include_tables=True):
"Legacy extraction function, now deprecated."
# deprecation warning
warnings.warn(
"process_record() is deprecated, use extract() instead",
DeprecationWarning
)
return extract(filecontent, url=url, record_id=record_id, no_fallback=no_fallback,
include_comments=include_comments, target_language=target_language,
include_tables=include_tables)