Source code for trafilatura.downloads

# pylint:disable-msg=E0611,I1101
"""
All functions needed to steer and execute downloads of web documents.
"""

import logging
import os
import random

from concurrent.futures import ThreadPoolExecutor, as_completed
from configparser import ConfigParser
from functools import partial
from importlib.metadata import version
from io import BytesIO
from time import sleep
from typing import (
    Any,
    Callable,
    Dict,
    Generator,
    List,
    Optional,
    Set,
    Tuple,
    Union,
)

import certifi
import urllib3

from courlan import UrlStore
from courlan.network import redirection_test

from .settings import DEFAULT_CONFIG, Extractor
from .utils import URL_BLACKLIST_REGEX, decode_file, is_acceptable_length, make_chunks

try:
    from urllib3.contrib.socks import SOCKSProxyManager

    PROXY_URL = os.environ.get("http_proxy")
except ImportError:
    PROXY_URL = None

try:
    import pycurl  # type: ignore

    CURL_SHARE = pycurl.CurlShare()
    # available options:
    # https://curl.se/libcurl/c/curl_share_setopt.html
    CURL_SHARE.setopt(pycurl.SH_SHARE, pycurl.LOCK_DATA_DNS)
    CURL_SHARE.setopt(pycurl.SH_SHARE, pycurl.LOCK_DATA_SSL_SESSION)
    # not thread-safe
    # CURL_SHARE.setopt(pycurl.SH_SHARE, pycurl.LOCK_DATA_CONNECT)
    HAS_PYCURL = True
except ImportError:
    HAS_PYCURL = False


LOGGER = logging.getLogger(__name__)

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
HTTP_POOL = None
NO_CERT_POOL = None
RETRY_STRATEGY = None


def create_pool(**args: Any) -> Union[urllib3.PoolManager, Any]:
    "Configure urllib3 download pool according to user-defined settings."
    manager_class = SOCKSProxyManager if PROXY_URL else urllib3.PoolManager
    manager_args = {"proxy_url": PROXY_URL} if PROXY_URL else {}
    manager_args["num_pools"] = 50  # type: ignore[assignment]
    return manager_class(**manager_args, **args)  # type: ignore[arg-type]


DEFAULT_HEADERS = urllib3.util.make_headers(accept_encoding=True)  # type: ignore[no-untyped-call]
USER_AGENT = (
    "trafilatura/" + version("trafilatura") + " (+https://github.com/adbar/trafilatura)"
)
DEFAULT_HEADERS["User-Agent"] = USER_AGENT

FORCE_STATUS = [
    429,
    499,
    500,
    502,
    503,
    504,
    509,
    520,
    521,
    522,
    523,
    524,
    525,
    526,
    527,
    530,
    598,
]

CURL_SSL_ERRORS = {35, 54, 58, 59, 60, 64, 66, 77, 82, 83, 91}


class Response:
    "Store information gathered in a HTTP response object."
    __slots__ = ["data", "headers", "html", "status", "url"]

    def __init__(self, data: bytes, status: int, url: str) -> None:
        self.data = data
        self.headers: Optional[Dict[str, str]] = None
        self.html: Optional[str] = None
        self.status = status
        self.url = url

    def __bool__(self) -> bool:
        return self.data is not None

    def __repr__(self) -> str:
        return self.html or decode_file(self.data)

    def store_headers(self, headerdict: Dict[str, str]) -> None:
        "Store response headers if required."
        # further control steps here
        self.headers = {k.lower(): v for k, v in headerdict.items()}

    def decode_data(self, decode: bool) -> None:
        "Decode the bytestring in data and store a string in html."
        if decode and self.data:
            self.html = decode_file(self.data)

    def as_dict(self) -> Dict[str, str]:
        "Convert the response object to a dictionary."
        return {attr: getattr(self, attr) for attr in self.__slots__}


# caching throws an error
# @lru_cache(maxsize=2)
def _parse_config(config: ConfigParser) -> Tuple[Optional[List[str]], Optional[str]]:
    "Read and extract HTTP header strings from the configuration file."
    # load a series of user-agents
    myagents = config.get("DEFAULT", "USER_AGENTS", fallback="").strip()
    agent_list = myagents.splitlines() if myagents else None
    # https://developer.mozilla.org/en-US/docs/Web/HTTP/Cookies
    # todo: support for several cookies?
    mycookie = config.get("DEFAULT", "COOKIE") or None
    return agent_list, mycookie


