#!/usr/bin/env python ''' Utilities to provide various misc functions. ''' # import urllib.request # import urllib.error # import gzip # from time import sleep import aiohttp import asyncio from bs4 import BeautifulSoup from urllib.parse import urljoin, urlsplit import urllib.robotparser class AsyncCrawler(object): ''' docstring ''' def __init__(self, baseurl=None, robots=None, concurrency=None): self.baseurl = baseurl self.robots = robots self.uncrawled = set() self.crawled = set() self.headers = {'Accept-Encoding': 'gzip, deflate', 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:61.0) Gecko/20100101 Firefox/61.0'} self.client_session = None self.semaphore = asyncio.BoundedSemaphore(concurrency) async def crawl_url(self, url=None): ''' docstring ''' urls = [] source = await self.get_source(url) if source: urls = self.find_all_urls(source) return urls def validate_url(self, url=None): ''' Ensures we have a valid URL to crawl and that the site's robots.txt allows it. ''' # ensure the URL is in a sane format url = standardise_url(url=url, base_url=self.baseurl) if url and self.robots.check(url=url): return url else: return False async def get_source(self, url=None): ''' Obtains the page's source. ''' async with self.semaphore: async with self.client_session.head(url, timeout=5) as head: try: data = await head.read() except Exception as e: print(e) if 'text/html' in head.headers['Content-Type']: async with self.client_session.get(url, timeout=5) as resp: try: source = await resp.read() print('crawled {0}'.format(url)) return source except Exception: return None else: print('{0} - {1}'.format(head.headers['Content-Type'], url)) def find_all_urls(self, source=None): ''' Find all URLs in a page's source. ''' urls = [] html = BeautifulSoup(source, 'lxml') hrefs = html.find_all('a', href=True) # build a set of URLs which are valid and haven't been crawled yet for href in hrefs: url = self.validate_url(url=href['href']) if url and url not in self.crawled: urls.append(url) return urls async def run(self, urls=None): tasks = [] all_urls = set() for url in urls: self.crawled.add(url) tasks.append(self.crawl_url(url)) for task in asyncio.as_completed(tasks): urls = None try: # completed.append((await task)) urls = await task except Exception as e: print(e) if urls: for url in urls: all_urls.add(url) return all_urls async def main(self): self.client_session = aiohttp.ClientSession(headers=self.headers) to_crawl = [] to_crawl.append(self.baseurl) while len(to_crawl) > 0: discovered_urls = await self.run(urls=to_crawl) to_crawl.clear() to_crawl.extend(discovered_urls) await self.client_session.close() return self.crawled class RobotsTxt(object): ''' needs a docstring ''' def __init__(self, base_url=None): ''' Manually retrieve robots.txt to allow us to set the user-agent. ''' self.base_url = base_url self.headers = {'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:61.0) Gecko/20100101 Firefox/61.0'} robots_url = urljoin(self.base_url, 'robots.txt') request = urllib.request.Request(robots_url, headers=self.headers) robots = urllib.robotparser.RobotFileParser() robots.set_url(robots_url) try: response = urllib.request.urlopen(request, timeout=5) except urllib.error.HTTPError: robots.allow_all = True else: data = response.read() decoded_data = data.decode("utf-8").splitlines() robots.parse(decoded_data) self.robots = robots def check(self, url): ''' Test if robots allows us to crawl that URL. ''' return self.robots.can_fetch("*", url) def standardise_url(url=None, base_url=None): ''' If `base_url` is None then we attempt to standarise the URL to ensure it can be prepended to relative URLs. If no scheme has been provided then we default to http as any sane https-only site should 301 redirect http > https. If `base_url` is set, we standardise URLs to strip queries and fragments (we don't want to scrape in-page anchors etc). Any relative URLs will be appended to the base url. Returns a standardised URL as a string. ''' default_proto = 'http' delim = '://' file_extensions = ('/', 'htm', 'html', 'xhtml', 'php', 'asp', 'aspx', 'cfm') split_url = urlsplit(url) if not base_url: # This will sanitise the initial url provided by the user. if split_url.scheme and split_url.scheme.startswith('http'): return "".join([split_url.scheme, delim, split_url.netloc]) elif (split_url.path and not split_url.scheme and not split_url.netloc): return "".join([default_proto, delim, split_url.path]) else: # if url.endswith(file_extensions): # Sanitise discovered URLs. We already expect them in the format # protocol://base_url/path if url.startswith('/'): return urljoin(base_url, split_url.path) elif url.startswith(base_url): return "".join([split_url.scheme, delim, split_url.netloc, split_url.path]) return None