Source code for trafilatura.feeds

"""
Examining feeds and extracting links for further processing.
"""

import json
import logging
import re

from itertools import islice
from time import sleep
from typing import List, Optional

from courlan import (
    check_url,
    clean_url,
    filter_urls,
    fix_relative_urls,
    get_hostinfo,
    is_valid_url,
)

from .deduplication import is_similar_domain
from .downloads import fetch_url
from .settings import MAX_LINKS
from .utils import load_html

LOGGER = logging.getLogger(__name__)

# https://www.iana.org/assignments/media-types/media-types.xhtml
# standard + potential types
FEED_TYPES = {
    "application/atom",  # not IANA-compatible
    "application/atom+xml",
    "application/feed+json",  # not IANA-compatible
    "application/json",
    "application/rdf",  # not IANA-compatible
    "application/rdf+xml",
    "application/rss",  # not IANA-compatible
    "application/rss+xml",
    "application/x.atom+xml",  # not IANA-compatible
    "application/x-atom+xml",  # not IANA-compatible
    "application/xml",
    "text/atom",  # not IANA-compatible
    "text/atom+xml",
    "text/plain",
    "text/rdf",  # not IANA-compatible
    "text/rdf+xml",
    "text/rss",  # not IANA-compatible
    "text/rss+xml",
    "text/xml",
}

FEED_OPENING = re.compile(r"<(feed|rss|\?xml)")

LINK_ATTRS = re.compile(r'<link .*?href=".+?"')
LINK_HREF = re.compile(r'href="(.+?)"')
LINK_ELEMENTS = re.compile(
    r"<link>(?:\s*)(?:<!\[CDATA\[)?(.+?)(?:\]\]>)?(?:\s*)</link>"
)

BLACKLIST = re.compile(r"\bcomments\b")  # no comment feed

LINK_VALIDATION_RE = re.compile(
    r"\.(?:atom|rdf|rss|xml)$|"
    r"\b(?:atom|rss)\b|"
    r"\?type=100$|"  # Typo3
    r"feeds/posts/default/?$|"  # Blogger
    r"\?feed=(?:atom|rdf|rss|rss2)|"
    r"feed$"  # Generic
)


class FeedParameters:
    "Store necessary information to proceed a feed."
    __slots__ = ["base", "domain", "ext", "lang", "ref"]

    def __init__(
        self,
        baseurl: str,
        domain: str,
        reference: str,
        external: bool = False,
        target_lang: Optional[str] = None,
    ) -> None:
        self.base: str = baseurl
        self.domain: str = domain
        self.ext: bool = external
        self.lang: Optional[str] = target_lang
        self.ref: str = reference


def is_potential_feed(feed_string: str) -> bool:
    "Check if the string could be a feed."
    if FEED_OPENING.match(feed_string):
        return True
    beginning = feed_string[:100]
    return "<rss" in beginning or "<feed" in beginning


def handle_link_list(linklist: List[str], params: FeedParameters) -> List[str]:
    """Examine links to determine if they are valid and
    lead to a web page"""
    output_links = []

    for item in sorted(set(linklist)):
        link = fix_relative_urls(params.base, item)
        checked = check_url(link, language=params.lang)

        if checked is not None:
            if (
                not params.ext
                and "feed" not in link
                and not is_similar_domain(params.domain, checked[1])
            ):
                LOGGER.warning(
                    "Rejected, diverging domain names: %s %s", params.domain, checked[1]
                )
            else:
                output_links.append(checked[0])
        # Feedburner/Google feeds
        elif "feedburner" in item or "feedproxy" in item:
            output_links.append(item)

    return output_links


def find_links(feed_string: str, params: FeedParameters) -> List[str]:
    "Try different feed types and return the corresponding links."
    if not is_potential_feed(feed_string):
        # JSON
        if feed_string.startswith("{"):
            try:
                # fallback: https://www.jsonfeed.org/version/1.1/
                candidates = [
                    item.get("url") or item.get("id")
                    for item in json.loads(feed_string).get("items", [])
                ]
                return [c for c in candidates if c is not None]
            except json.decoder.JSONDecodeError:
                LOGGER.debug("JSON decoding error: %s", params.domain)
        else:
            LOGGER.debug("Possibly invalid feed: %s", params.domain)
        return []

    # Atom
    if "<link " in feed_string:
        return [
            LINK_HREF.search(link)[1]
            for link in (
                m[0] for m in islice(LINK_ATTRS.finditer(feed_string), MAX_LINKS)
            )
            if "atom+xml" not in link and 'rel="self"' not in link
        ]
        # if '"' in feedlink:
        #    feedlink = feedlink.split('"')[0]

    # RSS
    if "<link>" in feed_string:
        return [
            m[1].strip()
            for m in islice(LINK_ELEMENTS.finditer(feed_string, re.DOTALL), MAX_LINKS)
        ]

    return []


