#!/usr/bin/env python ''' Utilities to provide various misc functions. ''' from bs4 import BeautifulSoup import aiohttp import urllib.request import urllib.robotparser import urllib.error import gzip from urllib.parse import (urljoin, urlsplit) class UrlPool(object): ''' Object to manage a pool of URLs. ''' def __init__(self): self.pool = set() def check_duplicate(self, new_url): ''' Checks if a URL exists in the current pool. ''' if new_url in self.pool: return True else: return False def remove_from_pool(self): ''' Remove a URL from the pool and return it to be crawled. ''' return(self.pool.pop()) def add_to_pool(self, url): self.pool.add(url) def list_pool(self): pool = self.pool return pool class WebPage(object): ''' Object to manage common operations required to return the data from each individual page. ''' # set a sane user-agent and request compression if available. headers = {'Accept-Encoding': 'gzip, deflate', 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:61.0) Gecko/20100101 Firefox/61.0'} def __init__(self, url=None, base_url=None, robots=None): self.url = url self.base_url = base_url self.robots = robots self.source = None self.urls_to_crawl = set() def get_source(self): ''' Retrieve a page's source. ''' request = urllib.request.Request(self.url, headers=self.headers) page = urllib.request.urlopen(request, timeout=5) # handle the content encoding in case it needs decompressing. if 'text/html' in page.info().get('Content-Type'): if page.info().get('Content-Encoding'): if page.info().get('Content-Encoding') == 'gzip': self.source = gzip.decompress(page.read()) elif page.info().get('Content-Encoding') == 'deflate': self.source = page.read() else: self.source = page.read() def find_links(self): ''' Find all URLs on a page and ensure they are absolute. If they are relative then they will be appended to the base URL. ''' hrefs = set() soup = BeautifulSoup(self.source, 'lxml') links = soup.find_all('a', href=True) for link in links: if link['href'].startswith('/'): hrefs.add(urljoin(self.url, link['href'])) else: hrefs.add(link['href']) self.discovered_hrefs = hrefs def parse_urls(self): ''' Iterate through the list of discovered URLs and add them to the pool if they start with the base URL. ''' for url in self.discovered_hrefs: if url.startswith(self.base_url) and self.robots.check(url): sanitised_url = sanitise_url(url=url) self.urls_to_crawl.add(sanitised_url) def list_urls(self): ''' Returns all valid discovered URLs. ''' return self.urls_to_crawl def run(self): ''' Attempt to get the page's source and if successful, iterate through it to find any links we can crawl. ''' try: self.get_source() except Exception: # skip if we didn't retrieve the source. pass if self.source: self.find_links() self.parse_urls() return True else: return False class RobotsTxt(object): ''' needs a docstring ''' def __init__(self, base_url=None): ''' Manually retrieve robots.txt to allow us to set the user-agent. ''' self.base_url = base_url self.headers = {'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:61.0) Gecko/20100101 Firefox/61.0'} robots_url = urljoin(self.base_url, 'robots.txt') request = urllib.request.Request(robots_url, headers=self.headers) robots = urllib.robotparser.RobotFileParser() robots.set_url(robots_url) try: response = urllib.request.urlopen(request, timeout=5) except urllib.error.HTTPError: robots.allow_all = True else: data = response.read() decoded_data = data.decode("utf-8").splitlines() robots.parse(decoded_data) self.robots = robots def check(self, url): ''' Test if robots allows us to crawl that URL. ''' return self.robots.can_fetch("*", url) def sanitise_url(url, base_url=False): ''' If `base_url` is True, we attempt to standardise `url` to ensure it can be prepended to relative URLs. If no scheme has been provided then we default to http as any sane https-only site should 301 redirect http > https. If `base_url` is False, we sanitise URLs to strip queries and fragments (we don't want to scrape in-page anchors etc). Returns a sanitised URL as a string. ''' default_proto = 'http' delim = '://' split_url = urlsplit(url) if base_url: # This will sanitise the initial url for the initial page crawl. if split_url.scheme and split_url.scheme.startswith('http'): sanitised_url = "".join([split_url.scheme, delim, split_url.netloc]) elif (split_url.path and not split_url.scheme and not split_url.netloc): sanitised_url = "".join([default_proto, delim, split_url.path]) else: # Sanitise discovered URLs. We already expect them in the format # protocol://base_url/path sanitised_url = "".join([split_url.scheme, delim, split_url.netloc, split_url.path]) return sanitised_url