mirror of
				https://github.com/KevinMidboe/spotify-downloader.git
				synced 2025-10-29 18:00:15 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			447 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			447 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from spotdl.metadata.providers import ProviderSpotify
 | |
| from spotdl.metadata.providers import ProviderYouTube
 | |
| from spotdl.metadata.providers import YouTubeSearch
 | |
| from spotdl.metadata.embedders import EmbedderDefault
 | |
| from spotdl.metadata.exceptions import SpotifyMetadataNotFoundError
 | |
| import spotdl.metadata
 | |
| 
 | |
| from spotdl.lyrics.providers import LyricWikia
 | |
| from spotdl.lyrics.providers import Genius
 | |
| from spotdl.lyrics.exceptions import LyricsNotFoundError
 | |
| 
 | |
| from spotdl.encode.encoders import EncoderFFmpeg
 | |
| 
 | |
| from spotdl.authorize.services import AuthorizeSpotify
 | |
| 
 | |
| from spotdl.track import Track
 | |
| import spotdl.util
 | |
| import spotdl.config
 | |
| 
 | |
| from spotdl.command_line.exceptions import NoYouTubeVideoFoundError
 | |
| from spotdl.command_line.exceptions import NoYouTubeVideoMatchError
 | |
| from spotdl.metadata_search import MetadataSearch
 | |
| 
 | |
| from spotdl.helpers.spotify import SpotifyHelpers
 | |
| 
 | |
| import sys
 | |
| import os
 | |
| import urllib.request
 | |
| 
 | |
| import logging
 | |
| logger = logging.getLogger(__name__)
 | |
| 
 | |
| 
 | |
| class Spotdl:
 | |
|     def __init__(self, argument_handler):
 | |
|         self.arguments = argument_handler.run_errands()
 | |
| 
 | |
|     def __enter__(self):
 | |
|         return self
 | |
| 
 | |
|     def __exit__(self, type, value, traceback):
 | |
|         del self
 | |
| 
 | |
|     def match_arguments(self):
 | |
|         logger.debug("Received arguments:\n{}".format(self.arguments))
 | |
| 
 | |
|         if self.arguments["remove_config"]:
 | |
|             self.remove_saved_config()
 | |
|             return
 | |
|         self.save_default_config()
 | |
| 
 | |
|         AuthorizeSpotify(
 | |
|             client_id=self.arguments["spotify_client_id"],
 | |
|             client_secret=self.arguments["spotify_client_secret"]
 | |
|         )
 | |
|         spotify_tools = SpotifyHelpers()
 | |
|         if self.arguments["song"]:
 | |
|             for track in self.arguments["song"]:
 | |
|                 if track == "-":
 | |
|                     for line in sys.stdin:
 | |
|                         self.download_track(
 | |
|                             line.strip(),
 | |
|                         )
 | |
|                 else:
 | |
|                     self.download_track(track)
 | |
|         elif self.arguments["list"]:
 | |
|             if self.arguments["write_m3u"]:
 | |
|                 self.write_m3u(
 | |
|                     self.arguments["list"],
 | |
|                     self.arguments["write_to"]
 | |
|                 )
 | |
|             else:
 | |
|                 list_download = {
 | |
|                     "synchronous": self.download_tracks_from_file,
 | |
|                     # "threaded"  : self.download_tracks_from_file_threaded,
 | |
|                 }[self.arguments["processor"]]
 | |
| 
 | |
|                 list_download(
 | |
|                     self.arguments["list"],
 | |
|                 )
 | |
|         elif self.arguments["playlist"]:
 | |
|             playlist = spotify_tools.fetch_playlist(self.arguments["playlist"])
 | |
|             spotify_tools.write_playlist_tracks(playlist, self.arguments["write_to"])
 | |
|         elif self.arguments["album"]:
 | |
|             album = spotify_tools.fetch_album(self.arguments["album"])
 | |
|             spotify_tools.write_album_tracks(album, self.arguments["write_to"])
 | |
|         elif self.arguments["all_albums"]:
 | |
|             albums = spotify_tools.fetch_albums_from_artist(self.arguments["all_albums"])
 | |
|             spotify_tools.write_all_albums(albums, self.arguments["write_to"])
 | |
|         elif self.arguments["username"]:
 | |
|             playlist_url = spotify_tools.prompt_for_user_playlist(self.arguments["username"])
 | |
