Basic downloading

This commit is contained in:
Ritiek Malhotra
2020-04-08 08:00:43 +05:30
parent 121fcdcdf6
commit 51da0b7a29
22 changed files with 868 additions and 619 deletions

View File

@@ -1,3 +1,3 @@
__version__ = "1.2.6"
from spotdl.download import Track
from spotdl.track import Track

View File

View File

@@ -0,0 +1,57 @@
def match_args():
if const.args.song:
for track in const.args.song:
track_dl = downloader.Downloader(raw_song=track)
track_dl.download_single()
elif const.args.list:
if const.args.write_m3u:
youtube_tools.generate_m3u(
track_file=const.args.list
)
else:
list_dl = downloader.ListDownloader(
tracks_file=const.args.list,
skip_file=const.args.skip,
write_successful_file=const.args.write_successful,
)
list_dl.download_list()
elif const.args.playlist:
spotify_tools.write_playlist(
playlist_url=const.args.playlist, text_file=const.args.write_to
)
elif const.args.album:
spotify_tools.write_album(
album_url=const.args.album, text_file=const.args.write_to
)
elif const.args.all_albums:
spotify_tools.write_all_albums_from_artist(
artist_url=const.args.all_albums, text_file=const.args.write_to
)
elif const.args.username:
spotify_tools.write_user_playlist(
username=const.args.username, text_file=const.args.write_to
)
def main():
const.args = handle.get_arguments()
internals.filter_path(const.args.folder)
youtube_tools.set_api_key()
logzero.setup_default_logger(formatter=const._formatter, level=const.args.log_level)
try:
match_args()
# actually we don't necessarily need this, but yeah...
# explicit is better than implicit!
sys.exit(0)
except KeyboardInterrupt as e:
# log.exception(e)
sys.exit(3)
if __name__ == "__main__":
main()

View File

