mirror of
https://github.com/KevinMidboe/spotify-downloader.git
synced 2025-10-29 18:00:15 +00:00
De-clutter metadata search
This commit is contained in:
@@ -20,6 +20,7 @@ import spotdl.config
|
||||
|
||||
from spotdl.command_line.exceptions import NoYouTubeVideoFoundError
|
||||
from spotdl.command_line.exceptions import NoYouTubeVideoMatchError
|
||||
from spotdl.metadata_search import MetadataSearch
|
||||
|
||||
from spotdl.helpers.spotify import SpotifyHelpers
|
||||
|
||||
@@ -28,133 +29,7 @@ import os
|
||||
import urllib.request
|
||||
|
||||
import logging
|
||||
logger = logging.getLogger(name=__name__)
|
||||
|
||||
def search_lyrics(query):
|
||||
provider = Genius()
|
||||
try:
|
||||
lyrics = provider.from_query(query)
|
||||
except LyricsNotFoundError:
|
||||
lyrics = None
|
||||
return lyrics
|
||||
|
||||
|
||||
def search_metadata_on_spotify(query):
|
||||
provider = ProviderSpotify()
|
||||
try:
|
||||
metadata = provider.from_query(query)
|
||||
except SpotifyMetadataNotFoundError:
|
||||
metadata = {}
|
||||
return metadata
|
||||
|
||||
|
||||
def prompt_for_youtube_search_result(videos):
|
||||
max_index_length = len(str(len(videos)))
|
||||
max_title_length = max(len(v["title"]) for v in videos)
|
||||
print(" 0. Skip downloading this track", file=sys.stderr)
|
||||
for index, video in enumerate(videos, 1):
|
||||
vid_details = "{index:>{max_index}}. {title:<{max_title}} {url} [{duration}]".format(
|
||||
index=index,
|
||||
max_index=max_index_length,
|
||||
title=video["title"],
|
||||
max_title=max_title_length,
|
||||
url=video["url"],
|
||||
duration=video["duration"],
|
||||
)
|
||||
print(vid_details, file=sys.stderr)
|
||||
print("", file=sys.stderr)
|
||||
|
||||
selection = spotdl.util.prompt_user_for_selection(range(1, len(videos)+1))
|
||||
|
||||
if selection is None:
|
||||
return None
|
||||
return videos[selection-1]
|
||||
|
||||
|
||||
def search_metadata(track, search_format="{artist} - {track-name} lyrics", manual=False):
|
||||
# TODO: Clean this function
|
||||
youtube = ProviderYouTube()
|
||||
youtube_searcher = YouTubeSearch()
|
||||
|
||||
if spotdl.util.is_spotify(track):
|
||||
logger.debug("Input track is a Spotify URL.")
|
||||
spotify = ProviderSpotify()
|
||||
spotify_metadata = spotify.from_url(track)
|
||||
lyric_query = spotdl.metadata.format_string(
|
||||
"{artist} - {track-name}",
|
||||
spotify_metadata,
|
||||
)
|
||||
search_query = spotdl.metadata.format_string(search_format, spotify_metadata)
|
||||
youtube_videos = youtube_searcher.search(search_query)
|
||||
if not youtube_videos:
|
||||
raise NoYouTubeVideoFoundError(
|
||||
'YouTube returned no videos for the search query "{}".'.format(search_query)
|
||||
)
|
||||
if manual:
|
||||
youtube_video = prompt_for_youtube_search_result(youtube_videos)
|
||||
else:
|
||||
youtube_video = youtube_videos.bestmatch()
|
||||
|
||||
if youtube_video is None:
|
||||
raise NoYouTubeVideoMatchError(
|
||||
'No matching videos found on YouTube for the search query "{}".'.format(
|
||||
search_query
|
||||
)
|
||||
)
|
||||
|
||||
youtube_metadata = youtube.from_url(youtube_video["url"])
|
||||
metadata = spotdl.util.merge(
|
||||
youtube_metadata,
|
||||
spotify_metadata
|
||||
)
|
||||
|
||||
elif spotdl.util.is_youtube(track):
|
||||
logger.debug("Input track is a YouTube URL.")
|
||||
metadata = youtube.from_url(track)
|
||||
lyric_query = spotdl.metadata.format_string(
|
||||
"{artist} - {track-name}",
|
||||
metadata,
|
||||
)
|
||||
|
||||
else:
|
||||
logger.debug("Input track is a search query.")
|
||||
search_query = track
|
||||
lyric_query = search_query
|
||||
spotify_metadata = spotdl.util.ThreadWithReturnValue(
|
||||
target=search_metadata_on_spotify,
|
||||
args=(search_query,)
|
||||
)
|
||||
spotify_metadata.start()
|
||||
youtube_videos = youtube_searcher.search(search_query)
|
||||
if not youtube_videos:
|
||||
raise NoYouTubeVideoFoundError(
|
||||
'YouTube returned no videos for the search query "{}".'.format(search_query)
|
||||
)
|
||||
if manual:
|
||||
youtube_video = prompt_for_youtube_search_result(youtube_videos)
|
||||
else:
|
||||
youtube_video = youtube_videos.bestmatch()
|
||||
|
||||
if youtube_video is None:
|
||||
raise NoYouTubeVideoMatchError(
|
||||
'No apparent matching videos found on YouTube for the search query "{}"'.format(
|
||||
search_query
|
||||
)
|
||||
)
|
||||
|
||||
youtube_metadata = youtube.from_url(youtube_video["url"])
|
||||
metadata = spotdl.util.merge(
|
||||
youtube_metadata,
|
||||
spotify_metadata.join()
|
||||
)
|
||||
|
||||
metadata["lyrics"] = spotdl.util.ThreadWithReturnValue(
|
||||
target=search_lyrics,
|
||||
args=(lyric_query,)
|
||||
)
|
||||
|
||||
metadata["lyrics"].start()
|
||||
return metadata
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Spotdl:
|
||||
@@ -200,13 +75,14 @@ class Spotdl:
|
||||
self.download_track(track)
|
||||
elif self.arguments["list"]:
|
||||
if self.arguments["write_m3u"]:
|
||||
youtube_tools.generate_m3u(
|
||||
track_file=self.arguments["list"]
|
||||
self.write_m3u(
|
||||
self.arguments["list"],
|
||||
self.arguments["write_to"]
|
||||
)
|
||||
else:
|
||||
list_download = {
|
||||
"synchronous": self.download_tracks_from_file,
|
||||
"threaded" : self.download_tracks_from_file_threaded,
|
||||
# "threaded" : self.download_tracks_from_file_threaded,
|
||||
}[self.arguments["processor"]]
|
||||
|
||||
list_download(
|
||||
@@ -226,14 +102,75 @@ class Spotdl:
|
||||
playlist = spotify_tools.fetch_playlist(playlist_url)
|
||||
spotify_tools.write_playlist_tracks(playlist, self.arguments["write_to"])
|
||||
|
||||
def write_m3u(self, track_file, target_file=None):
|
||||
with open(track_file, "r") as fin:
|
||||
tracks = fin.read().splitlines()
|
||||
|
||||
logger.info(
|
||||
"Checking and removing any duplicate tracks in {}.".format(track_file)
|
||||
)
|
||||
# Remove duplicates and empty elements
|
||||
# Also strip whitespaces from elements (if any)
|
||||
spotdl.util.remove_duplicates(
|
||||
tracks,
|
||||
condition=lambda x: x,
|
||||
operation=str.strip
|
||||
)
|
||||
|
||||
if target_file is None:
|
||||
target_file = "{}.m3u".format(track_file.split(".")[0])
|
||||
|
||||
total_tracks = len(tracks)
|
||||
logger.info("Generating {0} from {1} YouTube URLs".format(target_file, total_tracks))
|
||||
with open(target_file, "w") as output_file:
|
||||
output_file.write("#EXTM3U\n\n")
|
||||
|
||||
youtube_searcher = YouTubeSearch()
|
||||
videos = []
|
||||
for n, track in enumerate(tracks, 1):
|
||||
try:
|
||||
search_results = youtube_searcher.search(search_query)
|
||||
if not search_results:
|
||||
raise NoYouTubeVideoFoundError(
|
||||
'YouTube returned no videos for the search query "{}".'.format(search_query)
|
||||
)
|
||||
video = search_results.bestmatch()
|
||||
if not video:
|
||||
raise NoYouTubeVideoMatchError(
|
||||
'No matching videos found on YouTube for the search query "{}".'.format(search_query)
|
||||
)
|
||||
except (NoYouTubeVideoFoundError, NoYouTubeVideoMatchError) as e:
|
||||
logger.error(e.args[0])
|
||||
else:
|
||||
print(video)
|
||||
exit()
|
||||
logger.info(
|
||||
"Matched track {0}/{1} ({2})".format(
|
||||
str(n).zfill(len(str(total_tracks))),
|
||||
total_tracks,
|
||||
content.watchv_url
|
||||
)
|
||||
)
|
||||
logger.debug(track)
|
||||
m3u_key = "#EXTINF:{duration},{title}\n{youtube_url}\n".format(
|
||||
duration=internals.get_sec(content.duration),
|
||||
title=content.title,
|
||||
youtube_url=content.watchv_url,
|
||||
)
|
||||
logger.debug(m3u_key)
|
||||
with open(target_file, "a") as output_file:
|
||||
output_file.write(m3u_key)
|
||||
|
||||
def download_track(self, track):
|
||||
logger.info('Downloading "{}"'.format(track))
|
||||
search_metadata = MetadataSearch(
|
||||
track,
|
||||
lyrics=True,
|
||||
yt_search_format=self.arguments["search_format"],
|
||||
yt_manual=self.arguments["manual"]
|
||||
)
|
||||
try:
|
||||
metadata = search_metadata(
|
||||
track,
|
||||
search_format=self.arguments["search_format"],
|
||||
manual=self.arguments["manual"],
|
||||
)
|
||||
metadata = search_metadata.on_youtube_and_spotify()
|
||||
except (NoYouTubeVideoFoundError, NoYouTubeVideoMatchError) as e:
|
||||
logger.error(e.args[0])
|
||||
else:
|
||||
@@ -316,7 +253,6 @@ class Spotdl:
|
||||
"Checking and removing any duplicate tracks in {}.".format(path)
|
||||
)
|
||||
with open(path, "r") as fin:
|
||||
# Read tracks into a list and remove any duplicates
|
||||
tracks = fin.read().splitlines()
|
||||
|
||||
# Remove duplicates and empty elements
|
||||
@@ -334,13 +270,19 @@ class Spotdl:
|
||||
print("", file=sys.stderr)
|
||||
|
||||
for number, track in enumerate(tracks, 1):
|
||||
search_metadata = MetadataSearch(
|
||||
track,
|
||||
lyrics=True,
|
||||
yt_search_format=self.arguments["search_format"],
|
||||
yt_manual=self.arguments["manual"]
|
||||
)
|
||||
try:
|
||||
log_track_query = '{position}. Downloading "{track}"'.format(
|
||||
position=number,
|
||||
track=track
|
||||
)
|
||||
logger.info(log_track_query)
|
||||
metadata = search_metadata(track, self.arguments["search_format"])
|
||||
metadata = search_metadata.on_youtube_and_spotify()
|
||||
self.download_track_from_metadata(metadata)
|
||||
except (urllib.request.URLError, TypeError, IOError) as e:
|
||||
logger.exception(e.args[0])
|
||||
@@ -360,6 +302,7 @@ class Spotdl:
|
||||
fout.writelines(tracks[number-1:])
|
||||
print("", file=sys.stderr)
|
||||
|
||||
"""
|
||||
def download_tracks_from_file_threaded(self, path):
|
||||
# FIXME: Can we make this function cleaner?
|
||||
|
||||
@@ -430,4 +373,5 @@ class Spotdl:
|
||||
current_iteration += 1
|
||||
with open(path, "w") as fout:
|
||||
fout.writelines(tracks)
|
||||
"""
|
||||
|
||||
|
||||
0
spotdl/helpers/__init__.py
Normal file
0
spotdl/helpers/__init__.py
Normal file
@@ -5,6 +5,9 @@ import json
|
||||
from spotdl.lyrics.lyric_base import LyricBase
|
||||
from spotdl.lyrics.exceptions import LyricsNotFoundError
|
||||
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
BASE_URL = "https://genius.com"
|
||||
BASE_SEARCH_URL = BASE_URL + "/api/search/multi?per_page=1&q="
|
||||
|
||||
@@ -40,7 +43,7 @@ class Genius(LyricBase):
|
||||
response = urllib.request.urlopen(request, timeout=timeout)
|
||||
except urllib.request.HTTPError:
|
||||
raise LyricsNotFoundError(
|
||||
"Could not find lyrics at URL: {}".format(url)
|
||||
"Could not find Genius lyrics at URL: {}".format(url)
|
||||
)
|
||||
else:
|
||||
return response.read()
|
||||
@@ -55,7 +58,7 @@ class Genius(LyricBase):
|
||||
return lyrics_paragraph.get_text()
|
||||
else:
|
||||
raise LyricsNotFoundError(
|
||||
"The lyrics for this track are yet to be released."
|
||||
"The lyrics for this track are yet to be released on Genius."
|
||||
)
|
||||
|
||||
def _fetch_search_page(self, url, timeout=None):
|
||||
@@ -68,7 +71,7 @@ class Genius(LyricBase):
|
||||
metadata = json.loads(response.read())
|
||||
if len(metadata["response"]["sections"][0]["hits"]) == 0:
|
||||
raise LyricsNotFoundError(
|
||||
"Could not find any search results for URL: {}".format(url)
|
||||
"Genius returned no lyric results for the search URL: {}".format(url)
|
||||
)
|
||||
return metadata
|
||||
|
||||
@@ -91,8 +94,8 @@ class Genius(LyricBase):
|
||||
|
||||
if lyric_url is None:
|
||||
raise LyricsNotFoundError(
|
||||
"Could not find any valid lyric paths in the "
|
||||
"API response for the query {}".format(query)
|
||||
"Could not find any valid lyric paths in Genius "
|
||||
"lyrics API response for the query {}.".format(query)
|
||||
)
|
||||
|
||||
return self.base_url + lyric_url
|
||||
@@ -102,8 +105,14 @@ class Genius(LyricBase):
|
||||
Returns the lyric string for the track best matching the
|
||||
given query.
|
||||
"""
|
||||
lyric_url = self.best_matching_lyric_url_from_query(query)
|
||||
return self.from_url(lyric_url, linesep, timeout=timeout)
|
||||
try:
|
||||
lyric_url = self.best_matching_lyric_url_from_query(query)
|
||||
except LyricsNotFoundError:
|
||||
raise LyricsNotFoundError(
|
||||
'Genius returned no lyric results for the search query "{}".'.format(query)
|
||||
)
|
||||
else:
|
||||
return self.from_url(lyric_url, linesep, timeout=timeout)
|
||||
|
||||
def from_artist_and_track(self, artist, track, linesep="\n", timeout=None):
|
||||
"""
|
||||
|
||||
@@ -50,7 +50,7 @@ class EmbedderDefault(EmbedderBase):
|
||||
|
||||
def as_mp3(self, path, metadata, cached_albumart=None):
|
||||
""" Embed metadata to MP3 files. """
|
||||
logger.debug('Writing MP3 metadata to "{path}"'.format(path=path))
|
||||
logger.debug('Writing MP3 metadata to "{path}".'.format(path=path))
|
||||
# EasyID3 is fun to use ;)
|
||||
# For supported easyid3 tags:
|
||||
# https://github.com/quodlibet/mutagen/blob/master/mutagen/easyid3.py
|
||||
@@ -108,7 +108,7 @@ class EmbedderDefault(EmbedderBase):
|
||||
|
||||
def as_m4a(self, path, metadata, cached_albumart=None):
|
||||
""" Embed metadata to M4A files. """
|
||||
logger.debug('Writing M4A metadata to "{path}"'.format(path=path))
|
||||
logger.debug('Writing M4A metadata to "{path}".'.format(path=path))
|
||||
audiofile = MP4(path)
|
||||
self._embed_basic_metadata(audiofile, metadata, "m4a", preset=M4A_TAG_PRESET)
|
||||
if metadata["year"]:
|
||||
@@ -132,7 +132,7 @@ class EmbedderDefault(EmbedderBase):
|
||||
audiofile.save()
|
||||
|
||||
def as_flac(self, path, metadata, cached_albumart=None):
|
||||
logger.debug('Writing FLAC metadata to "{path}"'.format(path=path))
|
||||
logger.debug('Writing FLAC metadata to "{path}".'.format(path=path))
|
||||
audiofile = FLAC(path)
|
||||
self._embed_basic_metadata(audiofile, metadata, "flac")
|
||||
if metadata["year"]:
|
||||
|
||||
@@ -5,6 +5,7 @@ from spotdl.metadata import ProviderBase
|
||||
from spotdl.metadata.exceptions import SpotifyMetadataNotFoundError
|
||||
|
||||
from spotdl.authorize.services import AuthorizeSpotify
|
||||
import spotdl.util
|
||||
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -26,15 +27,18 @@ class ProviderSpotify(ProviderBase):
|
||||
return self.metadata_to_standard_form(metadata)
|
||||
|
||||
def from_query(self, query):
|
||||
tracks = self.spotify.search(query, limit=1)["tracks"]["items"]
|
||||
tracks = self.search(query)["tracks"]["items"]
|
||||
if not tracks:
|
||||
raise SpotifyMetadataNotFoundError(
|
||||
'Could not find any tracks matching the given search query ("{}")'.format(
|
||||
'Spotify returned no tracks for the search query "{}".'.format(
|
||||
query,
|
||||
)
|
||||
)
|
||||
return self.metadata_to_standard_form(tracks[0])
|
||||
|
||||
def search(self, query):
|
||||
return self.spotify.search(query)
|
||||
|
||||
def _generate_token(self, client_id, client_secret):
|
||||
""" Generate the token. """
|
||||
credentials = oauth2.SpotifyClientCredentials(
|
||||
@@ -43,15 +47,12 @@ class ProviderSpotify(ProviderBase):
|
||||
token = credentials.get_access_token()
|
||||
return token
|
||||
|
||||
def _titlecase(self, string):
|
||||
return " ".join(word.capitalize() for word in string.split())
|
||||
|
||||
def metadata_to_standard_form(self, metadata):
|
||||
artist = self.spotify.artist(metadata["artists"][0]["id"])
|
||||
album = self.spotify.album(metadata["album"]["id"])
|
||||
|
||||
try:
|
||||
metadata[u"genre"] = self._titlecase(artist["genres"][0])
|
||||
metadata[u"genre"] = spotdl.util.titlecase(artist["genres"][0])
|
||||
except IndexError:
|
||||
metadata[u"genre"] = None
|
||||
try:
|
||||
@@ -78,3 +79,4 @@ class ProviderSpotify(ProviderBase):
|
||||
del metadata["album"]["available_markets"]
|
||||
|
||||
return metadata
|
||||
|
||||
|
||||
@@ -108,10 +108,6 @@ class YouTubeSearch:
|
||||
logger.debug('Fetching YouTube results for "{}".'.format(search_url))
|
||||
html = self._fetch_response_html(search_url)
|
||||
videos = self._fetch_search_results(html)
|
||||
# print(html)
|
||||
# print("")
|
||||
# print(videos)
|
||||
# exit()
|
||||
return YouTubeVideos(videos)
|
||||
|
||||
|
||||
@@ -198,7 +194,7 @@ class YouTubeStreams(StreamsBase):
|
||||
|
||||
class ProviderYouTube(ProviderBase):
|
||||
def from_query(self, query):
|
||||
watch_urls = YouTubeSearch().search(query)
|
||||
watch_urls = self.search(query)
|
||||
if not watch_urls:
|
||||
raise YouTubeMetadataNotFoundError(
|
||||
'YouTube returned nothing for the given search '
|
||||
@@ -214,6 +210,9 @@ class ProviderYouTube(ProviderBase):
|
||||
def from_pytube_object(self, content):
|
||||
return self.metadata_to_standard_form(content)
|
||||
|
||||
def search(self, query):
|
||||
return YouTubeSearch().search(query)
|
||||
|
||||
def _fetch_publish_date(self, content):
|
||||
# FIXME: This needs to be supported in PyTube itself
|
||||
# See https://github.com/nficano/pytube/issues/595
|
||||
|
||||
275
spotdl/metadata_search.py
Normal file
275
spotdl/metadata_search.py
Normal file
@@ -0,0 +1,275 @@
|
||||
from spotdl.metadata.providers import ProviderSpotify
|
||||
from spotdl.metadata.providers import ProviderYouTube
|
||||
from spotdl.lyrics.providers import Genius
|
||||
from spotdl.lyrics.exceptions import LyricsNotFoundError
|
||||
|
||||
import spotdl.metadata
|
||||
import spotdl.util
|
||||
from spotdl.metadata.exceptions import SpotifyMetadataNotFoundError
|
||||
|
||||
from spotdl.command_line.exceptions import NoYouTubeVideoFoundError
|
||||
from spotdl.command_line.exceptions import NoYouTubeVideoMatchError
|
||||
|
||||
import sys
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
PROVIDERS = {
|
||||
"spotify": ProviderSpotify,
|
||||
"youtube": ProviderYouTube,
|
||||
}
|
||||
|
||||
|
||||
def prompt_for_youtube_search_result(videos):
|
||||
max_index_length = len(str(len(videos)))
|
||||
max_title_length = max(len(v["title"]) for v in videos)
|
||||
print(" 0. Skip downloading this track", file=sys.stderr)
|
||||
for index, video in enumerate(videos, 1):
|
||||
vid_details = "{index:>{max_index}}. {title:<{max_title}}\n{new_line_gap} {url} [{duration}]".format(
|
||||
index=index,
|
||||
max_index=max_index_length,
|
||||
title=video["title"],
|
||||
max_title=max_title_length,
|
||||
new_line_gap=" " * max_index_length,
|
||||
url=video["url"],
|
||||
duration=video["duration"],
|
||||
)
|
||||
print(vid_details, file=sys.stderr)
|
||||
print("", file=sys.stderr)
|
||||
|
||||
selection = spotdl.util.prompt_user_for_selection(range(1, len(videos)+1))
|
||||
|
||||
if selection is None:
|
||||
return None
|
||||
return videos[selection-1]
|
||||
|
||||
|
||||
class MetadataSearch:
|
||||
def __init__(self, track, lyrics=False, yt_search_format="{artist} - {track-name}", yt_manual=False, providers=PROVIDERS):
|
||||
self.track = track
|
||||
self.track_type = spotdl.util.track_type(track)
|
||||
self.lyrics = lyrics
|
||||
self.yt_search_format = yt_search_format
|
||||
self.yt_manual = yt_manual
|
||||
self.providers = {}
|
||||
for provider, parent in providers.items():
|
||||
self.providers[provider] = parent()
|
||||
self.lyric_provider = Genius()
|
||||
|
||||
def get_lyrics(self, query):
|
||||
try:
|
||||
lyrics = self.lyric_provider.from_query(query)
|
||||
except LyricsNotFoundError as e:
|
||||
logger.warning(e.args[0])
|
||||
lyrics = None
|
||||
return lyrics
|
||||
|
||||
def _make_lyric_search_query(self, metadata):
|
||||
if self.track_type == "query":
|
||||
lyric_query = self.track
|
||||
else:
|
||||
lyric_search_format = "{artist} - {track-name}"
|
||||
lyric_query = spotdl.metadata.format_string(
|
||||
lyric_search_format,
|
||||
metadata
|
||||
)
|
||||
return lyric_query
|
||||
|
||||
def on_youtube_and_spotify(self):
|
||||
track_type_mapper = {
|
||||
"spotify": self._on_youtube_and_spotify_for_type_spotify,
|
||||
"youtube": self._on_youtube_and_spotify_for_type_youtube,
|
||||
"query": self._on_youtube_and_spotify_for_type_query,
|
||||
}
|
||||
caller = track_type_mapper[self.track_type]
|
||||
metadata = caller()
|
||||
|
||||
if not self.lyrics:
|
||||
return metadata
|
||||
|
||||
lyric_query = self._make_lyric_search_query(metadata)
|
||||
metadata["lyrics"] = spotdl.util.ThreadWithReturnValue(
|
||||
target=self.get_lyrics,
|
||||
args=(lyric_query,),
|
||||
)
|
||||
metadata["lyrics"].start()
|
||||
|
||||
return metadata
|
||||
|
||||
def on_youtube(self):
|
||||
track_type_mapper = {
|
||||
"spotify": self._on_youtube_for_type_spotify,
|
||||
"youtube": self._on_youtube_for_type_youtube,
|
||||
"query": self._on_youtube_for_type_query,
|
||||
}
|
||||
caller = track_type_mapper[self.track_type]
|
||||
metadata = caller(self.track)
|
||||
|
||||
if not self.lyrics:
|
||||
return metadata
|
||||
|
||||
lyric_query = self._make_lyric_search_query(metadata)
|
||||
metadata["lyrics"] = spotdl.util.ThreadWithReturnValue(
|
||||
target=self.get_lyrics,
|
||||
arguments=(lyric_query,),
|
||||
)
|
||||
metadata["lyrics"].start()
|
||||
|
||||
return metadata
|
||||
|
||||
def on_spotify(self):
|
||||
track_type_mapper = {
|
||||
"spotify": self._on_spotify_for_type_spotify,
|
||||
"youtube": self._on_spotify_for_type_youtube,
|
||||
"query": self._on_spotify_for_type_query,
|
||||
}
|
||||
caller = track_type_mapper[self.track_type]
|
||||
metadata = caller(self.track)
|
||||
|
||||
if not self.lyrics:
|
||||
return metadata
|
||||
|
||||
lyric_query = self._make_lyric_search_query(metadata)
|
||||
metadata["lyrics"] = spotdl.util.ThreadWithReturnValue(
|
||||
target=self.get_lyrics,
|
||||
arguments=(lyric_query,),
|
||||
)
|
||||
metadata["lyrics"].start()
|
||||
|
||||
return metadata
|
||||
|
||||
def _on_youtube_and_spotify_for_type_spotify(self):
|
||||
logger.debug("Extracting YouTube and Spotify metadata for input Spotify URI.")
|
||||
spotify_metadata = self._on_spotify_for_type_spotify(self.track)
|
||||
lyric_query = spotdl.metadata.format_string(
|
||||
"{artist} - {track-name}",
|
||||
spotify_metadata,
|
||||
)
|
||||
search_query = spotdl.metadata.format_string(self.yt_search_format, spotify_metadata)
|
||||
videos = self.providers["youtube"].search(search_query)
|
||||
if not videos:
|
||||
raise NoYouTubeVideoFoundError(
|
||||
'YouTube returned no videos for the search query "{}".'.format(search_query)
|
||||
)
|
||||
if self.yt_manual:
|
||||
youtube_video = prompt_for_youtube_search_result(videos)
|
||||
else:
|
||||
youtube_video = videos.bestmatch()
|
||||
|
||||
if youtube_video is None:
|
||||
raise NoYouTubeVideoMatchError(
|
||||
'No matching videos found on YouTube for the search query "{}".'.format(
|
||||
search_query
|
||||
)
|
||||
)
|
||||
|
||||
youtube_metadata = self.providers["youtube"].from_url(youtube_video["url"])
|
||||
metadata = spotdl.util.merge(
|
||||
youtube_metadata,
|
||||
spotify_metadata
|
||||
)
|
||||
return metadata
|
||||
|
||||
def _on_youtube_and_spotify_for_type_youtube(self):
|
||||
logger.debug("Extracting YouTube and Spotify metadata for input YouTube URL.")
|
||||
youtube_metadata = self._on_youtube_for_type_youtube(self.track)
|
||||
search_query = spotdl.metadata.format_string(self.yt_search_format, youtube_metadata)
|
||||
spotify_metadata = self._on_spotify_for_type_query(search_query)
|
||||
metadata = spotdl.util.merge(
|
||||
youtube_metadata,
|
||||
spotify_metadata
|
||||
)
|
||||
return metadata
|
||||
|
||||
def _on_youtube_and_spotify_for_type_query(self):
|
||||
logger.debug("Extracting YouTube and Spotify metadata for input track query.")
|
||||
search_query = self.track
|
||||
lyric_query = search_query
|
||||
# Make use of threads here to search on both YouTube & Spotify
|
||||
# at the same time.
|
||||
spotify_metadata = spotdl.util.ThreadWithReturnValue(
|
||||
target=self._on_spotify_for_type_query,
|
||||
args=(search_query,)
|
||||
)
|
||||
spotify_metadata.start()
|
||||
youtube_metadata = self._on_youtube_for_type_query(search_query)
|
||||
metadata = spotdl.util.merge(
|
||||
youtube_metadata,
|
||||
spotify_metadata.join()
|
||||
)
|
||||
return metadata
|
||||
|
||||
def _on_youtube_for_type_spotify(self):
|
||||
logger.debug("Extracting YouTube metadata for input Spotify URI.")
|
||||
spotify_metadata = self._on_spotify_for_type_spotify(self.track)
|
||||
lyric_query = spotdl.metadata.format_string(
|
||||
"{artist} - {track-name}",
|
||||
spotify_metadata,
|
||||
)
|
||||
search_query = spotdl.metadata.format_string(self.yt_search_format, spotify_metadata)
|
||||
videos = self.providers["youtube"].search(search_query)
|
||||
if not videos:
|
||||
raise NoYouTubeVideoFoundError(
|
||||
'YouTube returned no videos for the search query "{}".'.format(search_query)
|
||||
)
|
||||
if self.yt_manual:
|
||||
youtube_video = prompt_for_youtube_search_result(videos)
|
||||
else:
|
||||
youtube_video = videos.bestmatch()
|
||||
|
||||
if youtube_video is None:
|
||||
raise NoYouTubeVideoMatchError(
|
||||
'No matching videos found on YouTube for the search query "{}".'.format(
|
||||
search_query
|
||||
)
|
||||
)
|
||||
|
||||
youtube_metadata = self.providers["youtube"].from_url(youtube_video["url"])
|
||||
return youtube_metadata
|
||||
|
||||
def _on_youtube_for_type_youtube(self, url):
|
||||
logger.debug("Extracting YouTube metadata for input YouTube URL.")
|
||||
youtube_metadata = self.providers["youtube"].from_url(url)
|
||||
return youtube_metadata
|
||||
|
||||
def _on_youtube_for_type_query(self, query):
|
||||
logger.debug("Extracting YouTube metadata for input track query.")
|
||||
videos = self.providers["youtube"].search(query)
|
||||
if not videos:
|
||||
raise NoYouTubeVideoFoundError(
|
||||
'YouTube returned no videos for the search query "{}".'.format(query)
|
||||
)
|
||||
if self.yt_manual:
|
||||
youtube_video = prompt_for_youtube_search_result(videos)
|
||||
else:
|
||||
youtube_video = videos.bestmatch()
|
||||
if youtube_video is None:
|
||||
raise NoYouTubeVideoMatchError(
|
||||
'No matching videos found on YouTube for the search query "{}".'.format(
|
||||
query
|
||||
)
|
||||
)
|
||||
youtube_metadata = self.providers["youtube"].from_url(youtube_video["url"])
|
||||
return youtube_metadata
|
||||
|
||||
def _on_spotify_for_type_youtube(self, url):
|
||||
logger.debug("Extracting Spotify metadata for input YouTube URL.")
|
||||
youtube_metadata = self.providers["youtube"].from_url(url)
|
||||
search_query = spotdl.metadata.format_string(self.yt_search_format, youtube_metadata)
|
||||
spotify_metadata = self.providers["spotify"].from_query(search_query)
|
||||
return spotify_metadata
|
||||
|
||||
def _on_spotify_for_type_spotify(self, url):
|
||||
logger.debug("Extracting Spotify metadata for input Spotify URI.")
|
||||
spotify_metadata = self.providers["spotify"].from_url(url)
|
||||
return spotify_metadata
|
||||
|
||||
def _on_spotify_for_type_query(self, query):
|
||||
logger.debug("Extracting Spotify metadata for input track query.")
|
||||
try:
|
||||
spotify_metadata = self.providers["spotify"].from_query(query)
|
||||
except SpotifyMetadataNotFoundError as e:
|
||||
logger.warn(e.args[0])
|
||||
spotify_metadata = {}
|
||||
return spotify_metadata
|
||||
|
||||
@@ -66,22 +66,33 @@ def prompt_user_for_selection(items):
|
||||
logger.warning("Choose a valid number!")
|
||||
|
||||
|
||||
def is_spotify(raw_song):
|
||||
def is_spotify(track):
|
||||
""" Check if the input song is a Spotify link. """
|
||||
status = len(raw_song) == 22 and raw_song.replace(" ", "%20") == raw_song
|
||||
status = status or raw_song.find("spotify") > -1
|
||||
status = len(track) == 22 and track.replace(" ", "%20") == track
|
||||
status = status or track.find("spotify") > -1
|
||||
return status
|
||||
|
||||
|
||||
def is_youtube(raw_song):
|
||||
def is_youtube(track):
|
||||
""" Check if the input song is a YouTube link. """
|
||||
status = len(raw_song) == 11 and raw_song.replace(" ", "%20") == raw_song
|
||||
status = status and not raw_song.lower() == raw_song
|
||||
status = status or "youtube.com/watch?v=" in raw_song
|
||||
status = len(track) == 11 and track.replace(" ", "%20") == track
|
||||
status = status and not track.lower() == track
|
||||
status = status or "youtube.com/watch?v=" in track
|
||||
return status
|
||||
|
||||
|
||||
def sanitize(string, ok="-_()[]{}", spaces_to_underscores=False):
|
||||
def track_type(track):
|
||||
track_types = {
|
||||
"spotify": is_spotify,
|
||||
"youtube": is_youtube,
|
||||
}
|
||||
for provider, fn in track_types.items():
|
||||
if fn(track):
|
||||
return provider
|
||||
return "query"
|
||||
|
||||
|
||||
def sanitize(string, ok="&-_()[]{}", spaces_to_underscores=False):
|
||||
""" Generate filename of the song to be downloaded. """
|
||||
if spaces_to_underscores:
|
||||
string = string.replace(" ", "_")
|
||||
@@ -197,3 +208,7 @@ def content_available(url):
|
||||
else:
|
||||
return response.getcode() < 300
|
||||
|
||||
|
||||
def titlecase(string):
|
||||
return " ".join(word.capitalize() for word in string.split())
|
||||
|
||||
|
||||
Reference in New Issue
Block a user