Very brittle command-line frontend

This commit is contained in:
Ritiek Malhotra
2020-04-11 22:03:47 +05:30
parent 14104e6870
commit 9afd14282a
16 changed files with 167 additions and 220 deletions

View File

@@ -70,5 +70,5 @@ setup(
"Topic :: Multimedia :: Sound/Audio",
"Topic :: Utilities",
],
entry_points={"console_scripts": ["spotdl = spotdl.spotdl:main"]},
entry_points={"console_scripts": ["spotdl = spotdl.command_line.__main__:main"]},
)

View File

@@ -1,3 +1,3 @@
__version__ = "1.2.6"
__version__ = "2.0.0"
from spotdl.track import Track

View File

@@ -4,52 +4,37 @@ from spotdl.authorize.exceptions import SpotifyAuthorizationError
import spotipy
import spotipy.oauth2 as oauth2
# This global_client is used to keep the last logged-in client
# This masterclient is used to keep the last logged-in client
# object in memory for for persistence. If credentials aren't
# provided when creating further objects, the last authenticated
# client object with correct credentials is returned when
# `AuthorizeSpotify().authorize()` is called.
global_client = None
masterclient = None
class AuthorizeSpotify(AuthorizeBase):
def __init__(self):
global global_client
self._client = global_client
class AuthorizeSpotify(spotipy.Spotify):
def __init__(self, client_id=None, client_secret=None):
global masterclient
def _generate_token(self, client_id, client_secret):
""" Generate the token. """
credentials = oauth2.SpotifyClientCredentials(
client_id=client_id,
client_secret=client_secret,
)
token = credentials.get_access_token()
return token
credentials_provided = client_id is not None \
and client_secret is not None
valid_input = credentials_provided or masterclient is not None
def authorize(self, client_id=None, client_secret=None):
no_credentials_provided = client_id is None and client_secret is None
not_valid_input = no_credentials_provided and self._client is None
if not_valid_input:
if not valid_input:
raise SpotifyAuthorizationError(
"You must pass in client_id and client_secret to this method "
"when authenticating for the first time."
)
if no_credentials_provided:
return self._client
try:
token = self._generate_token(client_id, client_secret)
except spotipy.SpotifyOauthError:
raise SpotifyAuthorizeError(
"Failed to retrieve token. Perhaps you provided invalid credentials?"
if masterclient:
# Use cached client instead of authorizing again
# and thus wasting time.
self.__dict__.update(masterclient.__dict__)
else:
credential_manager = oauth2.SpotifyClientCredentials(
client_id=client_id,
client_secret=client_secret
)
spotify = spotipy.Spotify(auth=token)
self._client = spotify
global global_client
global_client = spotify
return spotify
super().__init__(client_credentials_manager=credential_manager)
# Cache current client
masterclient = self

View File

@@ -0,0 +1,3 @@
from spotdl.command_line.arguments import get_arguments
from spotdl.command_line import helpers

View File

@@ -1,55 +1,61 @@
def match_args():
if const.args.song:
for track in const.args.song:
track_dl = downloader.Downloader(raw_song=track)
track_dl.download_single()
elif const.args.list:
if const.args.write_m3u:
from spotdl.authorize.services import AuthorizeSpotify
from spotdl import command_line
def match_arguments(arguments):
if arguments.tracks:
# TODO: Also support reading from stdin for -t parameter
# Also supported writing to stdout for all parameters
if len(arguments.tracks) > 1:
# log.warning("download multiple tracks with optimized list instead")
pass
for track in arguments.tracks:
command_line.helpers.download_track(track, arguments)
elif arguments.list:
if arguments.write_m3u:
youtube_tools.generate_m3u(
track_file=const.args.list
track_file=arguments.list
)
else:
list_dl = downloader.ListDownloader(
tracks_file=const.args.list,
skip_file=const.args.skip,
write_successful_file=const.args.write_successful,
tracks_file=arguments.list,
skip_file=arguments.skip,
write_successful_file=arguments.write_successful,
)
list_dl.download_list()
elif const.args.playlist:
elif arguments.playlist:
spotify_tools.write_playlist(
playlist_url=const.args.playlist, text_file=const.args.write_to
playlist_url=arguments.playlist, text_file=arguments.write_to
)
elif const.args.album:
elif arguments.album:
spotify_tools.write_album(
album_url=const.args.album, text_file=const.args.write_to
album_url=arguments.album, text_file=arguments.write_to
)
elif const.args.all_albums:
elif arguments.all_albums:
spotify_tools.write_all_albums_from_artist(
artist_url=const.args.all_albums, text_file=const.args.write_to
artist_url=arguments.all_albums, text_file=arguments.write_to
)
elif const.args.username:
elif arguments.username:
spotify_tools.write_user_playlist(
username=const.args.username, text_file=const.args.write_to
username=arguments.username, text_file=arguments.write_to
)
def main():
const.args = handle.get_arguments()
arguments = command_line.get_arguments()
internals.filter_path(const.args.folder)
youtube_tools.set_api_key()
AuthorizeSpotify(
client_id=arguments.spotify_client_id,
client_secret=arguments.spotify_client_secret
)
# youtube_tools.set_api_key()
logzero.setup_default_logger(formatter=const._formatter, level=const.args.log_level)
# logzero.setup_default_logger(formatter=const._formatter, level=const.args.log_level)
try:
match_args()
# actually we don't necessarily need this, but yeah...
# explicit is better than implicit!
sys.exit(0)
match_arguments(arguments)
except KeyboardInterrupt as e:
# log.exception(e)
sys.exit(3)
sys.exit(2)
if __name__ == "__main__":

View File

@@ -23,7 +23,10 @@ def log_leveller(log_level_str):
def override_config(config_file, parser, argv=None):
""" Override default dict with config dict passed as comamnd line argument. """
config_file = os.path.realpath(config_file)
config = spotdl.util.merge(DEFAULT_CONFIGURATION["spotify-downloader"], spotdl.config.get_config(config_file))
config = spotdl.util.merge(
spotdl.config.DEFAULT_CONFIGURATION["spotify-downloader"],
spotdl.config.get_config(config_file)
)
parser.set_defaults(**config)
return parser.parse_args(argv)
@@ -34,8 +37,8 @@ def get_arguments(argv=None, to_group=True, to_merge=True):
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
if to_merge:
config_file = spotdl.config.default_config_file
if to_merge:
config_dir = os.path.dirname(spotdl.config.default_config_file)
os.makedirs(os.path.dirname(spotdl.config.default_config_file), exist_ok=True)
config = spotdl.util.merge(
@@ -49,7 +52,7 @@ def get_arguments(argv=None, to_group=True, to_merge=True):
group = parser.add_mutually_exclusive_group(required=True)
# TODO: --song is deprecated. Remove in future versions.
# Use --track instead.
# Use --tracks instead.
group.add_argument(
"-s",
"--song",
@@ -58,9 +61,9 @@ def get_arguments(argv=None, to_group=True, to_merge=True):
)
group.add_argument(
"-t",
"--track",
"--tracks",
nargs="+",
help="download track by spotify link or name"
help="download track(s) by spotify link or name"
)
group.add_argument(
"-l",
@@ -143,14 +146,14 @@ def get_arguments(argv=None, to_group=True, to_merge=True):
"-i",
"--input-ext",
default=config["input-ext"],
help="preferred input format .m4a or .webm (Opus)",
choices={".m4a", ".webm"},
help="preferred input format 'm4a' or 'webm' (Opus)",
choices={"m4a", "webm"},
)
parser.add_argument(
"-o",
"--output-ext",
default=config["output-ext"],
help="preferred output format .mp3, .m4a (AAC), .flac, etc.",
help="preferred output format: 'mp3', 'm4a' (AAC), 'flac', etc.",
)
parser.add_argument(
"--write-to",
@@ -163,7 +166,7 @@ def get_arguments(argv=None, to_group=True, to_merge=True):
default=config["file-format"],
help="file format to save the downloaded track with, each tag "
"is surrounded by curly braces. Possible formats: "
"{}".format([spotdl.util.formats[x] for x in spotdl.util.formats]),
# "{}".format([spotdl.util.formats[x] for x in spotdl.util.formats]),
)
parser.add_argument(
"--trim-silence",
@@ -177,7 +180,7 @@ def get_arguments(argv=None, to_group=True, to_merge=True):
default=config["search-format"],
help="search format to search for on YouTube, each tag "
"is surrounded by curly braces. Possible formats: "
"{}".format([spotdl.util.formats[x] for x in spotdl.util.formats]),
# "{}".format([spotdl.util.formats[x] for x in spotdl.util.formats]),
)
parser.add_argument(
"-dm",
@@ -238,19 +241,19 @@ def get_arguments(argv=None, to_group=True, to_merge=True):
parser.add_argument(
"-sci",
"--spotify-client-id",
default=config["spotify_client_id"],
default=config["spotify-client-id"],
help=argparse.SUPPRESS,
)
parser.add_argument(
"-scs",
"--spotify-client-secret",
default=config["spotify_client_secret"],
default=config["spotify-client-secret"],
help=argparse.SUPPRESS,
)
parser.add_argument(
"-c",
"--config",
default=None,
default=config_file,
help="path to custom config.yml file"
)
parser.add_argument(
@@ -289,11 +292,11 @@ def get_arguments(argv=None, to_group=True, to_merge=True):
"--write-to can only be used with --playlist, --album, --all-albums, or --username"
)
song_parameter_passed = parsed.song is not None and parsed.track is None
song_parameter_passed = parsed.song is not None and parsed.tracks is None
if song_parameter_passed:
# log.warn("-s / --song is deprecated and will be removed in future versions. "
# "Use -t / --track instead.")
setattr(parsed, "track", parsed.song)
# "Use -t / --tracks instead.")
setattr(parsed, "tracks", parsed.song)
del parsed.song
parsed.log_level = log_leveller(parsed.log_level)

View File

@@ -36,7 +36,12 @@ def search_metadata(track, lyrics=True):
return metadata
def download_track(metadata, arguments):
def download_track(track, arguments):
metadata = search_metadata(track)
download_track_from_metadata(metadata, arguments)
def download_track_from_metadata(metadata, arguments):
# TODO: CONFIG.YML
# Exit here if config.dry_run
@@ -45,19 +50,31 @@ def download_track(metadata, arguments):
# log.info(log_fmt)
track = Track(metadata, cache_albumart=True)
# TODO: CONFIG.YML
# Download tracks with name config.file_format
# TODO: CONFIG.YML
# Append config.output_ext to config.file_format
track = Track(metadata, cache_albumart=True)
track.download_while_re_encoding("test.mp3")
# TODO: CONFIG.YML
# Check config.overwrite here
filename = spotdl.util.format_string(
arguments.file_format,
metadata,
output_extension=arguments.output_ext
)
track.download_while_re_encoding(
filename,
target_encoding=arguments.output_ext
)
# TODO: CONFIG.YML
# Skip metadata if config.no_metadata
track.apply_metadata("test.mp3")
track.apply_metadata(filename, encoding=arguments.output_ext)
def download_tracks_from_file(path, arguments):
@@ -107,7 +124,10 @@ def download_tracks_from_file(path, arguments):
)
next_track_metadata.start()
download_track(metadata["current_track"], log_fmt=(str(current_iteration) + ". {artist} - {track_name}"))
download_track_from_metadata(
metadata["current_track"],
log_fmt=(str(current_iteration) + ". {artist} - {track_name}")
)
current_iteration += 1
next_track_metadata.join()
except (urllib.request.URLError, TypeError, IOError) as e:

View File

@@ -13,22 +13,22 @@ DEFAULT_CONFIGURATION = {
"avconv": False,
"directory": spotdl.util.get_music_dir(),
"overwrite": "prompt",
"input-ext": ".m4a",
"output-ext": ".mp3",
"input-ext": "m4a",
"output-ext": "mp3",
"write-to": None,
"trim-silence": False,
"download-only-metadata": False,
"dry-run": False,
"music-videos-only": False,
"no-spaces": False,
"file-format": "{artist} - {track_name}",
"file-format": "{artist} - {track_name}.{output_ext}",
"search-format": "{artist} - {track_name} lyrics",
"youtube-api-key": None,
"skip": None,
"write-successful": None,
"log-level": "INFO",
"spotify_client_id": "4fe3fecfe5334023a1472516cc99d805",
"spotify_client_secret": "0f02b7c483c04257984695007a4a8d5c",
"spotify-client-id": "4fe3fecfe5334023a1472516cc99d805",
"spotify-client-secret": "0f02b7c483c04257984695007a4a8d5c",
}
}

View File

@@ -71,14 +71,16 @@ class EncoderFFmpeg(EncoderBase):
+ ["-i", input_path] \
+ arguments.split() \
+ self._additional_arguments \
+ ["-f", target_encoding] \
+ [target_path]
return command
def re_encode(self, input_path, target_path, delete_original=False):
def re_encode(self, input_path, target_path, target_encoding=None, delete_original=False):
encode_command = self._generate_encode_command(
input_path,
target_path
target_path,
target_encoding=target_encoding
)
process = subprocess.Popen(encode_command)
process.wait()
@@ -87,12 +89,12 @@ class EncoderFFmpeg(EncoderBase):
os.remove(input_path)
return process
def re_encode_from_stdin(self, input_encoding, target_path):
target_encoding = self.get_encoding(target_path)
def re_encode_from_stdin(self, input_encoding, target_path, target_encoding=None):
encode_command = self._generate_encode_command(
"-",
target_path,
input_encoding=input_encoding,
target_encoding=target_encoding,
)
process = subprocess.Popen(encode_command, stdin=subprocess.PIPE)
return process

View File

@@ -3,8 +3,12 @@
# Need to confirm this and if so, remove the calls
# to `spotify._get_id` in below methods.
from spotdl.authorize.services import AuthorizeSpotify
class SpotifyHelpers:
def __init__(self, spotify):
def __init__(self, spotify=None):
if spotify is None:
spotify = AuthorizeSpotify()
self.spotify = spotify
def prompt_for_user_playlist(self, username):

View File

@@ -0,0 +1,9 @@
from spotdl.metadata.embedders import EmbedderDefault
import pytest
@pytest.mark.xfail
def test_embedder():
# Do not forget to Write tests for this!
raise NotImplementedError

View File

@@ -4,8 +4,12 @@ import spotipy.oauth2 as oauth2
from spotdl.metadata import ProviderBase
from spotdl.metadata.exceptions import SpotifyMetadataNotFoundError
from spotdl.authorize.services import AuthorizeSpotify
class ProviderSpotify(ProviderBase):
def __init__(self, spotify=None):
if spotify is None:
spotify = AuthorizeSpotify()
self.spotify = spotify
def set_credentials(self, client_id, client_secret):

View File

@@ -37,12 +37,14 @@ class Track:
def _calculate_total_chunks(self, filesize):
return (filesize // self._chunksize) + 1
def download_while_re_encoding(self, target_path, encoder=EncoderFFmpeg(), show_progress=True):
def download_while_re_encoding(self, target_path, target_encoding=None,
encoder=EncoderFFmpeg(), show_progress=True):
stream = self.metadata["streams"].getbest()
total_chunks = self._calculate_total_chunks(stream["filesize"])
process = encoder.re_encode_from_stdin(
stream["encoding"],
target_path
target_path,
target_encoding=target_encoding
)
response = stream["connection"]
for _ in tqdm.trange(total_chunks):
@@ -61,12 +63,14 @@ class Track:
chunk = response.read(self._chunksize)
fout.write(chunk)
def re_encode(self, input_path, target_path, encoder=EncoderFFmpeg(), show_progress=True):
def re_encode(self, input_path, target_path, target_encoding=None,
encoder=EncoderFFmpeg(), show_progress=True):
stream = self.metadata["streams"].getbest()
total_chunks = self._calculate_total_chunks(stream["filesize"])
process = encoder.re_encode_from_stdin(
stream["encoding"],
target_path
target_path,
target_encoding=target_encoding
)
with open(input_path, "rb") as fin:
for _ in tqdm.trange(total_chunks):
@@ -76,10 +80,15 @@ class Track:
process.stdin.close()
process.wait()
def apply_metadata(self, input_path, embedder=EmbedderDefault()):
def apply_metadata(self, input_path, encoding=None, embedder=EmbedderDefault()):
albumart = self._cache_resources["albumart"]
if albumart["threadinstance"]:
albumart["threadinstance"].join()
embedder.apply_metadata(input_path, self.metadata, cached_albumart=albumart["content"])
embedder.apply_metadata(
input_path,
self.metadata,
cached_albumart=albumart["content"],
encoding=encoding,
)

View File

@@ -17,22 +17,6 @@ except ImportError:
log.info("Please remove any other slugify library and install `unicode-slugify`")
sys.exit(5)
formats = {
0: "track_name",
1: "artist",
2: "album",
3: "album_artist",
4: "genre",
5: "disc_number",
6: "duration",
7: "year",
8: "original_date",
9: "track_number",
10: "total_tracks",
11: "isrc",
12: "track_id",
}
def merge(base, overrider):
""" Override default dict with config dict. """
@@ -72,48 +56,28 @@ def is_youtube(raw_song):
return status
def format_string(
string_format, tags, slugification=False, force_spaces=False, total_songs=0
):
""" Generate a string of the format '[artist] - [song]' for the given spotify song. """
format_tags = dict(formats)
format_tags[0] = tags["name"]
format_tags[1] = tags["artists"][0]["name"]
format_tags[2] = tags["album"]["name"]
format_tags[3] = tags["artists"][0]["name"]
format_tags[4] = tags["genre"]
format_tags[5] = tags["disc_number"]
format_tags[6] = tags["duration"]
format_tags[7] = tags["year"]
format_tags[8] = tags["release_date"]
format_tags[9] = tags["track_number"]
format_tags[10] = tags["total_tracks"]
format_tags[11] = tags["external_ids"]["isrc"]
try:
format_tags[12] = tags["id"]
except KeyError:
pass
format_tags_sanitized = {
k: sanitize_title(str(v), ok="'-_()[]{}") if slugification else str(v)
for k, v in format_tags.items()
def format_string(string, metadata, output_extension=""):
formats = {
"{track_name}" : metadata["name"],
"{artist}" : metadata["artists"][0]["name"],
"{album}" : metadata["album"]["name"],
"{album_artist}" : metadata["artists"][0]["name"],
"{genre}" : metadata["genre"],
"{disc_number}" : metadata["disc_number"],
"{duration}" : metadata["duration"],
"{year}" : metadata["year"],
"{original_date}": metadata["release_date"],
"{track_number}" : metadata["track_number"],
"{total_tracks}" : metadata["total_tracks"],
"{isrc}" : metadata["external_ids"]["isrc"],
"{track_id}" : metadata.get("id", ""),
"{output_ext}" : output_extension,
}
# calculating total digits presnet in total_songs to prepare a zfill.
total_digits = 0 if total_songs == 0 else int(math.log10(total_songs)) + 1
for x in formats:
format_tag = "{" + formats[x] + "}"
# Making consistent track number by prepending zero
# on it according to number of digits in total songs
if format_tag == "{track_number}":
format_tags_sanitized[x] = format_tags_sanitized[x].zfill(total_digits)
for key, value in formats.items():
string = string.replace(key, str(value))
string_format = string_format.replace(format_tag, format_tags_sanitized[x])
if const.args.no_spaces and not force_spaces:
string_format = string_format.replace(" ", "_")
return string_format
return string
def sanitize_title(title, ok="-_()[]{}"):

View File

@@ -1,26 +0,0 @@
from spotdl import const
from spotdl import handle
from spotdl import spotdl
import urllib
import pytest
def load_defaults():
const.args = handle.get_arguments(raw_args="", to_group=False, to_merge=False)
const.args.overwrite = "skip"
spotdl.args = const.args
spotdl.log = const.logzero.setup_logger(
formatter=const._formatter, level=const.args.log_level
)
# GIST_URL is the monkeypatched version of: https://www.youtube.com/results?search_query=janji+-+heroes
# so that we get same results even if YouTube changes the list/order of videos on their page.
GIST_URL = "https://gist.githubusercontent.com/ritiek/e731338e9810e31c2f00f13c249a45f5/raw/c11a27f3b5d11a8d082976f1cdd237bd605ec2c2/search_results.html"
def monkeypatch_youtube_search_page(*args, **kwargs):
fake_urlopen = urllib.request.urlopen(GIST_URL)
return fake_urlopen

View File

@@ -1,36 +0,0 @@
from spotdl import patcher
import pafy
import pytest
pafy_patcher = patcher.PatchPafy()
pafy_patcher.patch_getbestthumb()
class TestPafyContentAvailable:
pass
class TestMethodAssignment:
def test_pafy_getbestthumb(self):
pafy.backend_shared.BasePafy.getbestthumb == patcher._getbestthumb
class TestMethodCalls:
@pytest.fixture(scope="module")
def content_fixture(self):
content = pafy.new("http://youtube.com/watch?v=3nQNiWdeH2Q")
return content
def test_pafy_getbestthumb(self, content_fixture):
thumbnail = patcher._getbestthumb(content_fixture)
assert thumbnail == "https://i.ytimg.com/vi/3nQNiWdeH2Q/hqdefault.jpg"
def test_pafy_getbestthumb_without_ytdl(self, content_fixture):
content_fixture._ydl_info["thumbnails"][0]["url"] = None
thumbnail = patcher._getbestthumb(content_fixture)
assert thumbnail == "https://i.ytimg.com/vi/3nQNiWdeH2Q/sddefault.jpg"
def test_pafy_content_available(self):
TestPafyContentAvailable._content_available = patcher._content_available
assert TestPafyContentAvailable()._content_available("https://youtube.com/")