Source code for trafilatura.spider

# pylint:disable-msg=E0611,E1101,I1101
"""
Functions dedicated to website navigation and crawling/spidering.
"""

import logging

from configparser import ConfigParser
from time import sleep
from typing import List, Optional, Tuple
from urllib.robotparser import RobotFileParser

from courlan import (
    UrlStore,
    extract_links,
    fix_relative_urls,
    get_base_url,
    is_navigation_page,
    is_not_crawlable,
)

try:
    import py3langid  # type: ignore
except ImportError:
    pass

from lxml.etree import XPath, tostring

from .core import baseline, prune_unwanted_nodes
from .downloads import Response, fetch_response, fetch_url
from .settings import DEFAULT_CONFIG
from .utils import LANGID_FLAG, decode_file, load_html


LOGGER = logging.getLogger(__name__)

URL_STORE = UrlStore(compressed=False, strict=False)

ROBOTS_TXT_URL = "/robots.txt"
MAX_SEEN_URLS = 10
MAX_KNOWN_URLS = 100000


class CrawlParameters:
    "Store necessary information to manage a focused crawl."
    __slots__ = ["start", "base", "lang", "rules", "ref", "i", "known_num", "is_on", "prune_xpath"]

    def __init__(
        self,
        start: str,
        lang: Optional[str] = None,
        rules: Optional[RobotFileParser] = None,
        prune_xpath: Optional[str] = None,
    ) -> None:
        self.start: str = start
        self.base: str = self._get_base_url(start)
        self.ref: str = self._get_reference(start)
        self.lang: Optional[str] = lang
        self.rules: Optional[RobotFileParser] = rules or get_rules(self.base)
        self.i: int = 0
        self.known_num: int = 0
        self.is_on: bool = True
        self.prune_xpath: Optional[str] = prune_xpath

    def _get_base_url(self, start: str) -> str:
        "Set reference domain for the crawl."
        base: str = get_base_url(start)
        if not base:
            raise ValueError(f"cannot start crawl: {start}")
        return base

    def _get_reference(self, start: str) -> str:
        "Determine the reference URL."
        return start.rsplit("/", 1)[0] if start.count("/") >= 3 else start

    def update_metadata(self, url_store: UrlStore) -> None:
        "Adjust crawl data based on URL store info."
        self.is_on = bool(url_store.find_unvisited_urls(self.base))
        self.known_num = len(url_store.find_known_urls(self.base))

    def filter_list(self, todo: Optional[List[str]]) -> List[str]:
        "Prepare the todo list, excluding invalid URLs."
        if not todo:
            return []
        return [u for u in todo if u != self.start and self.ref in u]

    def is_valid_link(self, link: str) -> bool:
        "Run checks: robots.txt rules, URL type and crawl breadth."
        return (
            (not self.rules or self.rules.can_fetch("*", link))
            and self.ref in link
            and not is_not_crawlable(link)
        )


def refresh_detection(
    htmlstring: str, homepage: str
) -> Tuple[Optional[str], Optional[str]]:
    "Check if there could be a redirection by meta-refresh tag."
    if '"refresh"' not in htmlstring and '"REFRESH"' not in htmlstring:
        return htmlstring, homepage

    html_tree = load_html(htmlstring)
    if html_tree is None:
        return htmlstring, homepage

    # test meta-refresh redirection
    # https://stackoverflow.com/questions/2318446/how-to-follow-meta-refreshes-in-python
    results = html_tree.xpath(
        './/meta[@http-equiv="refresh" or @http-equiv="REFRESH"]/@content'
    )

    result = results[0] if results else ""

    if not result or ";" not in result:
        logging.info("no redirect found: %s", homepage)
        return htmlstring, homepage

    url2 = result.split(";")[1].strip().lower().replace("url=", "")
    if not url2.startswith("http"):
        # Relative URL, adapt
        base_url = get_base_url(url2)
        url2 = fix_relative_urls(base_url, url2)
    # second fetch
    newhtmlstring = fetch_url(url2)
    if newhtmlstring is None:
        logging.warning("failed redirect: %s", url2)
        return None, None
    # else:
    logging.info("successful redirect: %s", url2)
    return newhtmlstring, url2