@@ -2,112 +2,71 @@ from logzero import logger as log
import appdirs
import logging
import yaml
import argparse
import mimetypes
import os
import sys
import spotdl
from spotdl import internals
import spotdl.util
import spotdl.config
_LOG_LEVELS_STR = ["INFO", "WARNING", "ERROR", "DEBUG"]
default_conf = {
"spotify-downloader": {
"no-remove-original": False,
"manual": False,
"no-metadata": False,
"no-fallback-metadata": False,
"avconv": False,
"folder": internals.get_music_dir(),
"overwrite": "prompt",
"input-ext": ".m4a",
"output-ext": ".mp3",
"write-to": None,
"trim-silence": False,
"download-only-metadata": False,
"dry-run": False,
"music-videos-only": False,
"no-spaces": False,
"file-format": "{artist} - {track_name}",
"search-format": "{artist} - {track_name} lyrics",
"youtube-api-key": None,
"skip": None,
"write-successful": None,
"log-level": "INFO",
"spotify_client_id": "4fe3fecfe5334023a1472516cc99d805",
"spotify_client_secret": "0f02b7c483c04257984695007a4a8d5c",
}
}
_LOG_LEVELS_STR = ("INFO", "WARNING", "ERROR", "DEBUG")
def log_leveller(log_level_str):
loggin_levels = [logging.INFO, logging.WARNING, logging.ERROR, logging.DEBUG]
logging_levels = [logging.INFO, logging.WARNING, logging.ERROR, logging.DEBUG]
log_level_str_index = _LOG_LEVELS_STR.index(log_level_str)
loggin_level = loggin_levels[log_level_str_index]
return loggin_level
logging_level = logging_levels[log_level_str_index]
return logging_level
def merge(default, config):
""" Override default dict with config dict. """
merged = default.copy()
merged.update(config)
return merged
def get_config(config_file):
try:
with open(config_file, "r") as ymlfile:
cfg = yaml.safe_load(ymlfile)
except FileNotFoundError:
log.info("Writing default configuration to {0}:".format(config_file))
with open(config_file, "w") as ymlfile:
yaml.dump(default_conf, ymlfile, default_flow_style=False)
cfg = default_conf
for line in yaml.dump(
default_conf["spotify-downloader"], default_flow_style=False
).split("\n"):
if line.strip():
log.info(line.strip())
log.info(
"Please note that command line arguments have higher priority "
"than their equivalents in the configuration file"
)
return cfg["spotify-downloader"]
def override_config(config_file, parser, raw_args=None):
def override_config(config_file, parser, argv=None):
""" Override default dict with config dict passed as comamnd line argument. """
config_file = os.path.realpath(config_file)
config = merge(default_conf["spotify-downloader"], get_config(config_file))
config = spotdl.util.merge(DEFAULT_CONFIGURATION["spotify-downloader"], spotdl.config.get_config(config_file))
parser.set_defaults(**config)
return parser.parse_args(raw_args)
return parser.parse_args(argv)
def get_arguments(raw_args=None, to_group=True, to_merge=True):
def get_arguments(argv=None, to_group=True, to_merge=True):
parser = argparse.ArgumentParser(
description="Download and convert tracks from Spotify, Youtube etc.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
if to_merge:
config_dir = os.path.join(appdirs.user_config_dir(), "spotdl")
os.makedirs(config_dir, exist_ok=True)
config_file = os.path.join(config_dir, "config.yml")
config = merge(default_conf["spotify-downloader"], get_config(config_file))
config_file = spotdl.config.default_config_file
config_dir = os.path.dirname(spotdl.config.default_config_file)
os.makedirs(os.path.dirname(spotdl.config.default_config_file), exist_ok=True)
config = spotdl.util.merge(
spotdl.config.DEFAULT_CONFIGURATION["spotify-downloader"],
spotdl.config.get_config(config_file)
)
else:
config = default_conf["spotify-downloader"]
config = spotdl.config.DEFAULT_CONFIGURATION["spotify-downloader"]
if to_group:
group = parser.add_mutually_exclusive_group(required=True)
# TODO: --song is deprecated. Remove in future versions.
# Use --track instead.
group.add_argument(
"-s", "--song", nargs="+", help="download track by spotify link or name"
"-s",
"--song",
nargs="+",
help=argparse.SUPPRESS
)
group.add_argument(
"-t",
"--track",
nargs="+",
help="download track by spotify link or name"
)
group.add_argument(
"-l",
"--list",
help="download tracks from a file"
)
group.add_argument("-l", "--list", help="download tracks from a file")
group.add_argument(
"-p",
"--playlist",
@@ -130,7 +89,7 @@ def get_arguments(raw_args=None, to_group=True, to_merge=True):
parser.add_argument(
"--write-m3u",
help="generate an .m3u playlist file with youtube links given "
"a text file containing tracks",
"a text file containing tracks",
action="store_true",
)
parser.add_argument(
@@ -170,9 +129,9 @@ def get_arguments(raw_args=None, to_group=True, to_merge=True):
)
parser.add_argument(
"-f",
"--folder",
default=os.path.abspath(config["folder"]),
help="path to folder where downloaded tracks will be stored in",
"--directory",
default=os.path.abspath(config["directory"]),
help="path to directory where downloaded tracks will be stored in",
)
parser.add_argument(
"--overwrite",
@@ -204,7 +163,7 @@ def get_arguments(raw_args=None, to_group=True, to_merge=True):
default=config["file-format"],
help="file format to save the downloaded track with, each tag "
"is surrounded by curly braces. Possible formats: "
"{}".format([internals.formats[x] for x in internals.formats]),
"{}".format([spotdl.util.formats[x] for x in spotdl.util.formats]),
)
parser.add_argument(
"--trim-silence",
@@ -218,7 +177,7 @@ def get_arguments(raw_args=None, to_group=True, to_merge=True):
default=config["search-format"],
help="search format to search for on YouTube, each tag "
"is surrounded by curly braces. Possible formats: "
"{}".format([internals.formats[x] for x in internals.formats]),
"{}".format([spotdl.util.formats[x] for x in spotdl.util.formats]),
)
parser.add_argument(
"-dm",
@@ -289,7 +248,10 @@ def get_arguments(raw_args=None, to_group=True, to_merge=True):
help=argparse.SUPPRESS,
)
parser.add_argument(
"-c", "--config", default=None, help="path to custom config.yml file"
"-c",
"--config",
default=None,
help="path to custom config.yml file"
)
parser.add_argument(
"-V",
@@ -298,7 +260,7 @@ def get_arguments(raw_args=None, to_group=True, to_merge=True):
version="%(prog)s {}".format(spotdl.__version__),
)
parsed = parser.parse_args(raw_args)
parsed = parser.parse_args(argv)
if parsed.config is not None and to_merge:
parsed = override_config(parsed.config, parser)
@@ -327,6 +289,13 @@ def get_arguments(raw_args=None, to_group=True, to_merge=True):
"--write-to can only be used with --playlist, --album, --all-albums, or --username"
)
song_parameter_passed = parsed.song is not None and parsed.track is None
if song_parameter_passed:
# log.warn("-s / --song is deprecated and will be removed in future versions. "
# "Use -t / --track instead.")
setattr(parsed, "track", parsed.song)
del parsed.song
parsed.log_level = log_leveller(parsed.log_level)
return parsed

View File

@@ -0,0 +1,117 @@
from spotdl.metadata.providers import ProviderSpotify
from spotdl.metadata.providers import ProviderYouTube
from spotdl.metadata.embedders import EmbedderDefault
from spotdl.track import Track
import spotdl.util
import urllib.request
import threading
def search_metadata(track):
youtube = ProviderYouTube()
if spotdl.util.is_spotify(track):
spotify = ProviderSpotify()
spotify_metadata = spotify.from_url(track)
# TODO: CONFIG.YML
# Generate string in config.search_format
search_query = "{} - {}".format(
spotify_metadata["artists"][0]["name"],
spotify_metadata["name"]
)
youtube_metadata = youtube.from_query(search_query)
metadata = spotdl.util.merge(
youtube_metadata,
spotify_metadata
)
elif spotdl.util.is_youtube(track):
metadata = youtube.from_url(track)
else:
metadata = youtube.from_query(track)
return metadata
def download_track(metadata,
dry_run=False, overwrite="prompt", output_ext="mp3", file_format="{artist} - {track-name}", log_fmt="{artist} - {track_name}"):
# TODO: CONFIG.YML
# Exit here if config.dry_run
# TODO: CONFIG.YML
# Check if test.mp3 already exists here
# log.info(log_fmt)
# TODO: CONFIG.YML
# Download tracks with name config.file_format
# TODO: CONFIG.YML
# Append config.output_ext to config.file_format
track = Track(metadata, cache_albumart=True)
track.download_while_re_encoding("test.mp3")
track.apply_metadata("test.mp3")
def download_tracks_from_file(path):
# log.info(
# "Checking and removing any duplicate tracks "
# "in reading {}".format(path)
# )
with open(path, "r") as fin:
# Read tracks into a list and remove any duplicates
tracks = fin.read().splitlines()
# Remove duplicates and empty elements
# Also strip whitespaces from elements (if any)
spotdl.util.remove_duplicates(tracks, condition=lambda x: x, operation=str.strip)
# Overwrite file
with open(path, "w") as fout:
fout.writelines(tracks)
next_track_metadata = threading.Thread(target=lambda: None)
next_track_metadata.start()
tracks_count = len(tracks)
current_iteration = 1
def mutable_assignment(mutable_resource, track):
mutable_resource["next_track"] = search_metadata(track)
metadata = {
"current_track": None,
"next_track": None,
}
while tracks_count > 0:
current_track = tracks.pop(0)
tracks_count -= 1
metadata["current_track"] = metadata["next_track"]
metadata["next_track"] = None
try:
if metadata["current_track"] is None:
metadata["current_track"] = search_metadata(current_track)
if tracks_count > 0:
next_track = tracks[0]
next_track_metadata = threading.Thread(
target=mutable_assignment,
args=(metadata, next_track)
)
next_track_metadata.start()
download_track(metadata["current_track"], log_fmt=(str(current_iteration) + ". {artist} - {track_name}"))
current_iteration += 1
next_track_metadata.join()
except (urllib.request.URLError, TypeError, IOError) as e:
# log.exception(e.args[0])
# log.warning("Failed. Will retry after other songs\n")
tracks.append(current_track)
else:
# TODO: CONFIG.YML
# Write track to config.write_sucessful
pass
finally:
with open(path, "w") as fout:
fout.writelines(tracks)

View File

@@ -0,0 +1,78 @@
import spotdl.command_line.arguments
import sys
import pytest
def test_log_str_to_int():
expect_levels = [20, 30, 40, 10]
levels = [spotdl.command_line.arguments.log_leveller(level)
for level in spotdl.command_line.arguments._LOG_LEVELS_STR]
assert levels == expect_levels
class TestBadArguments:
def test_error_m3u_without_list(self):
with pytest.raises(SystemExit):
spotdl.command_line.arguments.get_arguments(argv=("-t cool song", "--write-m3u"), to_group=True)
def test_m3u_with_list(self):
spotdl.command_line.arguments.get_arguments(argv=("-l cool_list.txt", "--write-m3u"), to_group=True)
def test_write_to_error(self):
with pytest.raises(SystemExit):
spotdl.command_line.arguments.get_arguments(argv=("-t", "sekai all i had", "--write-to", "output.txt"))
class TestArguments:
def test_general_arguments(self):
arguments = spotdl.command_line.arguments.get_arguments(argv=("-t", "elena coats - one last song"))
arguments = arguments.__dict__
assert isinstance(arguments["spotify_client_id"], str)
assert isinstance(arguments["spotify_client_secret"], str)
arguments["spotify_client_id"] = None
arguments["spotify_client_secret"] = None
expect_arguments = {
"track": ["elena coats - one last song"],
"song": None,
"list": None,
"playlist": None,
"album": None,
"all_albums": None,
"username": None,
"write_m3u": False,
"manual": False,
"no_remove_original": False,
"no_metadata": False,
"no_fallback_metadata": False,
"avconv": False,
"directory": "/home/ritiek/Music",
"overwrite": "prompt",
"input_ext": ".m4a",
"output_ext": ".mp3",
"write_to": None,
"file_format": "{artist} - {track_name}",
"trim_silence": False,
"search_format": "{artist} - {track_name} lyrics",
"download_only_metadata": False,
"dry_run": False,
"music_videos_only": False,
"no_spaces": False,
"log_level": 20,
"youtube_api_key": None,
"skip": None,
"write_successful": None,
"spotify_client_id": None,
"spotify_client_secret": None,
"config": None
}
assert arguments == expect_arguments
def test_grouped_arguments(self):
with pytest.raises(SystemExit):
spotdl.command_line.arguments.get_arguments(to_group=True, to_merge=True)

72
spotdl/config.py Normal file
View File

@@ -0,0 +1,72 @@
import appdirs
import yaml
import os
import spotdl.util
DEFAULT_CONFIGURATION = {
"spotify-downloader": {
"no-remove-original": False,
"manual": False,
"no-metadata": False,
"no-fallback-metadata": False,
"avconv": False,
"directory": spotdl.util.get_music_dir(),
"overwrite": "prompt",
"input-ext": ".m4a",
"output-ext": ".mp3",
"write-to": None,
"trim-silence": False,
"download-only-metadata": False,
"dry-run": False,
"music-videos-only": False,
"no-spaces": False,
"file-format": "{artist} - {track_name}",
"search-format": "{artist} - {track_name} lyrics",
"youtube-api-key": None,
"skip": None,
"write-successful": None,
"log-level": "INFO",
"spotify_client_id": "4fe3fecfe5334023a1472516cc99d805",
"spotify_client_secret": "0f02b7c483c04257984695007a4a8d5c",
}
}
default_config_file = os.path.join(
appdirs.user_config_dir(),
"spotdl",
"config.yml"
)
def read_config(config_file):
with open(config_file, "r") as ymlfile:
config = yaml.safe_load(ymlfile)
return config
def dump_config(config_file, config=DEFAULT_CONFIGURATION):
with open(config_file, "w") as ymlfile:
yaml.dump(DEFAULT_CONFIGURATION, ymlfile, default_flow_style=False)
def get_config(config_file):
if os.path.isfile(config_file):
config = read_config(config_file)
else:
config = DEFAULT_CONFIGURATION
dump_config(config_file, config=DEFAULT_CONFIGURATION)
# log.info("Writing default configuration to {0}:".format(config_file))
# for line in yaml.dump(
# DEFAULT_CONFIGURATION["spotify-downloader"], default_flow_style=False
# ).split("\n"):
# if line.strip():
# log.info(line.strip())
# log.info(
# "Please note that command line arguments have higher priority "
# "than their equivalents in the configuration file"
# )
return config["spotify-downloader"]

View File

@@ -13,30 +13,3 @@ logzero.setup_default_logger(formatter=_formatter, level=_log_level)
# (useful when using spotdl as a library)
args = type("", (), {})()
# Apple has specific tags - see mutagen docs -
# http://mutagen.readthedocs.io/en/latest/api/mp4.html
M4A_TAG_PRESET = {
"album": "\xa9alb",
"artist": "\xa9ART",
"date": "\xa9day",
"title": "\xa9nam",
"year": "\xa9day",
"originaldate": "purd",
"comment": "\xa9cmt",
"group": "\xa9grp",
"writer": "\xa9wrt",
"genre": "\xa9gen",
"tracknumber": "trkn",
"albumartist": "aART",
"discnumber": "disk",
"cpil": "cpil",
"albumart": "covr",
"copyright": "cprt",
"tempo": "tmpo",
"lyrics": "\xa9lyr",
"comment": "\xa9cmt",
}
TAG_PRESET = {}
for key in M4A_TAG_PRESET.keys():
TAG_PRESET[key] = key

View File

@@ -1,20 +0,0 @@
import subprocess
import threading
def download(url, path=".", progress_bar=True, ):
command = ("ffmpeg", "-y", "-i", "-", "output.wav")
content = pytube.YouTube("https://www.youtube.com/watch?v=SE0nYFJ0ZvQ")
stream = content.streams[0]
response = urllib.request.urlopen(stream.url)
process = subprocess.Popen(command, stdin=subprocess.PIPE)
while True:
chunk = response.read(16 * 1024)
if not chunk:
break
process.stdin.write(chunk)
process.stdin.close()

165
spotdl/helpers/spotify.py Normal file
View File

@@ -0,0 +1,165 @@
class SpotifyHelpers:
def __init__(self, spotify):
self.spotify = spotify
def extract_spotify_id(string):
"""
Returns a Spotify ID of a playlist, album, etc. after extracting
it from a given HTTP URL or Spotify URI.
"""
if "/" in string:
# Input string is an HTTP URL
if string.endswith("/"):
string = string[:-1]
splits = string.split("/")
else:
# Input string is a Spotify URI
splits = string.split(":")
spotify_id = splits[-1]
return spotify_id
def prompt_for_user_playlist(self, username):
""" Write user playlists to text_file """
links = fetch_user_playlist_urls(username)
playlist = internals.input_link(links)
return playlist
def fetch_user_playlist_urls(self, username):
""" Fetch user playlists when using the -u option. """
playlists = self.spotify.user_playlists(username)
links = []
check = 1
while True:
for playlist in playlists["items"]:
# in rare cases, playlists may not be found, so playlists['next']
# is None. Skip these. Also see Issue #91.
if playlist["name"] is not None:
# log.info(
# u"{0:>5}. {1:<30} ({2} tracks)".format(
# check, playlist["name"], playlist["tracks"]["total"]
# )
# )
playlist_url = playlist["external_urls"]["spotify"]
# log.debug(playlist_url)
links.append(playlist_url)
check += 1
if playlists["next"]:
playlists = self.spotify.next(playlists)
else:
break
return links
def fetch_playlist(self, playlist_url):
try:
playlist_id = self.extract_spotify_id(playlist_url)
except IndexError:
# Wrong format, in either case
# log.error("The provided playlist URL is not in a recognized format!")
sys.exit(10)
try:
results = self.spotify.user_playlist(
user=None, playlist_id=playlist_id, fields="tracks,next,name"
)
except spotipy.client.SpotifyException:
# log.error("Unable to find playlist")
# log.info("Make sure the playlist is set to publicly visible and then try again")
sys.exit(11)
return results
def write_playlist(self, playlist, text_file=None):
tracks = playlist["tracks"]
if not text_file:
text_file = u"{0}.txt".format(slugify(playlist["name"], ok="-_()[]{}"))
return write_tracks(tracks, text_file)
def fetch_album(self, album_url):
album_id = self.extract_spotify_id(album_url)
album = self.spotify.album(album_id)
return album
def write_album(self, album, text_file=None):
tracks = self.spotify.album_tracks(album["id"])
if not text_file:
text_file = u"{0}.txt".format(slugify(album["name"], ok="-_()[]{}"))
return write_tracks(tracks, text_file)
def fetch_albums_from_artist(self, artist_url, album_type=None):
"""
This function returns all the albums from a give artist_url using the US
market
:param artist_url - spotify artist url
:param album_type - the type of album to fetch (ex: single) the default is
all albums
:param return - the album from the artist
"""
# fetching artist's albums limitting the results to the US to avoid duplicate
# albums from multiple markets
artist_id = self.extract_spotify_id(artist_url)
results = self.spotify.artist_albums(artist_id, album_type=album_type, country="US")
albums = results["items"]
# indexing all pages of results
while results["next"]:
results = self.spotify.next(results)
albums.extend(results["items"])
return albums
def write_all_albums_from_artist(self, albums, text_file=None):
"""
This function gets all albums from an artist and writes it to a file in the
current working directory called [ARTIST].txt, where [ARTIST] is the artist
of the album
:param artist_url - spotify artist url
:param text_file - file to write albums to
"""
album_base_url = "https://open.spotify.com/album/"
# if no file if given, the default save file is in the current working
# directory with the name of the artist
if text_file is None:
text_file = albums[0]["artists"][0]["name"] + ".txt"
for album in albums:
logging album name
log.info("Fetching album: " + album["name"])
write_album(album_base_url + album["id"], text_file=text_file)
def write_tracks(self, tracks, text_file):
# log.info(u"Writing {0} tracks to {1}".format(tracks["total"], text_file))
track_urls = []
with open(text_file, "a") as file_out:
while True:
for item in tracks["items"]:
if "track" in item:
track = item["track"]
else:
track = item
try:
track_url = track["external_urls"]["spotify"]
# log.debug(track_url)
file_out.write(track_url + "\n")
track_urls.append(track_url)
except KeyError:
# log.warning(
u"Skipping track {0} by {1} (local only?)".format(
track["name"], track["artists"][0]["name"]
)
)
# 1 page = 50 results
# check if there are more pages
if tracks["next"]:
tracks = self.spotify.next(tracks)
else:
break
return track_urls