|             playlist = spotify_tools.fetch_playlist(playlist_url)
 | |
|             spotify_tools.write_playlist_tracks(playlist, self.arguments["write_to"])
 | |
| 
 | |
|     def save_config(self, config_file=spotdl.config.DEFAULT_CONFIG_FILE, config=spotdl.config.DEFAULT_CONFIGURATION):
 | |
|         config_dir = os.path.dirname(config_file)
 | |
|         os.makedirs(config_dir, exist_ok=True)
 | |
|         logger.info('Writing configuration to "{0}":'.format(config_file))
 | |
|         spotdl.config.dump_config(config_file=config_file, config=spotdl.config.DEFAULT_CONFIGURATION)
 | |
|         config = spotdl.config.dump_config(config=spotdl.config.DEFAULT_CONFIGURATION["spotify-downloader"])
 | |
|         for line in config.split("\n"):
 | |
|             if line.strip():
 | |
|                 logger.info(line.strip())
 | |
|         logger.info(
 | |
|             "Please note that command line arguments have higher priority "
 | |
|             "than their equivalents in the configuration file.\n"
 | |
|         )
 | |
| 
 | |
|     def save_default_config(self):
 | |
|         if not os.path.isfile(spotdl.config.DEFAULT_CONFIG_FILE):
 | |
|             self.save_config()
 | |
| 
 | |
|     def remove_saved_config(self, config_file=spotdl.config.DEFAULT_CONFIG_FILE):
 | |
|         if os.path.isfile(spotdl.config.DEFAULT_CONFIG_FILE):
 | |
|             logger.info('Removing "{}".'.format(spotdl.config.DEFAULT_CONFIG_FILE))
 | |
|             os.remove(spotdl.config.DEFAULT_CONFIG_FILE)
 | |
|         else:
 | |
|             logger.info('File does not exist: "{}".'.format(spotdl.config.DEFAULT_CONFIG_FILE))
 | |
| 
 | |
|     def write_m3u(self, track_file, target_file=None):
 | |
|         with open(track_file, "r") as fin:
 | |
|             tracks = fin.read().splitlines()
 | |
| 
 | |
|         logger.info(
 | |
|             "Checking and removing any duplicate tracks in {}.".format(track_file)
 | |
|         )
 | |
|         # Remove duplicates and empty elements
 | |
|         # Also strip whitespaces from elements (if any)
 | |
|         tracks = spotdl.util.remove_duplicates(
 | |
|             tracks,
 | |
|             condition=lambda x: x,
 | |
|             operation=str.strip
 | |
|         )
 | |
| 
 | |
|         if target_file is None:
 | |
|             target_file = "{}.m3u".format(track_file.split(".")[0])
 | |
| 
 | |
|         total_tracks = len(tracks)
 | |
|         logger.info("Generating {0} from {1} YouTube URLs.".format(target_file, total_tracks))
 | |
|         write_to_stdout = target_file == "-"
 | |
|         m3u_headers = "#EXTM3U\n\n"
 | |
|         if write_to_stdout:
 | |
|             sys.stdout.write(m3u_headers)
 | |
|         else:
 | |
|             with open(target_file, "w") as output_file:
 | |
|                 output_file.write(m3u_headers)
 | |
| 
 | |
|         videos = []
 | |
|         for n, track in enumerate(tracks, 1):
 | |
|             try:
 | |
|                 search_metadata = MetadataSearch(
 | |
|                     track,
 | |
|                     lyrics=not self.arguments["no_metadata"],
 | |
|                     yt_search_format=self.arguments["search_format"],
 | |
|                     yt_manual=self.arguments["manual"]
 | |
|                 )
 | |
|                 video = search_metadata.best_on_youtube_search()
 | |
|             except (NoYouTubeVideoFoundError, NoYouTubeVideoMatchError) as e:
 | |
|                 logger.error(e.args[0])
 | |
|             else:
 | |
|                 logger.info(
 | |
|                     "Matched track {0}/{1} ({2})".format(
 | |
|                         str(n).zfill(len(str(total_tracks))),
 | |
|                         total_tracks,
 | |
|                         video["url"],
 | |
|                     )
 | |
|                 )
 | |
|                 m3u_key = "#EXTINF:{duration},{title}\n{youtube_url}\n".format(
 | |
|                     duration=spotdl.util.get_sec(video["duration"]),
 | |
|                     title=video["title"],
 | |
|                     youtube_url=video["url"],
 | |
|                 )
 | |
