mirror of
				https://github.com/KevinMidboe/spotify-downloader.git
				synced 2025-10-29 18:00:15 +00:00 
			
		
		
		
	* Outputs error details when track download fails from list file * Refactored Spotipy token refreshing * Reverted to old refreshing method Kept refresh_token() in spotify_tools.py
		
			
				
	
	
		
			242 lines
		
	
	
		
			8.8 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			242 lines
		
	
	
		
			8.8 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
#!/usr/bin/env python3
 | 
						|
# -*- coding: UTF-8 -*-
 | 
						|
from spotdl import __version__
 | 
						|
from spotdl import const
 | 
						|
from spotdl import handle
 | 
						|
from spotdl import metadata
 | 
						|
from spotdl import convert
 | 
						|
from spotdl import internals
 | 
						|
from spotdl import spotify_tools
 | 
						|
from spotdl import youtube_tools
 | 
						|
from logzero import logger as log
 | 
						|
from slugify import slugify
 | 
						|
import spotipy
 | 
						|
import urllib.request
 | 
						|
import logzero
 | 
						|
import os
 | 
						|
import sys
 | 
						|
import time
 | 
						|
import platform
 | 
						|
import pprint
 | 
						|
 | 
						|
 | 
						|
def check_exists(music_file, raw_song, meta_tags):
 | 
						|
    """ Check if the input song already exists in the given folder. """
 | 
						|
    log.debug(
 | 
						|
        "Cleaning any temp files and checking "
 | 
						|
        'if "{}" already exists'.format(music_file)
 | 
						|
    )
 | 
						|
    songs = os.listdir(const.args.folder)
 | 
						|
    for song in songs:
 | 
						|
        if song.endswith(".temp"):
 | 
						|
            os.remove(os.path.join(const.args.folder, song))
 | 
						|
            continue
 | 
						|
        # check if a song with the same name is already present in the given folder
 | 
						|
        if os.path.splitext(song)[0] == music_file:
 | 
						|
            log.debug('Found an already existing song: "{}"'.format(song))
 | 
						|
            if internals.is_spotify(raw_song):
 | 
						|
                # check if the already downloaded song has correct metadata
 | 
						|
                # if not, remove it and download again without prompt
 | 
						|
                already_tagged = metadata.compare(
 | 
						|
                    os.path.join(const.args.folder, song), meta_tags
 | 
						|
                )
 | 
						|
                log.debug(
 | 
						|
                    "Checking if it is already tagged correctly? {}", already_tagged
 | 
						|
                )
 | 
						|
                if not already_tagged:
 | 
						|
                    os.remove(os.path.join(const.args.folder, song))
 | 
						|
                    return False
 | 
						|
 | 
						|
            log.warning('"{}" already exists'.format(song))
 | 
						|
            if const.args.overwrite == "prompt":
 | 
						|
                log.info(
 | 
						|
                    '"{}" has already been downloaded. '
 | 
						|
                    "Re-download? (y/N): ".format(song)
 | 
						|
                )
 | 
						|
                prompt = input("> ")
 | 
						|
                if prompt.lower() == "y":
 | 
						|
                    os.remove(os.path.join(const.args.folder, song))
 | 
						|
                    return False
 | 
						|
                else:
 | 
						|
                    return True
 | 
						|
            elif const.args.overwrite == "force":
 | 
						|
                os.remove(os.path.join(const.args.folder, song))
 | 
						|
                log.info('Overwriting "{}"'.format(song))
 | 
						|
                return False
 | 
						|
            elif const.args.overwrite == "skip":
 | 
						|
                log.info('Skipping "{}"'.format(song))
 | 
						|
                return True
 | 
						|
    return False
 | 
						|
 | 
						|
 | 
						|