def probe_alternative_homepage(
    homepage: str,
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
    "Check if the homepage is redirected and return appropriate values."
    response = fetch_response(homepage, decode=False)
    if not response or not response.data:
        return None, None, None

    # get redirected URL here?
    if response.url not in (homepage, "/"):
        logging.info("followed homepage redirect: %s", response.url)
        homepage = response.url

    # decode response
    htmlstring = decode_file(response.data)

    # is there a meta-refresh on the page?
    new_htmlstring, new_homepage = refresh_detection(htmlstring, homepage)
    if new_homepage is None:  # malformed or malicious content
        return None, None, None

    logging.debug("fetching homepage OK: %s", new_homepage)
    return new_htmlstring, new_homepage, get_base_url(new_homepage)


def parse_robots(robots_url: str, data: str) -> Optional[RobotFileParser]:
    "Parse a robots.txt file with the standard library urllib.robotparser."
    # https://github.com/python/cpython/blob/main/Lib/urllib/robotparser.py
    rules = RobotFileParser()
    rules.set_url(robots_url)
    # exceptions happening here
    try:
        rules.parse(data.splitlines())
    except Exception as exc:
        LOGGER.error("cannot read robots.txt: %s", exc)
        return None
    return rules


def get_rules(base_url: str) -> Optional[RobotFileParser]:
    "Attempt to fetch and parse robots.txt file for a given website."
    robots_url = base_url + ROBOTS_TXT_URL
    data = fetch_url(robots_url)
    return parse_robots(robots_url, data) if data else None


def is_target_language(htmlstring: str, language: Optional[str]) -> bool:
    """Run a baseline extraction and use a language detector to
    check if the content matches the target language.
    Return True if language checks are bypassed."""
    if htmlstring and language and LANGID_FLAG:
        _, text, _ = baseline(htmlstring)
        result, _ = py3langid.classify(text)
        return bool(result == language)
    return True


def is_still_navigation(todo: List[str]) -> bool:
    """Probe if there are still navigation URLs in the queue."""
    return any(is_navigation_page(url) for url in todo)


def process_links(
    htmlstring: str,
    params: CrawlParameters,
    url: Optional[str] = "",
) -> None:
    """Examine the HTML code and process the retrieved internal links.
    Extract and filter new internal links after an optional language check.
    Store the links in todo-list while prioritizing the navigation ones."""
    if not is_target_language(htmlstring, params.lang):
        return

    if htmlstring and params.prune_xpath is not None:
        if isinstance(params.prune_xpath, str):
            params.prune_xpath = [params.prune_xpath]  # type: ignore[assignment]
        tree = load_html(htmlstring)
        if tree is not None:
            tree = prune_unwanted_nodes(tree, [XPath(x) for x in params.prune_xpath])
            htmlstring = tostring(tree).decode()

    links, links_priority = [], []
    for link in extract_links(
        pagecontent=htmlstring,
        url=url or params.base,
        external_bool=False,
        language=params.lang,
        with_nav=True,
        strict=False,
    ):
        if not params.is_valid_link(link):
            continue
        if is_navigation_page(link):
            links_priority.append(link)
        else:
            links.append(link)

    URL_STORE.add_urls(urls=links, appendleft=links_priority)


def process_response(
    response: Optional[Response],
    params: CrawlParameters,
) -> None:
    """Convert urllib3 response object and extract links."""
    if response is None or not response.data:
        return
    # add final document URL to known_links
    URL_STORE.add_urls([response.url], visited=True)

    # convert urllib3 response to string and proceed to link extraction
    process_links(decode_file(response.data), params, params.base)


def init_crawl(
    start: str,
    lang: Optional[str] = None,
    rules: Optional[RobotFileParser] = None,
    todo: Optional[List[str]] = None,
    known: Optional[List[str]] = None,
    prune_xpath: Optional[str] = None,
) -> CrawlParameters:
    """Initialize crawl by setting variables, copying values to the
    URL store and retrieving the initial page if the crawl starts."""
    params = CrawlParameters(start, lang, rules, prune_xpath)

    # todo: just known or also visited?
    URL_STORE.add_urls(urls=known or [], visited=True)
    URL_STORE.add_urls(urls=params.filter_list(todo))
    URL_STORE.store_rules(params.base, params.rules)

    # visiting the start page if necessary
    if not todo:
        URL_STORE.add_urls(urls=[params.start], visited=False)
        params = crawl_page(params, initial=True)
    else:
        params.update_metadata(URL_STORE)

    return params


def crawl_page(
    params: CrawlParameters,
    initial: bool = False,
) -> CrawlParameters:
    """Examine a webpage, extract navigation links and links."""
    # config=DEFAULT_CONFIG
    url = URL_STORE.get_url(params.base)
    if not url:
        params.is_on = False
        params.known_num = len(URL_STORE.find_known_urls(params.base))
        return params

    params.i += 1

    if initial:
        # probe and process homepage
        htmlstring, homepage, new_base_url = probe_alternative_homepage(url)
        if htmlstring and homepage and new_base_url:
            # register potentially new homepage
            URL_STORE.add_urls([homepage])
            # extract links on homepage
            process_links(htmlstring, params, url=url)
    else:
        response = fetch_response(url, decode=False)
        process_response(response, params)

    # optional backup of gathered pages without nav-pages ? ...
    params.update_metadata(URL_STORE)
    return params


[docs] def focused_crawler( homepage: str, max_seen_urls: int = MAX_SEEN_URLS, max_known_urls: int = MAX_KNOWN_URLS, todo: Optional[List[str]] = None, known_links: Optional[List[str]] = None, lang: Optional[str] = None, config: ConfigParser = DEFAULT_CONFIG, rules: Optional[RobotFileParser] = None, prune_xpath: Optional[str] = None, ) -> Tuple[List[str], List[str]]: """Basic crawler targeting pages of interest within a website. Args: homepage: URL of the page to first page to fetch, preferably the homepage of a website. max_seen_urls: maximum number of pages to visit, stop iterations at this number or at the exhaustion of pages on the website, whichever comes first. max_known_urls: stop if the total number of pages "known" exceeds this number. todo: provide a previously generated list of pages to visit / crawl frontier. known_links: provide a list of previously known pages. lang: try to target links according to language heuristics. config: use a different configuration (configparser format). rules: provide politeness rules (urllib.robotparser.RobotFileParser() format). prune_xpath: remove unwanted elements from the HTML pages using XPath. Returns: List of pages to visit, deque format, possibly empty if there are no further pages to visit. Set of known links. """ params = init_crawl(homepage, lang, rules, todo, known_links, prune_xpath) sleep_time = URL_STORE.get_crawl_delay( params.base, default=config.getfloat("DEFAULT", "SLEEP_TIME") ) # visit pages until a limit is reached while ( params.is_on and params.i < max_seen_urls and params.known_num < max_known_urls ): params = crawl_page(params) sleep(sleep_time) # refocus todo-list on URLs without navigation? todo = list(dict.fromkeys(URL_STORE.find_unvisited_urls(params.base))) # [u for u in todo if not is_navigation_page(u)] known_links = list(dict.fromkeys(URL_STORE.find_known_urls(params.base))) return todo, known_links