|                 logger.debug(m3u_key.strip())
 | |
|                 if write_to_stdout:
 | |
|                     sys.stdout.write(m3u_key)
 | |
|                 else:
 | |
|                     with open(target_file, "a") as output_file:
 | |
|                         output_file.write(m3u_key)
 | |
| 
 | |
|     def download_track(self, track):
 | |
|         logger.info('Downloading "{}"'.format(track))
 | |
|         search_metadata = MetadataSearch(
 | |
|             track,
 | |
|             lyrics=not self.arguments["no_metadata"],
 | |
|             yt_search_format=self.arguments["search_format"],
 | |
|             yt_manual=self.arguments["manual"]
 | |
|         )
 | |
|         try:
 | |
|             if self.arguments["no_metadata"]:
 | |
|                 metadata = search_metadata.on_youtube()
 | |
|             else:
 | |
|                 metadata = search_metadata.on_youtube_and_spotify()
 | |
|         except (NoYouTubeVideoFoundError, NoYouTubeVideoMatchError) as e:
 | |
|             logger.error(e.args[0])
 | |
|         else:
 | |
|             self.download_track_from_metadata(metadata)
 | |
| 
 | |
|     def should_we_overwrite_existing_file(self, overwrite):
 | |
|         if overwrite == "force":
 | |
|             logger.info("Forcing overwrite on existing file.")
 | |
|             to_overwrite = True
 | |
|         elif overwrite == "prompt":
 | |
|             to_overwrite = input("Overwrite? (y/N): ").lower() == "y"
 | |
|         else:
 | |
|             logger.info("Not overwriting existing file.")
 | |
|             to_overwrite = False
 | |
| 
 | |
|         return to_overwrite
 | |
| 
 | |
|     def generate_temp_filename(self, filename, for_stdout=False):
 | |
|         if for_stdout:
 | |
|             return filename
 | |
|         return "{filename}.temp".format(filename=filename)
 | |
| 
 | |
|     def output_filename_filter(self, allow_spaces):
 | |
|         replace_spaces_with_underscores = not allow_spaces
 | |
|         if replace_spaces_with_underscores:
 | |
|             return lambda s: s.replace(" ", "_")
 | |
|         return lambda s: s
 | |
| 
 | |
|     def download_track_from_metadata(self, metadata):
 | |
|         track = Track(metadata, cache_albumart=(not self.arguments["no_metadata"]))
 | |
|         stream = metadata["streams"].get(
 | |
|             quality=self.arguments["quality"],
 | |
|             preftype=self.arguments["input_ext"],
 | |
|         )
 | |
|         if stream is None:
 | |
|             logger.error('No matching streams found for given input format: "{}".'.format(
 | |
|                 self.arguments["input_ext"]
 | |
|             ))
 | |
|             return
 | |
| 
 | |
|         if self.arguments["no_encode"]:
 | |
|             output_extension = stream["encoding"]
 | |
|         else:
 | |
|             output_extension = self.arguments["output_ext"]
 | |
| 
 | |
|         filename = spotdl.metadata.format_string(
 | |
|             self.arguments["output_file"],
 | |
|             metadata,
 | |
|             output_extension=output_extension,
 | |
|             sanitizer=lambda s: spotdl.util.sanitize(
 | |
|                 s, spaces_to_underscores=self.arguments["no_spaces"]
 | |
|             )
 | |
|         )
 | |
|         download_to_stdout = filename == "-"
 | |
|         temp_filename = self.generate_temp_filename(filename, for_stdout=download_to_stdout)
 | |
| 
 | |
|         to_skip_download = self.arguments["dry_run"]
 | |
|         if os.path.isfile(filename):
 | |
|             logger.info('A file with name "{filename}" already exists.'.format(
 | |
|                 filename=filename
 | |
|             ))
 | |
|             to_skip_download = to_skip_download \
 | |
|                 or not self.should_we_overwrite_existing_file(self.arguments["overwrite"])
 | |
| 
 | |
|         if to_skip_download:
 | |
|             logger.debug("Skip track download.")
 | |
|             return
 | |
| 
 | |
|         if not self.arguments["no_metadata"]:
 | |
|             metadata["lyrics"].start()
 | |
| 
 | |