def extract_links(feed_string: str, params: FeedParameters) -> List[str]:
    "Extract and refine links from Atom, RSS and JSON feeds."
    if not feed_string:
        LOGGER.debug("Empty feed: %s", params.domain)
        return []

    feed_links = find_links(feed_string.strip(), params)

    output_links = [
        link
        for link in handle_link_list(feed_links, params)
        if link != params.ref and link.count("/") > 2
    ]

    if feed_links:
        LOGGER.debug(
            "Links found: %s of which %s valid", len(feed_links), len(output_links)
        )
    else:
        LOGGER.debug("Invalid feed for %s", params.domain)

    return output_links


def determine_feed(htmlstring: str, params: FeedParameters) -> List[str]:
    """Parse the HTML and try to extract feed URLs from the home page.
    Adapted from http://www.aaronsw.com/2002/feedfinder/"""
    tree = load_html(htmlstring)
    if tree is None:
        LOGGER.debug("Invalid HTML/Feed page: %s", params.base)
        return []

    # most common case + websites like geo.de
    feed_urls = [
        link.get("href", "")
        for link in tree.xpath('//link[@rel="alternate"][@href]')
        if link.get("type") in FEED_TYPES
        or LINK_VALIDATION_RE.search(link.get("href", ""))
    ]

    # backup
    if not feed_urls:
        feed_urls = [
            link.get("href", "")
            for link in tree.xpath("//a[@href]")
            if LINK_VALIDATION_RE.search(link.get("href", ""))
        ]

    # refine
    output_urls = []
    for link in dict.fromkeys(feed_urls):
        link = fix_relative_urls(params.base, link)
        link = clean_url(link)
        if (
            link
            and link != params.ref
            and is_valid_url(link)
            and not BLACKLIST.search(link)
        ):
            output_urls.append(link)

    # log result
    LOGGER.debug(
        "Feed URLs found: %s of which %s valid", len(feed_urls), len(output_urls)
    )
    return output_urls


def probe_gnews(params: FeedParameters, urlfilter: Optional[str]) -> List[str]:
    "Alternative way to gather feed links: Google News."
    if params.lang:
        downloaded = fetch_url(
            f"https://news.google.com/rss/search?q=site:{params.domain}&hl={params.lang}&scoring=n&num=100"
        )
        if downloaded:
            feed_links = extract_links(downloaded, params)
            feed_links = filter_urls(feed_links, urlfilter)
            LOGGER.debug(
                "%s Google news links found for %s", len(feed_links), params.domain
            )
            return feed_links
    return []


[docs] def find_feed_urls( url: str, target_lang: Optional[str] = None, external: bool = False, sleep_time: int = 2, ) -> List[str]: """Try to find feed URLs. Args: url: Webpage or feed URL as string. Triggers URL-based filter if the webpage isn't a homepage. target_lang: Define a language to filter URLs based on heuristics (two-letter string, ISO 639-1 format). external: Similar hosts only or external URLs (boolean, defaults to False). sleep_time: Wait between requests on the same website. Returns: The extracted links as a list (sorted list of unique links). """ domain, baseurl = get_hostinfo(url) if domain is None: LOGGER.warning("Invalid URL: %s", url) return [] params = FeedParameters(baseurl, domain, url, external, target_lang) urlfilter = None downloaded = fetch_url(url) downloaded = fetch_url(url) if downloaded is not None: # assume it's a feed feed_links = extract_links(downloaded, params) if not feed_links: # assume it's a web page for feed in determine_feed(downloaded, params): feed_string = fetch_url(feed) feed_links.extend(extract_links(feed_string, params)) # filter triggered, prepare it if len(url) > len(baseurl) + 2: urlfilter = url # return links found if feed_links: feed_links = filter_urls(feed_links, urlfilter) LOGGER.debug("%s feed links found for %s", len(feed_links), domain) return feed_links LOGGER.debug("No usable feed links found: %s", url) else: LOGGER.error("Could not download web page: %s", url) if url.strip("/") != baseurl: sleep(sleep_time) return try_homepage(baseurl, target_lang) return probe_gnews(params, urlfilter)
def try_homepage(baseurl: str, target_lang: Optional[str]) -> List[str]: """Shift into reverse and try the homepage instead of the particular feed page that was given as input.""" LOGGER.debug("Probing homepage for feeds instead: %s", baseurl) return find_feed_urls(baseurl, target_lang)