View File

@@ -3,6 +3,8 @@ import os
from abc import ABC
from abc import abstractmethod
import urllib.request
class EmbedderBase(ABC):
"""
The subclass must define the supported media file encoding
@@ -40,12 +42,16 @@ class EmbedderBase(ABC):
# Ignore the initial dot from file extension
return extension[1:]
def apply_metadata(self, path, metadata, encoding=None):
def apply_metadata(self, path, metadata, cached_albumart=None, encoding=None):
"""
This method must automatically detect the media encoding
format from file path and embed the corresponding metadata
on the given file by calling an appropriate submethod.
"""
if cached_albumart is None:
cached_albumart = urllib.request.urlopen(
metadata["album"]["images"][0]["url"],
).read()
if encoding is None:
encoding = self.get_encoding(path)
if encoding not in self.supported_formats:
@@ -54,9 +60,9 @@ class EmbedderBase(ABC):
encoding,
))
embed_on_given_format = self.targets[encoding]
embed_on_given_format(path, metadata)
embed_on_given_format(path, metadata, cached_albumart=cached_albumart)
def as_mp3(self, path, metadata):
def as_mp3(self, path, metadata, cached_albumart=None):
"""
Method for mp3 support. This method might be defined in
a subclass.
@@ -66,7 +72,7 @@ class EmbedderBase(ABC):
"""
raise NotImplementedError
def as_opus(self, path, metadata):
def as_opus(self, path, metadata, cached_albumart=None):
"""
Method for opus support. This method might be defined in
a subclass.
@@ -76,7 +82,7 @@ class EmbedderBase(ABC):
"""
raise NotImplementedError
def as_flac(self, path, metadata):
def as_flac(self, path, metadata, cached_albumart=None):
"""
Method for flac support. This method might be defined in
a subclass.

