Feed arguments in libary usage mode

This commit is contained in:
Ritiek Malhotra
2020-05-15 00:32:54 +05:30
parent f40f69fdc5
commit e71989a963
6 changed files with 110 additions and 125 deletions

View File

@@ -3,6 +3,10 @@ import coloredlogs
import sys import sys
from spotdl.command_line.lib import Spotdl
from spotdl.command_line.arguments import get_arguments
from spotdl.command_line.exceptions import ArgumentError
# hardcode loglevel for dependencies so that they do not spew generic # hardcode loglevel for dependencies so that they do not spew generic
# log messages along with spotdl. # log messages along with spotdl.
for module in ("urllib3", "spotipy", "pytube"): for module in ("urllib3", "spotipy", "pytube"):
@@ -27,12 +31,13 @@ def set_logger(level):
def main(): def main():
from spotdl.command_line.arguments import get_arguments argument_handler = get_arguments()
arguments = get_arguments() logging_level = argument_handler.get_logging_level()
logger = set_logger(arguments.parsed.log_level) logger = set_logger(logging_level)
arguments = arguments.run_errands() try:
from spotdl.command_line.lib import Spotdl spotdl = Spotdl(argument_handler)
spotdl = Spotdl(arguments) except ArgumentError as e:
argument_handler.parser.error(e.args[0])
try: try:
spotdl.match_arguments() spotdl.match_arguments()
except KeyboardInterrupt as e: except KeyboardInterrupt as e:

View File

