Scrape YouTube by default and optionally use YouTube API to perform searches (#250)

* YouTube scraping

* Cleanup GenerateYouTubeURL class

* Some minor improvements

* Add test to fetch title with and without api key
This commit is contained in:
Ritiek Malhotra
2018-03-09 20:40:15 +05:30
committed by GitHub
parent 46f313777b
commit b968b5d206
7 changed files with 254 additions and 113 deletions

View File

@@ -23,6 +23,7 @@ default_conf = { 'spotify-downloader':
'music-videos-only' : False,
'no-spaces' : False,
'file-format' : '{artist} - {track_name}',
'youtube-api-key' : None,
'log-level' : 'INFO' }
}
@@ -40,7 +41,7 @@ def merge(default, config):
merged.update(config)
return merged
def get_config(config_file):
try:
with open(config_file, 'r') as ymlfile:
@@ -57,21 +58,22 @@ def override_config(config_file, parser, raw_args=None):
""" Override default dict with config dict passed as comamnd line argument. """
config_file = os.path.realpath(config_file)
config = merge(default_conf['spotify-downloader'], get_config(config_file))
parser.set_defaults(manual=config['manual'])
parser.set_defaults(no_metadata=config['no-metadata'])
parser.set_defaults(avconv=config['avconv'])
parser.set_defaults(folder=os.path.relpath(config['folder'], os.getcwd()))
parser.set_defaults(overwrite=config['overwrite'])
parser.set_defaults(input_ext=config['input-ext'])
parser.set_defaults(output_ext=config['output-ext'])
parser.set_defaults(download_only_metadata=config['download-only-metadata'])
parser.set_defaults(dry_run=config['dry-run'])
parser.set_defaults(file_format=config['file-format'])
parser.set_defaults(folder=os.path.relpath(config['folder'], os.getcwd()))
parser.set_defaults(input_ext=config['input-ext'])
parser.set_defaults(log_level=config['log-level'])
parser.set_defaults(manual=config['manual'])
parser.set_defaults(music_videos_only=config['music-videos-only'])
parser.set_defaults(no_metadata=config['no-metadata'])
parser.set_defaults(no_spaces=config['no-spaces'])
parser.set_defaults(output_ext=config['output-ext'])
parser.set_defaults(overwrite=config['overwrite'])
parser.set_defaults(file_format=config['file-format'])
parser.set_defaults(no_spaces=config['youtube-api-key'])
parser.set_defaults(log_level=config['log-level'])
return parser.parse_args(raw_args)
@@ -151,15 +153,18 @@ def get_arguments(raw_args=None, to_group=True, to_merge=True):
choices=_LOG_LEVELS_STR,
type=str.upper,
help='set log verbosity')
parser.add_argument(
'-yk', '--youtube-api-key', default=config['youtube-api-key'],
help=argparse.SUPPRESS)
parser.add_argument(
'-c', '--config', default=None,
help='Replace with custom config.yml file')
help='Replace with custom config.yml file')
parsed = parser.parse_args(raw_args)
if parsed.config is not None and to_merge:
parsed = override_config(parsed.config,parser)
parsed = override_config(parsed.config, parser)
parsed.log_level = log_leveller(parsed.log_level)
return parsed

View File