View File

@@ -45,7 +45,7 @@ class EmbedderDefault(EmbedderBase):
self._tag_preset = TAG_PRESET
# self.provider = "spotify" if metadata["spotify_metadata"] else "youtube"
def as_mp3(self, path, metadata):
def as_mp3(self, path, metadata, cached_albumart=None):
""" Embed metadata to MP3 files. """
# EasyID3 is fun to use ;)
# For supported easyid3 tags:
@@ -84,22 +84,25 @@ class EmbedderDefault(EmbedderBase):
audiofile["USLT"] = USLT(
encoding=3, desc=u"Lyrics", text=metadata["lyrics"]
)
if cached_albumart is None:
cached_albumart = urllib.request.urlopen(
metadata["album"]["images"][0]["url"]
).read()
albumart.close()
try:
albumart = urllib.request.urlopen(metadata["album"]["images"][0]["url"])
audiofile["APIC"] = APIC(
encoding=3,
mime="image/jpeg",
type=3,
desc=u"Cover",
data=albumart.read(),
data=cached_albumart,
)
albumart.close()
except IndexError:
pass
audiofile.save(v2_version=3)
def as_opus(self, path):
def as_opus(self, path, cached_albumart=None):
""" Embed metadata to M4A files. """
audiofile = MP4(path)
self._embed_basic_metadata(audiofile, metadata, "opus", preset=M4A_TAG_PRESET)
@@ -110,17 +113,20 @@ class EmbedderDefault(EmbedderBase):
if metadata["lyrics"]:
audiofile[M4A_TAG_PRESET["lyrics"]] = metadata["lyrics"]
try:
albumart = urllib.request.urlopen(metadata["album"]["images"][0]["url"])
if cached_albumart is None:
cached_albumart = urllib.request.urlopen(
metadata["album"]["images"][0]["url"]
).read()
albumart.close()
audiofile[M4A_TAG_PRESET["albumart"]] = [
MP4Cover(albumart.read(), imageformat=MP4Cover.FORMAT_JPEG)
MP4Cover(cached_albumart, imageformat=MP4Cover.FORMAT_JPEG)
]
albumart.close()
except IndexError:
pass
audiofile.save()
def as_flac(self, path, metadata):
def as_flac(self, path, metadata, cached_albumart=None):
audiofile = FLAC(path)
self._embed_basic_metadata(audiofile, metadata, "flac")
if metadata["year"]:
@@ -134,9 +140,12 @@ class EmbedderDefault(EmbedderBase):
image.type = 3
image.desc = "Cover"
image.mime = "image/jpeg"
albumart = urllib.request.urlopen(metadata["album"]["images"][0]["url"])
image.data = albumart.read()
albumart.close()
if cached_albumart is None:
cached_albumart = urllib.request.urlopen(
metadata["album"]["images"][0]["url"]
).read()
albumart.close()
image.data = cached_albumart
audiofile.add_picture(image)
audiofile.save()

View File

@@ -2,10 +2,16 @@ from spotdl.metadata import ProviderBase
from spotdl.metadata.exceptions import SpotifyMetadataNotFoundError
from spotdl.metadata.providers import ProviderSpotify
import pytest
class TestProviderSpotify:
def test_subclass(self):
assert issubclass(ProviderSpotify, ProviderBase)
@pytest.mark.xfail
def test_spotify_stuff(self):
raise NotImplementedError
# def test_metadata_not_found_error(self):
# provider = ProviderSpotify(spotify=spotify)
# with pytest.raises(SpotifyMetadataNotFoundError):