def _determine_headers(
    config: ConfigParser, headers: Optional[Dict[str, str]] = None
) -> Dict[str, str]:
    "Internal function to decide on user-agent string."
    if config != DEFAULT_CONFIG:
        myagents, mycookie = _parse_config(config)
        headers = {}
        if myagents:
            headers["User-Agent"] = random.choice(myagents)
        if mycookie:
            headers["Cookie"] = mycookie
    return headers or DEFAULT_HEADERS


def _get_retry_strategy(config: ConfigParser) -> urllib3.util.Retry:
    "Define a retry strategy according to the config file."
    global RETRY_STRATEGY
    if not RETRY_STRATEGY:
        # or RETRY_STRATEGY.redirect != config.getint("DEFAULT", "MAX_REDIRECTS")
        RETRY_STRATEGY = urllib3.util.Retry(
            total=config.getint("DEFAULT", "MAX_REDIRECTS"),
            redirect=config.getint(
                "DEFAULT", "MAX_REDIRECTS"
            ),  # raise_on_redirect=False,
            connect=0,
            backoff_factor=config.getint("DEFAULT", "DOWNLOAD_TIMEOUT") / 2,
            status_forcelist=FORCE_STATUS,
            # unofficial: https://en.wikipedia.org/wiki/List_of_HTTP_status_codes#Unofficial_codes
        )
    return RETRY_STRATEGY


def _initiate_pool(
    config: ConfigParser, no_ssl: bool = False
) -> Union[urllib3.PoolManager, Any]:
    "Create a urllib3 pool manager according to options in the config file and HTTPS setting."
    global HTTP_POOL, NO_CERT_POOL
    pool = NO_CERT_POOL if no_ssl else HTTP_POOL

    if not pool:
        # define settings
        pool = create_pool(
            timeout=config.getint("DEFAULT", "DOWNLOAD_TIMEOUT"),
            ca_certs=None if no_ssl else certifi.where(),
            cert_reqs="CERT_NONE" if no_ssl else "CERT_REQUIRED",
        )
        # update variables
        if no_ssl:
            NO_CERT_POOL = pool
        else:
            HTTP_POOL = pool

    return pool


def _send_urllib_request(
    url: str, no_ssl: bool, with_headers: bool, config: ConfigParser
) -> Optional[Response]:
    "Internal function to robustly send a request (SSL or not) and return its result."
    try:
        pool_manager = _initiate_pool(config, no_ssl=no_ssl)

        # execute request, stop downloading as soon as MAX_FILE_SIZE is reached
        response = pool_manager.request(
            "GET",
            url,
            headers=_determine_headers(config),
            retries=_get_retry_strategy(config),
            preload_content=False,
        )
        data = bytearray()
        for chunk in response.stream(2**17):
            data.extend(chunk)
            if len(data) > config.getint("DEFAULT", "MAX_FILE_SIZE"):
                raise ValueError("MAX_FILE_SIZE exceeded")
        response.release_conn()

        # necessary for standardization
        resp = Response(bytes(data), response.status, response.geturl())
        if with_headers:
            resp.store_headers(response.headers)
        return resp

    except urllib3.exceptions.SSLError:
        LOGGER.warning("retrying after SSLError: %s", url)
        return _send_urllib_request(url, True, with_headers, config)
    except Exception as err:
        LOGGER.error("download error: %s %s", url, err)  # sys.exc_info()[0]

    return None


def _is_suitable_response(url: str, response: Response, options: Extractor) -> bool:
    "Check if the response conforms to formal criteria."
    lentest = len(response.html or response.data or "")
    if response.status != 200:
        LOGGER.error("not a 200 response: %s for URL %s", response.status, url)
        return False
    # raise error instead?
    if not is_acceptable_length(lentest, options):
        return False
    return True


def _handle_response(
    url: str, response: Response, decode: bool, options: Extractor
) -> Optional[Union[Response, str]]:  # todo: only return str
    "Internal function to run safety checks on response result."
    if _is_suitable_response(url, response, options):
        return response.html if decode else response
    # catchall
    return None


