[WIP] Refactor spotdl.py; introduced classes (#410)

* Refactor spotdl.py; introduced classes

* introduce CheckExists class

* Move these classes to download.py

* Fix refresh_token

* Add a changelog entry
This commit is contained in:
Ritiek Malhotra
2018-11-25 17:07:56 +05:30
committed by GitHub
parent 8ced90cb39
commit eae9316cee
13 changed files with 315 additions and 217 deletions

View File

@@ -6,7 +6,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [Unreleased]
*Please keep this section up to date when creating a PR!*
### Changed
- Refactored core downloading module ([@ritiek](https://github.com/ritiek)) (#410)
## [1.1.0] - 2018-11-13
### Added

0
LICENSE Executable file → Normal file
View File

0
setup.py Normal file → Executable file
View File

0
spotdl/__init__.py Executable file → Normal file
View File

261
spotdl/downloader.py Normal file
View File

@@ -0,0 +1,261 @@
from spotdl import const
from spotdl import metadata
from spotdl import convert
from spotdl import internals
from spotdl import spotify_tools
from spotdl import youtube_tools
from logzero import logger as log
import os
class CheckExists:
def __init__(self, music_file, meta_tags=None):
self.music_file = music_file
self.meta_tags = meta_tags
def already_exists(self, raw_song):
""" Check if the input song already exists in the given folder. """
log.debug(
"Cleaning any temp files and checking "
'if "{}" already exists'.format(self.music_file)
)
songs = os.listdir(const.args.folder)
self._remove_temp_files(songs)
for song in songs:
# check if a song with the same name is already present in the given folder
if self._match_filenames(song):
if internals.is_spotify(raw_song) and \
not self._has_metadata(song):
return False
log.warning('"{}" already exists'.format(song))
if const.args.overwrite == "prompt":
return self._prompt_song(song)
elif const.args.overwrite == "force":
return self._force_overwrite_song(song)
elif const.args.overwrite == "skip":
return self._skip_song(song)
return False
def _remove_temp_files(self, songs):
for song in songs:
if song.endswith(".temp"):
os.remove(os.path.join(const.args.folder, song))
def _has_metadata(self, song):
# check if the already downloaded song has correct metadata
# if not, remove it and download again without prompt
already_tagged = metadata.compare(
os.path.join(const.args.folder, song), self.meta_tags
)
log.debug(
"Checking if it is already tagged correctly? {}", already_tagged
)
if not already_tagged:
os.remove(os.path.join(const.args.folder, song))
return False
return True
def _prompt_song(self, song):
log.info(
'"{}" has already been downloaded. '
"Re-download? (y/N): ".format(song)
)
prompt = input("> ")
if prompt.lower() == "y":
return self._force_overwrite_song(song)
else:
return self._skip_song(song)
def _force_overwrite_song(self, song):
os.remove(os.path.join(const.args.folder, song))
log.info('Overwriting "{}"'.format(song))
return False
def _skip_song(self, song):
log.info('Skipping "{}"'.format(song))
return True
def _match_filenames(self, song):
if os.path.splitext(song)[0] == self.music_file:
log.debug('Found an already existing song: "{}"'.format(song))
return True
return False
class Downloader:
def __init__(self, raw_song, number=None):
self.raw_song = raw_song
self.number = number
self.content, self.meta_tags = youtube_tools.match_video_and_metadata(raw_song)
def download_single(self):
""" Logic behind downloading a song. """
if self._to_skip():
return
# "[number]. [artist] - [song]" if downloading from list
# otherwise "[artist] - [song]"
youtube_title = youtube_tools.get_youtube_title(self.content, self.number)
log.info("{} ({})".format(youtube_title, self.content.watchv_url))
# generate file name of the song to download
songname = self.refine_songname(self.content.title)
if const.args.dry_run:
return
song_existence = CheckExists(songname, self.meta_tags)
if not song_existence.already_exists(self.raw_song):
return self._download_single(songname)
def _download_single(self, songname):
# deal with file formats containing slashes to non-existent directories
songpath = os.path.join(const.args.folder, os.path.dirname(songname))
os.makedirs(songpath, exist_ok=True)
input_song = songname + const.args.input_ext
output_song = songname + const.args.output_ext
if youtube_tools.download_song(input_song, self.content):
print("")
try:
convert.song(
input_song,
output_song,
const.args.folder,
avconv=const.args.avconv,
trim_silence=const.args.trim_silence,
)
except FileNotFoundError:
output_song = self.unconverted_filename(songname)
if not const.args.input_ext == const.args.output_ext:
os.remove(os.path.join(const.args.folder, input_song))
if not const.args.no_metadata and self.meta_tags is not None:
metadata.embed(
os.path.join(const.args.folder, output_song), self.meta_tags
)
return True
def _to_skip(self):
if self.content is None:
log.debug("Found no matching video")
return True
if const.args.download_only_metadata and self.meta_tags is None:
log.info("Found no metadata. Skipping the download")
return True
def refine_songname(self, songname):
if self.meta_tags is not None:
refined_songname = internals.format_string(
const.args.file_format, self.meta_tags, slugification=True
)
log.debug(
'Refining songname from "{0}" to "{1}"'.format(
songname, refined_songname
)
)
if not refined_songname == " - ":
songname = refined_songname
else:
if not const.args.no_metadata:
log.warning("Could not find metadata")
songname = internals.sanitize_title(songname)
return songname
@staticmethod
def unconverted_filename(songname):
encoder = "avconv" if const.args.avconv else "ffmpeg"
log.warning("Could not find {0}, skipping conversion".format(encoder))
const.args.output_ext = const.args.input_ext
output_song = songname + const.args.output_ext
return output_song
class ListDownloader:
def __init__(self, tracks_file, skip_file=None, write_successful_file=None):
self.tracks_file = tracks_file
self.skip_file = skip_file
self.write_successful_file = write_successful_file
self.tracks = internals.get_unique_tracks(self.tracks_file)
def download_list(self):
""" Download all songs from the list. """
# override file with unique tracks
log.info("Overriding {} with unique tracks".format(self.tracks_file))
self._override_file()
# Remove tracks to skip from tracks list
if self.skip_file is not None:
self.tracks = self._filter_tracks_against_skip_file()
log.info(u"Preparing to download {} songs".format(len(self.tracks)))
return self._download_list()
def _download_list(self):
downloaded_songs = []
for number, raw_song in enumerate(self.tracks, 1):
print("")
try:
track_dl = Downloader(raw_song, number=number)
track_dl.download_single()
except spotipy.client.SpotifyException:
# token expires after 1 hour
self._regenerate_token()
track_dl.download_single()
# detect network problems
except (urllib.request.URLError, TypeError, IOError) as e:
self._cleanup(raw_song, e)
# TODO: remove this sleep once #397 is fixed
# wait 0.5 sec to avoid infinite looping
time.sleep(0.5)
continue
downloaded_songs.append(raw_song)
# Add track to file of successful downloads
if self.write_successful_file is not None:
self._write_successful(raw_song)
log.debug("Removing downloaded song from tracks file")
internals.trim_song(self.tracks_file)
return downloaded_songs
def _override_file(self):
with open(self.tracks_file, "w") as f:
f.write("\n".join(self.tracks))
def _write_successful(self, raw_song):
log.debug("Adding downloaded song to write successful file")
with open(self.write_successful_file, "a") as f:
f.write("\n" + raw_song)
@staticmethod
def _regenerate_token():
log.debug("Token expired, generating new one and authorizing")
spotify_tools.refresh_token()
def _cleanup(self, raw_song, exception):
self.tracks.append(raw_song)
# remove the downloaded song from file
internals.trim_song(self.tracks_file)
# and append it at the end of file
with open(self.tracks_file, "a") as f:
f.write("\n" + raw_song)
log.exception(exception)
log.warning("Failed to download song. Will retry after other songs\n")
def _filter_tracks_against_skip_file(self):
skip_tracks = internals.get_unique_tracks(self.skip_file)
len_before = len(self.tracks)
tracks = [track for track in self.tracks if track not in skip_tracks]
log.info("Skipping {} tracks".format(len_before - len(tracks)))
return tracks

View File

@@ -1,5 +1,5 @@
import appdirs
from spotdl import internals, const
from spotdl import internals
from logzero import logger as log
import logging
@@ -8,7 +8,6 @@ import argparse
import mimetypes
import os
import sys
_LOG_LEVELS_STR = ["INFO", "WARNING", "ERROR", "DEBUG"]

18
spotdl/internals.py Executable file → Normal file
View File

@@ -48,11 +48,12 @@ def input_link(links):
log.warning("Choose a valid number!")
def trim_song(text_file):
def trim_song(tracks_file):
""" Remove the first song from file. """
with open(text_file, "r") as file_in:
log.debug("Removing downloaded song from tracks file")
with open(tracks_file, "r") as file_in:
data = file_in.read().splitlines(True)
with open(text_file, "w") as file_out:
with open(tracks_file, "w") as file_out:
file_out.writelines(data[1:])
return data[0]
@@ -181,20 +182,23 @@ def extract_spotify_id(raw_string):
return spotify_id
def get_unique_tracks(text_file):
def get_unique_tracks(tracks_file):
"""
Returns a list of unique tracks given a path to a
file containing tracks.
"""
with open(text_file, "r") as listed:
log.info(
"Checking and removing any duplicate tracks "
"in reading {}".format(tracks_file)
)
with open(tracks_file, "r") as tracks_in:
# Read tracks into a list and remove any duplicates
lines = listed.read().splitlines()
lines = tracks_in.read().splitlines()
# Remove blank and strip whitespaces from lines (if any)
lines = [line.strip() for line in lines if line.strip()]
lines = remove_duplicates(lines)
return lines

0
spotdl/metadata.py Executable file → Normal file
View File

228
spotdl/spotdl.py Executable file → Normal file
View File

@@ -1,194 +1,47 @@
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
from spotdl import __version__
from spotdl import const
from spotdl import handle
from spotdl import metadata
from spotdl import convert
from spotdl import internals
from spotdl import spotify_tools
from spotdl import youtube_tools
from spotdl import downloader
from logzero import logger as log
from slugify import slugify
import spotipy
import urllib.request
import logzero
import os
import sys
import time
import platform
import pprint
def check_exists(music_file, raw_song, meta_tags):
""" Check if the input song already exists in the given folder. """
log.debug(
"Cleaning any temp files and checking "
'if "{}" already exists'.format(music_file)
)
songs = os.listdir(const.args.folder)
for song in songs:
if song.endswith(".temp"):
os.remove(os.path.join(const.args.folder, song))
continue
# check if a song with the same name is already present in the given folder
if os.path.splitext(song)[0] == music_file:
log.debug('Found an already existing song: "{}"'.format(song))
if internals.is_spotify(raw_song):
# check if the already downloaded song has correct metadata
# if not, remove it and download again without prompt
already_tagged = metadata.compare(
os.path.join(const.args.folder, song), meta_tags
)
log.debug(
"Checking if it is already tagged correctly? {}", already_tagged
)
if not already_tagged:
os.remove(os.path.join(const.args.folder, song))
return False
log.warning('"{}" already exists'.format(song))
if const.args.overwrite == "prompt":
log.info(
'"{}" has already been downloaded. '
"Re-download? (y/N): ".format(song)
)
prompt = input("> ")
if prompt.lower() == "y":
os.remove(os.path.join(const.args.folder, song))
return False
else:
return True
elif const.args.overwrite == "force":
os.remove(os.path.join(const.args.folder, song))
log.info('Overwriting "{}"'.format(song))
return False
elif const.args.overwrite == "skip":
log.info('Skipping "{}"'.format(song))
return True
return False
def debug_sys_info():
log.debug("Python version: {}".format(sys.version))
log.debug("Platform: {}".format(platform.platform()))
log.debug(pprint.pformat(const.args.__dict__))
def download_list(tracks_file, skip_file=None, write_successful_file=None):
""" Download all songs from the list. """
log.info("Checking and removing any duplicate tracks")
tracks = internals.get_unique_tracks(tracks_file)
# override file with unique tracks
with open(tracks_file, "w") as f:
f.write("\n".join(tracks))
# Remove tracks to skip from tracks list
if skip_file is not None:
skip_tracks = internals.get_unique_tracks(skip_file)
len_before = len(tracks)
tracks = [track for track in tracks if track not in skip_tracks]
log.info("Skipping {} tracks".format(len_before - len(tracks)))
log.info(u"Preparing to download {} songs".format(len(tracks)))
downloaded_songs = []
for number, raw_song in enumerate(tracks, 1):
print("")
try:
download_single(raw_song, number=number)
# token expires after 1 hour
except spotipy.client.SpotifyException:
# refresh token when it expires
log.debug("Token expired, generating new one and authorizing")
spotify_tools.refresh_token()
download_single(raw_song, number=number)
# detect network problems
except (urllib.request.URLError, TypeError, IOError) as e:
tracks.append(raw_song)
# remove the downloaded song from file
internals.trim_song(tracks_file)
# and append it at the end of file
with open(tracks_file, "a") as f:
f.write("\n" + raw_song)
log.exception(e)
log.warning("Failed to download song. Will retry after other songs\n")
# wait 0.5 sec to avoid infinite looping
time.sleep(0.5)
continue
downloaded_songs.append(raw_song)
# Add track to file of successful downloads
log.debug("Adding downloaded song to write successful file")
if write_successful_file is not None:
with open(write_successful_file, "a") as f:
f.write("\n" + raw_song)
log.debug("Removing downloaded song from tracks file")
internals.trim_song(tracks_file)
return downloaded_songs
def download_single(raw_song, number=None):
""" Logic behind downloading a song. """
content, meta_tags = youtube_tools.match_video_and_metadata(raw_song)
if content is None:
log.debug("Found no matching video")
return
if const.args.download_only_metadata and meta_tags is None:
log.info("Found no metadata. Skipping the download")
return
# "[number]. [artist] - [song]" if downloading from list
# otherwise "[artist] - [song]"
youtube_title = youtube_tools.get_youtube_title(content, number)
log.info("{} ({})".format(youtube_title, content.watchv_url))
# generate file name of the song to download
songname = content.title
if meta_tags is not None:
refined_songname = internals.format_string(
const.args.file_format, meta_tags, slugification=True
)
log.debug(
'Refining songname from "{0}" to "{1}"'.format(songname, refined_songname)
)
if not refined_songname == " - ":
songname = refined_songname
else:
if not const.args.no_metadata:
log.warning("Could not find metadata")
songname = internals.sanitize_title(songname)
if const.args.dry_run:
return
if not check_exists(songname, raw_song, meta_tags):
# deal with file formats containing slashes to non-existent directories
songpath = os.path.join(const.args.folder, os.path.dirname(songname))
os.makedirs(songpath, exist_ok=True)
input_song = songname + const.args.input_ext
output_song = songname + const.args.output_ext
if youtube_tools.download_song(input_song, content):
print("")
try:
convert.song(
input_song,
output_song,
const.args.folder,
avconv=const.args.avconv,
trim_silence=const.args.trim_silence,
)
except FileNotFoundError:
encoder = "avconv" if const.args.avconv else "ffmpeg"
log.warning("Could not find {0}, skipping conversion".format(encoder))
const.args.output_ext = const.args.input_ext
output_song = songname + const.args.output_ext
if not const.args.input_ext == const.args.output_ext:
os.remove(os.path.join(const.args.folder, input_song))
if not const.args.no_metadata and meta_tags is not None:
metadata.embed(os.path.join(const.args.folder, output_song), meta_tags)
return True
def match_args():
if const.args.song:
track_dl = downloader.Downloader(raw_song=const.args.song)
track_dl.download_single()
elif const.args.list:
if const.args.write_m3u:
youtube_tools.generate_m3u(track_file=const.args.list)
else:
list_dl = downloader.ListDownloader(
tracks_file=const.args.list,
skip_file=const.args.skip,
write_successful_file=const.args.write_successful,
)
list_dl.download_list()
elif const.args.playlist:
spotify_tools.write_playlist(playlist_url=const.args.playlist)
elif const.args.album:
spotify_tools.write_album(album_url=const.args.album)
elif const.args.all_albums:
spotify_tools.write_all_albums_from_artist(artist_url=const.args.all_albums)
elif const.args.username:
spotify_tools.write_user_playlist(username=const.args.username)
def main():
@@ -203,31 +56,8 @@ def main():
logzero.setup_default_logger(formatter=const._formatter, level=const.args.log_level)
log.debug("Python version: {}".format(sys.version))
log.debug("Platform: {}".format(platform.platform()))
log.debug(pprint.pformat(const.args.__dict__))
try:
if const.args.song:
download_single(raw_song=const.args.song)
elif const.args.list:
if const.args.write_m3u:
youtube_tools.generate_m3u(track_file=const.args.list)
else:
download_list(
tracks_file=const.args.list,
skip_file=const.args.skip,
write_successful_file=const.args.write_successful,
)
elif const.args.playlist:
spotify_tools.write_playlist(playlist_url=const.args.playlist)
elif const.args.album:
spotify_tools.write_album(album_url=const.args.album)
elif const.args.all_albums:
spotify_tools.write_all_albums_from_artist(artist_url=const.args.all_albums)
elif const.args.username:
spotify_tools.write_user_playlist(username=const.args.username)
match_args()
# actually we don't necessarily need this, but yeah...
# explicit is better than implicit!
sys.exit(0)

View File

@@ -9,7 +9,6 @@ from spotdl import internals
from spotdl import const
import os
import pprint
# Fix download speed throttle on short duration tracks
# Read more on mps-youtube/pafy#199

View File

@@ -1,7 +1,7 @@
import os
from spotdl import const
from spotdl import spotdl
from spotdl import downloader
import loader
@@ -16,5 +16,6 @@ def test_dry_download_list(tmpdir):
file_path = os.path.join(const.args.folder, "test_list.txt")
with open(file_path, "w") as f:
f.write(TRACK_URL)
downloaded_song, *_ = spotdl.download_list(file_path)
list_dl = downloader.ListDownloader(file_path)
downloaded_song, *_ = list_dl.download_list()
assert downloaded_song == TRACK_URL

View File

@@ -6,7 +6,7 @@ from spotdl import spotify_tools
from spotdl import youtube_tools
from spotdl import convert
from spotdl import metadata
from spotdl import spotdl
from spotdl import downloader
import loader
@@ -55,7 +55,8 @@ def test_check_track_exists_before_download(tmpdir):
songname = internals.format_string(const.args.file_format, meta_tags)
global file_name
file_name = internals.sanitize_title(songname)
check = spotdl.check_exists(file_name, TRACK_URL, meta_tags)
track_existence = downloader.CheckExists(file_name, meta_tags)
check = track_existence.already_exists(TRACK_URL)
assert check == expect_check
@@ -146,6 +147,7 @@ class TestEmbedMetadata:
def test_check_track_exists_after_download():
expect_check = True
check = spotdl.check_exists(file_name, TRACK_URL, meta_tags)
track_existence = downloader.CheckExists(file_name, meta_tags)
check = track_existence.already_exists(TRACK_URL)
os.remove(track_path + ".mp3")
assert check == expect_check

View File

@@ -5,7 +5,7 @@ from spotdl import const
from spotdl import internals
from spotdl import spotify_tools
from spotdl import youtube_tools
from spotdl import spotdl
from spotdl import downloader
import loader
@@ -112,7 +112,8 @@ def test_check_exists(tmpdir):
# prerequisites for determining filename
global file_name
file_name = internals.sanitize_title(title)
check = spotdl.check_exists(file_name, TRACK_SEARCH, metadata)
track_existence = downloader.CheckExists(file_name, metadata)
check = track_existence.already_exists(TRACK_SEARCH)
assert check == expect_check