def download_list(tracks_file, skip_file=None, write_successful_file=None):
 | 
						|
    """ Download all songs from the list. """
 | 
						|
 | 
						|
    log.info("Checking and removing any duplicate tracks")
 | 
						|
    tracks = internals.get_unique_tracks(tracks_file)
 | 
						|
 | 
						|
    # override file with unique tracks
 | 
						|
    with open(tracks_file, "w") as f:
 | 
						|
        f.write("\n".join(tracks))
 | 
						|
 | 
						|
    # Remove tracks to skip from tracks list
 | 
						|
    if skip_file is not None:
 | 
						|
        skip_tracks = internals.get_unique_tracks(skip_file)
 | 
						|
        len_before = len(tracks)
 | 
						|
        tracks = [track for track in tracks if track not in skip_tracks]
 | 
						|
        log.info("Skipping {} tracks".format(len_before - len(tracks)))
 | 
						|
 | 
						|
    log.info(u"Preparing to download {} songs".format(len(tracks)))
 | 
						|
    downloaded_songs = []
 | 
						|
 | 
						|
    for number, raw_song in enumerate(tracks, 1):
 | 
						|
        print("")
 | 
						|
        try:
 | 
						|
            download_single(raw_song, number=number)
 | 
						|
        # token expires after 1 hour
 | 
						|
        except spotipy.client.SpotifyException:
 | 
						|
            # refresh token when it expires
 | 
						|
            log.debug("Token expired, generating new one and authorizing")
 | 
						|
            spotify_tools.refresh_token()
 | 
						|
            download_single(raw_song, number=number)
 | 
						|
        # detect network problems
 | 
						|
        except (urllib.request.URLError, TypeError, IOError) as e:
 | 
						|
            tracks.append(raw_song)
 | 
						|
            # remove the downloaded song from file
 | 
						|
            internals.trim_song(tracks_file)
 | 
						|
            # and append it at the end of file
 | 
						|
            with open(tracks_file, "a") as f:
 | 
						|
                f.write("\n" + raw_song)
 | 
						|
            log.exception(e)
 | 
						|
            log.warning("Failed to download song. Will retry after other songs\n")
 | 
						|
            # wait 0.5 sec to avoid infinite looping
 | 
						|
            time.sleep(0.5)
 | 
						|
            continue
 | 
						|
 | 
						|
        downloaded_songs.append(raw_song)
 | 
						|
        # Add track to file of successful downloads
 | 
						|
        log.debug("Adding downloaded song to write successful file")
 | 
						|
        if write_successful_file is not None:
 | 
						|
            with open(write_successful_file, "a") as f:
 | 
						|
                f.write("\n" + raw_song)
 | 
						|
        log.debug("Removing downloaded song from tracks file")
 | 
						|
        internals.trim_song(tracks_file)
 | 
						|
 | 
						|
    return downloaded_songs
 | 
						|
 | 
						|
 | 
						|