@@ -6,6 +6,7 @@ import os
import sys import sys
import shutil import shutil
from spotdl.command_line.exceptions import ArgumentError
import spotdl.util import spotdl.util
import spotdl.config import spotdl.config
@@ -13,45 +14,28 @@ from collections.abc import Sequence
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
_LOG_LEVELS_STR = ("INFO", "WARNING", "ERROR", "DEBUG")
_LOG_LEVELS = {
"INFO": logging.INFO,
"WARNING": logging.WARNING,
"ERROR": logging.ERROR,
"DEBUG": logging.DEBUG,
}
def log_leveller(log_level_str): _CONFIG_BASE = spotdl.util.merge(
logging_levels = [logging.INFO, logging.WARNING, logging.ERROR, logging.DEBUG] spotdl.config.get_config(spotdl.config.default_config_file),
log_level_str_index = _LOG_LEVELS_STR.index(log_level_str) spotdl.config.DEFAULT_CONFIGURATION,
logging_level = logging_levels[log_level_str_index]
return logging_level
def override_config(config_file, parser, argv=None):
""" Override default dict with config dict passed as comamnd line argument. """
config_file = os.path.realpath(config_file)
config = spotdl.util.merge(
spotdl.config.DEFAULT_CONFIGURATION["spotify-downloader"],
spotdl.config.get_config(config_file)
) )
parser.set_defaults(**config)
return parser.parse_args(argv)
def get_arguments(argv=None, base_config_file=spotdl.config.default_config_file): def get_arguments(config_base=_CONFIG_BASE):
defaults = config_base["spotify-downloader"]
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="Download and convert tracks from Spotify, Youtube etc.", description="Download and convert tracks from Spotify, Youtube etc.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter, formatter_class=argparse.ArgumentDefaultsHelpFormatter,
) )
if base_config_file:
config_dir = os.path.dirname(base_config_file)
os.makedirs(os.path.dirname(base_config_file), exist_ok=True)
config = spotdl.util.merge(
spotdl.config.DEFAULT_CONFIGURATION["spotify-downloader"],
spotdl.config.get_config(base_config_file)
)
else:
config = spotdl.config.DEFAULT_CONFIGURATION["spotify-downloader"]
group = parser.add_mutually_exclusive_group(required=True) group = parser.add_mutually_exclusive_group(required=True)
group.add_argument( group.add_argument(
"-s", "-s",
"--song", "--song",
@@ -97,60 +81,60 @@ def get_arguments(argv=None, base_config_file=spotdl.config.default_config_file)
parser.add_argument( parser.add_argument(
"-m", "-m",
"--manual", "--manual",
default=config["manual"], default=defaults["manual"],
help="choose the track to download manually from a list of matching tracks", help="choose the track to download manually from a list of matching tracks",
action="store_true", action="store_true",
) )
parser.add_argument( parser.add_argument(
"-nm", "-nm",
"--no-metadata", "--no-metadata",
default=config["no_metadata"], default=defaults["no_metadata"],
help="do not embed metadata in tracks", help="do not embed metadata in tracks",
action="store_true", action="store_true",
) )
parser.add_argument( parser.add_argument(
"-ne", "-ne",
"--no-encode", "--no-encode",
default=config["no-encode"], default=defaults["no_encode"],
action="store_true", action="store_true",
help="do not encode media using FFmpeg", help="do not encode media using FFmpeg",
) )
parser.add_argument( parser.add_argument(
"--overwrite", "--overwrite",
default=config["overwrite"], default=defaults["overwrite"],
choices={"prompt", "force", "skip"}, choices={"prompt", "force", "skip"},
help="change the overwrite policy", help="change the overwrite policy",
) )
parser.add_argument( parser.add_argument(
"-q", "-q",
"--quality", "--quality",
default=config["quality"], default=defaults["quality"],
choices={"best", "worst"}, choices={"best", "worst"},
help="preferred audio quality", help="preferred audio quality",
) )
parser.add_argument( parser.add_argument(
"-i", "-i",
"--input-ext", "--input-ext",
default=config["input_ext"], default=defaults["input_ext"],
choices={"automatic", "m4a", "opus"}, choices={"automatic", "m4a", "opus"},
help="preferred input format", help="preferred input format",
) )
parser.add_argument( parser.add_argument(
"-o", "-o",
"--output-ext", "--output-ext",
default=config["output_ext"], default=defaults["output_ext"],
choices={"mp3", "m4a", "flac"}, choices={"mp3", "m4a", "flac"},
help="preferred output format", help="preferred output format",
) )
parser.add_argument( parser.add_argument(
"--write-to", "--write-to",
default=config["write_to"], default=defaults["write_to"],
help="write tracks from Spotify playlist, album, etc. to this file", help="write tracks from Spotify playlist, album, etc. to this file",
) )
parser.add_argument( parser.add_argument(
"-f", "-f",
"--output-file", "--output-file",
default=config["output_file"], default=defaults["output_file"],
help="path where to write the downloaded track to, special tags " help="path where to write the downloaded track to, special tags "
"are to be surrounded by curly braces. Possible tags: " "are to be surrounded by curly braces. Possible tags: "
# TODO: Add possible tags # TODO: Add possible tags
@@ -158,14 +142,14 @@ def get_arguments(argv=None, base_config_file=spotdl.config.default_config_file)
) )
parser.add_argument( parser.add_argument(
"--trim-silence", "--trim-silence",
default=config["trim_silence"], default=defaults["trim_silence"],
help="remove silence from the start of the audio", help="remove silence from the start of the audio",
action="store_true", action="store_true",
) )
parser.add_argument( parser.add_argument(
"-sf", "-sf",
"--search-format", "--search-format",
default=config["search_format"], default=defaults["search_format"],
help="search format to search for on YouTube, special tags " help="search format to search for on YouTube, special tags "
"are to be surrounded by curly braces. Possible tags: " "are to be surrounded by curly braces. Possible tags: "
# TODO: Add possible tags # TODO: Add possible tags
@@ -174,22 +158,14 @@ def get_arguments(argv=None, base_config_file=spotdl.config.default_config_file)
parser.add_argument( parser.add_argument(
"-d", "-d",
"--dry-run", "--dry-run",
default=config["dry_run"], default=defaults["dry_run"],
help="show only track title and YouTube URL, and then skip " help="show only track title and YouTube URL, and then skip "
"to the next track (if any)", "to the next track (if any)",
action="store_true", action="store_true",
) )
parser.add_argument(
"-mo",
"--music-videos-only",
default=config["music_videos_only"],
help="search only for music videos on Youtube (works only "
"when YouTube API key is set)",
action="store_true",
)
# parser.add_argument( # parser.add_argument(
# "--processor", # "--processor",
# default=config["processor"], # default=defaults["processor"],
# choices={"synchronous", "threaded"}, # choices={"synchronous", "threaded"},
# help='list downloading strategy: - "synchronous" downloads ' # help='list downloading strategy: - "synchronous" downloads '
# 'tracks one-by-one. - "threaded" (highly experimental at the ' # 'tracks one-by-one. - "threaded" (highly experimental at the '
@@ -199,7 +175,7 @@ def get_arguments(argv=None, base_config_file=spotdl.config.default_config_file)
parser.add_argument( parser.add_argument(
"-ns", "-ns",
"--no-spaces", "--no-spaces",
default=config["no_spaces"], default=defaults["no_spaces"],
help="replace spaces in metadata values with underscores when " help="replace spaces in metadata values with underscores when "
"generating filenames", "generating filenames",
action="store_true", action="store_true",
@@ -207,45 +183,39 @@ def get_arguments(argv=None, base_config_file=spotdl.config.default_config_file)
parser.add_argument( parser.add_argument(
"-ll", "-ll",
"--log-level", "--log-level",
default=config["log_level"], default=defaults["log_level"],
choices=_LOG_LEVELS_STR, choices=_LOG_LEVELS.keys(),
type=str.upper, type=str.upper,
help="set log verbosity", help="set log verbosity",
) )
parser.add_argument(
"-yk",
"--youtube-api-key",
default=config["youtube_api_key"],
help=argparse.SUPPRESS,
)
parser.add_argument( parser.add_argument(
"-sk", "-sk",
"--skip", "--skip",
default=config["skip"], default=defaults["skip"],
help="path to file containing tracks to skip", help="path to file containing tracks to skip",
) )
parser.add_argument( parser.add_argument(
"-w", "-w",
"--write-successful", "--write-successful",
default=config["write_successful"], default=defaults["write_successful"],
help="path to file to write successful tracks to", help="path to file to write successful tracks to",
) )
parser.add_argument( parser.add_argument(
"-sci", "-sci",
"--spotify-client-id", "--spotify-client-id",
default=config["spotify_client_id"], default=defaults["spotify_client_id"],
help=argparse.SUPPRESS, help=argparse.SUPPRESS,
) )
parser.add_argument( parser.add_argument(
"-scs", "-scs",
"--spotify-client-secret", "--spotify-client-secret",
default=config["spotify_client_secret"], default=defaults["spotify_client_secret"],
help=argparse.SUPPRESS, help=argparse.SUPPRESS,
) )
parser.add_argument( parser.add_argument(
"-c", "-c",
"--config", "--config",
default=base_config_file, default=spotdl.config.default_config_file,
help="path to custom config.yml file" help="path to custom config.yml file"
) )
parser.add_argument( parser.add_argument(
@@ -255,60 +225,76 @@ def get_arguments(argv=None, base_config_file=spotdl.config.default_config_file)
version="%(prog)s {}".format(spotdl.__version__), version="%(prog)s {}".format(spotdl.__version__),
) )
parsed = parser.parse_args(argv) return ArgumentHandler(parser=parser)
if base_config_file and parsed.config is not None:
parsed = override_config(parsed.config, parser)
parsed.log_level = log_leveller(parsed.log_level)
# TODO: Remove this line once we can experiement with other
# download processors (such as "threaded").
parsed.processor = "synchronous"
return Arguments(parser, parsed)
class Arguments: class ArgumentHandler:
def __init__(self, parser, parsed): def __init__(self, args=None, parser=argparse.ArgumentParser(""), config_base=_CONFIG_BASE):
args_were_passed = args is not None
if not args_were_passed:
args = parser.parse_args().__dict__
config_file = args.get("config")
if config_file:
config = spotdl.config.read_config(config_file)
parser.set_defaults(**config["spotify-downloader"])
configured_args = parser.parse_args().__dict__
if args_were_passed:
parser.set_defaults(**args)
configured_args = parser.parse_args().__dict__
defaults = config_base["spotify-downloader"]
args = spotdl.util.merge(defaults, args)
self.parser = parser self.parser = parser
self.parsed = parsed self.args = args
self.configured_args = configured_args
def get_configured_args(self):
return self.configured_args
def get_logging_level(self):
return _LOG_LEVELS[self.args["log_level"]]
def run_errands(self): def run_errands(self):
if (self.parsed.list args = self.get_configured_args()
and not mimetypes.MimeTypes().guess_type(self.parsed.list)[0] == "text/plain" if (args.get("list")
and not mimetypes.MimeTypes().guess_type(args["list"])[0] == "text/plain"
): ):
self.parser.error( raise ArgumentError(
"{0} is not of a valid argument to --list, argument must be plain text file.".format( "{0} is not of a valid argument to --list, argument must be plain text file.".format(
self.parsed.list args["list"]
) )
) )
if self.parsed.write_m3u and not self.parsed.list: if args.get("write_m3u") and not args.get("list"):
self.parser.error("--write-m3u can only be used with --list.") raise ArgumentError("--write-m3u can only be used with --list.")
if self.parsed.write_to and not ( if args["write_to"] and not (
self.parsed.playlist or self.parsed.album or self.parsed.all_albums or self.parsed.username or self.parsed.write_m3u args.get("playlist") or args.get("album") or args.get("all_albums") or args.get("username") or args.get("write_m3u")
): ):
self.parser.error( raise ArgumentError(
"--write-to can only be used with --playlist, --album, --all-albums, --username, or --write-m3u." "--write-to can only be used with --playlist, --album, --all-albums, --username, or --write-m3u."
) )
ffmpeg_exists = shutil.which("fffmpeg") ffmpeg_exists = shutil.which("ffmpeg")
if not ffmpeg_exists: if not ffmpeg_exists:
logger.warn("FFmpeg was not found in PATH. Will not re-encode media to specified output format.") logger.warn("FFmpeg was not found in PATH. Will not re-encode media to specified output format.")
self.parsed.no_encode = True args["no_encode"] = True
if self.parsed.no_encode and self.parsed.trim_silence: if args["no_encode"] and args["trim_silence"]:
logger.warn("--trim-silence can only be used when an encoder is set.") logger.warn("--trim-silence can only be used when an encoder is set.")
if self.parsed.output_file == "-" and self.parsed.no_metadata is False: if args["output_file"] == "-" and args["no_metadata"] is False:
logger.warn( logger.warn(
"Cannot write metadata when target is STDOUT. Pass " "Cannot write metadata when target is STDOUT. Pass "
"--no-metadata explicitly to hide this warning." "--no-metadata explicitly to hide this warning."
) )
self.parsed.no_metadata = True args["no_metadata"] = True
elif os.path.isdir(self.parsed.output_file): elif os.path.isdir(args["output_file"]):
adjusted_output_file = os.path.join( adjusted_output_file = os.path.join(
self.parsed.output_file, args["output_file"],
self.parser.get_default("output_file") self.parser.get_default("output_file")
) )
logger.warn( logger.warn(
@@ -319,10 +305,7 @@ class Arguments:
adjusted_output_file adjusted_output_file
) )
) )
self.parsed.output_file = adjusted_output_file args["output_file"] = adjusted_output_file
# We're done dealing with configuration file here and don't need to use it later return args
del self.parsed.config
return self.parsed.__dict__

View File

@@ -11,3 +11,10 @@ class NoYouTubeVideoMatchError(Exception):
def __init__(self, message=None): def __init__(self, message=None):
super().__init__(message) super().__init__(message)
class ArgumentError(Exception):
__module__ = Exception.__module__
def __init__(self, message=None):
super().__init__(message)

View File

@@ -32,20 +32,8 @@ logger = logging.getLogger(__name__)
class Spotdl: class Spotdl:
def __init__(self, arguments): def __init__(self, argument_handler):
if "config" in arguments: self.arguments = argument_handler.run_errands()
# Make sure we set the base configuration from the config file if
# the config file has been passed.
config = spotdl.util.merge(
spotdl.config.DEFAULT_CONFIGURATION["spotify-downloader"],
spotdl.config.get_config(arguments["config"])
)
else:
# If config file has not been passed, set the base configuration
# to the default confguration.
config = spotdl.config.DEFAULT_CONFIGURATION["spotify-downloader"]
self.arguments = spotdl.util.merge(config, arguments)
def __enter__(self): def __enter__(self):
return self return self
@@ -255,6 +243,8 @@ class Spotdl:
)) ))
os.rename(temp_filename, filename) os.rename(temp_filename, filename)
return filename
def apply_metadata(self, track, filename, encoding): def apply_metadata(self, track, filename, encoding):
logger.info("Applying metadata") logger.info("Applying metadata")
try: try:

