Source code for trafilatura.spider

# pylint:disable-msg=E0611,E1101,I1101
"""
Functions dedicated to website navigation and crawling/spidering.
"""

import logging
import urllib.robotparser

from collections import deque
from time import sleep

from courlan import extract_links, fix_relative_urls, get_hostinfo, get_host_and_path, is_navigation_page, is_not_crawlable
from lxml import etree

from .core import baseline
from .downloads import fetch_url
# from .feeds import find_feed_urls # extract_links ad extract_feed_links
from .settings import DEFAULT_CONFIG
from .utils import decode_response, load_html, uniquify_list

# language detection
try:
    import cld3
    LANGID_FLAG = True
except ImportError:
    LANGID_FLAG = False

LOGGER = logging.getLogger(__name__)


def refresh_detection(htmlstring, homepage):
    "Check if there could be a redirection by meta-refresh tag."
    if '"refresh"' in htmlstring or '"REFRESH"' in htmlstring:
        try:
            html_tree = load_html(htmlstring)
            # test meta-refresh redirection
            # https://stackoverflow.com/questions/2318446/how-to-follow-meta-refreshes-in-python
            attr = html_tree.xpath('//meta[@http-equiv="refresh"]/@content|//meta[@http-equiv="REFRESH"]/@content')[0]
            _, text = attr.split(';')
            text = text.strip().lower()
            if text.startswith('url=') or text.startswith('URL='):
                url2 = text[4:]
                if not url2.startswith('http'):
                    # Relative URL, adapt
                    _, base_url = get_hostinfo(url2)
                    url2 = fix_relative_urls(base_url, url2)
                # second fetch
                newhtmlstring = fetch_url(url2)
                if newhtmlstring is None:
                    logging.warning('failed redirect: %s', url2)
                    return None, None
                #else:
                htmlstring, homepage = newhtmlstring, url2
                logging.info('successful redirect: %s', url2)
        except (IndexError, etree.ParserError, etree.XMLSyntaxError, etree.XPathEvalError) as err:
            logging.info('no redirect found: %s %s', homepage, err)
    return htmlstring, homepage


def probe_alternative_homepage(homepage):
    "Check if the homepage is redirected and return appropriate values."
    response = fetch_url(homepage, decode=False)
    if response is None or response == '':
        return None, None, None
    # get redirected URL here?
    if response.url != homepage:
        logging.info('followed redirect: %s', response.url)
        homepage = response.url
    # decode response
    htmlstring = decode_response(response.data)
    # is there a meta-refresh on the page?
    htmlstring, homepage = refresh_detection(htmlstring, homepage)
    logging.info('fetching homepage OK: %s', homepage)
    _, base_url = get_hostinfo(homepage)
    return htmlstring, homepage, base_url


def is_known_link(link, known_links):
    "Compare the link to the existing link base."
    #if link in known_links:
    #    return True
    test1 = link.rstrip('/')
    test2 = test1 + '/'
    if test1 in known_links or test2 in known_links:
        return True
    if link[:5] == 'https':
        testlink = link[:4] + link[:5]
    else:
        testlink = ''.join([link[:4], 's', link[4:]])
    test1, test2 = testlink.rstrip('/'), testlink.rstrip('/') + '/'
    return testlink in known_links or test1 in known_links or test2 in known_links


def find_new_links(htmlstring, base_url, known_links, language=None, rules=None):
    """Extract and filter new internal links after an optional language check."""
    new_links = []
    # reference=None
    # optional language check: run baseline extraction + language identifier
    if language is not None and LANGID_FLAG is True:
        _, text, _ = baseline(htmlstring)
        result = cld3.get_language(text)
        if result is not None and result.language != language:
            return new_links, known_links
    # iterate through the links and filter them
    for link in extract_links(htmlstring, base_url, False, language=language, with_nav=True):
        # check robots.txt rules
        if rules is not None and not rules.can_fetch("*", link):
            continue
        # sanity check
        if is_known_link(link, known_links) is True or is_not_crawlable(link):
            continue
        new_links.append(link)
        known_links.add(link)
    return new_links, known_links


def store_todo_links(todo, new_links, shortform=False):
    """Store the retrieved internal links in todo-list while prioritizing
       the navigation ones."""
    # add links to deque
    if todo is None:
        todo = deque()
    # prioritize navigation links
    # use most short links if there are no navlinks?
    for link in new_links:
        if shortform is True:
            link = get_host_and_path(link)[1]
        if is_navigation_page(link):
            todo.appendleft(link)
        else:
            todo.append(link)
    # unique list while preserving order
    return deque(uniquify_list(todo))