View File

@@ -42,7 +42,7 @@ def expect_search_results():
"https://www.youtube.com/watch?v=jX0n2rSmDbE",
"https://www.youtube.com/watch?v=nVzA1uWTydQ",
"https://www.youtube.com/watch?v=rQ6jcpwzQZU",
"https://www.youtube.com/watch?v=-grLLLTza6k",
"https://www.youtube.com/watch?v=VY1eFxgRR-k",
"https://www.youtube.com/watch?v=j0AxZ4V5WQw",
"https://www.youtube.com/watch?v=zbWsb36U0uo",
"https://www.youtube.com/watch?v=3B1aY9Ob8r0",
@@ -134,6 +134,13 @@ class MockYouTube:
@property
def streams(self):
# For updating the test data:
# from spotdl.metadata.providers.youtube import YouTubeStreams
# import pytube
# import pickle
# content = pytube.YouTube("https://youtube.com/watch?v=cH4E_t3m3xM")
# with open("streams.dump", "wb") as fout:
# pickle.dump(content.streams, fout)
module_directory = os.path.dirname(__file__)
mock_streams = os.path.join(module_directory, "data", "streams.dump")
with open(mock_streams, "rb") as fin:
@@ -156,10 +163,10 @@ def expect_formatted_streams():
to predict its value before-hand.
"""
return [
{"bitrate": 160, "download_url": None, "encoding": "opus", "filesize": 3614184},
{"bitrate": 128, "download_url": None, "encoding": "mp4a.40.2", "filesize": 3444850},
{"bitrate": 70, "download_url": None, "encoding": "opus", "filesize": 1847626},
{"bitrate": 50, "download_url": None, "encoding": "opus", "filesize": 1407962}
{"bitrate": 160, "content": None, "download_url": None, "encoding": "opus", "filesize": 3614184},
{"bitrate": 128, "content": None, "download_url": None, "encoding": "mp4a.40.2", "filesize": 3444850},
{"bitrate": 70, "content": None, "download_url": None, "encoding": "opus", "filesize": 1847626},
{"bitrate": 50, "content": None, "download_url": None, "encoding": "opus", "filesize": 1407962}
]
@@ -169,50 +176,88 @@ class TestYouTubeStreams:
formatted_streams = YouTubeStreams(content.streams)
for index in range(len(formatted_streams.all)):
assert isinstance(formatted_streams.all[index]["download_url"], str)
assert formatted_streams.all[index]["connection"] is not None
# We `None` the `download_url` since it's impossible to
# predict its value before-hand.
formatted_streams.all[index]["download_url"] = None
formatted_streams.all[index]["connection"] = None
assert formatted_streams.all == expect_formatted_streams
# assert formatted_streams.all == expect_formatted_streams
for f, e in zip(formatted_streams.all, expect_formatted_streams):
assert f["filesize"] == e["filesize"]
class MockHTTPResponse:
"""
This mocks `urllib.request.urlopen` for custom response text.
"""
response_file = ""
def __init__(self, response):
if response._full_url.endswith("ouVRL5arzUg=="):
self.headers = {"Content-Length": 3614184}
elif response._full_url.endswith("egl0iK2D-Bk="):
self.headers = {"Content-Length": 3444850}
elif response._full_url.endswith("J7VXJtoi3as="):
self.headers = {"Content-Length": 1847626}
elif response._full_url.endswith("_d5_ZthQdvtD"):
self.headers = {"Content-Length": 1407962}
def read(self):
module_directory = os.path.dirname(__file__)
mock_html = os.path.join(module_directory, "data", self.response_file)
with open(mock_html, "r") as fin:
html = fin.read()
return html
# @pytest.mark.mock
def test_mock_streams(self, mock_content, expect_formatted_streams):
def test_mock_streams(self, mock_content, expect_formatted_streams, monkeypatch):
monkeypatch.setattr(urllib.request, "urlopen", self.MockHTTPResponse)
self.test_streams(mock_content, expect_formatted_streams)
@pytest.mark.network
def test_getbest(self, content):
formatted_streams = YouTubeStreams(content.streams)
best_stream = formatted_streams.getbest()
assert isinstance(best_stream["download_url"], str)
assert best_stream["connection"] is not None
# We `None` the `download_url` since it's impossible to
# predict its value before-hand.
best_stream["download_url"] = None
best_stream["connection"] = None
assert best_stream == {
"bitrate": 160,
"connection": None,
"download_url": None,
"encoding": "opus",
"filesize": 3614184
}
# @pytest.mark.mock
def test_mock_getbest(self, mock_content):
def test_mock_getbest(self, mock_content, monkeypatch):
monkeypatch.setattr(urllib.request, "urlopen", self.MockHTTPResponse)
self.test_getbest(mock_content)
@pytest.mark.network
def test_getworst(self, content):
formatted_streams = YouTubeStreams(content.streams)
worst_stream = formatted_streams.getworst()
assert isinstance(worst_stream["download_url"], str)
assert worst_stream["connection"] is not None
# We `None` the `download_url` since it's impossible to
# predict its value before-hand.
worst_stream["download_url"] = None
worst_stream["connection"] = None
assert worst_stream == {
"bitrate": 50,
"connection": None,
"download_url": None,
"encoding": 'opus',
"filesize": 1407962
}
# @pytest.mark.mock
def test_mock_getworst(self, mock_content):
def test_mock_getworst(self, mock_content, monkeypatch):
monkeypatch.setattr(urllib.request, "urlopen", self.MockHTTPResponse)
self.test_getworst(mock_content)

View File

@@ -2,13 +2,14 @@ import pytube
from bs4 import BeautifulSoup
import urllib.request
import threading
from spotdl.metadata import StreamsBase
from spotdl.metadata import ProviderBase
from spotdl.metadata.exceptions import YouTubeMetadataNotFoundError
BASE_URL = "https://www.youtube.com/results?sp=EgIQAQ%253D%253D&q={}"
HEADERS = [('Range', 'bytes=0-'),]
class YouTubeSearch:
def __init__(self):
@@ -76,16 +77,45 @@ class YouTubeSearch:
class YouTubeStreams(StreamsBase):
def __init__(self, streams):
self.network_headers = HEADERS
audiostreams = streams.filter(only_audio=True).order_by("abr").desc()
self.all = [{
# Store only the integer part. For example the given
# bitrate would be "192kbps", we store only the integer
# part here and drop the rest.
"bitrate": int(stream.abr[:-4]),
"download_url": stream.url,
"encoding": stream.audio_codec,
"filesize": stream.filesize,
} for stream in audiostreams]
thread_pool = []
self.all = []
for stream in audiostreams:
standard_stream = {
# Store only the integer part for bitrate. For example
# the given bitrate would be "192kbps", we store only
# the integer part (192) here and drop the rest.
"bitrate": int(stream.abr[:-4]),
"connection": None,
"download_url": stream.url,
"encoding": stream.audio_codec,
"filesize": None,
}
establish_connection = threading.Thread(
target=self._store_connection,
args=(standard_stream,),
)
thread_pool.append(establish_connection)
establish_connection.start()
self.all.append(standard_stream)
for thread in thread_pool:
thread.join()
def _store_connection(self, stream):
response = self._make_request(stream["download_url"])
stream["connection"] = response
stream["filesize"] = int(response.headers["Content-Length"])
def _make_request(self, url):
request = urllib.request.Request(url)
for header in self.network_headers:
request.add_header(*header)
return urllib.request.urlopen(request)
def getbest(self):
return self.all[0]

View File

@@ -0,0 +1,72 @@
from spotdl.metadata import EmbedderBase
import pytest
class EmbedderKid(EmbedderBase):
def __init__(self):
super().__init__()
class TestEmbedderBaseABC:
def test_error_base_class_embedderbase(self):
with pytest.raises(TypeError):
# This abstract base class must be inherited from
# for instantiation
EmbedderBase()
def test_inherit_abstract_base_class_streamsbase(self):
EmbedderKid()
class TestMethods:
@pytest.fixture(scope="module")
def embedderkid(self):
return EmbedderKid()
def test_target_formats(self, embedderkid):
assert embedderkid.supported_formats == ()
@pytest.mark.parametrize("path, expect_encoding", (
("/a/b/c/file.mp3", "mp3"),
("music/pop/1.wav", "wav"),
("/a path/with spaces/track.m4a", "m4a"),
))
def test_get_encoding(self, embedderkid, path, expect_encoding):
assert embedderkid.get_encoding(path) == expect_encoding
def test_apply_metadata_with_explicit_encoding(self, embedderkid):
with pytest.raises(TypeError):
embedderkid.apply_metadata("/path/to/music.mp3", {}, cached_albumart="imagedata", encoding="mp3")
def test_apply_metadata_with_implicit_encoding(self, embedderkid):
with pytest.raises(TypeError):
embedderkid.apply_metadata("/path/to/music.wav", {}, cached_albumart="imagedata")
class MockHTTPResponse:
"""
This mocks `urllib.request.urlopen` for custom response text.
"""
response_file = ""
def __init__(self, url):
pass
def read(self):
pass
def test_apply_metadata_without_cached_image(self, embedderkid, monkeypatch):
monkeypatch.setattr("urllib.request.urlopen", self.MockHTTPResponse)
metadata = {"album": {"images": [{"url": "http://animageurl.com"},]}}
with pytest.raises(TypeError):
embedderkid.apply_metadata("/path/to/music.wav", metadata, cached_albumart=None)
@pytest.mark.parametrize("fmt_method_suffix", (
"as_mp3",
"as_opus",
"as_flac",
))
def test_embed_formats(self, fmt_method_suffix, embedderkid):
method = eval("embedderkid." + fmt_method_suffix)
with pytest.raises(NotImplementedError):
method("/a/random/path", {})

View File

@@ -21,59 +21,3 @@ def debug_sys_info():
log.debug(pprint.pformat(const.args.__dict__))
def match_args():
if const.args.song:
for track in const.args.song:
track_dl = downloader.Downloader(raw_song=track)
track_dl.download_single()
elif const.args.list:
if const.args.write_m3u:
youtube_tools.generate_m3u(
track_file=const.args.list
)
else:
list_dl = downloader.ListDownloader(
tracks_file=const.args.list,
skip_file=const.args.skip,
write_successful_file=const.args.write_successful,
)
list_dl.download_list()
elif const.args.playlist:
spotify_tools.write_playlist(
playlist_url=const.args.playlist, text_file=const.args.write_to
)
elif const.args.album:
spotify_tools.write_album(
album_url=const.args.album, text_file=const.args.write_to
)
elif const.args.all_albums:
spotify_tools.write_all_albums_from_artist(
artist_url=const.args.all_albums, text_file=const.args.write_to
)
elif const.args.username:
spotify_tools.write_user_playlist(
username=const.args.username, text_file=const.args.write_to
)
def main():
const.args = handle.get_arguments()
internals.filter_path(const.args.folder)
youtube_tools.set_api_key()
logzero.setup_default_logger(formatter=const._formatter, level=const.args.log_level)
try:
match_args()
# actually we don't necessarily need this, but yeah...
# explicit is better than implicit!
sys.exit(0)
except KeyboardInterrupt as e:
log.exception(e)
sys.exit(3)
if __name__ == "__main__":
main()

View File

@@ -1,255 +0,0 @@
import spotipy
import spotipy.oauth2 as oauth2
from slugify import slugify
from titlecase import titlecase
from logzero import logger as log
import pprint
import sys
import os
import functools
from spotdl import const
from spotdl import internals
from spotdl.lyrics.providers import LyricClasses
from spotdl.lyrics.exceptions import LyricsNotFoundError
spotify = None
def must_be_authorized(func, spotify=spotify):
def wrapper(*args, **kwargs):
global spotify
try:
assert spotify
return func(*args, **kwargs)
except (AssertionError, spotipy.client.SpotifyException):
token = generate_token()
spotify = spotipy.Spotify(auth=token)
return func(*args, **kwargs)
return wrapper
@must_be_authorized
def generate_metadata(raw_song):
""" Fetch a song's metadata from Spotify. """
if internals.is_spotify(raw_song):
# fetch track information directly if it is spotify link
log.debug("Fetching metadata for given track URL")
meta_tags = spotify.track(raw_song)
else:
# otherwise search on spotify and fetch information from first result
log.debug('Searching for "{}" on Spotify'.format(raw_song))
try:
meta_tags = spotify.search(raw_song, limit=1)["tracks"]["items"][0]
except IndexError:
return None
artist = spotify.artist(meta_tags["artists"][0]["id"])
album = spotify.album(meta_tags["album"]["id"])
try:
meta_tags[u"genre"] = titlecase(artist["genres"][0])
except IndexError:
meta_tags[u"genre"] = None
try:
meta_tags[u"copyright"] = album["copyrights"][0]["text"]
except IndexError:
meta_tags[u"copyright"] = None
try:
meta_tags[u"external_ids"][u"isrc"]
except KeyError:
meta_tags[u"external_ids"][u"isrc"] = None
meta_tags[u"release_date"] = album["release_date"]
meta_tags[u"publisher"] = album["label"]
meta_tags[u"total_tracks"] = album["tracks"]["total"]
log.debug("Fetching lyrics")
meta_tags["lyrics"] = None
for LyricClass in LyricClasses:
track = LyricClass(meta_tags["artists"][0]["name"], meta_tags["name"])
try:
meta_tags["lyrics"] = track.get_lyrics()
except LyricsNotFoundError:
continue
else:
break
# Some sugar
meta_tags["year"], *_ = meta_tags["release_date"].split("-")
meta_tags["duration"] = meta_tags["duration_ms"] / 1000.0
meta_tags["spotify_metadata"] = True
# Remove unwanted parameters
del meta_tags["duration_ms"]
del meta_tags["available_markets"]
del meta_tags["album"]["available_markets"]
log.debug(pprint.pformat(meta_tags))
return meta_tags
@must_be_authorized
def write_user_playlist(username, text_file=None):
""" Write user playlists to text_file """
links = get_playlists(username=username)
playlist = internals.input_link(links)
return write_playlist(playlist, text_file)
@must_be_authorized
def get_playlists(username):
""" Fetch user playlists when using the -u option. """
playlists = spotify.user_playlists(username)
links = []
check = 1
while True:
for playlist in playlists["items"]:
# in rare cases, playlists may not be found, so playlists['next']
# is None. Skip these. Also see Issue #91.
if playlist["name"] is not None:
log.info(
u"{0:>5}. {1:<30} ({2} tracks)".format(
check, playlist["name"], playlist["tracks"]["total"]
)
)
playlist_url = playlist["external_urls"]["spotify"]
log.debug(playlist_url)
links.append(playlist_url)
check += 1
if playlists["next"]:
playlists = spotify.next(playlists)
else:
break
return links
@must_be_authorized
def fetch_playlist(playlist):
try:
playlist_id = internals.extract_spotify_id(playlist)
except IndexError:
# Wrong format, in either case
log.error("The provided playlist URL is not in a recognized format!")
sys.exit(10)
try:
results = spotify.user_playlist(
user=None, playlist_id=playlist_id, fields="tracks,next,name"
)
except spotipy.client.SpotifyException:
log.error("Unable to find playlist")
log.info("Make sure the playlist is set to publicly visible and then try again")
sys.exit(11)
return results
@must_be_authorized
def write_playlist(playlist_url, text_file=None):
playlist = fetch_playlist(playlist_url)
tracks = playlist["tracks"]
if not text_file:
text_file = u"{0}.txt".format(slugify(playlist["name"], ok="-_()[]{}"))
return write_tracks(tracks, text_file)
@must_be_authorized
def fetch_album(album):
album_id = internals.extract_spotify_id(album)
album = spotify.album(album_id)
return album
@must_be_authorized
def fetch_albums_from_artist(artist_url, album_type=None):
"""
This funcction returns all the albums from a give artist_url using the US
market
:param artist_url - spotify artist url
:param album_type - the type of album to fetch (ex: single) the default is
all albums
:param return - the album from the artist
"""
# fetching artist's albums limitting the results to the US to avoid duplicate
# albums from multiple markets
artist_id = internals.extract_spotify_id(artist_url)
results = spotify.artist_albums(artist_id, album_type=album_type, country="US")
albums = results["items"]
# indexing all pages of results
while results["next"]:
results = spotify.next(results)
albums.extend(results["items"])
return albums
@must_be_authorized
def write_all_albums_from_artist(artist_url, text_file=None):
"""
This function gets all albums from an artist and writes it to a file in the
current working directory called [ARTIST].txt, where [ARTIST] is the artist
of the album
:param artist_url - spotify artist url
:param text_file - file to write albums to
"""
album_base_url = "https://open.spotify.com/album/"
# fetching all default albums
albums = fetch_albums_from_artist(artist_url, album_type=None)
# if no file if given, the default save file is in the current working
# directory with the name of the artist
if text_file is None:
text_file = albums[0]["artists"][0]["name"] + ".txt"
for album in albums:
# logging album name
log.info("Fetching album: " + album["name"])
write_album(album_base_url + album["id"], text_file=text_file)
@must_be_authorized
def write_album(album_url, text_file=None):
album = fetch_album(album_url)
tracks = spotify.album_tracks(album["id"])
if not text_file:
text_file = u"{0}.txt".format(slugify(album["name"], ok="-_()[]{}"))
return write_tracks(tracks, text_file)
@must_be_authorized
def write_tracks(tracks, text_file):
log.info(u"Writing {0} tracks to {1}".format(tracks["total"], text_file))
track_urls = []
with open(text_file, "a") as file_out:
while True:
for item in tracks["items"]:
if "track" in item:
track = item["track"]
else:
track = item
try:
track_url = track["external_urls"]["spotify"]
log.debug(track_url)
file_out.write(track_url + "\n")
track_urls.append(track_url)
except KeyError:
log.warning(
u"Skipping track {0} by {1} (local only?)".format(
track["name"], track["artists"][0]["name"]
)
)
# 1 page = 50 results
# check if there are more pages
if tracks["next"]:
tracks = spotify.next(tracks)
else:
break
return track_urls