View File

@@ -11,7 +11,7 @@ DEFAULT_CONFIGURATION = {
"manual": False, "manual": False,
"no_metadata": False, "no_metadata": False,
"no_fallback_metadata": False, "no_fallback_metadata": False,
"no-encode": False, "no_encode": False,
"overwrite": "prompt", "overwrite": "prompt",
"quality": "best", "quality": "best",
"input_ext": "automatic", "input_ext": "automatic",
@@ -58,7 +58,7 @@ def get_config(config_file):
config = DEFAULT_CONFIGURATION config = DEFAULT_CONFIGURATION
dump_config(config_file, config=DEFAULT_CONFIGURATION) dump_config(config_file, config=DEFAULT_CONFIGURATION)
logger.info("Writing default configuration to {0}:".format(config_file)) logger.info("Writing default configuration to {0}.".format(config_file))
for line in yaml.dump( for line in yaml.dump(
DEFAULT_CONFIGURATION["spotify-downloader"], default_flow_style=False DEFAULT_CONFIGURATION["spotify-downloader"], default_flow_style=False
@@ -67,8 +67,8 @@ def get_config(config_file):
logger.info(line.strip()) logger.info(line.strip())
logger.info( logger.info(
"Please note that command line arguments have higher priority " "Please note that command line arguments have higher priority "
"than their equivalents in the configuration file" "than their equivalents in the configuration file."
) )
return config["spotify-downloader"] return config

View File

@@ -244,7 +244,7 @@ class ProviderYouTube(ProviderBase):
return YouTubeSearch().search(query) return YouTubeSearch().search(query)
def _fetch_publish_date(self, content): def _fetch_publish_date(self, content):
# FIXME: This needs to be supported in PyTube itself # XXX: This needs to be supported in PyTube itself
# See https://github.com/nficano/pytube/issues/595 # See https://github.com/nficano/pytube/issues/595
position = content.watch_html.find("publishDate") position = content.watch_html.find("publishDate")
publish_date = content.watch_html[position+16:position+25] publish_date = content.watch_html[position+16:position+25]