def process_links(htmlstring, base_url, known_links, todo, language=None, shortform=False, rules=None):
    """Examine the HTML code and process the retrieved internal links. Store
       the links in todo-list while prioritizing the navigation ones."""
    new_links, known_links = find_new_links(htmlstring, base_url, known_links, language, rules)
    todo = store_todo_links(todo, new_links, shortform)
    return todo, known_links


def process_response(response, todo, known_links, base_url, language, shortform=False, rules=None):
    """Convert urllib3 response object and extract links."""
    htmlstring = None
    # add final document URL to known_links
    if response is not None:
        known_links.add(response.url)
        if response.data is not None and response.data != '':
            # convert urllib3 response to string
            htmlstring = decode_response(response.data)
            # proceed to link extraction
            todo, known_links = process_links(htmlstring, base_url, known_links, todo, language=language, shortform=shortform, rules=rules)
    return todo, known_links, htmlstring


def init_crawl(homepage, todo, known_links, language=None, shortform=False, rules=None):
    """Start crawl by initializing variables and potentially examining the starting page."""
    # config=DEFAULT_CONFIG
    _, base_url = get_hostinfo(homepage)
    known_links = known_links or set()
    i = 0
    # fetch and parse robots.txt file if necessary
    if rules is None:
        rules = urllib.robotparser.RobotFileParser()
        rules.set_url(base_url + '/robots.txt')
        # exceptions happening here
        try:
            rules.read()
        except Exception as exc:
            LOGGER.error('cannot read robots.txt: %s', exc)
            rules = None
    # initialize crawl by visiting homepage if necessary
    if todo is None:
        todo = deque([homepage])
        todo, known_links, i, _ = crawl_page(i, base_url, todo, known_links, lang=language, shortform=shortform, rules=rules, initial=True)
    return todo, known_links, base_url, i, rules


def crawl_page(i, base_url, todo, known_links, lang=None, rules=None, initial=False, shortform=False):
    """Examine a webpage, extract navigation links and links."""
    # config=DEFAULT_CONFIG
    url = todo.popleft()
    known_links.add(url)
    if initial is True:
        # probe and process homepage
        htmlstring, homepage, base_url = probe_alternative_homepage(url)
        known_links.add(homepage) # add potentially "new" homepage
        # extract links on homepage
        todo, known_links = process_links(htmlstring, base_url, known_links, None, language=lang, shortform=shortform, rules=rules)
    else:
        response = fetch_url(url, decode=False)
        todo, known_links, htmlstring = process_response(response, todo, known_links, base_url, lang, shortform=shortform, rules=rules)
    # optional backup of gathered pages without nav-pages
    # ...
    i += 1
    return todo, known_links, i, htmlstring


[docs]def focused_crawler(homepage, max_seen_urls=10, max_known_urls=100000, todo=None, known_links=None, lang=None, config=DEFAULT_CONFIG, rules=None): """Basic crawler targeting pages of interest within a website. Args: homepage: URL of the page to first page to fetch, preferably the homepage of a website. max_seen_urls: maximum number of pages to visit, stop iterations at this number or at the exhaustion of pages on the website, whichever comes first. max_known_urls: stop if the total number of pages "known" exceeds this number. todo: provide a previously generated list of pages to visit / crawl frontier, must be in collections.deque format. known_links: provide a previously generated set of links. lang: try to target links according to language heuristics. config: use a different configuration (configparser format). rules: provide politeness rules (urllib.robotparser.RobotFileParser() format). New in version 0.9.1. Returns: List of pages to visit, deque format, possibly empty if there are no further pages to visit. Set of known links. """ todo, known_links, base_url, i, rules = init_crawl(homepage, todo, known_links, language=lang, rules=rules) # visit pages until a limit is reached while todo and i < max_seen_urls and len(known_links) <= max_known_urls: todo, known_links, i, _ = crawl_page(i, base_url, todo, known_links, lang=lang, rules=rules) sleep(get_crawl_delay(rules, default=config.getfloat('DEFAULT', 'SLEEP_TIME'))) # refocus todo-list on URLs without navigation? # [u for u in todo if not is_navigation_page(u)] return todo, known_links
def get_crawl_delay(rules, default=5): """Define sleeping time between requests (in seconds).""" delay = rules.crawl_delay("*") or 0 # backup if delay == 0: delay = default return delay def is_still_navigation(todo): """Probe if there are still navigation URLs in the queue.""" return any(is_navigation_page(url) for url in todo)