"""
Examining feeds and extracting links for further processing.
"""
## This file is available from https://github.com/adbar/trafilatura
## under GNU GPL v3 license
import json
import logging
import re
from itertools import islice
from courlan import (check_url, clean_url, filter_urls, fix_relative_urls,
get_hostinfo, validate_url)
from .downloads import fetch_url
from .settings import MAX_LINKS
from .utils import is_similar_domain, load_html
LOGGER = logging.getLogger(__name__)
FEED_TYPES = {'application/atom+xml', 'application/json', 'application/rdf+xml', 'application/rss+xml', 'application/x.atom+xml', 'application/x-atom+xml', 'text/atom+xml', 'text/plain', 'text/rdf+xml', 'text/rss+xml', 'text/xml'}
FEED_OPENING = re.compile(r'<(feed|rss|\?xml)')
LINK_ATTRS = re.compile(r'<link .*?href=".+?"')
LINK_HREF = re.compile(r'href="(.+?)"')
LINK_ELEMENTS = re.compile(r'<link>(?:\s*)(?:<!\[CDATA\[)?(.+?)(?:\]\]>)?(?:\s*)</link>')
BLACKLIST = re.compile(r'\bcomments\b') # no comment feed
def handle_link_list(linklist, domainname, baseurl, target_lang=None):
'''Examine links to determine if they are valid and
lead to a web page'''
output_links = []
# sort and uniq
for item in sorted(set(linklist)):
# fix and check
link = fix_relative_urls(baseurl, item)
# control output for validity
checked = check_url(link, language=target_lang)
if checked is not None:
if not is_similar_domain(domainname, checked[1]) and not "feed" in link:
LOGGER.warning('Rejected, diverging domain names: %s %s', domainname, checked[1])
else:
output_links.append(checked[0])
# Feedburner/Google feeds
elif 'feedburner' in item or 'feedproxy' in item:
output_links.append(item)
return output_links
def extract_links(feed_string, domainname, baseurl, reference, target_lang=None):
'''Extract links from Atom and RSS feeds'''
feed_links = []
# check if it's a feed
if feed_string is None:
LOGGER.debug('Empty feed: %s', domainname)
return feed_links
feed_string = feed_string.strip()
# typical first and second lines absent
if not FEED_OPENING.match(feed_string) and not \
('<rss' in feed_string[:100] or '<feed' in feed_string[:100]):
# could be JSON
if feed_string.startswith('{'):
try:
feed_dict = json.loads(feed_string)
if 'items' in feed_dict:
for item in feed_dict['items']:
if 'url' in item:
feed_links.append(item['url'])
# fallback: https://www.jsonfeed.org/version/1.1/
elif 'id' in item:
feed_links.append(item['id'])
except json.decoder.JSONDecodeError:
LOGGER.debug('JSON decoding error: %s', domainname)
else:
LOGGER.debug('Possibly invalid feed: %s', domainname)
return feed_links
# could be Atom
if '<link ' in feed_string:
for link in (m[0] for m in islice(LINK_ATTRS.finditer(feed_string), MAX_LINKS)):
if 'atom+xml' in link or 'rel="self"' in link:
continue
feedlink = LINK_HREF.search(link)[1]
#if '"' in feedlink:
# feedlink = feedlink.split('"')[0]
feed_links.append(feedlink)
# could be RSS
elif '<link>' in feed_string:
feed_links.extend(
[m[1].strip() for m in islice(LINK_ELEMENTS.finditer(feed_string, re.DOTALL), MAX_LINKS)]
)
# refine
output_links = handle_link_list(feed_links, domainname, baseurl, target_lang)
output_links = [l for l in output_links if l != reference and l.count('/') > 2]
# log result
if feed_links:
LOGGER.debug('Links found: %s of which %s valid', len(feed_links), len(output_links))
else:
LOGGER.debug('Invalid feed for %s', domainname)
return output_links
def determine_feed(htmlstring, baseurl, reference):
'''Try to extract the feed URL from the home page.
Adapted from http://www.aaronsw.com/2002/feedfinder/'''
# parse the page to look for feeds
tree = load_html(htmlstring)
# safeguard
if tree is None:
LOGGER.debug('Invalid HTML/Feed page: %s', baseurl)
return []
feed_urls = []
for linkelem in tree.xpath('//link[@rel="alternate"]'):
# discard elements without links
if 'href' not in linkelem.attrib:
continue
# most common case
if 'type' in linkelem.attrib and linkelem.get('type') in FEED_TYPES:
feed_urls.append(linkelem.get('href'))
# websites like geo.de
elif 'atom' in linkelem.get('href') or 'rss' in linkelem.get('href'):
feed_urls.append(linkelem.get('href'))
# backup
if not feed_urls:
for linkelem in tree.xpath('//a[@href]'):
if linkelem.get('href')[-4:].lower() in ('.rss', '.rdf', '.xml'):
feed_urls.append(linkelem.get('href'))
elif linkelem.get('href')[-5:].lower() == '.atom':
feed_urls.append(linkelem.get('href'))
elif 'atom' in linkelem.get('href') or 'rss' in linkelem.get('href'):
feed_urls.append(linkelem.get('href'))
# refine
output_urls = []
for link in sorted(set(feed_urls)):
link = fix_relative_urls(baseurl, link)
link = clean_url(link)
if link is None or link == reference or validate_url(link)[0] is False:
continue
if BLACKLIST.search(link):
continue
output_urls.append(link)
# log result
LOGGER.debug('Feed URLs found: %s of which %s valid', len(feed_urls), len(output_urls))
return output_urls
[docs]
def find_feed_urls(url, target_lang=None):
"""Try to find feed URLs.
Args:
url: Webpage or feed URL as string.
Triggers URL-based filter if the webpage isn't a homepage.
target_lang: Define a language to filter URLs based on heuristics
(two-letter string, ISO 639-1 format).
Returns:
The extracted links as a list (sorted list of unique links).
"""
domainname, baseurl = get_hostinfo(url)
if domainname is None:
LOGGER.warning('Invalid URL: %s', url)
return []
urlfilter = None
downloaded = fetch_url(url)
if downloaded is not None:
# assume it's a feed
feed_links = extract_links(downloaded, domainname, baseurl, url, target_lang)
if len(feed_links) == 0:
# assume it's a web page
for feed in determine_feed(downloaded, baseurl, url):
feed_string = fetch_url(feed)
feed_links.extend(extract_links(feed_string, domainname, baseurl, url, target_lang))
# filter triggered, prepare it
if len(url) > len(baseurl) + 2:
urlfilter = url
# return links found
if len(feed_links) > 0:
feed_links = filter_urls(feed_links, urlfilter)
LOGGER.debug('%s feed links found for %s', len(feed_links), domainname)
return feed_links
LOGGER.debug('No usable feed links found: %s', url)
else:
LOGGER.error('Could not download web page: %s', url)
if url.strip('/') != baseurl:
return try_homepage(baseurl, target_lang)
# try alternative: Google News
if target_lang is not None:
downloaded = fetch_url(
f'https://news.google.com/rss/search?q=site:{baseurl}&hl={target_lang}&scoring=n&num=100'
)
if downloaded is not None:
feed_links = extract_links(downloaded, domainname, baseurl, url, target_lang)
feed_links = filter_urls(feed_links, urlfilter)
LOGGER.debug('%s Google news links found for %s', len(feed_links), domainname)
return feed_links
return []
def try_homepage(baseurl, target_lang):
'''Shift into reverse and try the homepage instead of the particular feed
page that was given as input.'''
LOGGER.debug('Probing homepage for feeds instead: %s', baseurl)
return find_feed_urls(baseurl, target_lang)