|         filter_space_chars = self.output_filename_filter(not self.arguments["no_spaces"])
 | |
|         directory = os.path.dirname(
 | |
|             spotdl.metadata.format_string(
 | |
|                 self.arguments["output_file"],
 | |
|                 metadata,
 | |
|                 output_extension=output_extension,
 | |
|                 sanitizer=filter_space_chars
 | |
|             )
 | |
|         )
 | |
|         os.makedirs(directory or ".", exist_ok=True)
 | |
| 
 | |
|         logger.info('Downloading to "{filename}"'.format(filename=filename))
 | |
|         if self.arguments["no_encode"]:
 | |
|             track.download(stream, temp_filename)
 | |
|         else:
 | |
|             encoder = EncoderFFmpeg()
 | |
|             if self.arguments["trim_silence"]:
 | |
|                 encoder.set_trim_silence()
 | |
|             track.download_while_re_encoding(
 | |
|                 stream,
 | |
|                 temp_filename,
 | |
|                 target_encoding=output_extension,
 | |
|                 encoder=encoder,
 | |
|             )
 | |
| 
 | |
|         if not self.arguments["no_metadata"]:
 | |
|             track.metadata["lyrics"] = track.metadata["lyrics"].join()
 | |
|             self.apply_metadata(track, temp_filename, output_extension)
 | |
| 
 | |
|         if not download_to_stdout:
 | |
|             logger.debug("Renaming {temp_filename} to {filename}.".format(
 | |
|                 temp_filename=temp_filename, filename=filename
 | |
|             ))
 | |
|             os.rename(temp_filename, filename)
 | |
| 
 | |
|         return filename
 | |
| 
 | |
|     def apply_metadata(self, track, filename, encoding):
 | |
|         logger.info("Applying metadata")
 | |
|         try:
 | |
|             track.apply_metadata(filename, encoding=encoding)
 | |
|         except TypeError:
 | |
|             logger.warning("Cannot apply metadata on provided output format.")
 | |
| 
 | |
|     def strip_and_filter_duplicates(self, tracks):
 | |
|         filtered_tracks = spotdl.util.remove_duplicates(
 | |
|             tracks,
 | |
|             condition=lambda x: x,
 | |
|             operation=str.strip
 | |
|         )
 | |
|         return filtered_tracks
 | |
| 
 | |
|     def filter_against_skip_file(self, items, skip_file):
 | |
|         skip_items = spotdl.util.readlines_from_nonbinary_file(skip_file)
 | |
|         filtered_skip_items = self.strip_and_filter_duplicates(skip_items)
 | |
|         filtered_items = [item for item in items if not item in filtered_skip_items]
 | |
|         return filtered_items
 | |
| 
 | |
|     def download_tracks_from_file(self, path):
 | |
|         logger.info(
 | |
|             "Checking and removing any duplicate tracks in {}.".format(path)
 | |
|         )
 | |
|         tracks = spotdl.util.readlines_from_nonbinary_file(path)
 | |
|         tracks = self.strip_and_filter_duplicates(tracks)
 | |
| 
 | |
|         if self.arguments["skip_file"]:
 | |
|             len_tracks_before = len(tracks)
 | |
|             tracks = self.filter_against_skip_file(tracks, self.arguments["skip_file"])
 | |
|             logger.info("Skipping {} tracks due to matches in skip file.".format(
 | |
|                 len_tracks_before - len(tracks))
 | |
|             )
 | |
|         # Overwrite file
 | |
|         spotdl.util.writelines_to_nonbinary_file(path, tracks)
 | |
| 
 | |
|         logger.info(
 | |
|             "Downloading {n} tracks.\n".format(n=len(tracks))
 | |
|         )
 | |
| 
 | |
|         for position, track in enumerate(tracks, 1):
 | |
|             search_metadata = MetadataSearch(
 | |
|                 track,
 | |
|                 lyrics=True,
 | |
|                 yt_search_format=self.arguments["search_format"],
 | |
|                 yt_manual=self.arguments["manual"]
 | |
|             )
 | |
|             try:
 | |
|                 log_track_query = '{position}. Downloading "{track}"'.format(
 | |
|                     position=position,
 | |
|                     track=track
 | |
|                 )
 | |
|                 logger.info(log_track_query)
 | |
|                 metadata = search_metadata.on_youtube_and_spotify()
 | |