View File

@@ -0,0 +1,76 @@
import spotdl.config
import argparse
import os
import sys
import yaml
import pytest
@pytest.mark.xfail
@pytest.fixture(scope="module")
def config_path(tmpdir_factory):
config_path = os.path.join(str(tmpdir_factory.mktemp("config")), "config.yml")
return config_path
@pytest.mark.xfail
@pytest.fixture(scope="module")
def modified_config():
modified_config = dict(spotdl.config.DEFAULT_CONFIGURATION)
return modified_config
def test_dump_n_read_config(config_path):
expect_config = spotdl.config.DEFAULT_CONFIGURATION
spotdl.config.dump_config(
config_path,
config=expect_config,
)
config = spotdl.config.read_config(config_path)
assert config == expect_config
class TestDefaultConfigFile:
@pytest.mark.skipif(not sys.platform == "linux", reason="Linux only")
def test_linux_default_config_file(self):
expect_default_config_file = os.path.expanduser("~/.config/spotdl/config.yml")
assert spotdl.config.default_config_file == expect_default_config_file
@pytest.mark.xfail
@pytest.mark.skipif(not sys.platform == "darwin" and not sys.platform == "win32",
reason="Windows only")
def test_windows_default_config_file(self):
raise NotImplementedError
@pytest.mark.xfail
@pytest.mark.skipif(not sys.platform == "darwin",
reason="OS X only")
def test_osx_default_config_file(self):
raise NotImplementedError
class TestConfig:
def test_default_config(self, config_path):
expect_config = spotdl.config.DEFAULT_CONFIGURATION["spotify-downloader"]
config = spotdl.config.get_config(config_path)
assert config == expect_config
@pytest.mark.xfail
def test_custom_config_path(self, config_path, modified_config):
parser = argparse.ArgumentParser()
with open(config_path, "w") as config_file:
yaml.dump(modified_config, config_file, default_flow_style=False)
overridden_config = spotdl.config.override_config(
config_path, parser, raw_args=""
)
modified_values = [
str(value)
for value in modified_config["spotify-downloader"].values()
]
overridden_config.folder = os.path.realpath(overridden_config.folder)
overridden_values = [
str(value) for value in overridden_config.__dict__.values()
]
assert sorted(overridden_values) == sorted(modified_values)