def download_single(raw_song, number=None):
 | 
						|
    """ Logic behind downloading a song. """
 | 
						|
    content, meta_tags = youtube_tools.match_video_and_metadata(raw_song)
 | 
						|
 | 
						|
    if content is None:
 | 
						|
        log.debug("Found no matching video")
 | 
						|
        return
 | 
						|
 | 
						|
    if const.args.download_only_metadata and meta_tags is None:
 | 
						|
        log.info("Found no metadata. Skipping the download")
 | 
						|
        return
 | 
						|
 | 
						|
    # "[number]. [artist] - [song]" if downloading from list
 | 
						|
    # otherwise "[artist] - [song]"
 | 
						|
    youtube_title = youtube_tools.get_youtube_title(content, number)
 | 
						|
    log.info("{} ({})".format(youtube_title, content.watchv_url))
 | 
						|
 | 
						|
    # generate file name of the song to download
 | 
						|
    songname = content.title
 | 
						|
 | 
						|
    if meta_tags is not None:
 | 
						|
        refined_songname = internals.format_string(
 | 
						|
            const.args.file_format, meta_tags, slugification=True
 | 
						|
        )
 | 
						|
        log.debug(
 | 
						|
            'Refining songname from "{0}" to "{1}"'.format(songname, refined_songname)
 | 
						|
        )
 | 
						|
        if not refined_songname == " - ":
 | 
						|
            songname = refined_songname
 | 
						|
    else:
 | 
						|
        if not const.args.no_metadata:
 | 
						|
            log.warning("Could not find metadata")
 | 
						|
        songname = internals.sanitize_title(songname)
 | 
						|
 | 
						|
    if const.args.dry_run:
 | 
						|
        return
 | 
						|
 | 
						|
    if not check_exists(songname, raw_song, meta_tags):
 | 
						|
        # deal with file formats containing slashes to non-existent directories
 | 
						|
        songpath = os.path.join(const.args.folder, os.path.dirname(songname))
 | 
						|
        os.makedirs(songpath, exist_ok=True)
 | 
						|
        input_song = songname + const.args.input_ext
 | 
						|
        output_song = songname + const.args.output_ext
 | 
						|
        if youtube_tools.download_song(input_song, content):
 | 
						|
            print("")
 | 
						|
            try:
 | 
						|
                convert.song(
 | 
						|
                    input_song,
 | 
						|
                    output_song,
 | 
						|
                    const.args.folder,
 | 
						|
                    avconv=const.args.avconv,
 | 
						|
                    trim_silence=const.args.trim_silence,
 | 
						|
                )
 | 
						|
            except FileNotFoundError:
 | 
						|
                encoder = "avconv" if const.args.avconv else "ffmpeg"
 | 
						|
                log.warning("Could not find {0}, skipping conversion".format(encoder))
 | 
						|
                const.args.output_ext = const.args.input_ext
 | 
						|
                output_song = songname + const.args.output_ext
 | 
						|
 | 
						|
            if not const.args.input_ext == const.args.output_ext:
 | 
						|
                os.remove(os.path.join(const.args.folder, input_song))
 | 
						|
            if not const.args.no_metadata and meta_tags is not None:
 | 
						|
                metadata.embed(os.path.join(const.args.folder, output_song), meta_tags)
 | 
						|
            return True
 | 
						|
 | 
						|
 | 
						|
def main():
 | 
						|
    const.args = handle.get_arguments()
 | 
						|
 | 
						|
    if const.args.version:
 | 
						|
        print("spotdl {version}".format(version=__version__))
 | 
						|
        sys.exit()
 | 
						|
 | 
						|
    internals.filter_path(const.args.folder)
 | 
						|
    youtube_tools.set_api_key()
 | 
						|
 | 
						|
    logzero.setup_default_logger(formatter=const._formatter, level=const.args.log_level)
 | 
						|
 | 
						|
    log.debug("Python version: {}".format(sys.version))
 | 
						|
    log.debug("Platform: {}".format(platform.platform()))
 | 
						|
    log.debug(pprint.pformat(const.args.__dict__))
 | 
						|
 | 
						|
    try:
 | 
						|
        if const.args.song:
 | 
						|
            download_single(raw_song=const.args.song)
 | 
						|
        elif const.args.list:
 | 
						|
            if const.args.write_m3u:
 | 
						|
                youtube_tools.generate_m3u(track_file=const.args.list)
 | 
						|
            else:
 | 
						|
                download_list(
 | 
						|
                    tracks_file=const.args.list,
 | 
						|
                    skip_file=const.args.skip,
 | 
						|
                    write_successful_file=const.args.write_successful,
 | 
						|
                )
 | 
						|
        elif const.args.playlist:
 | 
						|
            spotify_tools.write_playlist(playlist_url=const.args.playlist)
 | 
						|
        elif const.args.album:
 | 
						|
            spotify_tools.write_album(album_url=const.args.album)
 | 
						|
        elif const.args.all_albums:
 | 
						|
            spotify_tools.write_all_albums_from_artist(artist_url=const.args.all_albums)
 | 
						|
        elif const.args.username:
 | 
						|
            spotify_tools.write_user_playlist(username=const.args.username)
 | 
						|
 | 
						|
        # actually we don't necessarily need this, but yeah...
 | 
						|
        # explicit is better than implicit!
 | 
						|
        sys.exit(0)
 | 
						|
 | 
						|
    except KeyboardInterrupt as e:
 | 
						|
        log.exception(e)
 | 
						|
        sys.exit(3)
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    main()
 |