@@ -120,6 +120,19 @@ def videotime_from_seconds(time):
return '{0}:{1:02}:{2:02}'.format((time//60)//60, (time//60) % 60, time % 60)
def get_sec(time_str):
v = time_str.split(':', 3)
v.reverse()
sec = 0
if len(v) > 0: # seconds
sec += int(v[0])
if len(v) > 1: # minutes
sec += int(v[1]) * 60
if len(v) > 2: # hours
sec += int(v[2]) * 3600
return sec
def get_splits(url):
if '/' in url:
if url.endswith('/'):
@@ -127,4 +140,4 @@ def get_splits(url):
splits = url.split('/')
else:
splits = url.split(':')
return splits
return splits

View File

@@ -1,3 +1,5 @@
from bs4 import BeautifulSoup
import urllib
import pafy
from core import internals
@@ -7,13 +9,21 @@ import os
import pprint
log = const.log
# Please respect this YouTube token :)
pafy.set_api_key('AIzaSyAnItl3udec-Q1d5bkjKJGL-RgrKO_vU90')
# Fix download speed throttle on short duration tracks
# Read more on mps-youtube/pafy#199
pafy.g.opener.addheaders.append(('Range', 'bytes=0-'))
def set_api_key():
if const.args.youtube_api_key:
key = const.args.youtube_api_key
else:
# Please respect this YouTube token :)
key = 'AIzaSyAnItl3udec-Q1d5bkjKJGL-RgrKO_vU90'
pafy.set_api_key(key)
def go_pafy(raw_song, meta_tags=None):
""" Parse track from YouTube. """
if internals.is_youtube(raw_song):
@@ -58,92 +68,175 @@ def download_song(file_name, content):
return False
def generate_youtube_url(raw_song, meta_tags, tries_remaining=5):
""" Search for the song on YouTube and generate a URL to its video. """
# prevents an infinite loop but allows for a few retries
if tries_remaining == 0:
log.debug('No tries left. I quit.')
return
query = { 'part' : 'snippet',
'maxResults' : 50,
'type' : 'video' }
if const.args.music_videos_only:
query['videoCategoryId'] = '10'
if not meta_tags:
song = raw_song
query['q'] = song
else:
song = '{0} - {1}'.format(meta_tags['artists'][0]['name'],
meta_tags['name'])
query['q'] = song
log.debug('query: {0}'.format(query))
data = pafy.call_gdata('search', query)
data['items'] = list(filter(lambda x: x['id'].get('videoId') is not None,
data['items']))
query_results = {'part': 'contentDetails,snippet,statistics',
'maxResults': 50,
'id': ','.join(i['id']['videoId'] for i in data['items'])}
log.debug('query_results: {0}'.format(query_results))
vdata = pafy.call_gdata('videos', query_results)
videos = []
for x in vdata['items']:
duration_s = pafy.playlist.parseISO8591(x['contentDetails']['duration'])
youtubedetails = {'link': x['id'], 'title': x['snippet']['title'],
'videotime':internals.videotime_from_seconds(duration_s),
'seconds': duration_s}
videos.append(youtubedetails)
if not meta_tags:
break
if not videos:
return None
if const.args.manual:
log.info(song)
log.info('0. Skip downloading this song.\n')
# fetch all video links on first page on YouTube
for i, v in enumerate(videos):
log.info(u'{0}. {1} {2} {3}'.format(i+1, v['title'], v['videotime'],
"http://youtube.com/watch?v="+v['link']))
# let user select the song to download
result = internals.input_link(videos)
if not result:
return None
else:
if not meta_tags:
# if the metadata could not be acquired, take the first result
# from Youtube because the proper song length is unknown
result = videos[0]
log.debug('Since no metadata found on Spotify, going with the first result')
else:
# filter out videos that do not have a similar length to the Spotify song
duration_tolerance = 10
max_duration_tolerance = 20
possible_videos_by_duration = list()
'''
start with a reasonable duration_tolerance, and increment duration_tolerance
until one of the Youtube results falls within the correct duration or
the duration_tolerance has reached the max_duration_tolerance
'''
while len(possible_videos_by_duration) == 0:
possible_videos_by_duration = list(filter(lambda x: abs(x['seconds'] - meta_tags['duration']) <= duration_tolerance, videos))
duration_tolerance += 1
if duration_tolerance > max_duration_tolerance:
log.error("{0} by {1} was not found.\n".format(meta_tags['name'], meta_tags['artists'][0]['name']))
return None
result = possible_videos_by_duration[0]
if result:
url = "http://youtube.com/watch?v=" + result['link']
else:
url = None
def generate_search_url(song):
""" Generate YouTube search URL for the given song. """
# urllib.request.quote() encodes URL with special characters
song = urllib.request.quote(song)
# Special YouTube URL filter to search only for videos
url = 'https://www.youtube.com/results?sp=EgIQAQ%253D%253D&q={0}'.format(song)
return url
def is_video(result):
# ensure result is not a channel
not_video = result.find('channel') is not None or \
'yt-lockup-channel' in result.parent.attrs['class'] or \
'yt-lockup-channel' in result.attrs['class']
# ensure result is not a mix/playlist
not_video = not_video or \
'yt-lockup-playlist' in result.parent.attrs['class']
# ensure video result is not an advertisement
not_video = not_video or \
result.find('googleads') is not None
video = not not_video
return video
def generate_youtube_url(raw_song, meta_tags):
url_fetch = GenerateYouTubeURL(raw_song, meta_tags)
if const.args.youtube_api_key:
url = url_fetch.api()
else:
url = url_fetch.scrape()
return url
class GenerateYouTubeURL:
def __init__(self, raw_song, meta_tags):
self.raw_song = raw_song
self.meta_tags = meta_tags
def _best_match(self, videos):
""" Select the best matching video from a list of videos. """
if const.args.manual:
log.info(self.raw_song)
log.info('0. Skip downloading this song.\n')
# fetch all video links on first page on YouTube
for i, v in enumerate(videos):
log.info(u'{0}. {1} {2} {3}'.format(i+1, v['title'], v['videotime'],
"http://youtube.com/watch?v="+v['link']))
# let user select the song to download
result = internals.input_link(videos)
if result is None:
return None
else:
if not self.meta_tags:
# if the metadata could not be acquired, take the first result
# from Youtube because the proper song length is unknown
result = videos[0]
log.debug('Since no metadata found on Spotify, going with the first result')
else:
# filter out videos that do not have a similar length to the Spotify song
duration_tolerance = 10
max_duration_tolerance = 20
possible_videos_by_duration = list()
'''
start with a reasonable duration_tolerance, and increment duration_tolerance
until one of the Youtube results falls within the correct duration or
the duration_tolerance has reached the max_duration_tolerance
'''
while len(possible_videos_by_duration) == 0:
possible_videos_by_duration = list(filter(lambda x: abs(x['seconds'] - self.meta_tags['duration']) <= duration_tolerance, videos))
duration_tolerance += 1
if duration_tolerance > max_duration_tolerance:
log.error("{0} by {1} was not found.\n".format(self.meta_tags['name'], self.meta_tags['artists'][0]['name']))
return None
result = possible_videos_by_duration[0]
if result:
url = "http://youtube.com/watch?v=" + result['link']
else:
url = None
return url
def scrape(self, tries_remaining=5):
""" Search and scrape YouTube to return a list of matching videos. """
# prevents an infinite loop but allows for a few retries
if tries_remaining == 0:
log.debug('No tries left. I quit.')
return
if self.meta_tags is None:
song = self.raw_song
search_url = generate_search_url(song)
else:
song = internals.generate_songname(const.args.file_format,
self.meta_tags)
search_url = generate_search_url(song)
log.debug('Opening URL: {0}'.format(search_url))
item = urllib.request.urlopen(search_url).read()
items_parse = BeautifulSoup(item, "html.parser")
videos = []
for x in items_parse.find_all('div', {'class': 'yt-lockup-dismissable yt-uix-tile'}):
if not is_video(x):
continue
y = x.find('div', class_='yt-lockup-content')
link = y.find('a')['href'][-11:]
title = y.find('a')['title']
try:
videotime = x.find('span', class_="video-time").get_text()
except AttributeError:
log.debug('Could not find video duration on YouTube, retrying..')
return generate_youtube_url(self.raw_song, self.meta_tags, tries_remaining - 1)
youtubedetails = {'link': link, 'title': title, 'videotime': videotime,
'seconds': internals.get_sec(videotime)}
videos.append(youtubedetails)
if self.meta_tags is None:
break
return self._best_match(videos)
def api(self):
""" Use YouTube API to search and return a list of matching videos. """
query = { 'part' : 'snippet',
'maxResults' : 50,
'type' : 'video' }
if const.args.music_videos_only:
query['videoCategoryId'] = '10'
if not self.meta_tags:
song = self.raw_song
query['q'] = song
else:
song = '{0} - {1}'.format(self.meta_tags['artists'][0]['name'],
self.meta_tags['name'])
query['q'] = song
log.debug('query: {0}'.format(query))
data = pafy.call_gdata('search', query)
data['items'] = list(filter(lambda x: x['id'].get('videoId') is not None,
data['items']))
query_results = {'part': 'contentDetails,snippet,statistics',
'maxResults': 50,
'id': ','.join(i['id']['videoId'] for i in data['items'])}
log.debug('query_results: {0}'.format(query_results))
vdata = pafy.call_gdata('videos', query_results)
videos = []
for x in vdata['items']:
duration_s = pafy.playlist.parseISO8591(x['contentDetails']['duration'])
youtubedetails = {'link': x['id'], 'title': x['snippet']['title'],
'videotime':internals.videotime_from_seconds(duration_s),
'seconds': duration_s}
videos.append(youtubedetails)
if not self.meta_tags:
break
return self._best_match(videos)