View File

@@ -1,25 +1,38 @@
import tqdm
import subprocess
import urllib.request
import subprocess
import threading
from spotdl.encode.encoders import EncoderFFmpeg
from spotdl.metadata.embedders import EmbedderDefault
CHUNK_SIZE= 16 * 1024
HEADERS = [('Range', 'bytes=0-'),]
class Track:
def __init__(self, metadata):
def __init__(self, metadata, cache_albumart=False):
self.metadata = metadata
self.network_headers = HEADERS
self._chunksize = CHUNK_SIZE
def _make_request(self, url):
request = urllib.request.Request(url)
for header in self.network_headers:
request.add_header(*header)
return urllib.request.urlopen(request)
self._cache_resources = {
"albumart": {"content": None, "threadinstance": None }
}
if cache_albumart:
self._albumart_thread = self._cache_albumart()
def _fetch_response_content_threaded(self, mutable_resource, url):
content = urllib.request.urlopen(url).read()
mutable_resource["content"] = content
def _cache_albumart(self):
# A hack to get a thread's return value
albumart_thread = threading.Thread(
target=self._fetch_response_content_threaded,
args=(self._cache_resources["albumart"],
self.metadata["album"]["images"][0]["url"]),
)
albumart_thread.start()
self._cache_resources["albumart"]["threadinstance"] = albumart_thread
def _calculate_total_chunks(self, filesize):
return (filesize // self._chunksize) + 1
@@ -27,11 +40,11 @@ class Track:
def download_while_re_encoding(self, target_path, encoder=EncoderFFmpeg(), show_progress=True):
stream = self.metadata["streams"].getbest()
total_chunks = self._calculate_total_chunks(stream["filesize"])
response = self._make_request(stream["download_url"])
process = encoder.re_encode_from_stdin(
stream["encoding"],
target_path
)
response = stream["connection"]
for _ in tqdm.trange(total_chunks):
chunk = response.read(self._chunksize)
process.stdin.write(chunk)
@@ -42,7 +55,7 @@ class Track:
def download(self, target_path, show_progress=True):
stream = self.metadata["streams"].getbest()
total_chunks = self._calculate_total_chunks(stream["filesize"])
response = self._make_request(stream["download_url"])
response = stream["connection"]
with open(target_path, "wb") as fout:
for _ in tqdm.trange(total_chunks):
chunk = response.read(self._chunksize)
@@ -64,5 +77,9 @@ class Track:
process.wait()
def apply_metadata(self, input_path, embedder=EmbedderDefault()):
embedder.apply_metadata(input_path, self.metadata)
albumart = self._cache_resources["albumart"]
if albumart["threadinstance"]:
albumart["threadinstance"].join()
embedder.apply_metadata(input_path, self.metadata, cached_albumart=albumart["content"])

View File

@@ -36,6 +36,13 @@ formats = {
}
def merge(base, overrider):
""" Override default dict with config dict. """
merger = base.copy()
merger.update(overrider)
return merger
def input_link(links):
""" Let the user input a choice. """
while True:
@@ -52,15 +59,6 @@ def input_link(links):
log.warning("Choose a valid number!")
def trim_song(tracks_file):
""" Remove the first song from file. """
with open(tracks_file, "r") as file_in:
data = file_in.read().splitlines(True)
with open(tracks_file, "w") as file_out:
file_out.writelines(data[1:])
return data[0]
def is_spotify(raw_song):
""" Check if the input song is a Spotify link. """
status = len(raw_song) == 22 and raw_song.replace(" ", "%20") == raw_song
@@ -172,52 +170,6 @@ def get_sec(time_str):
return sec
def extract_spotify_id(raw_string):
"""
Returns a Spotify ID of a playlist, album, etc. after extracting
it from a given HTTP URL or Spotify URI.
"""
if "/" in raw_string:
# Input string is an HTTP URL
if raw_string.endswith("/"):
raw_string = raw_string[:-1]
# We need to manually trim additional text from HTTP URLs
# We could skip this if https://github.com/plamere/spotipy/pull/324
# gets merged,
to_trim = raw_string.find("?")
if not to_trim == -1:
raw_string = raw_string[:to_trim]
splits = raw_string.split("/")
else:
# Input string is a Spotify URI
splits = raw_string.split(":")
spotify_id = splits[-1]
return spotify_id
def get_unique_tracks(tracks_file):
"""
Returns a list of unique tracks given a path to a
file containing tracks.
"""
log.info(
"Checking and removing any duplicate tracks "
"in reading {}".format(tracks_file)
)
with open(tracks_file, "r") as tracks_in:
# Read tracks into a list and remove any duplicates
lines = tracks_in.read().splitlines()
# Remove blank and strip whitespaces from lines (if any)
lines = [line.strip() for line in lines if line.strip()]
lines = remove_duplicates(lines)
return lines
# a hacky way to get user's localized music directory
# (thanks @linusg, issue #203)
def get_music_dir():
@@ -258,7 +210,7 @@ def get_music_dir():
return os.path.join(home, "Music")
def remove_duplicates(tracks):
def remove_duplicates(elements, condition=lambda _: True, operation=lambda x: x):
"""
Removes duplicates from a list whilst preserving order.
@@ -268,7 +220,12 @@ def remove_duplicates(tracks):
local_set = set()
local_set_add = local_set.add
return [x for x in tracks if not (x in local_set or local_set_add(x))]
filtered_list = []
for x in elements:
if not local_set and condition(x):
filtered_list.append(operation(x))
local_set_add(x)
return filtered_list
def content_available(url):
@@ -278,3 +235,4 @@ def content_available(url):
return False
else:
return response.getcode() < 300

View File

@@ -1,70 +0,0 @@
import os
import sys
import argparse
from spotdl import handle
import pytest
import yaml
def test_error_m3u_without_list():
with pytest.raises(SystemExit):
handle.get_arguments(raw_args=("-s cool song", "--write-m3u"), to_group=True)
def test_m3u_with_list():
handle.get_arguments(raw_args=("-l cool_list.txt", "--write-m3u"), to_group=True)
def test_log_str_to_int():
expect_levels = [20, 30, 40, 10]
levels = [handle.log_leveller(level) for level in handle._LOG_LEVELS_STR]
assert levels == expect_levels
@pytest.fixture(scope="module")
def config_path_fixture(tmpdir_factory):
config_path = os.path.join(str(tmpdir_factory.mktemp("config")), "config.yml")
return config_path
@pytest.fixture(scope="module")
def modified_config_fixture():
modified_config = dict(handle.default_conf)
return modified_config
class TestConfig:
def test_default_config(self, config_path_fixture):
expect_config = handle.default_conf["spotify-downloader"]
config = handle.get_config(config_path_fixture)
assert config == expect_config
def test_modified_config(self, modified_config_fixture):
modified_config_fixture["spotify-downloader"]["file-format"] = "just_a_test"
merged_config = handle.merge(handle.default_conf, modified_config_fixture)
assert merged_config == modified_config_fixture
def test_custom_config_path(self, config_path_fixture, modified_config_fixture):
parser = argparse.ArgumentParser()
with open(config_path_fixture, "w") as config_file:
yaml.dump(modified_config_fixture, config_file, default_flow_style=False)
overridden_config = handle.override_config(
config_path_fixture, parser, raw_args=""
)
modified_values = [
str(value)
for value in modified_config_fixture["spotify-downloader"].values()
]
overridden_config.folder = os.path.realpath(overridden_config.folder)
overridden_values = [
str(value) for value in overridden_config.__dict__.values()
]
assert sorted(overridden_values) == sorted(modified_values)
def test_grouped_arguments(tmpdir):
sys.path[0] = str(tmpdir)
with pytest.raises(SystemExit):
handle.get_arguments(to_group=True, to_merge=True)