[docs] def fetch_url( url: str, no_ssl: bool = False, config: ConfigParser = DEFAULT_CONFIG, options: Optional[Extractor] = None, ) -> Optional[str]: """Downloads a web page and seamlessly decodes the response. Args: url: URL of the page to fetch. no_ssl: Do not try to establish a secure connection (to prevent SSLError). config: Pass configuration values for output control. options: Extraction options (supersedes config). Returns: Unicode string or None in case of failed downloads and invalid results. """ config = options.config if options else config response = fetch_response(url, decode=True, no_ssl=no_ssl, config=config) if response and response.data: if not options: options = Extractor(config=config) if _is_suitable_response(url, response, options): return response.html return None
[docs] def fetch_response( url: str, *, decode: bool = False, no_ssl: bool = False, with_headers: bool = False, config: ConfigParser = DEFAULT_CONFIG, ) -> Optional[Response]: """Downloads a web page and returns a full response object. Args: url: URL of the page to fetch. decode: Use html attribute to decode the data (boolean). no_ssl: Don't try to establish a secure connection (to prevent SSLError). with_headers: Keep track of the response headers. config: Pass configuration values for output control. Returns: Response object or None in case of failed downloads and invalid results. """ dl_function = _send_urllib_request if not HAS_PYCURL else _send_pycurl_request LOGGER.debug("sending request: %s", url) response = dl_function(url, no_ssl, with_headers, config) # Response if not response: # None or "" LOGGER.debug("request failed: %s", url) return None response.decode_data(decode) return response
def _pycurl_is_live_page(url: str) -> bool: "Send a basic HTTP HEAD request with pycurl." page_exists = False # Initialize pycurl object curl = pycurl.Curl() # Set the URL and HTTP method (HEAD) curl.setopt(pycurl.URL, url.encode("utf-8")) curl.setopt(pycurl.CONNECTTIMEOUT, 10) # no SSL verification curl.setopt(pycurl.SSL_VERIFYPEER, 0) curl.setopt(pycurl.SSL_VERIFYHOST, 0) # Set option to avoid getting the response body curl.setopt(curl.NOBODY, True) if PROXY_URL: curl.setopt(pycurl.PRE_PROXY, PROXY_URL) # Perform the request try: curl.perform() # Get the response code page_exists = curl.getinfo(curl.RESPONSE_CODE) < 400 except pycurl.error as err: LOGGER.debug("pycurl HEAD error: %s %s", url, err) page_exists = False # Clean up curl.close() return page_exists def _urllib3_is_live_page(url: str) -> bool: "Use courlan redirection test (based on urllib3) to send a HEAD request." try: _ = redirection_test(url) except Exception as err: LOGGER.debug("urllib3 HEAD error: %s %s", url, err) return False return True def is_live_page(url: str) -> bool: "Send a HTTP HEAD request without taking anything else into account." result = _pycurl_is_live_page(url) if HAS_PYCURL else False # use urllib3 as backup return result or _urllib3_is_live_page(url) def add_to_compressed_dict( inputlist: List[str], blacklist: Optional[Set[str]] = None, url_filter: Optional[str] = None, url_store: Optional[UrlStore] = None, compression: bool = False, verbose: bool = False, ) -> UrlStore: """Filter, convert input URLs and add them to domain-aware processing dictionary""" if url_store is None: url_store = UrlStore(compressed=compression, strict=False, verbose=verbose) inputlist = list(dict.fromkeys(inputlist)) if blacklist: inputlist = [ u for u in inputlist if URL_BLACKLIST_REGEX.sub("", u) not in blacklist ] if url_filter: inputlist = [u for u in inputlist if any(f in u for f in url_filter)] url_store.add_urls(inputlist) return url_store def load_download_buffer( url_store: UrlStore, sleep_time: float = 5.0 ) -> Tuple[List[str], UrlStore]: """Determine threading strategy and draw URLs respecting domain-based back-off rules.""" while True: bufferlist = url_store.get_download_urls(time_limit=sleep_time, max_urls=10**5) if bufferlist or url_store.done: break sleep(sleep_time) return bufferlist, url_store def _buffered_downloads( bufferlist: List[str], download_threads: int, worker: Callable[[str], Any], chunksize: int = 10000, ) -> Generator[Tuple[str, Any], None, None]: "Use a thread pool to perform a series of downloads." with ThreadPoolExecutor(max_workers=download_threads) as executor: for chunk in make_chunks(bufferlist, chunksize): future_to_url = {executor.submit(worker, url): url for url in chunk} for future in as_completed(future_to_url): yield future_to_url[future], future.result() def buffered_downloads( bufferlist: List[str], download_threads: int, options: Optional[Extractor] = None, ) -> Generator[Tuple[str, str], None, None]: "Download queue consumer, single- or multi-threaded." worker = partial(fetch_url, options=options) return _buffered_downloads(bufferlist, download_threads, worker) def buffered_response_downloads( bufferlist: List[str], download_threads: int, options: Optional[Extractor] = None, ) -> Generator[Tuple[str, Response], None, None]: "Download queue consumer, returns full Response objects." config = options.config if options else DEFAULT_CONFIG worker = partial(fetch_response, config=config) return _buffered_downloads(bufferlist, download_threads, worker) def _send_pycurl_request( url: str, no_ssl: bool, with_headers: bool, config: ConfigParser ) -> Optional[Response]: """Experimental function using libcurl and pycurl to speed up downloads""" # https://github.com/pycurl/pycurl/blob/master/examples/retriever-multi.py # init headerlist = [ f"{header}: {content}" for header, content in _determine_headers(config).items() ] # prepare curl request # https://curl.haxx.se/libcurl/c/curl_easy_setopt.html curl = pycurl.Curl() curl.setopt(pycurl.URL, url.encode("utf-8")) # share data curl.setopt(pycurl.SHARE, CURL_SHARE) curl.setopt(pycurl.HTTPHEADER, headerlist) # curl.setopt(pycurl.USERAGENT, '') curl.setopt(pycurl.FOLLOWLOCATION, 1) curl.setopt(pycurl.MAXREDIRS, config.getint("DEFAULT", "MAX_REDIRECTS")) curl.setopt(pycurl.CONNECTTIMEOUT, config.getint("DEFAULT", "DOWNLOAD_TIMEOUT")) curl.setopt(pycurl.TIMEOUT, config.getint("DEFAULT", "DOWNLOAD_TIMEOUT")) curl.setopt(pycurl.MAXFILESIZE, config.getint("DEFAULT", "MAX_FILE_SIZE")) curl.setopt(pycurl.NOSIGNAL, 1) if no_ssl is True: curl.setopt(pycurl.SSL_VERIFYPEER, 0) curl.setopt(pycurl.SSL_VERIFYHOST, 0) else: curl.setopt(pycurl.CAINFO, certifi.where()) if with_headers: headerbytes = BytesIO() curl.setopt(pycurl.HEADERFUNCTION, headerbytes.write) if PROXY_URL: curl.setopt(pycurl.PRE_PROXY, PROXY_URL) # TCP_FASTOPEN # curl.setopt(pycurl.FAILONERROR, 1) # curl.setopt(pycurl.ACCEPT_ENCODING, '') # send request try: bufferbytes = curl.perform_rb() except pycurl.error as err: LOGGER.error("pycurl error: %s %s", url, err) # retry in case of SSL-related error # see https://curl.se/libcurl/c/libcurl-errors.html # errmsg = curl.errstr_raw() # additional error codes: 80, 90, 96, 98 if no_ssl is False and err.args[0] in CURL_SSL_ERRORS: LOGGER.debug("retrying after SSL error: %s %s", url, err) return _send_pycurl_request(url, True, with_headers, config) # traceback.print_exc(file=sys.stderr) # sys.stderr.flush() return None # additional info # ip_info = curl.getinfo(curl.PRIMARY_IP) resp = Response( bufferbytes, curl.getinfo(curl.RESPONSE_CODE), curl.getinfo(curl.EFFECTIVE_URL) ) curl.close() if with_headers: respheaders = {} # https://github.com/pycurl/pycurl/blob/master/examples/quickstart/response_headers.py for line in ( headerbytes.getvalue().decode("iso-8859-1", errors="replace").splitlines() ): # re.split(r'\r?\n') ? # This will botch headers that are split on multiple lines... if ":" not in line: continue # Break the header line into header name and value. name, value = line.split(":", 1) # Now we can actually record the header name and value. respheaders[name.strip()] = value.strip() # name.strip().lower() ? resp.store_headers(respheaders) return resp