Added docstring to all functions. 🎉
This commit is contained in:
@@ -8,26 +8,41 @@ from urllib.error import URLError
|
||||
|
||||
logger = logging.getLogger('torrentSearch')
|
||||
|
||||
def build_url(ssl, baseUrl, path, args_dict=[]):
|
||||
def build_url(ssl, baseUrl, path, args_dict={}):
|
||||
"""
|
||||
Given the parameters joins them together to a url to
|
||||
:param bool ssl: if ssl is to be used or not
|
||||
:param str baseUrl: the start of the url (http://thepiratebay.org)
|
||||
:param list path: the rest of the path to the url (['search', 'lucifer', '0'])
|
||||
:param dict args_dict: a dict with the query element we want to append to the url
|
||||
:return: complete url based on the inputs
|
||||
:rtype: str
|
||||
"""
|
||||
url_parts = list(parse.urlparse(baseUrl))
|
||||
url_parts[0] = 'https' if ssl else 'http'
|
||||
if type(path) is list:
|
||||
url_parts[2] = '/'.join(path)
|
||||
else:
|
||||
url_parts[2] = path
|
||||
url_parts[2] = '/'.join(path)
|
||||
url_parts[4] = parse.urlencode(args_dict)
|
||||
return parse.urlunparse(url_parts)
|
||||
|
||||
# Converts a input string or list to percent-encoded string,
|
||||
# this is for encoding information in a Uniform Resource
|
||||
# Identifier (URI) using urllib
|
||||
def convert_query_to_percent_encoded_octets(input_query):
|
||||
"""
|
||||
Converts a string with spaces to a string separated by '%20'
|
||||
:param str input_query:
|
||||
:return: string with spaces replaced with '%20' if found any
|
||||
:rtype: str
|
||||
"""
|
||||
if type(input_query) is list:
|
||||
input_query = ' '.join(input_query)
|
||||
|
||||
return parse.quote(input_query)
|
||||
|
||||
def fetch_url(url):
|
||||
"""
|
||||
Call and get output for a given url
|
||||
:param str url: the url we want to make a request to
|
||||
:return: a response object with contents and status code of the request
|
||||
:rtype: http.client.HTTPResponse
|
||||
"""
|
||||
logger.debug('Fetching query: {}'.format(url))
|
||||
req = request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
|
||||
try:
|
||||
@@ -40,4 +55,4 @@ def fetch_url(url):
|
||||
elif hasattr(e, 'code'):
|
||||
logger.error('The server couldn\'t fulfill the request.')
|
||||
logger.error('Error code: ', e.code)
|
||||
sys.exit()
|
||||
sys.exit()
|
||||
@@ -12,89 +12,99 @@ from torrentSearch.utils import humansize, representsInteger
|
||||
logger = logging.getLogger('torrentSearch')
|
||||
|
||||
class Jackett(object):
|
||||
"""docstring for Jackett"""
|
||||
def __init__(self, apikey, host, path, limit, ssl):
|
||||
super(Jackett, self).__init__()
|
||||
self.apikey = apikey
|
||||
self.host = host
|
||||
self.path = path
|
||||
self.page_limit = limit
|
||||
self.ssl = ssl
|
||||
"""docstring for Jackett"""
|
||||
def __init__(self, apikey, host, path, limit, ssl):
|
||||
super(Jackett, self).__init__()
|
||||
self.apikey = apikey
|
||||
self.host = host
|
||||
self.path = path
|
||||
self.page_limit = limit
|
||||
self.ssl = ssl
|
||||
|
||||
# Returns the api key set in the initiator
|
||||
# return [string]
|
||||
def get_apikey(self):
|
||||
logger.debug('Using api key: {}'.format(self.apikey))
|
||||
return self.apikey
|
||||
def get_apikey(self):
|
||||
logger.debug('Using api key: {}'.format(self.apikey))
|
||||
return self.apikey
|
||||
|
||||
# Returns the path set in the initiator
|
||||
# return [string]
|
||||
def get_path(self):
|
||||
return self.path
|
||||
def get_path(self):
|
||||
return self.path
|
||||
|
||||
# Returns the page_limit set in the initiator
|
||||
# return [string]
|
||||
def get_page_limit(self):
|
||||
logger.debug('Current page limit: {} pages'.format(self.page_limit))
|
||||
return self.page_limit
|
||||
def get_page_limit(self):
|
||||
logger.debug('Current page limit: {} pages'.format(self.page_limit))
|
||||
return self.page_limit
|
||||
|
||||
# Starts the call to getting result from our indexer
|
||||
# query [string]
|
||||
# returns [List of Torrent objects]
|
||||
def search(self, query):
|
||||
baseUrl = 'http://' + self.host
|
||||
path = self.get_path()
|
||||
url_args = {
|
||||
'apikey': self.get_apikey(),
|
||||
'limit': self.get_page_limit(),
|
||||
'q': query
|
||||
}
|
||||
logger.debug('Url arguments for jackett search: {}'.format(url_args))
|
||||
def search(self, query):
|
||||
"""
|
||||
Starts the call to getting result from our indexer
|
||||
:param jackett.Jackett self: object instance
|
||||
:param str query: query we want to search for
|
||||
:return: list of results we found from scraping jackett output based on query
|
||||
:rtype: list
|
||||
"""
|
||||
baseUrl = 'http://' + self.host
|
||||
path = self.get_path()
|
||||
url_args = {
|
||||
'apikey': self.get_apikey(),
|
||||
'limit': self.get_page_limit(),
|
||||
'q': query
|
||||
}
|
||||
logger.debug('Url arguments for jackett search: {}'.format(url_args))
|
||||
|
||||
url = build_url(self.ssl, baseUrl, path, url_args)
|
||||
res = fetch_url(url)
|
||||
url = build_url(self.ssl, baseUrl, path, url_args)
|
||||
res = fetch_url(url)
|
||||
|
||||
return self.parse_xml_for_torrents(res.read())
|
||||
return self.parse_xml_for_torrents(res.read())
|
||||
|
||||
|
||||
# def __init__(self, name, magnet=None, size=None, uploader=None, date=None,
|
||||
# seed_count=None, leech_count=None, url=None):
|
||||
def find_xml_attribute(self, xml_element, attr):
|
||||
"""
|
||||
Finds a specific XML attribute given a element name
|
||||
:param jackett.Jackett self: object instance
|
||||
:param xml.etree.ElementTree.Element xml_element: the xml tree we want to search
|
||||
:param str attr: the attribute/element name we want to find in the xml tree
|
||||
:return: the value of the element fiven the attr/element name
|
||||
:rtype: str
|
||||
"""
|
||||
value = xml_element.find(attr)
|
||||
if (value != None):
|
||||
logger.debug('Found attribute: {}'.format(attr))
|
||||
return value.text
|
||||
else:
|
||||
logger.warning('Could not find attribute: {}'.format(attr))
|
||||
return ''
|
||||
|
||||
def find_xml_attribute(self, xml_element, attr):
|
||||
value = xml_element.find(attr)
|
||||
if (value != None):
|
||||
logger.debug('Found attribute: {}'.format(attr))
|
||||
return value.text
|
||||
else:
|
||||
logger.warning('Could not find attribute: {}'.format(attr))
|
||||
return ''
|
||||
def parse_xml_for_torrents(self, raw_xml):
|
||||
"""
|
||||
Finds a specific XML attribute given a element name
|
||||
:param jackett.Jackett self: object instance
|
||||
:param bytes raw_xml: the xml page returned by querying jackett
|
||||
:return: all the torrents we found in the xml page
|
||||
:rtype: list
|
||||
"""
|
||||
tree = ET.fromstring(raw_xml)
|
||||
channel = tree.find('channel')
|
||||
results = []
|
||||
for child in channel.findall('item'):
|
||||
title = self.find_xml_attribute(child, 'title')
|
||||
date = self.find_xml_attribute(child, 'pubDate')
|
||||
magnet = self.find_xml_attribute(child, 'link')
|
||||
size = self.find_xml_attribute(child, 'size')
|
||||
files = self.find_xml_attribute(child, 'files')
|
||||
seeders = 0
|
||||
peers = 0
|
||||
|
||||
def parse_xml_for_torrents(self, raw_xml):
|
||||
tree = ET.fromstring(raw_xml)
|
||||
channel = tree.find('channel')
|
||||
results = []
|
||||
for child in channel.findall('item'):
|
||||
title = self.find_xml_attribute(child, 'title')
|
||||
date = self.find_xml_attribute(child, 'pubDate')
|
||||
magnet = self.find_xml_attribute(child, 'link')
|
||||
size = self.find_xml_attribute(child, 'size')
|
||||
files = self.find_xml_attribute(child, 'files')
|
||||
seeders = 0
|
||||
peers = 0
|
||||
for elm in child.findall('{http://torznab.com/schemas/2015/feed}attr'):
|
||||
if elm.get('name') == 'seeders':
|
||||
seeders = elm.get('value')
|
||||
if elm.get('name') == 'peers':
|
||||
peers = elm.get('value')
|
||||
|
||||
for elm in child.findall('{http://torznab.com/schemas/2015/feed}attr'):
|
||||
if elm.get('name') == 'seeders':
|
||||
seeders = elm.get('value')
|
||||
if elm.get('name') == 'peers':
|
||||
peers = elm.get('value')
|
||||
if (size != '' and representsInteger(size)):
|
||||
size = humansize(int(size))
|
||||
|
||||
if (size != '' and representsInteger(size)):
|
||||
size = humansize(int(size))
|
||||
logger.debug('Found torrent with info: \n\ttitle: {}\n\tmagnet: {}\n\tsize: {}\n\tdate: {}\
|
||||
\n\tseeders: {}\n\tpeers: {}'.format(title, magnet, size, date, seeders, peers))
|
||||
torrent = Torrent(title, magnet=magnet, size=size, date=date, seed_count=seeders, leech_count=peers)
|
||||
results.append(torrent)
|
||||
|
||||
logger.debug('Found torrent with info: \n\ttitle: {}\n\tmagnet: {}\n\tsize: {}\n\tdate: {}\
|
||||
\n\tseeders: {}\n\tpeers: {}'.format(title, magnet, size, date, seeders, peers))
|
||||
torrent = Torrent(title, magnet=magnet, size=size, date=date, seed_count=seeders, leech_count=peers)
|
||||
results.append(torrent)
|
||||
|
||||
return results
|
||||
return results
|
||||
|
||||
|
||||
@@ -11,98 +11,97 @@ from torrentSearch.torrent import Torrent
|
||||
logger = logging.getLogger('torrentSearch')
|
||||
|
||||
class Piratebay(object):
|
||||
"""docstring for Piratebay"""
|
||||
def __init__(self, host, path, limit, ssl):
|
||||
super(Piratebay, self).__init__()
|
||||
self.host = host
|
||||
self.path = path
|
||||
self.page_limit = limit
|
||||
self.ssl = ssl
|
||||
self.page = 0
|
||||
self.total_pages = -1
|
||||
"""docstring for Piratebay"""
|
||||
def __init__(self, host, path, limit, ssl):
|
||||
super(Piratebay, self).__init__()
|
||||
self.host = host
|
||||
self.path = path
|
||||
self.page_limit = limit
|
||||
self.ssl = ssl
|
||||
self.page = 0
|
||||
self.total_pages = -1
|
||||
|
||||
# Returns the path set in the initiator
|
||||
# return [string]
|
||||
def get_path(self):
|
||||
return self.path
|
||||
def get_path(self):
|
||||
return self.path
|
||||
|
||||
# Returns the page_limit set in the initiator
|
||||
# return [string]
|
||||
def get_page_limit(self):
|
||||
return self.page_limit
|
||||
def get_page_limit(self):
|
||||
return self.page_limit
|
||||
|
||||
# Starts the call to getting result from our indexer
|
||||
# query [string]
|
||||
# returns [List of Torrent objects]
|
||||
def search(self, query):
|
||||
search_query = convert_query_to_percent_encoded_octets(query)
|
||||
baseUrl = 'http://' + self.host
|
||||
|
||||
path = [self.get_path(), search_query, str(self.page)]
|
||||
url = build_url(self.ssl, baseUrl, path)
|
||||
def search(self, query):
|
||||
"""
|
||||
Starts the call to getting result from our thepiratebay site
|
||||
:param piratebay.Piratebay self: object instance
|
||||
:param str query: query we want to search for
|
||||
:return: list of results we found from scraping thepiratebay site based on query
|
||||
:rtype: list
|
||||
"""
|
||||
search_query = convert_query_to_percent_encoded_octets(query)
|
||||
baseUrl = 'http://' + self.host
|
||||
|
||||
path = [self.get_path(), search_query, str(self.page)]
|
||||
url = build_url(self.ssl, baseUrl, path)
|
||||
|
||||
res = fetch_url(url)
|
||||
res = fetch_url(url)
|
||||
|
||||
return self.parse_raw_page_for_torrents(res.read())
|
||||
return self.parse_raw_page_for_torrents(res.read())
|
||||
|
||||
def removeHeader(self, bs4_element):
|
||||
if ('header' in bs4_element['class']):
|
||||
return bs4_element.find_next('tr')
|
||||
|
||||
def removeHeader(self, bs4_element):
|
||||
if ('header' in bs4_element['class']):
|
||||
return bs4_element.find_next('tr')
|
||||
return bs4_element
|
||||
|
||||
return bs4_element
|
||||
def has_magnet(self, href):
|
||||
return href and re.compile('magnet').search(href)
|
||||
|
||||
def has_magnet(self, href):
|
||||
return href and re.compile('magnet').search(href)
|
||||
def parse_raw_page_for_torrents(self, content):
|
||||
soup = BeautifulSoup(content, 'html.parser')
|
||||
content_searchResult = soup.body.find(id='searchResult')
|
||||
|
||||
def parse_raw_page_for_torrents(self, content):
|
||||
soup = BeautifulSoup(content, 'html.parser')
|
||||
content_searchResult = soup.body.find(id='searchResult')
|
||||
if content_searchResult is None:
|
||||
logging.info('No torrents found for the search criteria.')
|
||||
return None
|
||||
|
||||
listElements = content_searchResult.tr
|
||||
|
||||
torrentWrapper = self.removeHeader(listElements)
|
||||
|
||||
if content_searchResult is None:
|
||||
logging.info('No torrents found for the search criteria.')
|
||||
return None
|
||||
|
||||
listElements = content_searchResult.tr
|
||||
|
||||
torrentWrapper = self.removeHeader(listElements)
|
||||
torrents_found = []
|
||||
for torrentElement in torrentWrapper.find_all_next('td'):
|
||||
if torrentElement.find_all("div", class_='detName'):
|
||||
|
||||
torrents_found = []
|
||||
for torrentElement in torrentWrapper.find_all_next('td'):
|
||||
if torrentElement.find_all("div", class_='detName'):
|
||||
name = torrentElement.find('a', class_='detLink').get_text()
|
||||
url = torrentElement.find('a', class_='detLink')['href']
|
||||
magnet = torrentElement.find(href=self.has_magnet)
|
||||
|
||||
uploader = torrentElement.find('a', class_='detDesc')
|
||||
|
||||
name = torrentElement.find('a', class_='detLink').get_text()
|
||||
url = torrentElement.find('a', class_='detLink')['href']
|
||||
magnet = torrentElement.find(href=self.has_magnet)
|
||||
|
||||
uploader = torrentElement.find('a', class_='detDesc')
|
||||
if uploader is None:
|
||||
uploader = torrentElement.find('i')
|
||||
|
||||
uploader = uploader.get_text()
|
||||
|
||||
if uploader is None:
|
||||
uploader = torrentElement.find('i')
|
||||
|
||||
uploader = uploader.get_text()
|
||||
info_text = torrentElement.find('font', class_='detDesc').get_text()
|
||||
|
||||
date = return_re_match(info_text, r"(\d+\-\d+\s\d+)|(Y\-day\s\d{2}\:\d{2})")
|
||||
size = return_re_match(info_text, r"(\d+(\.\d+)?\s[a-zA-Z]+)")
|
||||
byteSize = deHumansize(size)
|
||||
|
||||
info_text = torrentElement.find('font', class_='detDesc').get_text()
|
||||
|
||||
date = return_re_match(info_text, r"(\d+\-\d+\s\d+)|(Y\-day\s\d{2}\:\d{2})")
|
||||
size = return_re_match(info_text, r"(\d+(\.\d+)?\s[a-zA-Z]+)")
|
||||
byteSize = deHumansize(size)
|
||||
# COULD NOT FIND HREF!
|
||||
if (magnet is None):
|
||||
logger.warning('Could not find magnet for {}'.format(name))
|
||||
continue
|
||||
|
||||
# COULD NOT FIND HREF!
|
||||
if (magnet is None):
|
||||
logger.warning('Could not find magnet for {}'.format(name))
|
||||
continue
|
||||
seed_and_leech = torrentElement.find_all_next(attrs={"align": "right"})
|
||||
seed = seed_and_leech[0].get_text()
|
||||
leech = seed_and_leech[1].get_text()
|
||||
|
||||
seed_and_leech = torrentElement.find_all_next(attrs={"align": "right"})
|
||||
seed = seed_and_leech[0].get_text()
|
||||
leech = seed_and_leech[1].get_text()
|
||||
torrent = Torrent(name, magnet['href'], byteSize, uploader, date, seed, leech, url)
|
||||
|
||||
torrent = Torrent(name, magnet['href'], byteSize, uploader, date, seed, leech, url)
|
||||
torrents_found.append(torrent)
|
||||
else:
|
||||
logger.warning('Could not find torrent element on thepiratebay webpage.')
|
||||
continue
|
||||
|
||||
torrents_found.append(torrent)
|
||||
else:
|
||||
logger.warning('Could not find torrent element on thepiratebay webpage.')
|
||||
continue
|
||||
|
||||
logging.info('Found %s torrents for given search criteria.' % len(torrents_found))
|
||||
return torrents_found
|
||||
logging.info('Found %s torrents for given search criteria.' % len(torrents_found))
|
||||
return torrents_found
|
||||
|
||||
@@ -79,7 +79,9 @@ def main():
|
||||
def getConfig():
|
||||
"""
|
||||
Read path and get configuartion file with site settings
|
||||
Returns config [configparser]
|
||||
|
||||
:return: config settings read from 'config.ini'
|
||||
:rtype: configparser.ConfigParser
|
||||
"""
|
||||
config = configparser.ConfigParser()
|
||||
config_dir = os.path.join(BASE_DIR, 'config.ini')
|
||||
@@ -90,7 +92,10 @@ def getConfig():
|
||||
def createJSONList(torrents):
|
||||
"""
|
||||
Iterates over all torrent objects in torrents and gets all attributes which are appended to a list
|
||||
Returns: List of torrents with all their info in a JSON format
|
||||
|
||||
:param list torrents: integer of size of torrent file
|
||||
:return: List of torrents with all their info in a JSON format
|
||||
:rtype: str
|
||||
"""
|
||||
jsonList = []
|
||||
for torrent in torrents:
|
||||
@@ -129,7 +134,13 @@ def chooseCandidate(torrent_list):
|
||||
def searchTorrentSite(config, query, site, print_result):
|
||||
"""
|
||||
Selects site based on input and finds torrents for that site based on query
|
||||
Returns json list with results. If print_results is True in args then also prints the output to terminal
|
||||
|
||||
:param configparser.ConfigParser config: integer of size of torrent filest
|
||||
:param str query: query to search search torrents for
|
||||
:param str site: the site we want to index/scrape
|
||||
:param boolean print_result: if the in results should be printed to terminal
|
||||
:return: json list with results
|
||||
:rtype: str
|
||||
"""
|
||||
logger.debug('Searching for query {} at {}'.format(query, site))
|
||||
|
||||
|
||||
Reference in New Issue
Block a user