mirror of
				https://github.com/KevinMidboe/spotify-downloader.git
				synced 2025-10-29 18:00:15 +00:00 
			
		
		
		
	Replace class SpotifyAuthorize with @must_be_authorized (#506)
				
					
				
			* @must_be_authorized decorator for functions in spotify_tools.py * We don't need this * Add tests * Update CHANGELOG.md
This commit is contained in:
		| @@ -13,7 +13,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. | |||||||
| - | - | ||||||
|  |  | ||||||
| ### Fixed | ### Fixed | ||||||
| - | - Fix writing playlist tracks to file ([@ritiek](https://github.com/ritiek)) (#506) | ||||||
|  |  | ||||||
| ## [1.1.2] - 2019-02-10 | ## [1.1.2] - 2019-02-10 | ||||||
| ### Changed | ### Changed | ||||||
|   | |||||||
| @@ -202,12 +202,8 @@ class ListDownloader: | |||||||
|             try: |             try: | ||||||
|                 track_dl = Downloader(raw_song, number=number) |                 track_dl = Downloader(raw_song, number=number) | ||||||
|                 track_dl.download_single() |                 track_dl.download_single() | ||||||
|             except spotipy.client.SpotifyException: |  | ||||||
|                 # token expires after 1 hour |  | ||||||
|                 self._regenerate_token() |  | ||||||
|                 track_dl.download_single() |  | ||||||
|             # detect network problems |  | ||||||
|             except (urllib.request.URLError, TypeError, IOError) as e: |             except (urllib.request.URLError, TypeError, IOError) as e: | ||||||
|  |                 # detect network problems | ||||||
|                 self._cleanup(raw_song, e) |                 self._cleanup(raw_song, e) | ||||||
|                 # TODO: remove this sleep once #397 is fixed |                 # TODO: remove this sleep once #397 is fixed | ||||||
|                 # wait 0.5 sec to avoid infinite looping |                 # wait 0.5 sec to avoid infinite looping | ||||||
| @@ -233,11 +229,6 @@ class ListDownloader: | |||||||
|         with open(self.write_successful_file, "a") as f: |         with open(self.write_successful_file, "a") as f: | ||||||
|             f.write("\n" + raw_song) |             f.write("\n" + raw_song) | ||||||
|  |  | ||||||
|     @staticmethod |  | ||||||
|     def _regenerate_token(): |  | ||||||
|         log.debug("Token expired, generating new one and authorizing") |  | ||||||
|         spotify_tools.refresh_token() |  | ||||||
|  |  | ||||||
|     def _cleanup(self, raw_song, exception): |     def _cleanup(self, raw_song, exception): | ||||||
|         self.tracks.append(raw_song) |         self.tracks.append(raw_song) | ||||||
|         # remove the downloaded song from file |         # remove the downloaded song from file | ||||||
|   | |||||||
| @@ -51,7 +51,6 @@ def main(): | |||||||
|  |  | ||||||
|     internals.filter_path(const.args.folder) |     internals.filter_path(const.args.folder) | ||||||
|     youtube_tools.set_api_key() |     youtube_tools.set_api_key() | ||||||
|     spotify = spotify_tools.SpotifyAuthorize() |  | ||||||
|  |  | ||||||
|     logzero.setup_default_logger(formatter=const._formatter, level=const.args.log_level) |     logzero.setup_default_logger(formatter=const._formatter, level=const.args.log_level) | ||||||
|  |  | ||||||
|   | |||||||
| @@ -8,234 +8,253 @@ from logzero import logger as log | |||||||
| import pprint | import pprint | ||||||
| import sys | import sys | ||||||
| import os | import os | ||||||
|  | import functools | ||||||
|  |  | ||||||
| from spotdl import const | from spotdl import const | ||||||
| from spotdl import internals | from spotdl import internals | ||||||
|  |  | ||||||
| # token = generate_token() | spotify = None | ||||||
| # spotify = spotipy.Spotify(auth=token) |  | ||||||
|  |  | ||||||
|  |  | ||||||
| class SpotifyAuthorize: | def generate_token(): | ||||||
|     """ Class to handle all interactions with spotipy instance. """ |     """ Generate the token. """ | ||||||
|     def __init__(self): |     credentials = oauth2.SpotifyClientCredentials( | ||||||
|         self.client_id = const.args.spotify_client_id |         client_id=const.args.spotify_client_id, | ||||||
|         self.client_secret = const.args.spotify_client_secret |         client_secret=const.args.spotify_client_secret, | ||||||
|         token = self.generate_token() |     ) | ||||||
|         self.spotify = spotipy.Spotify(auth=token) |     token = credentials.get_access_token() | ||||||
|  |     return token | ||||||
|  |  | ||||||
|     def generate_token(self): |  | ||||||
|         """ Generate the token. """ | def must_be_authorized(func, spotify=spotify): | ||||||
|         credentials = oauth2.SpotifyClientCredentials( |     def wrapper(*args, **kwargs): | ||||||
|             client_id=self.client_id, |         global spotify | ||||||
|             client_secret=self.client_secret, |         try: | ||||||
|  |             assert spotify | ||||||
|  |             return func(*args, **kwargs) | ||||||
|  |         except (AssertionError, spotipy.client.SpotifyException): | ||||||
|  |             token = generate_token() | ||||||
|  |             spotify = spotipy.Spotify(auth=token) | ||||||
|  |             return func(*args, **kwargs) | ||||||
|  |     return wrapper | ||||||
|  |  | ||||||
|  |  | ||||||
|  | @must_be_authorized | ||||||
|  | def generate_metadata(raw_song): | ||||||
|  |     """ Fetch a song's metadata from Spotify. """ | ||||||
|  |     if internals.is_spotify(raw_song): | ||||||
|  |         # fetch track information directly if it is spotify link | ||||||
|  |         log.debug("Fetching metadata for given track URL") | ||||||
|  |         meta_tags = spotify.track(raw_song) | ||||||
|  |     else: | ||||||
|  |         # otherwise search on spotify and fetch information from first result | ||||||
|  |         log.debug('Searching for "{}" on Spotify'.format(raw_song)) | ||||||
|  |         try: | ||||||
|  |             meta_tags = spotify.search(raw_song, limit=1)["tracks"]["items"][0] | ||||||
|  |         except IndexError: | ||||||
|  |             return None | ||||||
|  |     artist = spotify.artist(meta_tags["artists"][0]["id"]) | ||||||
|  |     album = spotify.album(meta_tags["album"]["id"]) | ||||||
|  |  | ||||||
|  |     try: | ||||||
|  |         meta_tags[u"genre"] = titlecase(artist["genres"][0]) | ||||||
|  |     except IndexError: | ||||||
|  |         meta_tags[u"genre"] = None | ||||||
|  |     try: | ||||||
|  |         meta_tags[u"copyright"] = album["copyrights"][0]["text"] | ||||||
|  |     except IndexError: | ||||||
|  |         meta_tags[u"copyright"] = None | ||||||
|  |     try: | ||||||
|  |         meta_tags[u"external_ids"][u"isrc"] | ||||||
|  |     except KeyError: | ||||||
|  |         meta_tags[u"external_ids"][u"isrc"] = None | ||||||
|  |  | ||||||
|  |     meta_tags[u"release_date"] = album["release_date"] | ||||||
|  |     meta_tags[u"publisher"] = album["label"] | ||||||
|  |     meta_tags[u"total_tracks"] = album["tracks"]["total"] | ||||||
|  |  | ||||||
|  |     log.debug("Fetching lyrics") | ||||||
|  |  | ||||||
|  |     try: | ||||||
|  |         meta_tags["lyrics"] = lyricwikia.get_lyrics( | ||||||
|  |             meta_tags["artists"][0]["name"], meta_tags["name"] | ||||||
|         ) |         ) | ||||||
|         token = credentials.get_access_token() |     except lyricwikia.LyricsNotFound: | ||||||
|         return token |         meta_tags["lyrics"] = None | ||||||
|  |  | ||||||
|     def refresh_token(self): |     # Some sugar | ||||||
|         """ Refresh expired token. """ |     meta_tags["year"], *_ = meta_tags["release_date"].split("-") | ||||||
|         new_token = self.generate_token() |     meta_tags["duration"] = meta_tags["duration_ms"] / 1000.0 | ||||||
|         self.spotify = spotipy.Spotify(auth=new_token) |     meta_tags["spotify_metadata"] = True | ||||||
|  |     # Remove unwanted parameters | ||||||
|  |     del meta_tags["duration_ms"] | ||||||
|  |     del meta_tags["available_markets"] | ||||||
|  |     del meta_tags["album"]["available_markets"] | ||||||
|  |  | ||||||
|     def generate_metadata(self, raw_song): |     log.debug(pprint.pformat(meta_tags)) | ||||||
|         """ Fetch a song's metadata from Spotify. """ |     return meta_tags | ||||||
|         if internals.is_spotify(raw_song): |  | ||||||
|             # fetch track information directly if it is spotify link |  | ||||||
|             log.debug("Fetching metadata for given track URL") | @must_be_authorized | ||||||
|             meta_tags = self.spotify.track(raw_song) | def write_user_playlist(username, text_file=None): | ||||||
|  |     """ Write user playlists to text_file """ | ||||||
|  |     links = get_playlists(username=username) | ||||||
|  |     playlist = internals.input_link(links) | ||||||
|  |     return write_playlist(playlist, text_file) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | @must_be_authorized | ||||||
|  | def get_playlists(username): | ||||||
|  |     """ Fetch user playlists when using the -u option. """ | ||||||
|  |     playlists = spotify.user_playlists(username) | ||||||
|  |     links = [] | ||||||
|  |     check = 1 | ||||||
|  |  | ||||||
|  |     while True: | ||||||
|  |         for playlist in playlists["items"]: | ||||||
|  |             # in rare cases, playlists may not be found, so playlists['next'] | ||||||
|  |             # is None. Skip these. Also see Issue #91. | ||||||
|  |             if playlist["name"] is not None: | ||||||
|  |                 log.info( | ||||||
|  |                     u"{0:>5}. {1:<30}  ({2} tracks)".format( | ||||||
|  |                         check, playlist["name"], playlist["tracks"]["total"] | ||||||
|  |                     ) | ||||||
|  |                 ) | ||||||
|  |                 playlist_url = playlist["external_urls"]["spotify"] | ||||||
|  |                 log.debug(playlist_url) | ||||||
|  |                 links.append(playlist_url) | ||||||
|  |                 check += 1 | ||||||
|  |         if playlists["next"]: | ||||||
|  |             playlists = spotify.next(playlists) | ||||||
|         else: |         else: | ||||||
|             # otherwise search on spotify and fetch information from first result |             break | ||||||
|             log.debug('Searching for "{}" on Spotify'.format(raw_song)) |  | ||||||
|             try: |  | ||||||
|                 meta_tags = self.spotify.search(raw_song, limit=1)["tracks"]["items"][0] |  | ||||||
|             except IndexError: |  | ||||||
|                 return None |  | ||||||
|         artist = self.spotify.artist(meta_tags["artists"][0]["id"]) |  | ||||||
|         album = self.spotify.album(meta_tags["album"]["id"]) |  | ||||||
|  |  | ||||||
|         try: |     return links | ||||||
|             meta_tags[u"genre"] = titlecase(artist["genres"][0]) |  | ||||||
|         except IndexError: |  | ||||||
|             meta_tags[u"genre"] = None |  | ||||||
|         try: |  | ||||||
|             meta_tags[u"copyright"] = album["copyrights"][0]["text"] |  | ||||||
|         except IndexError: |  | ||||||
|             meta_tags[u"copyright"] = None |  | ||||||
|         try: |  | ||||||
|             meta_tags[u"external_ids"][u"isrc"] |  | ||||||
|         except KeyError: |  | ||||||
|             meta_tags[u"external_ids"][u"isrc"] = None |  | ||||||
|  |  | ||||||
|         meta_tags[u"release_date"] = album["release_date"] |  | ||||||
|         meta_tags[u"publisher"] = album["label"] |  | ||||||
|         meta_tags[u"total_tracks"] = album["tracks"]["total"] |  | ||||||
|  |  | ||||||
|         log.debug("Fetching lyrics") | @must_be_authorized | ||||||
|  | def fetch_playlist(playlist): | ||||||
|  |     try: | ||||||
|  |         playlist_id = internals.extract_spotify_id(playlist) | ||||||
|  |     except IndexError: | ||||||
|  |         # Wrong format, in either case | ||||||
|  |         log.error("The provided playlist URL is not in a recognized format!") | ||||||
|  |         sys.exit(10) | ||||||
|  |     try: | ||||||
|  |         results = spotify.user_playlist( | ||||||
|  |             user=None, playlist_id=playlist_id, fields="tracks,next,name" | ||||||
|  |         ) | ||||||
|  |     except spotipy.client.SpotifyException: | ||||||
|  |         log.error("Unable to find playlist") | ||||||
|  |         log.info("Make sure the playlist is set to publicly visible and then try again") | ||||||
|  |         sys.exit(11) | ||||||
|  |  | ||||||
|         try: |     return results | ||||||
|             meta_tags["lyrics"] = lyricwikia.get_lyrics( |  | ||||||
|                 meta_tags["artists"][0]["name"], meta_tags["name"] |  | ||||||
|             ) |  | ||||||
|         except lyricwikia.LyricsNotFound: |  | ||||||
|             meta_tags["lyrics"] = None |  | ||||||
|  |  | ||||||
|         # Some sugar |  | ||||||
|         meta_tags["year"], *_ = meta_tags["release_date"].split("-") |  | ||||||
|         meta_tags["duration"] = meta_tags["duration_ms"] / 1000.0 |  | ||||||
|         meta_tags["spotify_metadata"] = True |  | ||||||
|         # Remove unwanted parameters |  | ||||||
|         del meta_tags["duration_ms"] |  | ||||||
|         del meta_tags["available_markets"] |  | ||||||
|         del meta_tags["album"]["available_markets"] |  | ||||||
|  |  | ||||||
|         log.debug(pprint.pformat(meta_tags)) | @must_be_authorized | ||||||
|         return meta_tags | def write_playlist(playlist_url, text_file=None): | ||||||
|  |     playlist = fetch_playlist(playlist_url) | ||||||
|  |     tracks = playlist["tracks"] | ||||||
|  |     if not text_file: | ||||||
|  |         text_file = u"{0}.txt".format(slugify(playlist["name"], ok="-_()[]{}")) | ||||||
|  |     return write_tracks(tracks, text_file) | ||||||
|  |  | ||||||
|     def write_user_playlist(self, username, text_file=None): |  | ||||||
|         """ Write user playlists to text_file """ |  | ||||||
|         links = self.get_playlists(username=username) |  | ||||||
|         playlist = internals.input_link(links) |  | ||||||
|         return self.write_playlist(playlist, text_file) |  | ||||||
|  |  | ||||||
|     def get_playlists(self, username): | @must_be_authorized | ||||||
|         """ Fetch user playlists when using the -u option. """ | def fetch_album(album): | ||||||
|         playlists = self.spotify.user_playlists(username) |     album_id = internals.extract_spotify_id(album) | ||||||
|         links = [] |     album = spotify.album(album_id) | ||||||
|         check = 1 |     return album | ||||||
|  |  | ||||||
|  |  | ||||||
|  | @must_be_authorized | ||||||
|  | def fetch_albums_from_artist(artist_url, album_type=None): | ||||||
|  |     """ | ||||||
|  |     This funcction returns all the albums from a give artist_url using the US | ||||||
|  |     market | ||||||
|  |     :param artist_url - spotify artist url | ||||||
|  |     :param album_type - the type of album to fetch (ex: single) the default is | ||||||
|  |                         all albums | ||||||
|  |     :param return - the album from the artist | ||||||
|  |     """ | ||||||
|  |  | ||||||
|  |     # fetching artist's albums limitting the results to the US to avoid duplicate | ||||||
|  |     # albums from multiple markets | ||||||
|  |     artist_id = internals.extract_spotify_id(artist_url) | ||||||
|  |     results = spotify.artist_albums(artist_id, album_type=album_type, country="US") | ||||||
|  |  | ||||||
|  |     albums = results["items"] | ||||||
|  |  | ||||||
|  |     # indexing all pages of results | ||||||
|  |     while results["next"]: | ||||||
|  |         results = spotify.next(results) | ||||||
|  |         albums.extend(results["items"]) | ||||||
|  |  | ||||||
|  |     return albums | ||||||
|  |  | ||||||
|  |  | ||||||
|  | @must_be_authorized | ||||||
|  | def write_all_albums_from_artist(artist_url, text_file=None): | ||||||
|  |     """ | ||||||
|  |     This function gets all albums from an artist and writes it to a file in the | ||||||
|  |     current working directory called [ARTIST].txt, where [ARTIST] is the artist | ||||||
|  |     of the album | ||||||
|  |     :param artist_url - spotify artist url | ||||||
|  |     :param text_file - file to write albums to | ||||||
|  |     """ | ||||||
|  |  | ||||||
|  |     album_base_url = "https://open.spotify.com/album/" | ||||||
|  |  | ||||||
|  |     # fetching all default albums | ||||||
|  |     albums = fetch_albums_from_artist(artist_url, album_type=None) | ||||||
|  |  | ||||||
|  |     # if no file if given, the default save file is in the current working | ||||||
|  |     # directory with the name of the artist | ||||||
|  |     if text_file is None: | ||||||
|  |         text_file = albums[0]["artists"][0]["name"] + ".txt" | ||||||
|  |  | ||||||
|  |     for album in albums: | ||||||
|  |         # logging album name | ||||||
|  |         log.info("Fetching album: " + album["name"]) | ||||||
|  |         write_album(album_base_url + album["id"], text_file=text_file) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | @must_be_authorized | ||||||
|  | def write_album(album_url, text_file=None): | ||||||
|  |     album = fetch_album(album_url) | ||||||
|  |     tracks = spotify.album_tracks(album["id"]) | ||||||
|  |     if not text_file: | ||||||
|  |         text_file = u"{0}.txt".format(slugify(album["name"], ok="-_()[]{}")) | ||||||
|  |     return write_tracks(tracks, text_file) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | @must_be_authorized | ||||||
|  | def write_tracks(tracks, text_file): | ||||||
|  |     log.info(u"Writing {0} tracks to {1}".format(tracks["total"], text_file)) | ||||||
|  |     track_urls = [] | ||||||
|  |     with open(text_file, "a") as file_out: | ||||||
|         while True: |         while True: | ||||||
|             for playlist in playlists["items"]: |             for item in tracks["items"]: | ||||||
|                 # in rare cases, playlists may not be found, so playlists['next'] |                 if "track" in item: | ||||||
|                 # is None. Skip these. Also see Issue #91. |                     track = item["track"] | ||||||
|                 if playlist["name"] is not None: |                 else: | ||||||
|                     log.info( |                     track = item | ||||||
|                         u"{0:>5}. {1:<30}  ({2} tracks)".format( |                 try: | ||||||
|                             check, playlist["name"], playlist["tracks"]["total"] |                     track_url = track["external_urls"]["spotify"] | ||||||
|  |                     log.debug(track_url) | ||||||
|  |                     file_out.write(track_url + "\n") | ||||||
|  |                     track_urls.append(track_url) | ||||||
|  |                 except KeyError: | ||||||
|  |                     log.warning( | ||||||
|  |                         u"Skipping track {0} by {1} (local only?)".format( | ||||||
|  |                             track["name"], track["artists"][0]["name"] | ||||||
|                         ) |                         ) | ||||||
|                     ) |                     ) | ||||||
|                     playlist_url = playlist["external_urls"]["spotify"] |             # 1 page = 50 results | ||||||
|                     log.debug(playlist_url) |             # check if there are more pages | ||||||
|                     links.append(playlist_url) |             if tracks["next"]: | ||||||
|                     check += 1 |                 tracks = spotify.next(tracks) | ||||||
|             if playlists["next"]: |  | ||||||
|                 playlists = self.spotify.next(playlists) |  | ||||||
|             else: |             else: | ||||||
|                 break |                 break | ||||||
|  |     return track_urls | ||||||
|         return links |  | ||||||
|  |  | ||||||
|     def fetch_playlist(self, playlist): |  | ||||||
|         try: |  | ||||||
|             playlist_id = internals.extract_spotify_id(playlist) |  | ||||||
|         except IndexError: |  | ||||||
|             # Wrong format, in either case |  | ||||||
|             log.error("The provided playlist URL is not in a recognized format!") |  | ||||||
|             sys.exit(10) |  | ||||||
|         try: |  | ||||||
|             results = self.spotify.user_playlist( |  | ||||||
|                 user=None, playlist_id=playlist_id, fields="tracks,next,name" |  | ||||||
|             ) |  | ||||||
|         except spotipy.client.SpotifyException: |  | ||||||
|             log.error("Unable to find playlist") |  | ||||||
|             log.info("Make sure the playlist is set to publicly visible and then try again") |  | ||||||
|             sys.exit(11) |  | ||||||
|  |  | ||||||
|         return results |  | ||||||
|  |  | ||||||
|     def write_playlist(self, playlist_url, text_file=None): |  | ||||||
|         playlist = self.fetch_playlist(playlist_url) |  | ||||||
|         tracks = playlist["tracks"] |  | ||||||
|         if not text_file: |  | ||||||
|             text_file = u"{0}.txt".format(slugify(playlist["name"], ok="-_()[]{}")) |  | ||||||
|         return self.write_tracks(tracks, text_file) |  | ||||||
|  |  | ||||||
|     def fetch_album(self, album): |  | ||||||
|         album_id = internals.extract_spotify_id(album) |  | ||||||
|         album = self.spotify.album(album_id) |  | ||||||
|         return album |  | ||||||
|  |  | ||||||
|     def fetch_albums_from_artist(self, artist_url, album_type=None): |  | ||||||
|         """ |  | ||||||
|         This funcction returns all the albums from a give artist_url using the US |  | ||||||
|         market |  | ||||||
|         :param artist_url - spotify artist url |  | ||||||
|         :param album_type - the type of album to fetch (ex: single) the default is |  | ||||||
|                             all albums |  | ||||||
|         :param return - the album from the artist |  | ||||||
|         """ |  | ||||||
|  |  | ||||||
|         # fetching artist's albums limitting the results to the US to avoid duplicate |  | ||||||
|         # albums from multiple markets |  | ||||||
|         artist_id = internals.extract_spotify_id(artist_url) |  | ||||||
|         results = self.spotify.artist_albums(artist_id, album_type=album_type, country="US") |  | ||||||
|  |  | ||||||
|         albums = results["items"] |  | ||||||
|  |  | ||||||
|         # indexing all pages of results |  | ||||||
|         while results["next"]: |  | ||||||
|             results = self.spotify.next(results) |  | ||||||
|             albums.extend(results["items"]) |  | ||||||
|  |  | ||||||
|         return albums |  | ||||||
|  |  | ||||||
|  |  | ||||||
|     def write_all_albums_from_artist(self, artist_url, text_file=None): |  | ||||||
|         """ |  | ||||||
|         This function gets all albums from an artist and writes it to a file in the |  | ||||||
|         current working directory called [ARTIST].txt, where [ARTIST] is the artist |  | ||||||
|         of the album |  | ||||||
|         :param artist_url - spotify artist url |  | ||||||
|         :param text_file - file to write albums to |  | ||||||
|         """ |  | ||||||
|  |  | ||||||
|         album_base_url = "https://open.spotify.com/album/" |  | ||||||
|  |  | ||||||
|         # fetching all default albums |  | ||||||
|         albums = self.fetch_albums_from_artist(artist_url, album_type=None) |  | ||||||
|  |  | ||||||
|         # if no file if given, the default save file is in the current working |  | ||||||
|         # directory with the name of the artist |  | ||||||
|         if text_file is None: |  | ||||||
|             text_file = albums[0]["artists"][0]["name"] + ".txt" |  | ||||||
|  |  | ||||||
|         for album in albums: |  | ||||||
|             # logging album name |  | ||||||
|             log.info("Fetching album: " + album["name"]) |  | ||||||
|             self.write_album(album_base_url + album["id"], text_file=text_file) |  | ||||||
|  |  | ||||||
|     def write_album(self, album_url, text_file=None): |  | ||||||
|         album = self.fetch_album(album_url) |  | ||||||
|         tracks = self.spotify.album_tracks(album["id"]) |  | ||||||
|         if not text_file: |  | ||||||
|             text_file = u"{0}.txt".format(slugify(album["name"], ok="-_()[]{}")) |  | ||||||
|         return self.write_tracks(tracks, text_file) |  | ||||||
|  |  | ||||||
|     def write_tracks(self, tracks, text_file): |  | ||||||
|         log.info(u"Writing {0} tracks to {1}".format(tracks["total"], text_file)) |  | ||||||
|         track_urls = [] |  | ||||||
|         with open(text_file, "a") as file_out: |  | ||||||
|             while True: |  | ||||||
|                 for item in tracks["items"]: |  | ||||||
|                     if "track" in item: |  | ||||||
|                         track = item["track"] |  | ||||||
|                     else: |  | ||||||
|                         track = item |  | ||||||
|                     try: |  | ||||||
|                         track_url = track["external_urls"]["spotify"] |  | ||||||
|                         log.debug(track_url) |  | ||||||
|                         file_out.write(track_url + "\n") |  | ||||||
|                         track_urls.append(track_url) |  | ||||||
|                     except KeyError: |  | ||||||
|                         log.warning( |  | ||||||
|                             u"Skipping track {0} by {1} (local only?)".format( |  | ||||||
|                                 track["name"], track["artists"][0]["name"] |  | ||||||
|                             ) |  | ||||||
|                         ) |  | ||||||
|                 # 1 page = 50 results |  | ||||||
|                 # check if there are more pages |  | ||||||
|                 if tracks["next"]: |  | ||||||
|                     tracks = self.spotify.next(tracks) |  | ||||||
|                 else: |  | ||||||
|                     break |  | ||||||
|         return track_urls |  | ||||||
|   | |||||||
| @@ -48,7 +48,6 @@ def go_pafy(raw_song, meta_tags=None): | |||||||
| def match_video_and_metadata(track): | def match_video_and_metadata(track): | ||||||
|     """ Get and match track data from YouTube and Spotify. """ |     """ Get and match track data from YouTube and Spotify. """ | ||||||
|     meta_tags = None |     meta_tags = None | ||||||
|     spotify = spotify_tools.SpotifyAuthorize() |  | ||||||
|  |  | ||||||
|  |  | ||||||
|     def fallback_metadata(meta_tags): |     def fallback_metadata(meta_tags): | ||||||
| @@ -68,13 +67,13 @@ def match_video_and_metadata(track): | |||||||
|         content = go_pafy(track, meta_tags=None) |         content = go_pafy(track, meta_tags=None) | ||||||
|         track = slugify(content.title).replace("-", " ") |         track = slugify(content.title).replace("-", " ") | ||||||
|         if not const.args.no_metadata: |         if not const.args.no_metadata: | ||||||
|             meta_tags = spotify.generate_metadata(track) |             meta_tags = spotify_tools.generate_metadata(track) | ||||||
|             meta_tags = fallback_metadata(meta_tags) |             meta_tags = fallback_metadata(meta_tags) | ||||||
|  |  | ||||||
|     elif internals.is_spotify(track): |     elif internals.is_spotify(track): | ||||||
|         log.debug("Input song is a Spotify URL") |         log.debug("Input song is a Spotify URL") | ||||||
|         # Let it generate metadata, YouTube doesn't know Spotify slang |         # Let it generate metadata, YouTube doesn't know Spotify slang | ||||||
|         meta_tags = spotify.generate_metadata(track) |         meta_tags = spotify_tools.generate_metadata(track) | ||||||
|         content = go_pafy(track, meta_tags) |         content = go_pafy(track, meta_tags) | ||||||
|         if const.args.no_metadata: |         if const.args.no_metadata: | ||||||
|             meta_tags = None |             meta_tags = None | ||||||
| @@ -84,7 +83,7 @@ def match_video_and_metadata(track): | |||||||
|         if const.args.no_metadata: |         if const.args.no_metadata: | ||||||
|             content = go_pafy(track, meta_tags=None) |             content = go_pafy(track, meta_tags=None) | ||||||
|         else: |         else: | ||||||
|             meta_tags = spotify.generate_metadata(track) |             meta_tags = spotify_tools.generate_metadata(track) | ||||||
|             content = go_pafy(track, meta_tags=meta_tags) |             content = go_pafy(track, meta_tags=meta_tags) | ||||||
|             meta_tags = fallback_metadata(meta_tags) |             meta_tags = fallback_metadata(meta_tags) | ||||||
|  |  | ||||||
|   | |||||||
| @@ -33,7 +33,7 @@ def pytest_namespace(): | |||||||
|  |  | ||||||
| @pytest.fixture(scope="module") | @pytest.fixture(scope="module") | ||||||
| def metadata_fixture(): | def metadata_fixture(): | ||||||
|     meta_tags = spotify_tools.SpotifyAuthorize().generate_metadata(SPOTIFY_TRACK_URL) |     meta_tags = spotify_tools.generate_metadata(SPOTIFY_TRACK_URL) | ||||||
|     return meta_tags |     return meta_tags | ||||||
|  |  | ||||||
|  |  | ||||||
|   | |||||||
| @@ -1,4 +1,7 @@ | |||||||
| from spotdl import spotify_tools | from spotdl import spotify_tools | ||||||
|  | from spotdl import const | ||||||
|  |  | ||||||
|  | import spotipy | ||||||
|  |  | ||||||
| import os | import os | ||||||
| import pytest | import pytest | ||||||
| @@ -6,26 +9,45 @@ import loader | |||||||
|  |  | ||||||
| loader.load_defaults() | loader.load_defaults() | ||||||
|  |  | ||||||
| @pytest.fixture(scope="module") |  | ||||||
| def spotify(): |  | ||||||
|     return spotify_tools.SpotifyAuthorize() |  | ||||||
|  |  | ||||||
| def test_generate_token(spotify): | def test_generate_token(): | ||||||
|     token = spotify.generate_token() |     token = spotify_tools.generate_token() | ||||||
|     assert len(token) == 83 |     assert len(token) == 83 | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_refresh_token(spotify): | class TestMustBeAuthorizedDecorator: | ||||||
|     old_instance = spotify.spotify |     def test_spotify_instance_is_unset(self): | ||||||
|     spotify.refresh_token() |         spotify_tools.spotify = None | ||||||
|     new_instance = spotify.spotify |  | ||||||
|     assert not old_instance == new_instance |         @spotify_tools.must_be_authorized | ||||||
|  |         def sample_func(): | ||||||
|  |             return True | ||||||
|  |  | ||||||
|  |         assert sample_func() | ||||||
|  |  | ||||||
|  |     def test_spotify_instance_forces_assertion_error(self): | ||||||
|  |         @spotify_tools.must_be_authorized | ||||||
|  |         def sample_func(): | ||||||
|  |             raise AssertionError | ||||||
|  |  | ||||||
|  |         with pytest.raises(AssertionError): | ||||||
|  |             sample_func() | ||||||
|  |  | ||||||
|  |     def test_fake_token_generator(self, monkeypatch): | ||||||
|  |         spotify_tools.spotify = None | ||||||
|  |         monkeypatch.setattr(spotify_tools, "generate_token", lambda: 123123) | ||||||
|  |  | ||||||
|  |         with pytest.raises(spotipy.client.SpotifyException): | ||||||
|  |             spotify_tools.generate_metadata("ncs - spectre") | ||||||
|  |  | ||||||
|  |     def test_correct_token(self): | ||||||
|  |         assert spotify_tools.generate_metadata("ncs - spectre") | ||||||
|  |  | ||||||
|  |  | ||||||
| class TestGenerateMetadata: | class TestGenerateMetadata: | ||||||
|     @pytest.fixture(scope="module") |     @pytest.fixture(scope="module") | ||||||
|     def metadata_fixture(self, spotify): |     def metadata_fixture(self): | ||||||
|         metadata = spotify.generate_metadata("ncs - spectre") |         metadata = spotify_tools.generate_metadata("ncs - spectre") | ||||||
|         return metadata |         return metadata | ||||||
|  |  | ||||||
|     def test_len(self, metadata_fixture): |     def test_len(self, metadata_fixture): | ||||||
| @@ -41,7 +63,7 @@ class TestGenerateMetadata: | |||||||
|         assert metadata_fixture["duration"] == 230.634 |         assert metadata_fixture["duration"] == 230.634 | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_get_playlists(spotify): | def test_get_playlists(): | ||||||
|     expect_playlist_ids = [ |     expect_playlist_ids = [ | ||||||
|         "34gWCK8gVeYDPKcctB6BQJ", |         "34gWCK8gVeYDPKcctB6BQJ", | ||||||
|         "04wTU2c2WNQG9XE5oSLYfj", |         "04wTU2c2WNQG9XE5oSLYfj", | ||||||
| @@ -53,15 +75,15 @@ def test_get_playlists(spotify): | |||||||
|         for playlist_id in expect_playlist_ids |         for playlist_id in expect_playlist_ids | ||||||
|     ] |     ] | ||||||
|  |  | ||||||
|     playlists = spotify.get_playlists("uqlakumu7wslkoen46s5bulq0") |     playlists = spotify_tools.get_playlists("uqlakumu7wslkoen46s5bulq0") | ||||||
|     assert playlists == expect_playlists |     assert playlists == expect_playlists | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_write_user_playlist(tmpdir, spotify, monkeypatch): | def test_write_user_playlist(tmpdir, monkeypatch): | ||||||
|     expect_tracks = 17 |     expect_tracks = 17 | ||||||
|     text_file = os.path.join(str(tmpdir), "test_us.txt") |     text_file = os.path.join(str(tmpdir), "test_us.txt") | ||||||
|     monkeypatch.setattr("builtins.input", lambda x: 1) |     monkeypatch.setattr("builtins.input", lambda x: 1) | ||||||
|     spotify.write_user_playlist("uqlakumu7wslkoen46s5bulq0", text_file) |     spotify_tools.write_user_playlist("uqlakumu7wslkoen46s5bulq0", text_file) | ||||||
|     with open(text_file, "r") as f: |     with open(text_file, "r") as f: | ||||||
|         tracks = len(f.readlines()) |         tracks = len(f.readlines()) | ||||||
|     assert tracks == expect_tracks |     assert tracks == expect_tracks | ||||||
| @@ -69,8 +91,8 @@ def test_write_user_playlist(tmpdir, spotify, monkeypatch): | |||||||
|  |  | ||||||
| class TestFetchPlaylist: | class TestFetchPlaylist: | ||||||
|     @pytest.fixture(scope="module") |     @pytest.fixture(scope="module") | ||||||
|     def playlist_fixture(self, spotify): |     def playlist_fixture(self): | ||||||
|         playlist = spotify.fetch_playlist( |         playlist = spotify_tools.fetch_playlist( | ||||||
|             "https://open.spotify.com/playlist/0fWBMhGh38y0wsYWwmM9Kt" |             "https://open.spotify.com/playlist/0fWBMhGh38y0wsYWwmM9Kt" | ||||||
|         ) |         ) | ||||||
|         return playlist |         return playlist | ||||||
| @@ -82,10 +104,10 @@ class TestFetchPlaylist: | |||||||
|         assert playlist_fixture["tracks"]["total"] == 14 |         assert playlist_fixture["tracks"]["total"] == 14 | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_write_playlist(tmpdir, spotify): | def test_write_playlist(tmpdir): | ||||||
|     expect_tracks = 14 |     expect_tracks = 14 | ||||||
|     text_file = os.path.join(str(tmpdir), "test_pl.txt") |     text_file = os.path.join(str(tmpdir), "test_pl.txt") | ||||||
|     spotify.write_playlist( |     spotify_tools.write_playlist( | ||||||
|         "https://open.spotify.com/playlist/0fWBMhGh38y0wsYWwmM9Kt", text_file |         "https://open.spotify.com/playlist/0fWBMhGh38y0wsYWwmM9Kt", text_file | ||||||
|     ) |     ) | ||||||
|     with open(text_file, "r") as f: |     with open(text_file, "r") as f: | ||||||
| @@ -96,8 +118,8 @@ def test_write_playlist(tmpdir, spotify): | |||||||
| # XXX: Mock this test off if it fails in future | # XXX: Mock this test off if it fails in future | ||||||
| class TestFetchAlbum: | class TestFetchAlbum: | ||||||
|     @pytest.fixture(scope="module") |     @pytest.fixture(scope="module") | ||||||
|     def album_fixture(self, spotify): |     def album_fixture(self): | ||||||
|         album = spotify.fetch_album( |         album = spotify_tools.fetch_album( | ||||||
|             "https://open.spotify.com/album/499J8bIsEnU7DSrosFDJJg" |             "https://open.spotify.com/album/499J8bIsEnU7DSrosFDJJg" | ||||||
|         ) |         ) | ||||||
|         return album |         return album | ||||||
| @@ -112,8 +134,8 @@ class TestFetchAlbum: | |||||||
| # XXX: Mock this test off if it fails in future | # XXX: Mock this test off if it fails in future | ||||||
| class TestFetchAlbumsFromArtist: | class TestFetchAlbumsFromArtist: | ||||||
|     @pytest.fixture(scope="module") |     @pytest.fixture(scope="module") | ||||||
|     def albums_from_artist_fixture(self, spotify): |     def albums_from_artist_fixture(self): | ||||||
|         albums = spotify.fetch_albums_from_artist( |         albums = spotify_tools.fetch_albums_from_artist( | ||||||
|             "https://open.spotify.com/artist/7oPftvlwr6VrsViSDV7fJY" |             "https://open.spotify.com/artist/7oPftvlwr6VrsViSDV7fJY" | ||||||
|         ) |         ) | ||||||
|         return albums |         return albums | ||||||
| @@ -134,10 +156,10 @@ class TestFetchAlbumsFromArtist: | |||||||
|         assert albums_from_artist_fixture[0]["total_tracks"] == 12 |         assert albums_from_artist_fixture[0]["total_tracks"] == 12 | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_write_all_albums_from_artist(tmpdir, spotify): | def test_write_all_albums_from_artist(tmpdir): | ||||||
|     expect_tracks = 282 |     expect_tracks = 282 | ||||||
|     text_file = os.path.join(str(tmpdir), "test_ab.txt") |     text_file = os.path.join(str(tmpdir), "test_ab.txt") | ||||||
|     spotify.write_all_albums_from_artist( |     spotify_tools.write_all_albums_from_artist( | ||||||
|         "https://open.spotify.com/artist/4dpARuHxo51G3z768sgnrY", text_file |         "https://open.spotify.com/artist/4dpARuHxo51G3z768sgnrY", text_file | ||||||
|     ) |     ) | ||||||
|     with open(text_file, "r") as f: |     with open(text_file, "r") as f: | ||||||
| @@ -145,10 +167,10 @@ def test_write_all_albums_from_artist(tmpdir, spotify): | |||||||
|     assert tracks == expect_tracks |     assert tracks == expect_tracks | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_write_album(tmpdir, spotify): | def test_write_album(tmpdir): | ||||||
|     expect_tracks = 15 |     expect_tracks = 15 | ||||||
|     text_file = os.path.join(str(tmpdir), "test_al.txt") |     text_file = os.path.join(str(tmpdir), "test_al.txt") | ||||||
|     spotify.write_album( |     spotify_tools.write_album( | ||||||
|         "https://open.spotify.com/album/499J8bIsEnU7DSrosFDJJg", text_file |         "https://open.spotify.com/album/499J8bIsEnU7DSrosFDJJg", text_file | ||||||
|     ) |     ) | ||||||
|     with open(text_file, "r") as f: |     with open(text_file, "r") as f: | ||||||
|   | |||||||
| @@ -40,7 +40,7 @@ class TestYouTubeAPIKeys: | |||||||
|  |  | ||||||
| @pytest.fixture(scope="module") | @pytest.fixture(scope="module") | ||||||
| def metadata_fixture(): | def metadata_fixture(): | ||||||
|     metadata = spotify_tools.SpotifyAuthorize().generate_metadata(TRACK_SEARCH) |     metadata = spotify_tools.generate_metadata(TRACK_SEARCH) | ||||||
|     return metadata |     return metadata | ||||||
|  |  | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user