mirror of
https://github.com/KevinMidboe/spotify-downloader.git
synced 2025-12-08 20:39:08 +00:00
Rename spotdl.command_line.lib to spotdl.command_line.core
This commit is contained in:
441
spotdl/command_line/core.py
Normal file
441
spotdl/command_line/core.py
Normal file
@@ -0,0 +1,441 @@
|
||||
from spotdl.metadata.providers import ProviderSpotify
|
||||
from spotdl.metadata.providers import ProviderYouTube
|
||||
from spotdl.metadata.providers import YouTubeSearch
|
||||
from spotdl.metadata.embedders import EmbedderDefault
|
||||
from spotdl.metadata.exceptions import SpotifyMetadataNotFoundError
|
||||
import spotdl.metadata
|
||||
|
||||
from spotdl.lyrics.providers import LyricWikia
|
||||
from spotdl.lyrics.providers import Genius
|
||||
from spotdl.lyrics.exceptions import LyricsNotFoundError
|
||||
|
||||
from spotdl.encode.encoders import EncoderFFmpeg
|
||||
|
||||
from spotdl.authorize.services import AuthorizeSpotify
|
||||
|
||||
from spotdl.track import Track
|
||||
import spotdl.util
|
||||
import spotdl.config
|
||||
|
||||
from spotdl.command_line.exceptions import NoYouTubeVideoFoundError
|
||||
from spotdl.command_line.exceptions import NoYouTubeVideoMatchError
|
||||
from spotdl.metadata_search import MetadataSearch
|
||||
|
||||
from spotdl.helpers.spotify import SpotifyHelpers
|
||||
|
||||
import sys
|
||||
import os
|
||||
import urllib.request
|
||||
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Spotdl:
|
||||
def __init__(self, argument_handler):
|
||||
self.arguments = argument_handler.run_errands()
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, type, value, traceback):
|
||||
del self
|
||||
|
||||
def match_arguments(self):
|
||||
logger.debug("Received arguments:\n{}".format(self.arguments))
|
||||
|
||||
if self.arguments["remove_config"]:
|
||||
self.remove_saved_config()
|
||||
return
|
||||
self.save_default_config()
|
||||
|
||||
AuthorizeSpotify(
|
||||
client_id=self.arguments["spotify_client_id"],
|
||||
client_secret=self.arguments["spotify_client_secret"]
|
||||
)
|
||||
spotify_tools = SpotifyHelpers()
|
||||
if self.arguments["song"]:
|
||||
for track in self.arguments["song"]:
|
||||
if track == "-":
|
||||
for line in sys.stdin:
|
||||
self.download_track(
|
||||
line.strip(),
|
||||
)
|
||||
else:
|
||||
self.download_track(track)
|
||||
elif self.arguments["list"]:
|
||||
if self.arguments["write_m3u"]:
|
||||
self.write_m3u(
|
||||
self.arguments["list"],
|
||||
self.arguments["write_to"]
|
||||
)
|
||||
else:
|
||||
list_download = {
|
||||
"synchronous": self.download_tracks_from_file,
|
||||
# "threaded" : self.download_tracks_from_file_threaded,
|
||||
}[self.arguments["processor"]]
|
||||
|
||||
list_download(
|
||||
self.arguments["list"],
|
||||
)
|
||||
elif self.arguments["playlist"]:
|
||||
playlist = spotify_tools.fetch_playlist(self.arguments["playlist"])
|
||||
spotify_tools.write_playlist_tracks(playlist, self.arguments["write_to"])
|
||||
elif self.arguments["album"]:
|
||||
album = spotify_tools.fetch_album(self.arguments["album"])
|
||||
spotify_tools.write_album_tracks(album, self.arguments["write_to"])
|
||||
elif self.arguments["all_albums"]:
|
||||
albums = spotify_tools.fetch_albums_from_artist(self.arguments["all_albums"])
|
||||
spotify_tools.write_all_albums(albums, self.arguments["write_to"])
|
||||
elif self.arguments["username"]:
|
||||
playlist_url = spotify_tools.prompt_for_user_playlist(self.arguments["username"])
|
||||
playlist = spotify_tools.fetch_playlist(playlist_url)
|
||||
spotify_tools.write_playlist_tracks(playlist, self.arguments["write_to"])
|
||||
|
||||
def save_config(self, config_file=spotdl.config.DEFAULT_CONFIG_FILE, config=spotdl.config.DEFAULT_CONFIGURATION):
|
||||
config_dir = os.path.dirname(config_file)
|
||||
os.makedirs(config_dir, exist_ok=True)
|
||||
logger.info('Writing configuration to "{0}":'.format(config_file))
|
||||
spotdl.config.dump_config(config_file=config_file, config=spotdl.config.DEFAULT_CONFIGURATION)
|
||||
config = spotdl.config.dump_config(config=spotdl.config.DEFAULT_CONFIGURATION["spotify-downloader"])
|
||||
for line in config.split("\n"):
|
||||
if line.strip():
|
||||
logger.info(line.strip())
|
||||
logger.info(
|
||||
"Please note that command line arguments have higher priority "
|
||||
"than their equivalents in the configuration file.\n"
|
||||
)
|
||||
|
||||
def save_default_config(self):
|
||||
if not os.path.isfile(spotdl.config.DEFAULT_CONFIG_FILE):
|
||||
self.save_config()
|
||||
|
||||
def remove_saved_config(self, config_file=spotdl.config.DEFAULT_CONFIG_FILE):
|
||||
if os.path.isfile(spotdl.config.DEFAULT_CONFIG_FILE):
|
||||
logger.info('Removing "{}".'.format(spotdl.config.DEFAULT_CONFIG_FILE))
|
||||
os.remove(spotdl.config.DEFAULT_CONFIG_FILE)
|
||||
else:
|
||||
logger.info('File does not exist: "{}".'.format(spotdl.config.DEFAULT_CONFIG_FILE))
|
||||
|
||||
def write_m3u(self, track_file, target_file=None):
|
||||
with open(track_file, "r") as fin:
|
||||
tracks = fin.read().splitlines()
|
||||
|
||||
logger.info(
|
||||
"Checking and removing any duplicate tracks in {}.".format(track_file)
|
||||
)
|
||||
# Remove duplicates and empty elements
|
||||
# Also strip whitespaces from elements (if any)
|
||||
tracks = spotdl.util.remove_duplicates(
|
||||
tracks,
|
||||
condition=lambda x: x,
|
||||
operation=str.strip
|
||||
)
|
||||
|
||||
if target_file is None:
|
||||
target_file = "{}.m3u".format(track_file.split(".")[0])
|
||||
|
||||
total_tracks = len(tracks)
|
||||
logger.info("Generating {0} from {1} YouTube URLs.".format(target_file, total_tracks))
|
||||
write_to_stdout = target_file == "-"
|
||||
m3u_headers = "#EXTM3U\n\n"
|
||||
if write_to_stdout:
|
||||
sys.stdout.write(m3u_headers)
|
||||
else:
|
||||
with open(target_file, "w") as output_file:
|
||||
output_file.write(m3u_headers)
|
||||
|
||||
videos = []
|
||||
for n, track in enumerate(tracks, 1):
|
||||
try:
|
||||
search_metadata = MetadataSearch(
|
||||
track,
|
||||
lyrics=not self.arguments["no_metadata"],
|
||||
yt_search_format=self.arguments["search_format"],
|
||||
yt_manual=self.arguments["manual"]
|
||||
)
|
||||
video = search_metadata.best_on_youtube_search()
|
||||
except (NoYouTubeVideoFoundError, NoYouTubeVideoMatchError) as e:
|
||||
logger.error(e.args[0])
|
||||
else:
|
||||
logger.info(
|
||||
"Matched track {0}/{1} ({2})".format(
|
||||
str(n).zfill(len(str(total_tracks))),
|
||||
total_tracks,
|
||||
video["url"],
|
||||
)
|
||||
)
|
||||
m3u_key = "#EXTINF:{duration},{title}\n{youtube_url}\n".format(
|
||||
duration=spotdl.util.get_sec(video["duration"]),
|
||||
title=video["title"],
|
||||
youtube_url=video["url"],
|
||||
)
|
||||
logger.debug(m3u_key.strip())
|
||||
if write_to_stdout:
|
||||
sys.stdout.write(m3u_key)
|
||||
else:
|
||||
with open(target_file, "a") as output_file:
|
||||
output_file.write(m3u_key)
|
||||
|
||||
def download_track(self, track):
|
||||
logger.info('Downloading "{}"'.format(track))
|
||||
search_metadata = MetadataSearch(
|
||||
track,
|
||||
lyrics=not self.arguments["no_metadata"],
|
||||
yt_search_format=self.arguments["search_format"],
|
||||
yt_manual=self.arguments["manual"]
|
||||
)
|
||||
try:
|
||||
if self.arguments["no_metadata"]:
|
||||
metadata = search_metadata.on_youtube()
|
||||
else:
|
||||
metadata = search_metadata.on_youtube_and_spotify()
|
||||
except (NoYouTubeVideoFoundError, NoYouTubeVideoMatchError) as e:
|
||||
logger.error(e.args[0])
|
||||
else:
|
||||
self.download_track_from_metadata(metadata)
|
||||
|
||||
def should_we_overwrite_existing_file(self, overwrite):
|
||||
if overwrite == "force":
|
||||
logger.info("Forcing overwrite on existing file.")
|
||||
to_overwrite = True
|
||||
elif overwrite == "prompt":
|
||||
to_overwrite = input("Overwrite? (y/N): ").lower() == "y"
|
||||
else:
|
||||
logger.info("Not overwriting existing file.")
|
||||
to_overwrite = False
|
||||
|
||||
return to_overwrite
|
||||
|
||||
def generate_temp_filename(self, filename, for_stdout=False):
|
||||
if for_stdout:
|
||||
return filename
|
||||
return "{filename}.temp".format(filename=filename)
|
||||
|
||||
def output_filename_filter(self, allow_spaces):
|
||||
replace_spaces_with_underscores = not allow_spaces
|
||||
if replace_spaces_with_underscores:
|
||||
return lambda s: s.replace(" ", "_")
|
||||
return lambda s: s
|
||||
|
||||
def download_track_from_metadata(self, metadata):
|
||||
track = Track(metadata, cache_albumart=(not self.arguments["no_metadata"]))
|
||||
stream = metadata["streams"].get(
|
||||
quality=self.arguments["quality"],
|
||||
preftype=self.arguments["input_ext"],
|
||||
)
|
||||
|
||||
if self.arguments["no_encode"]:
|
||||
output_extension = stream["encoding"]
|
||||
else:
|
||||
output_extension = self.arguments["output_ext"]
|
||||
|
||||
filename = spotdl.metadata.format_string(
|
||||
self.arguments["output_file"],
|
||||
metadata,
|
||||
output_extension=output_extension,
|
||||
sanitizer=lambda s: spotdl.util.sanitize(
|
||||
s, spaces_to_underscores=self.arguments["no_spaces"]
|
||||
)
|
||||
)
|
||||
download_to_stdout = filename == "-"
|
||||
temp_filename = self.generate_temp_filename(filename, for_stdout=download_to_stdout)
|
||||
|
||||
to_skip_download = self.arguments["dry_run"]
|
||||
if os.path.isfile(filename):
|
||||
logger.info('A file with name "{filename}" already exists.'.format(
|
||||
filename=filename
|
||||
))
|
||||
to_skip_download = to_skip_download \
|
||||
or not self.should_we_overwrite_existing_file(self.arguments["overwrite"])
|
||||
|
||||
if to_skip_download:
|
||||
logger.debug("Skip track download.")
|
||||
return
|
||||
|
||||
if not self.arguments["no_metadata"]:
|
||||
metadata["lyrics"].start()
|
||||
|
||||
filter_space_chars = self.output_filename_filter(not self.arguments["no_spaces"])
|
||||
directory = os.path.dirname(
|
||||
spotdl.metadata.format_string(
|
||||
self.arguments["output_file"],
|
||||
metadata,
|
||||
output_extension=output_extension,
|
||||
sanitizer=filter_space_chars
|
||||
)
|
||||
)
|
||||
os.makedirs(directory or ".", exist_ok=True)
|
||||
|
||||
logger.info('Downloading to "{filename}"'.format(filename=filename))
|
||||
if self.arguments["no_encode"]:
|
||||
track.download(stream, temp_filename)
|
||||
else:
|
||||
encoder = EncoderFFmpeg()
|
||||
if self.arguments["trim_silence"]:
|
||||
encoder.set_trim_silence()
|
||||
track.download_while_re_encoding(
|
||||
stream,
|
||||
temp_filename,
|
||||
target_encoding=output_extension,
|
||||
encoder=encoder,
|
||||
)
|
||||
|
||||
if not self.arguments["no_metadata"]:
|
||||
track.metadata["lyrics"] = track.metadata["lyrics"].join()
|
||||
self.apply_metadata(track, temp_filename, output_extension)
|
||||
|
||||
if not download_to_stdout:
|
||||
logger.debug("Renaming {temp_filename} to {filename}.".format(
|
||||
temp_filename=temp_filename, filename=filename
|
||||
))
|
||||
os.rename(temp_filename, filename)
|
||||
|
||||
return filename
|
||||
|
||||
def apply_metadata(self, track, filename, encoding):
|
||||
logger.info("Applying metadata")
|
||||
try:
|
||||
track.apply_metadata(filename, encoding=encoding)
|
||||
except TypeError:
|
||||
logger.warning("Cannot apply metadata on provided output format.")
|
||||
|
||||
def strip_and_filter_duplicates(self, tracks):
|
||||
filtered_tracks = spotdl.util.remove_duplicates(
|
||||
tracks,
|
||||
condition=lambda x: x,
|
||||
operation=str.strip
|
||||
)
|
||||
return filtered_tracks
|
||||
|
||||
def filter_against_skip_file(self, items, skip_file):
|
||||
skip_items = spotdl.util.readlines_from_nonbinary_file(skip_file)
|
||||
filtered_skip_items = self.strip_and_filter_duplicates(skip_items)
|
||||
filtered_items = [item for item in items if not item in filtered_skip_items]
|
||||
return filtered_items
|
||||
|
||||
def download_tracks_from_file(self, path):
|
||||
logger.info(
|
||||
"Checking and removing any duplicate tracks in {}.".format(path)
|
||||
)
|
||||
tracks = spotdl.util.readlines_from_nonbinary_file(path)
|
||||
tracks = self.strip_and_filter_duplicates(tracks)
|
||||
|
||||
if self.arguments["skip_file"]:
|
||||
len_tracks_before = len(tracks)
|
||||
tracks = self.filter_against_skip_file(tracks, self.arguments["skip_file"])
|
||||
logger.info("Skipping {} tracks due to matches in skip file.".format(
|
||||
len_tracks_before - len(tracks))
|
||||
)
|
||||
# Overwrite file
|
||||
spotdl.util.writelines_to_nonbinary_file(path, tracks)
|
||||
|
||||
logger.info(
|
||||
"Downloading {n} tracks.\n".format(n=len(tracks))
|
||||
)
|
||||
|
||||
for position, track in enumerate(tracks, 1):
|
||||
search_metadata = MetadataSearch(
|
||||
track,
|
||||
lyrics=True,
|
||||
yt_search_format=self.arguments["search_format"],
|
||||
yt_manual=self.arguments["manual"]
|
||||
)
|
||||
try:
|
||||
log_track_query = '{position}. Downloading "{track}"'.format(
|
||||
position=position,
|
||||
track=track
|
||||
)
|
||||
logger.info(log_track_query)
|
||||
metadata = search_metadata.on_youtube_and_spotify()
|
||||
self.download_track_from_metadata(metadata)
|
||||
except (urllib.request.URLError, TypeError, IOError) as e:
|
||||
logger.exception(e.args[0])
|
||||
logger.warning(
|
||||
"Failed to download current track due to possible network issue. "
|
||||
"Will retry after other songs."
|
||||
)
|
||||
tracks.append(track)
|
||||
except (NoYouTubeVideoFoundError, NoYouTubeVideoMatchError) as e:
|
||||
logger.error("{err}".format(err=e.args[0]))
|
||||
else:
|
||||
if self.arguments["write_successful_file"]:
|
||||
with open(self.arguments["write_successful_file"], "a") as fout:
|
||||
fout.write("{}\n".format(track))
|
||||
finally:
|
||||
spotdl.util.writelines_to_nonbinary_file(path, tracks[position:])
|
||||
print("", file=sys.stderr)
|
||||
|
||||
"""
|
||||
def download_tracks_from_file_threaded(self, path):
|
||||
# FIXME: Can we make this function cleaner?
|
||||
|
||||
logger.info(
|
||||
"Checking and removing any duplicate tracks in {}.\n".format(path)
|
||||
)
|
||||
with open(path, "r") as fin:
|
||||
# Read tracks into a list and remove any duplicates
|
||||
tracks = fin.read().splitlines()
|
||||
|
||||
# Remove duplicates and empty elements
|
||||
# Also strip whitespaces from elements (if any)
|
||||
spotdl.util.remove_duplicates(
|
||||
tracks,
|
||||
condition=lambda x: x,
|
||||
operation=str.strip
|
||||
)
|
||||
|
||||
# Overwrite file
|
||||
with open(path, "w") as fout:
|
||||
fout.writelines(tracks)
|
||||
|
||||
tracks_count = len(tracks)
|
||||
current_iteration = 1
|
||||
|
||||
next_track = tracks.pop(0)
|
||||
metadata = {
|
||||
"current_track": None,
|
||||
"next_track": spotdl.util.ThreadWithReturnValue(
|
||||
target=search_metadata,
|
||||
args=(next_track, self.arguments["search_format"])
|
||||
)
|
||||
}
|
||||
metadata["next_track"].start()
|
||||
while tracks_count > 0:
|
||||
metadata["current_track"] = metadata["next_track"].join()
|
||||
metadata["next_track"] = None
|
||||
try:
|
||||
print(tracks_count, file=sys.stderr)
|
||||
print(tracks, file=sys.stderr)
|
||||
if tracks_count > 1:
|
||||
current_track = next_track
|
||||
next_track = tracks.pop(0)
|
||||
metadata["next_track"] = spotdl.util.ThreadWithReturnValue(
|
||||
target=search_metadata,
|
||||
args=(next_track, self.arguments["search_format"])
|
||||
)
|
||||
metadata["next_track"].start()
|
||||
|
||||
log_track_query = str(current_iteration) + ". {artist} - {track-name}"
|
||||
logger.info(log_track_query)
|
||||
if metadata["current_track"] is None:
|
||||
logger.warning("Something went wrong. Will retry after downloading remaining tracks.")
|
||||
pass
|
||||
print(metadata["current_track"]["name"], file=sys.stderr)
|
||||
# self.download_track_from_metadata(metadata["current_track"])
|
||||
except (urllib.request.URLError, TypeError, IOError) as e:
|
||||
print("", file=sys.stderr)
|
||||
logger.exception(e.args[0])
|
||||
logger.warning("Failed. Will retry after other songs\n")
|
||||
tracks.append(current_track)
|
||||
else:
|
||||
tracks_count -= 1
|
||||
if self.arguments["write_sucessful_file"]:
|
||||
with open(self.arguments["write_sucessful_file"], "a") as fout:
|
||||
fout.write(current_track)
|
||||
finally:
|
||||
current_iteration += 1
|
||||
with open(path, "w") as fout:
|
||||
fout.writelines(tracks)
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user