Files
web-scraper/utils/helpers.py

241 lines
7.8 KiB
Python

#!/usr/bin/env python
'''
Utilities to provide various misc functions.
'''
import aiohttp
import asyncio
from bs4 import BeautifulSoup
import urllib.error
from urllib.parse import urljoin, urlsplit
import urllib.request
import urllib.robotparser
class AsyncCrawler(object):
'''
A concurrent recursive web crawler.
A recursive web crawler which finds all URLs local to the domains specified
in the `rooturl` argument.
Arguments:
rooturl: Root domain to begin crawling.
robots: RobotsTxt object for the rooturl.
concurrency: number of concurrent pages to crawl.
Returns:
All discovered pages in a set.
'''
def __init__(self, rooturl=None, robots=None, concurrency=None):
self.rooturl = rooturl
self.robots = robots
self.crawled = set()
self.headers = {'Accept-Encoding': 'gzip, deflate',
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:61.0) Gecko/20100101 Firefox/61.0'}
self.client_session = None
self.semaphore = asyncio.BoundedSemaphore(concurrency)
async def crawl_url(self, url=None):
'''
Crawls the given URL and finds all new URLs in the given page.
'''
urls = []
source = await self.get_source(url)
if source:
urls = self.find_all_urls(source)
return urls
def validate_url(self, url=None):
'''
Ensures we have a valid URL to crawl and that the site's robots.txt
allows it.
'''
# ensure the URL is in a sane format
url = standardise_url(url=url, rooturl=self.rooturl)
if url and self.robots.check(url=url):
return url
else:
return False
async def get_source(self, url=None):
'''
Obtains the URL's source, provided it is HTML. Usage of semaphores
ensures only a certain number of coroutines can run at any given
time.
'''
async with self.semaphore:
async with self.client_session.head(url, timeout=5) as head:
try:
_ = await head.read()
except Exception:
pass
if 'text/html' in head.headers['Content-Type']:
async with self.client_session.get(url, timeout=5) as resp:
try:
source = await resp.read()
return source
except Exception:
return None
else:
return None
def find_all_urls(self, source=None):
'''
Find all URLs in a page's source. Returns a list of URLs which have
been validated as local to the starting URL.
'''
urls = []
html = BeautifulSoup(source, 'lxml')
hrefs = html.find_all('a', href=True)
# build a set of URLs which are valid and haven't been crawled yet
for href in hrefs:
url = self.validate_url(url=href['href'])
if url and url not in self.crawled:
urls.append(url)
return urls
async def run(self, urls=None):
'''
Crawls a batch of URLs of any size (resource usage is bounded by n
semaphores (where n = concurrency). Returns a set of URLs to be added
to the list of URLs which need to be crawled (find_all_urls only
returns unseen URLs).
'''
tasks = []
all_urls = set()
for url in urls:
# mark the URL as seen.
self.crawled.add(url)
# create a task to crawl the URL.
tasks.append(self.crawl_url(url))
# wait for all tasks to complete.
for task in asyncio.as_completed(tasks):
urls = None
try:
# try getting all tasks as completed.
urls = await task
except Exception:
# skip until all tasks have completed.
pass
# add the URLs to a set to be returned.
if urls:
for url in urls:
all_urls.add(url)
return all_urls
async def main(self):
'''
Runs a crawl with batches of URLs. Once complete returns a list of all
crawled URLs.
'''
self.client_session = aiohttp.ClientSession(headers=self.headers)
to_crawl = []
# add the root URL to initialise the crawler.
to_crawl.append(self.rooturl)
print('Crawling: {0}'.format(self.rooturl))
while len(to_crawl) > 0:
discovered_urls = await self.run(urls=to_crawl)
# empty to_crawl list and then add all newly discovered URLs for
# the next iteration.
to_crawl.clear()
to_crawl.extend(discovered_urls)
print('{0} URLs crawled'.format(len(self.crawled)))
# close the ssions once all URLs have been crawled.
await self.client_session.close()
return self.crawled
class RobotsTxt(object):
'''
Retrieve and query robots.txt for a given domain.
Retrieves and parses robots.txt for the given domain. Calling the check()
method returns True or False depending on whether crawling of that given
URL is allowed.
'''
def __init__(self, rooturl=None):
'''
Manually retrieve robots.txt to allow us to set the user-agent (works
around sites which disallow access to robots.txt without a sane
user-agent).
'''
self.rooturl = rooturl
self.headers = {'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:61.0) Gecko/20100101 Firefox/61.0'}
robots_url = urljoin(self.rooturl, 'robots.txt')
request = urllib.request.Request(robots_url, headers=self.headers)
robots = urllib.robotparser.RobotFileParser()
robots.set_url(robots_url)
try:
response = urllib.request.urlopen(request, timeout=5)
except urllib.error.HTTPError:
# if robots.txt doesn't exist then allow all URLs to be crawled.
robots.allow_all = True
else:
data = response.read()
decoded_data = data.decode("utf-8").splitlines()
robots.parse(decoded_data)
self.robots = robots
def check(self, url):
'''
Test if robots allows us to crawl that URL.
'''
return self.robots.can_fetch("*", url)
def standardise_url(url=None, rooturl=None):
'''
If `rooturl` is None then we attempt to standarise the URL to ensure it
can be prepended to relative URLs. If no scheme has been provided then we
default to http as any sane https-only site should 301 redirect http to
https.
If `rooturl` is set, we standardise URLs to strip queries and fragments
(we don't want to scrape in-page anchors etc). Any relative URLs will be
appended to the root url.
Returns a standardised URL as a string.
'''
default_proto = 'http'
delim = '://'
file_extensions = ('/', 'htm', 'html', 'xhtml', 'php', 'asp', 'aspx',
'cfm')
split_url = urlsplit(url)
if not rooturl:
# This will sanitise the initial url provided by the user.
if split_url.scheme and split_url.scheme.startswith('http'):
return "".join([split_url.scheme, delim, split_url.netloc])
elif (split_url.path and not split_url.scheme and not split_url.netloc):
return "".join([default_proto, delim, split_url.path])
else:
# if url.endswith(file_extensions):
# Sanitise discovered URLs. We already expect them in the format
# protocol://rooturl/path
if url.startswith('/'):
return urljoin(rooturl, split_url.path)
elif url.startswith(rooturl):
return "".join([split_url.scheme, delim, split_url.netloc,
split_url.path])
return None