From 7413c541d3bfe99cbb0b87b22b5df26c9e742c2e Mon Sep 17 00:00:00 2001 From: Ritiek Malhotra Date: Sun, 22 Mar 2020 21:44:04 +0530 Subject: [PATCH] Decouple fetching metadata --- spotdl/downloaders/youtube.py | 412 ------------------ spotdl/encode/encode_base.py | 39 +- spotdl/encode/encoders/ffmpeg.py | 26 +- spotdl/lyrics/lyric_base.py | 15 + spotdl/lyrics/providers/genius.py | 20 +- spotdl/lyrics/providers/lyricwikia_wrapper.py | 3 + spotdl/metadata/__init__.py | 2 + spotdl/metadata/metadata_base.py | 69 +++ spotdl/metadata/providers/__init__.py | 2 + spotdl/metadata/providers/spotify.py | 66 +++ spotdl/metadata/providers/youtube.py | 140 ++++++ spotdl/patch/patcher.py | 64 --- 12 files changed, 373 insertions(+), 485 deletions(-) delete mode 100644 spotdl/downloaders/youtube.py create mode 100644 spotdl/metadata/__init__.py create mode 100644 spotdl/metadata/metadata_base.py create mode 100644 spotdl/metadata/providers/__init__.py create mode 100644 spotdl/metadata/providers/spotify.py create mode 100644 spotdl/metadata/providers/youtube.py delete mode 100644 spotdl/patch/patcher.py diff --git a/spotdl/downloaders/youtube.py b/spotdl/downloaders/youtube.py deleted file mode 100644 index c8346f2..0000000 --- a/spotdl/downloaders/youtube.py +++ /dev/null @@ -1,412 +0,0 @@ -from bs4 import BeautifulSoup -import urllib -import pafy - -from slugify import slugify -from logzero import logger as log -import os - -from spotdl import spotify_tools -from spotdl import internals -from spotdl import const - -# Fix download speed throttle on short duration tracks -# Read more on mps-youtube/pafy#199 -pafy.g.opener.addheaders.append(("Range", "bytes=0-")) - -# Implement unreleased methods on Pafy object -# More info: https://github.com/mps-youtube/pafy/pull/211 -if pafy.__version__ <= "0.5.5": - from spotdl import patcher - - pafy_patcher = patcher.PatchPafy() - pafy_patcher.patch_getbestthumb() - pafy_patcher.patch_process_streams() - pafy_patcher.patch_insecure_streams() - - -def set_api_key(): - if const.args.youtube_api_key: - key = const.args.youtube_api_key - else: - # Please respect this YouTube token :) - key = "AIzaSyC6cEeKlxtOPybk9sEe5ksFN5sB-7wzYp0" - pafy.set_api_key(key) - - -def go_pafy(raw_song, meta_tags=None): - """ Parse track from YouTube. """ - if internals.is_youtube(raw_song): - track_info = pafy.new(raw_song) - else: - track_url = generate_youtube_url(raw_song, meta_tags) - - if track_url: - track_info = pafy.new(track_url) - else: - track_info = None - - return track_info - - -def match_video_and_metadata(track): - """ Get and match track data from YouTube and Spotify. """ - meta_tags = None - - def fallback_metadata(meta_tags): - fallback_metadata_info = ( - "Track not found on Spotify, falling back on YouTube metadata" - ) - skip_fallback_metadata_warning = ( - "Fallback condition not met, shall not embed metadata" - ) - if meta_tags is None: - if const.args.no_fallback_metadata: - log.warning(skip_fallback_metadata_warning) - else: - log.info(fallback_metadata_info) - meta_tags = generate_metadata(content) - return meta_tags - - if internals.is_youtube(track): - log.debug("Input song is a YouTube URL") - content = go_pafy(track, meta_tags=None) - track = slugify(content.title).replace("-", " ") - if not const.args.no_metadata: - meta_tags = spotify_tools.generate_metadata(track) - meta_tags = fallback_metadata(meta_tags) - - elif internals.is_spotify(track): - log.debug("Input song is a Spotify URL") - # Let it generate metadata, YouTube doesn't know Spotify slang - meta_tags = spotify_tools.generate_metadata(track) - content = go_pafy(track, meta_tags) - if const.args.no_metadata: - meta_tags = None - - else: - log.debug("Input song is plain text based") - if const.args.no_metadata: - content = go_pafy(track, meta_tags=None) - else: - meta_tags = spotify_tools.generate_metadata(track) - content = go_pafy(track, meta_tags=meta_tags) - meta_tags = fallback_metadata(meta_tags) - - return content, meta_tags - - -def generate_metadata(content): - """ Fetch a song's metadata from YouTube. """ - meta_tags = { - "spotify_metadata": False, - "name": content.title, - "artists": [{"name": content.author}], - "duration": content.length, - "external_urls": {"youtube": content.watchv_url}, - "album": { - "images": [{"url": content.getbestthumb()}], - "artists": [{"name": None}], - "name": None, - }, - "year": None, - "release_date": None, - "type": "track", - "disc_number": 1, - "track_number": 1, - "total_tracks": 1, - "publisher": None, - "external_ids": {"isrc": None}, - "lyrics": None, - "copyright": None, - "genre": None, - } - - # Workaround for - # https://github.com/ritiek/spotify-downloader/issues/671 - try: - meta_tags["year"] = content.published.split("-")[0] - meta_tags["release_date"] = content.published.split(" ")[0] - except pafy.util.GdataError: - pass - - return meta_tags - - -def get_youtube_title(content, number=None): - """ Get the YouTube video's title. """ - title = content.title - if number: - return "{0}. {1}".format(number, title) - else: - return title - - -def generate_m3u(track_file): - tracks = internals.get_unique_tracks(track_file) - target_file = "{}.m3u".format(track_file.split(".")[0]) - total_tracks = len(tracks) - log.info("Generating {0} from {1} YouTube URLs".format(target_file, total_tracks)) - with open(target_file, "w") as output_file: - output_file.write("#EXTM3U\n\n") - - videos = [] - for n, track in enumerate(tracks, 1): - content, _ = match_video_and_metadata(track) - if content is None: - log.warning("Skipping {}".format(track)) - else: - log.info( - "Matched track {0}/{1} ({2})".format( - n, total_tracks, content.watchv_url - ) - ) - log.debug(track) - m3u_key = "#EXTINF:{duration},{title}\n{youtube_url}\n".format( - duration=internals.get_sec(content.duration), - title=content.title, - youtube_url=content.watchv_url, - ) - log.debug(m3u_key) - with open(target_file, "a") as output_file: - output_file.write(m3u_key) - videos.append(content.watchv_url) - - return videos - - -def download_song(file_name, content): - """ Download the audio file from YouTube. """ - _, extension = os.path.splitext(file_name) - if extension in (".webm", ".m4a"): - link = content.getbestaudio(preftype=extension[1:]) - else: - log.debug("No audio streams available for {} type".format(extension)) - return False - - if link: - log.debug("Downloading from URL: " + link.url) - filepath = os.path.join(const.args.folder, file_name) - log.debug("Saving to: " + filepath) - link.download(filepath=filepath) - return True - else: - log.debug("No audio streams available") - return False - - -def generate_search_url(query): - """ Generate YouTube search URL for the given song. """ - # urllib.request.quote() encodes string with special characters - quoted_query = urllib.request.quote(query) - # Special YouTube URL filter to search only for videos - url = "https://www.youtube.com/results?sp=EgIQAQ%253D%253D&q={0}".format( - quoted_query - ) - return url - - -def is_video(result): - # ensure result is not a channel - not_video = ( - result.find("channel") is not None - or "yt-lockup-channel" in result.parent.attrs["class"] - or "yt-lockup-channel" in result.attrs["class"] - ) - - # ensure result is not a mix/playlist - not_video = not_video or "yt-lockup-playlist" in result.parent.attrs["class"] - - # ensure video result is not an advertisement - not_video = not_video or result.find("googleads") is not None - - video = not not_video - return video - - -def generate_youtube_url(raw_song, meta_tags): - url_fetch = GenerateYouTubeURL(raw_song, meta_tags) - if const.args.youtube_api_key: - url = url_fetch.api() - else: - url = url_fetch.scrape() - return url - - -class GenerateYouTubeURL: - def __init__(self, raw_song, meta_tags): - self.raw_song = raw_song - self.meta_tags = meta_tags - - if meta_tags is None: - self.search_query = raw_song - else: - self.search_query = internals.format_string( - const.args.search_format, meta_tags, force_spaces=True - ) - - def _best_match(self, videos): - if not videos: - log.error("No videos found on YouTube for a given search") - return None - - """ Select the best matching video from a list of videos. """ - if const.args.manual: - log.info(self.raw_song) - log.info("0. Skip downloading this song.\n") - # fetch all video links on first page on YouTube - for i, v in enumerate(videos): - log.info( - u"{0}. {1} {2} {3}".format( - i + 1, - v["title"], - v["videotime"], - "http://youtube.com/watch?v=" + v["link"], - ) - ) - # let user select the song to download - result = internals.input_link(videos) - if result is None: - return None - else: - if not self.meta_tags: - # if the metadata could not be acquired, take the first result - # from Youtube because the proper song length is unknown - result = videos[0] - log.debug( - "Since no metadata found on Spotify, going with the first result" - ) - else: - # filter out videos that do not have a similar length to the Spotify song - duration_tolerance = 10 - max_duration_tolerance = 20 - possible_videos_by_duration = [] - - # start with a reasonable duration_tolerance, and increment duration_tolerance - # until one of the Youtube results falls within the correct duration or - # the duration_tolerance has reached the max_duration_tolerance - while len(possible_videos_by_duration) == 0: - possible_videos_by_duration = list( - filter( - lambda x: abs(x["seconds"] - self.meta_tags["duration"]) - <= duration_tolerance, - videos, - ) - ) - duration_tolerance += 1 - if duration_tolerance > max_duration_tolerance: - log.error( - "{0} by {1} was not found.".format( - self.meta_tags["name"], - self.meta_tags["artists"][0]["name"], - ) - ) - return None - - result = possible_videos_by_duration[0] - - if result: - url = "http://youtube.com/watch?v={0}".format(result["link"]) - else: - url = None - - return url - - def scrape(self, bestmatch=True, tries_remaining=5): - """ Search and scrape YouTube to return a list of matching videos. """ - - # prevents an infinite loop but allows for a few retries - if tries_remaining == 0: - log.debug("No tries left. I quit.") - return - - search_url = generate_search_url(self.search_query) - log.debug("Opening URL: {0}".format(search_url)) - - item = self._fetch_response(search_url).read() - items_parse = BeautifulSoup(item, "html.parser") - - videos = [] - for x in items_parse.find_all( - "div", {"class": "yt-lockup-dismissable yt-uix-tile"} - ): - - if not is_video(x): - continue - - y = x.find("div", class_="yt-lockup-content") - link = y.find("a")["href"][-11:] - title = y.find("a")["title"] - - try: - videotime = x.find("span", class_="video-time").get_text() - except AttributeError: - log.debug("Could not find video duration on YouTube, retrying..") - return self.scrape( - bestmatch=bestmatch, tries_remaining=tries_remaining - 1 - ) - - youtubedetails = { - "link": link, - "title": title, - "videotime": videotime, - "seconds": internals.get_sec(videotime), - } - videos.append(youtubedetails) - - if bestmatch: - return self._best_match(videos) - - return videos - - def api(self, bestmatch=True): - """ Use YouTube API to search and return a list of matching videos. """ - - query = {"part": "snippet", "maxResults": 50, "type": "video"} - - if const.args.music_videos_only: - query["videoCategoryId"] = "10" - - if not self.meta_tags: - song = self.raw_song - query["q"] = song - else: - query["q"] = self.search_query - log.debug("query: {0}".format(query)) - - data = pafy.call_gdata("search", query) - data["items"] = list( - filter(lambda x: x["id"].get("videoId") is not None, data["items"]) - ) - query_results = { - "part": "contentDetails,snippet,statistics", - "maxResults": 50, - "id": ",".join(i["id"]["videoId"] for i in data["items"]), - } - log.debug("query_results: {0}".format(query_results)) - - vdata = pafy.call_gdata("videos", query_results) - - videos = [] - for x in vdata["items"]: - duration_s = pafy.playlist.parseISO8591(x["contentDetails"]["duration"]) - youtubedetails = { - "link": x["id"], - "title": x["snippet"]["title"], - "videotime": internals.videotime_from_seconds(duration_s), - "seconds": duration_s, - } - videos.append(youtubedetails) - - if bestmatch: - return self._best_match(videos) - - return videos - - @staticmethod - def _fetch_response(url): - # XXX: This method exists only because it helps us indirectly - # monkey patch `urllib.request.open`, directly monkey patching - # `urllib.request.open` causes us to end up in an infinite recursion - # during the test since `urllib.request.open` would monkeypatch itself. - return urllib.request.urlopen(url) diff --git a/spotdl/encode/encode_base.py b/spotdl/encode/encode_base.py index ec36b60..b4bd7c5 100644 --- a/spotdl/encode/encode_base.py +++ b/spotdl/encode/encode_base.py @@ -26,8 +26,18 @@ from spotdl.encode.exceptions import EncoderNotFoundError class EncoderBase(ABC): + """ + Defined encoders must inherit from this abstract base class + and implement their own functionality for the below defined + methods. + """ + @abstractmethod - def __init__(self, encoder_path, loglevel, additional_arguments): + def __init__(self, encoder_path, loglevel, additional_arguments=[]): + """ + This method must make sure whether specified encoder + is available under PATH. + """ if shutil.which(encoder_path) is None: raise EncoderNotFoundError( "{} executable does not exist or was not found in PATH.".format( @@ -40,26 +50,51 @@ class EncoderBase(ABC): @abstractmethod def set_argument(self, argument): + """ + This method must be used to set any custom functionality + for the encoder by passing arguments to it. + """ self._additional_arguments += argument.split() @abstractmethod def get_encoding(self, filename): + """ + This method must determine the encoding for a local + audio file. Such as "mp3", "wav", "m4a", etc. + """ _, extension = os.path.splitext(filename) # Ignore the initial dot from file extension return extension[1:] @abstractmethod def set_debuglog(self): + """ + This method must enable verbose logging in the defined + encoder. + """ pass @abstractmethod def _generate_encode_command(self, input_file, output_file): + """ + This method must the complete command for that would be + used to invoke the encoder and perform the encoding. + """ pass @abstractmethod def _generate_encoding_arguments(self, input_encoding, output_encoding): + """ + This method must return the core arguments for the defined + encoder such as defining the sample rate, audio bitrate, + etc. + """ pass @abstractmethod - def re_encode(self, input_encoding, output_encoding): + def re_encode(self, input_file, output_file): + """ + This method must invoke FFmpeg to encode a given input + file to a specified output file. + """ pass diff --git a/spotdl/encode/encoders/ffmpeg.py b/spotdl/encode/encoders/ffmpeg.py index 2a3db2c..b9f0273 100644 --- a/spotdl/encode/encoders/ffmpeg.py +++ b/spotdl/encode/encoders/ffmpeg.py @@ -60,9 +60,12 @@ class EncoderFFmpeg(EncoderBase): def set_debuglog(self): self._loglevel = "-loglevel debug" - def _generate_encode_command(self, input_file, output_file): - input_encoding = self.get_encoding(input_file) - output_encoding = self.get_encoding(output_file) + def _generate_encode_command(self, input_file, output_file, + input_encoding=None, output_encoding=None): + if input_encoding is None: + input_encoding = self.get_encoding(input_file) + if output_encoding is None: + output_encoding = self.get_encoding(output_file) arguments = self._generate_encoding_arguments( input_encoding, output_encoding @@ -82,9 +85,20 @@ class EncoderFFmpeg(EncoderBase): input_file, output_file ) - returncode = subprocess.call(encode_command) - encode_successful = returncode == 0 + process = subprocess.Popen(encode_command) + process.wait() + encode_successful = process.returncode == 0 if encode_successful and delete_original: os.remove(input_file) + return process + + def re_encode_from_stdin(self, input_encoding, output_file): + output_encoding = self.get_encoding(output_file) + encode_command = self._generate_encode_command( + "-", + output_file, + input_encoding=input_encoding, + ) + process = subprocess.Popen(encode_command) + return process - return returncode diff --git a/spotdl/lyrics/lyric_base.py b/spotdl/lyrics/lyric_base.py index 4468c36..6fbc4ab 100644 --- a/spotdl/lyrics/lyric_base.py +++ b/spotdl/lyrics/lyric_base.py @@ -5,10 +5,25 @@ from abc import abstractmethod class LyricBase(ABC): + """ + Defined lyric providers must inherit from this abstract base + class and implement their own functionality for the below + defined methods. + """ + @abstractmethod def __init__(self, artist, track): + """ + This method must set any protected attributes, + which may be modified from outside the class + if the need arises. + """ pass @abstractmethod def get_lyrics(self, linesep="\n", timeout=None): + """ + This method must return the lyrics string for the + given track. + """ pass diff --git a/spotdl/lyrics/providers/genius.py b/spotdl/lyrics/providers/genius.py index b599b4a..5c707d8 100644 --- a/spotdl/lyrics/providers/genius.py +++ b/spotdl/lyrics/providers/genius.py @@ -14,6 +14,11 @@ class Genius(LyricBase): self.base_url = BASE_URL def _guess_lyric_url(self): + """ + Returns the possible lyric URL for the track available + on Genius. This may not always be a valid URL, but this + is apparently the best we can do at the moment? + """ query = "/{} {} lyrics".format(self.artist, self.track) query = query.replace(" ", "-") encoded_query = urllib.request.quote(query) @@ -21,6 +26,10 @@ class Genius(LyricBase): return lyric_url def _fetch_page(self, url, timeout=None): + """ + Makes a GET request to the given URL and returns the + HTML content in the case of a valid response. + """ request = urllib.request.Request(url) request.add_header("User-Agent", "urllib") try: @@ -35,14 +44,23 @@ class Genius(LyricBase): return response.read() def _get_lyrics_text(self, html): + """ + Extracts and returns the lyric content from the + provided HTML. + """ soup = BeautifulSoup(html, "html.parser") lyrics_paragraph = soup.find("p") if lyrics_paragraph: return lyrics_paragraph.get_text() else: - raise LyricsNotFoundError("The lyrics for this track are yet to be released.") + raise LyricsNotFoundError( + "The lyrics for this track are yet to be released." + ) def get_lyrics(self, linesep="\n", timeout=None): + """ + Returns the lyric string for the given artist and track. + """ url = self._guess_lyric_url() html_page = self._fetch_page(url, timeout=timeout) lyrics = self._get_lyrics_text(html_page) diff --git a/spotdl/lyrics/providers/lyricwikia_wrapper.py b/spotdl/lyrics/providers/lyricwikia_wrapper.py index 6511657..e5a26ac 100644 --- a/spotdl/lyrics/providers/lyricwikia_wrapper.py +++ b/spotdl/lyrics/providers/lyricwikia_wrapper.py @@ -10,6 +10,9 @@ class LyricWikia(LyricBase): self.track = track def get_lyrics(self, linesep="\n", timeout=None): + """ + Returns the lyric string for the given artist and track. + """ try: lyrics = lyricwikia.get_lyrics(self.artist, self.track, linesep, timeout) except lyricwikia.LyricsNotFound as e: diff --git a/spotdl/metadata/__init__.py b/spotdl/metadata/__init__.py new file mode 100644 index 0000000..c38e961 --- /dev/null +++ b/spotdl/metadata/__init__.py @@ -0,0 +1,2 @@ +from spotdl.metadata.metadata_base import MetadataBase +from spotdl.metadata.metadata_base import StreamsBase diff --git a/spotdl/metadata/metadata_base.py b/spotdl/metadata/metadata_base.py new file mode 100644 index 0000000..4a57d76 --- /dev/null +++ b/spotdl/metadata/metadata_base.py @@ -0,0 +1,69 @@ +from abc import ABC +from abc import abstractmethod + + +class StreamsBase(ABC): + @abstractmethod + def __init__(self, streams): + """ + This method must parse audio streams into a list of + dictionaries with the keys: + "bitrate", "download_url", "encoding", "filesize". + + The list should typically be sorted in descending order + based on the audio stream's bitrate. + + This sorted list must be assigned to `self.all`. + """ + self.all = streams + + @abstractmethod + def getbest(self): + """ + This method must return the audio stream with the + highest bitrate. + """ + return self.all[0] + + @abstractmethod + def getworst(self): + """ + This method must return the audio stream with the + lowest bitrate. + """ + return self.all[-1] + + +class MetadataBase(ABC): + def set_credentials(self, client_id, client_secret): + """ + This method may or not be used depending on + whether the metadata provider requires authentication + or not. + """ + pass + + @abstractmethod + def from_url(self, url): + """ + This method must return track metadata from the + corresponding Spotify URL. + """ + pass + + @abstractmethod + def from_query(self, query): + """ + This method must return track metadata from the + corresponding search query. + """ + pass + + @abstractmethod + def metadata_to_standard_form(self, metadata): + """ + This method must transform the fetched metadata + into a format consistent with all other metadata + providers, for easy utilization. + """ + pass diff --git a/spotdl/metadata/providers/__init__.py b/spotdl/metadata/providers/__init__.py new file mode 100644 index 0000000..ff3977e --- /dev/null +++ b/spotdl/metadata/providers/__init__.py @@ -0,0 +1,2 @@ +from spotdl.metadata.providers.spotify import MetadataSpotify +from spotdl.metadata.providers.youtube import MetadataYouTube diff --git a/spotdl/metadata/providers/spotify.py b/spotdl/metadata/providers/spotify.py new file mode 100644 index 0000000..8b0ed0a --- /dev/null +++ b/spotdl/metadata/providers/spotify.py @@ -0,0 +1,66 @@ +import spotipy +import spotipy.oauth2 as oauth2 + +from spotdl.metadata import MetadataBase + + +class MetadataSpotify(MetadataBase): + def __init__(self, spotify=None): + self.spotify = spotify + + def set_credentials(self, client_id, client_secret): + token = self._generate_token(client_id, client_secret) + self.spotify = spotipy.Spotify(auth=token) + + def from_url(self, url): + metadata = self.spotify.track(url) + return self.metadata_to_standard_form(metadata) + + def from_query(self, query): + metadata = self.spotify.search(query, limit=1)["tracks"]["items"][0] + return self.metadata_to_standard_form(metadata) + + def _generate_token(self, client_id, client_secret): + """ Generate the token. """ + credentials = oauth2.SpotifyClientCredentials( + client_id=client_id, + client_secret=client_secret, + ) + token = credentials.get_access_token() + return token + + def _titlecase(self, string): + return " ".join(word.capitalize() for word in string.split()) + + def metadata_to_standard_form(self, metadata): + artist = self.spotify.artist(metadata["artists"][0]["id"]) + album = self.spotify.album(metadata["album"]["id"]) + + try: + metadata[u"genre"] = self._titlecase(artist["genres"][0]) + except IndexError: + metadata[u"genre"] = None + try: + metadata[u"copyright"] = album["copyrights"][0]["text"] + except IndexError: + metadata[u"copyright"] = None + try: + metadata[u"external_ids"][u"isrc"] + except KeyError: + metadata[u"external_ids"][u"isrc"] = None + + metadata[u"release_date"] = album["release_date"] + metadata[u"publisher"] = album["label"] + metadata[u"total_tracks"] = album["tracks"]["total"] + + # Some sugar + metadata["year"], *_ = metadata["release_date"].split("-") + metadata["duration"] = metadata["duration_ms"] / 1000.0 + metadata["provider"] = "spotify" + + # Remove unwanted parameters + del metadata["duration_ms"] + del metadata["available_markets"] + del metadata["album"]["available_markets"] + + return metadata diff --git a/spotdl/metadata/providers/youtube.py b/spotdl/metadata/providers/youtube.py new file mode 100644 index 0000000..e1b76be --- /dev/null +++ b/spotdl/metadata/providers/youtube.py @@ -0,0 +1,140 @@ +import pytube +from bs4 import BeautifulSoup + +import urllib.request + +from spotdl.metadata import StreamsBase +from spotdl.metadata import MetadataBase + +BASE_URL = "https://www.youtube.com/results?sp=EgIQAQ%253D%253D&q={}" + + +class YouTubeSearch: + def __init__(self): + self.base_url = BASE_URL + + def generate_search_url(self, query): + quoted_query = urllib.request.quote(query) + return self.base_url.format(quoted_query) + + def _fetch_response_html(self, url): + response = urllib.request.urlopen(url) + soup = BeautifulSoup(response.read(), "html.parser") + return soup + + def _fetch_search_results(self, html): + results = html.find_all( + "div", {"class": "yt-lockup-dismissable yt-uix-tile"} + ) + return results + + def _is_video(self, result): + # ensure result is not a channel + not_video = ( + result.find("channel") is not None + or "yt-lockup-channel" in result.parent.attrs["class"] + or "yt-lockup-channel" in result.attrs["class"] + ) + + # ensure result is not a mix/playlist + not_video = not_video or "yt-lockup-playlist" in result.parent.attrs["class"] + + # ensure video result is not an advertisement + not_video = not_video or result.find("googleads") is not None + + video = not not_video + return video + + def _parse_video_id(self, result): + details = result.find("div", class_="yt-lockup-content") + video_id = details.find("a")["href"][-11:] + return video_id + + def search(self, query, limit=10, tries_remaining=5): + """ Search and scrape YouTube to return a list of matching videos. """ + # prevents an infinite loop but allows for a few retries + if tries_remaining == 0: + # log.debug("No tries left. I quit.") + return + + search_url = self.generate_search_url(query) + # log.debug("Opening URL: {0}".format(search_url)) + html = self._fetch_response_html(search_url) + + videos = [] + for result in self._fetch_search_results(html): + if not self._is_video(result): + continue + if len(videos) >= limit: + break + video_id = self._parse_video_id(result) + videos.append("https://www.youtube.com/watch?v=" + video_id) + + return videos + + +class YouTubeStreams(StreamsBase): + def __init__(self, streams): + audiostreams = streams.filter(only_audio=True).order_by("abr").desc() + self.all = [{ + "bitrate": int(stream.abr[:-4]), + "download_url": stream.url, + "encoding": stream.audio_codec, + "filesize": stream.filesize, + } for stream in audiostreams] + + def getbest(self): + return self.all[0] + + def getworst(self): + return self.all[-1] + + +class MetadataYouTube(MetadataBase): + def from_query(self, query): + watch_urls = YouTubeSearch().search(query) + return self.from_url(watch_urls[0]) + + def from_url(self, url): + content = pytube.YouTube(url) + return self.from_pytube_object(content) + + def from_pytube_object(self, content): + return self.metadata_to_standard_form(content) + + def _fetch_publish_date(self, content): + # FIXME: This needs to be supported in PyTube itself + # See https://github.com/nficano/pytube/issues/595 + position = content.watch_html.find("publishDate") + publish_date = content.watch_html[position+16:position+25] + return publish_date + + def metadata_to_standard_form(self, content): + """ Fetch a song's metadata from YouTube. """ + streams = [] + publish_date = self._fetch_publish_date(content) + metadata = { + "name": content.title, + "artists": [{"name": content.author}], + "duration": content.length, + "external_urls": {"youtube": content.watch_url}, + "album": { + "images": [{"url": content.thumbnail_url}], + "artists": [{"name": None}], + "name": None, + }, + "year": publish_date.split("-")[0], + "release_date": publish_date, + "type": "track", + "disc_number": 1, + "track_number": 1, + "total_tracks": 1, + "publisher": None, + "external_ids": {"isrc": None}, + "lyrics": None, + "copyright": None, + "genre": None, + "streams": YouTubeStreams(content.streams), + "provider": "youtube", + } + return metadata diff --git a/spotdl/patch/patcher.py b/spotdl/patch/patcher.py deleted file mode 100644 index 16caa5b..0000000 --- a/spotdl/patch/patcher.py +++ /dev/null @@ -1,64 +0,0 @@ -from pafy import backend_youtube_dl -import pafy - -from spotdl import internals - - -def _getbestthumb(self): - url = self._ydl_info["thumbnails"][0]["url"] - if url: - return url - - part_url = "https://i.ytimg.com/vi/%s/" % self.videoid - # Thumbnail resolution sorted in descending order - thumbs = ( - "maxresdefault.jpg", - "sddefault.jpg", - "hqdefault.jpg", - "mqdefault.jpg", - "default.jpg", - ) - for thumb in thumbs: - url = part_url + thumb - if self._content_available(url): - return url - - -def _process_streams(self): - for format_index in range(len(self._ydl_info["formats"])): - try: - self._ydl_info["formats"][format_index]["url"] = self._ydl_info["formats"][ - format_index - ]["fragment_base_url"] - except KeyError: - pass - return backend_youtube_dl.YtdlPafy._old_process_streams(self) - - -@classmethod -def _content_available(cls, url): - return internals.content_available(url) - - -class PatchPafy: - """ - These patches have not been released by pafy on PyPI yet but - are useful to us. - """ - - def patch_getbestthumb(self): - # https://github.com/mps-youtube/pafy/pull/211 - pafy.backend_shared.BasePafy._bestthumb = None - pafy.backend_shared.BasePafy._content_available = _content_available - pafy.backend_shared.BasePafy.getbestthumb = _getbestthumb - - def patch_process_streams(self): - # https://github.com/mps-youtube/pafy/pull/230 - backend_youtube_dl.YtdlPafy._old_process_streams = ( - backend_youtube_dl.YtdlPafy._process_streams - ) - backend_youtube_dl.YtdlPafy._process_streams = _process_streams - - def patch_insecure_streams(self): - # https://github.com/mps-youtube/pafy/pull/235 - pafy.g.def_ydl_opts["prefer_insecure"] = False