"""
Examining feeds and extracting links for further processing.
"""
import json
import logging
import re
from itertools import islice
from time import sleep
from typing import List, Optional
from courlan import (
check_url,
clean_url,
filter_urls,
fix_relative_urls,
get_hostinfo,
is_valid_url,
)
from .downloads import fetch_url
from .settings import MAX_LINKS
from .utils import is_similar_domain, load_html, uniquify_list
LOGGER = logging.getLogger(__name__)
# https://www.iana.org/assignments/media-types/media-types.xhtml
# standard + potential types
FEED_TYPES = {
"application/atom", # not IANA-compatible
"application/atom+xml",
"application/feed+json", # not IANA-compatible
"application/json",
"application/rdf", # not IANA-compatible
"application/rdf+xml",
"application/rss", # not IANA-compatible
"application/rss+xml",
"application/x.atom+xml", # not IANA-compatible
"application/x-atom+xml", # not IANA-compatible
"application/xml",
"text/atom", # not IANA-compatible
"text/atom+xml",
"text/plain",
"text/rdf", # not IANA-compatible
"text/rdf+xml",
"text/rss", # not IANA-compatible
"text/rss+xml",
"text/xml",
}
FEED_OPENING = re.compile(r"<(feed|rss|\?xml)")
LINK_ATTRS = re.compile(r'<link .*?href=".+?"')
LINK_HREF = re.compile(r'href="(.+?)"')
LINK_ELEMENTS = re.compile(
r"<link>(?:\s*)(?:<!\[CDATA\[)?(.+?)(?:\]\]>)?(?:\s*)</link>"
)
BLACKLIST = re.compile(r"\bcomments\b") # no comment feed
LINK_VALIDATION_RE = re.compile(
r"\.(?:atom|rdf|rss|xml)$|"
r"\b(?:atom|rss)\b|"
r"\?type=100$|" # Typo3
r"feeds/posts/default/?$|" # Blogger
r"\?feed=(?:atom|rdf|rss|rss2)|"
r"feed$" # Generic
)
class FeedParameters:
"Store necessary information to proceed a feed."
__slots__ = ["base", "domain", "ext", "lang", "ref"]
def __init__(
self,
baseurl: str,
domainname: str,
reference: str,
external: bool = False,
target_lang: Optional[str] = None,
) -> None:
self.base: str = baseurl
self.domain: str = domainname
self.ext: bool = external
self.lang: Optional[str] = target_lang
self.ref: str = reference
def handle_link_list(linklist: List[str], params: FeedParameters) -> List[str]:
"""Examine links to determine if they are valid and
lead to a web page"""
output_links = []
# sort and uniq
for item in sorted(set(linklist)):
# fix and check
link = fix_relative_urls(params.base, item)
# control output for validity
checked = check_url(link, language=params.lang)
if checked is not None:
if (
not params.ext
and not "feed" in link
and not is_similar_domain(params.domain, checked[1])
):
LOGGER.warning(
"Rejected, diverging domain names: %s %s", params.domain, checked[1]
)
else:
output_links.append(checked[0])
# Feedburner/Google feeds
elif "feedburner" in item or "feedproxy" in item:
output_links.append(item)
return output_links
def extract_links(feed_string: str, params: FeedParameters) -> List[str]:
"""Extract links from Atom and RSS feeds"""
feed_links = []
# check if it's a feed
if feed_string is None:
LOGGER.debug("Empty feed: %s", params.domain)
return feed_links
feed_string = feed_string.strip()
# typical first and second lines absent
if not FEED_OPENING.match(feed_string) and not (
"<rss" in feed_string[:100] or "<feed" in feed_string[:100]
):
# could be JSON
if feed_string.startswith("{"):
try:
feed_dict = json.loads(feed_string)
if "items" in feed_dict:
for item in feed_dict["items"]:
# fallback: https://www.jsonfeed.org/version/1.1/
if "url" in item or "id" in item:
feed_links.append(item.get("url") or item.get("id"))
except json.decoder.JSONDecodeError:
LOGGER.debug("JSON decoding error: %s", params.domain)
else:
LOGGER.debug("Possibly invalid feed: %s", params.domain)
return feed_links
# could be Atom
if "<link " in feed_string:
for link in (m[0] for m in islice(LINK_ATTRS.finditer(feed_string), MAX_LINKS)):
if "atom+xml" in link or 'rel="self"' in link:
continue
feedlink = LINK_HREF.search(link)[1]
# if '"' in feedlink:
# feedlink = feedlink.split('"')[0]
feed_links.append(feedlink)
# could be RSS
elif "<link>" in feed_string:
feed_links.extend(
[
m[1].strip()
for m in islice(
LINK_ELEMENTS.finditer(feed_string, re.DOTALL), MAX_LINKS
)
]
)
# refine
output_links = handle_link_list(feed_links, params)
output_links = [l for l in output_links if l != params.ref and l.count("/") > 2]
# log result
if feed_links:
LOGGER.debug(
"Links found: %s of which %s valid", len(feed_links), len(output_links)
)
else:
LOGGER.debug("Invalid feed for %s", params.domain)
return output_links
def determine_feed(htmlstring: str, params: FeedParameters) -> List[str]:
"""Try to extract the feed URL from the home page.
Adapted from http://www.aaronsw.com/2002/feedfinder/"""
# parse the page to look for feeds
tree = load_html(htmlstring)
# safeguard
if tree is None:
LOGGER.debug("Invalid HTML/Feed page: %s", params.base)
return []
feed_urls = []
for linkelem in tree.xpath('//link[@rel="alternate"][@href]'):
# most common case + websites like geo.de
if (
"type" in linkelem.attrib and linkelem.get("type") in FEED_TYPES
) or LINK_VALIDATION_RE.search(linkelem.get("href", "")):
feed_urls.append(linkelem.get("href"))
# backup
if not feed_urls:
for linkelem in tree.xpath("//a[@href]"):
link = linkelem.get("href", "")
if LINK_VALIDATION_RE.search(link):
feed_urls.append(link)
# refine
output_urls = []
for link in uniquify_list(feed_urls):
link = fix_relative_urls(params.base, link)
link = clean_url(link)
if link is None or link == params.ref or not is_valid_url(link):
continue
if BLACKLIST.search(link):
continue
output_urls.append(link)
# log result
LOGGER.debug(
"Feed URLs found: %s of which %s valid", len(feed_urls), len(output_urls)
)
return output_urls
[docs]
def find_feed_urls(
url: str, target_lang: Optional[str] = None, external: bool = False, sleep_time: int = 2,
) -> List[str]:
"""Try to find feed URLs.
Args:
url: Webpage or feed URL as string.
Triggers URL-based filter if the webpage isn't a homepage.
target_lang: Define a language to filter URLs based on heuristics
(two-letter string, ISO 639-1 format).
external: Similar hosts only or external URLs
(boolean, defaults to False).
sleep_time: Wait between requests on the same website.
Returns:
The extracted links as a list (sorted list of unique links).
"""
domainname, baseurl = get_hostinfo(url)
if domainname is None:
LOGGER.warning("Invalid URL: %s", url)
return []
params = FeedParameters(baseurl, domainname, url, external, target_lang)
urlfilter = None
downloaded = fetch_url(url)
if downloaded is not None:
# assume it's a feed
feed_links = extract_links(downloaded, params)
if len(feed_links) == 0:
# assume it's a web page
for feed in determine_feed(downloaded, params):
feed_string = fetch_url(feed)
feed_links.extend(extract_links(feed_string, params))
# filter triggered, prepare it
if len(url) > len(baseurl) + 2:
urlfilter = url
# return links found
if len(feed_links) > 0:
feed_links = filter_urls(feed_links, urlfilter)
LOGGER.debug("%s feed links found for %s", len(feed_links), domainname)
return feed_links
LOGGER.debug("No usable feed links found: %s", url)
else:
LOGGER.error("Could not download web page: %s", url)
if url.strip("/") != baseurl:
sleep(sleep_time)
return try_homepage(baseurl, target_lang)
# try alternative: Google News
if target_lang is not None:
downloaded = fetch_url(
f"https://news.google.com/rss/search?q=site:{baseurl}&hl={target_lang}&scoring=n&num=100"
)
if downloaded is not None:
feed_links = extract_links(downloaded, params)
feed_links = filter_urls(feed_links, urlfilter)
LOGGER.debug(
"%s Google news links found for %s", len(feed_links), domainname
)
return feed_links
return []
def try_homepage(baseurl: str, target_lang: Optional[str]) -> List[str]:
"""Shift into reverse and try the homepage instead of the particular feed
page that was given as input."""
LOGGER.debug("Probing homepage for feeds instead: %s", baseurl)
return find_feed_urls(baseurl, target_lang)