Files
web-scraper/utils/helpers.py

334 lines
9.3 KiB
Python

#!/usr/bin/env python
'''
Utilities to provide various misc functions.
'''
import urllib.request
import urllib.error
import gzip
import aiohttp
import asyncio
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlsplit
import urllib.robotparser
class AsyncCrawler(object):
'''
docstring
'''
def __init__(self, baseurl=None, robots=None, concurrency=None):
self.baseurl = baseurl
self.robots = robots
self.uncrawled = set()
self.crawled = set()
# self.headers = {'Accept-Encoding': 'gzip, deflate',
# 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:61.0) Gecko/20100101 Firefox/61.0'}
self.headers = {'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:61.0) Gecko/20100101 Firefox/61.0'}
self.client_session = None
self.semaphore = asyncio.BoundedSemaphore(concurrency)
async def crawl_url(self, url=None):
'''
docstring
'''
urls = set()
async with self.semaphore:
source = await self.get_source(url)
if source:
self.crawled.add(url)
# for new_url in self.find_all_urls(source):
# urls.add(new_url)
urls_to_crawl = self.find_all_urls(source)
# print('discovered {0} new URLs'.format(len(urls_to_crawl)))
for new_url in urls_to_crawl:
urls.add(new_url)
# add the url we just crawled to the crawled pool.
return urls
def validate_url(self, url=None):
'''
Ensures we have a valid URL to crawl and that the site's robots.txt
allows it.
'''
# ensure the URL is in a sane format
url = standardise_url(url=url, base_url=self.baseurl)
if url and self.robots.check(url=url):
# print('validated url: {0}'.format(url))
return url
else:
return False
async def get_source(self, url=None):
'''
Obtains the page's source.
'''
print('semaphore held for {0}'.format(url))
async with self.client_session.get(url, timeout=5) as resp:
try:
source = await resp.read()
return source
except Exception:
return None
def find_all_urls(self, source=None):
'''
Find all URLs in a page's source.
'''
urls = set()
html = BeautifulSoup(source, 'lxml')
hrefs = html.find_all('a', href=True)
# build a set of URLs which are valid and haven't been crawled yet
for href in hrefs:
url = self.validate_url(url=href['href'])
if url and url not in self.crawled:
urls.add(url)
return urls
async def run_loop(self):
'''
function which runs the crawler
'''
print('Crawling: {}'.format(self.baseurl))
self.client_session = aiohttp.ClientSession(headers=self.headers)
# provide the starting URL to the crawler
self.uncrawled.add(self.baseurl)
while len(self.uncrawled) > 0:
# print('################################ there are {0} uncrawled urls in the pool'.format(
# len(self.uncrawled)))
url = self.uncrawled.pop()
# print('################ url popped, there are now {0} uncrawled urls in the pool'.format(
# len(self.uncrawled)))
new_urls = await self.crawl_url(url=url)
for url in new_urls:
# print('adding: {0}'.format(url))
self.uncrawled.add(url)
await self.client_session.close()
return self.crawled
class UrlPool(object):
'''
Object to manage a pool of URLs.
'''
def __init__(self):
self.pool = set()
def check_duplicate(self, new_url):
'''
Checks if a URL exists in the current pool.
'''
if new_url in self.pool:
return True
else:
return False
def remove_from_pool(self):
'''
Remove a URL from the pool and return it to be crawled.
'''
return(self.pool.pop())
def add_to_pool(self, url):
self.pool.add(url)
def list_pool(self):
pool = self.pool
return pool
class WebPage(object):
'''
Object to manage common operations required to return
the data from each individual page.
'''
# set a sane user-agent and request compression if available.
headers = {'Accept-Encoding': 'gzip, deflate',
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:61.0) Gecko/20100101 Firefox/61.0'}
def __init__(self, url=None, base_url=None, robots=None):
self.url = url
self.base_url = base_url
self.robots = robots
self.source = None
self.urls_to_crawl = set()
def get_source(self):
'''
Retrieve a page's source.
'''
request = urllib.request.Request(self.url, headers=self.headers)
page = urllib.request.urlopen(request, timeout=5)
# handle the content encoding in case it needs decompressing.
if 'text/html' in page.info().get('Content-Type'):
if page.info().get('Content-Encoding'):
if page.info().get('Content-Encoding') == 'gzip':
self.source = gzip.decompress(page.read())
elif page.info().get('Content-Encoding') == 'deflate':
self.source = page.read()
else:
self.source = page.read()
def find_links(self):
'''
Find all URLs on a page and ensure they are absolute. If they are
relative then they will be appended to the base URL.
'''
hrefs = set()
soup = BeautifulSoup(self.source, 'lxml')
links = soup.find_all('a', href=True)
for link in links:
if link['href'].startswith('/'):
hrefs.add(urljoin(self.url, link['href']))
else:
hrefs.add(link['href'])
self.discovered_hrefs = hrefs
def parse_urls(self):
'''
Iterate through the list of discovered URLs and add them to the
pool if they start with the base URL.
'''
for url in self.discovered_hrefs:
if url.startswith(self.base_url) and self.robots.check(url):
standardised_url = sanitise_url(url=url)
self.urls_to_crawl.add(standardised_url)
def list_urls(self):
'''
Returns all valid discovered URLs.
'''
return self.urls_to_crawl
def run(self):
'''
Attempt to get the page's source and if successful, iterate through it
to find any links we can crawl.
'''
try:
self.get_source()
except Exception:
# skip if we didn't retrieve the source.
pass
if self.source:
self.find_links()
self.parse_urls()
return True
else:
return False
class RobotsTxt(object):
'''
needs a docstring
'''
def __init__(self, base_url=None):
'''
Manually retrieve robots.txt to allow us to set the user-agent.
'''
self.base_url = base_url
self.headers = {'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:61.0) Gecko/20100101 Firefox/61.0'}
robots_url = urljoin(self.base_url, 'robots.txt')
request = urllib.request.Request(robots_url, headers=self.headers)
robots = urllib.robotparser.RobotFileParser()
robots.set_url(robots_url)
try:
response = urllib.request.urlopen(request, timeout=5)
except urllib.error.HTTPError:
robots.allow_all = True
else:
data = response.read()
decoded_data = data.decode("utf-8").splitlines()
robots.parse(decoded_data)
self.robots = robots
def check(self, url):
'''
Test if robots allows us to crawl that URL.
'''
return self.robots.can_fetch("*", url)
def standardise_url(url=None, base_url=None):
'''
If `base_url` is None then we attempt to standarise the URL to ensure it can
be prepended to relative URLs. If no scheme has been provided then we default
to http as any sane https-only site should 301 redirect http > https.
If `base_url` is set, we standardise URLs to strip queries and fragments (we
don't want to scrape in-page anchors etc). Any relative URLs will be appended
to the base url.
Returns a standardised URL as a string.
'''
default_proto = 'http'
delim = '://'
split_url = urlsplit(url)
if not base_url:
# This will sanitise the initial url provided by the user.
if split_url.scheme and split_url.scheme.startswith('http'):
return "".join([split_url.scheme, delim, split_url.netloc])
elif (split_url.path and not split_url.scheme and not split_url.netloc):
return "".join([default_proto, delim, split_url.path])
else:
# Sanitise discovered URLs. We already expect them in the format
# protocol://base_url/path
if url.startswith('/'):
return urljoin(base_url, split_url.path)
elif url.startswith(base_url):
return "".join([split_url.scheme, delim, split_url.netloc, split_url.path])
return None