|                 self.download_track_from_metadata(metadata)
 | |
|             except (urllib.request.URLError, TypeError, IOError) as e:
 | |
|                 logger.exception(e.args[0])
 | |
|                 logger.warning(
 | |
|                     "Failed to download current track due to possible network issue. "
 | |
|                     "Will retry after other songs."
 | |
|                 )
 | |
|                 tracks.append(track)
 | |
|             except (NoYouTubeVideoFoundError, NoYouTubeVideoMatchError) as e:
 | |
|                 logger.error("{err}".format(err=e.args[0]))
 | |
|             else:
 | |
|                 if self.arguments["write_successful_file"]:
 | |
|                     with open(self.arguments["write_successful_file"], "a") as fout:
 | |
|                         fout.write("{}\n".format(track))
 | |
|             finally:
 | |
|                 spotdl.util.writelines_to_nonbinary_file(path, tracks[position:])
 | |
|                 print("", file=sys.stderr)
 | |
| 
 | |
|     """
 | |
|     def download_tracks_from_file_threaded(self, path):
 | |
|         # FIXME: Can we make this function cleaner?
 | |
| 
 | |
|         logger.info(
 | |
|             "Checking and removing any duplicate tracks in {}.\n".format(path)
 | |
|         )
 | |
|         with open(path, "r") as fin:
 | |
|             # Read tracks into a list and remove any duplicates
 | |
|             tracks = fin.read().splitlines()
 | |
| 
 | |
|         # Remove duplicates and empty elements
 | |
|         # Also strip whitespaces from elements (if any)
 | |
|         spotdl.util.remove_duplicates(
 | |
|             tracks,
 | |
|             condition=lambda x: x,
 | |
|             operation=str.strip
 | |
|         )
 | |
| 
 | |
|         # Overwrite file
 | |
|         with open(path, "w") as fout:
 | |
|             fout.writelines(tracks)
 | |
| 
 | |
|         tracks_count = len(tracks)
 | |
|         current_iteration = 1
 | |
| 
 | |
|         next_track = tracks.pop(0)
 | |
|         metadata = {
 | |
|             "current_track": None,
 | |
|             "next_track": spotdl.util.ThreadWithReturnValue(
 | |
|                 target=search_metadata,
 | |
|                 args=(next_track, self.arguments["search_format"])
 | |
|             )
 | |
|         }
 | |
|         metadata["next_track"].start()
 | |
|         while tracks_count > 0:
 | |
|             metadata["current_track"] = metadata["next_track"].join()
 | |
|             metadata["next_track"] = None
 | |
|             try:
 | |
|                 print(tracks_count, file=sys.stderr)
 | |
|                 print(tracks, file=sys.stderr)
 | |
|                 if tracks_count > 1:
 | |
|                     current_track = next_track
 | |
|                     next_track = tracks.pop(0)
 | |
|                     metadata["next_track"] = spotdl.util.ThreadWithReturnValue(
 | |
|                         target=search_metadata,
 | |
|                         args=(next_track, self.arguments["search_format"])
 | |
|                     )
 | |
|                     metadata["next_track"].start()
 | |
| 
 | |
|                 log_track_query = str(current_iteration) + ". {artist} - {track-name}"
 | |
|                 logger.info(log_track_query)
 | |
|                 if metadata["current_track"] is None:
 | |
|                     logger.warning("Something went wrong. Will retry after downloading remaining tracks.")
 | |
|                     pass
 | |
|                 print(metadata["current_track"]["name"], file=sys.stderr)
 | |
|                 # self.download_track_from_metadata(metadata["current_track"])
 | |
|             except (urllib.request.URLError, TypeError, IOError) as e:
 | |
|                 print("", file=sys.stderr)
 | |
|                 logger.exception(e.args[0])
 | |
|                 logger.warning("Failed. Will retry after other songs\n")
 | |
|                 tracks.append(current_track)
 | |
|             else:
 | |
|                 tracks_count -= 1
 | |
|                 if self.arguments["write_sucessful_file"]:
 | |
|                     with open(self.arguments["write_sucessful_file"], "a") as fout:
 | |
|                         fout.write(current_track)
 | |
|             finally:
 | |
|                 current_iteration += 1
 | |
|                 with open(path, "w") as fout:
 | |
|                     fout.writelines(tracks)
 | |
|     """
 | |
| 
 |