mirror of
				https://github.com/KevinMidboe/spotify-downloader.git
				synced 2025-10-29 18:00:15 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			382 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			382 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| #!/usr/bin/env python3
 | |
| # -*- coding: UTF-8 -*-
 | |
| 
 | |
| from core import logger
 | |
| from core import metadata
 | |
| from core import convert
 | |
| from core import internals
 | |
| from core import spotify_tools
 | |
| from slugify import slugify
 | |
| import spotipy
 | |
| import pafy
 | |
| import urllib.request
 | |
| import os
 | |
| import sys
 | |
| import time
 | |
| import sys
 | |
| import platform
 | |
| import pprint
 | |
| 
 | |
| 
 | |
| def generate_songname(tags):
 | |
|     """ Generate a string of the format '[artist] - [song]' for the given spotify song. """
 | |
|     raw_song = u'{0} - {1}'.format(tags['artists'][0]['name'], tags['name'])
 | |
|     return raw_song
 | |
| 
 | |
| 
 | |
| def is_video(result):
 | |
|     # ensure result is not a channel
 | |
|     not_video = result.find('channel') is not None or \
 | |
|                 'yt-lockup-channel' in result.parent.attrs['class'] or \
 | |
|                 'yt-lockup-channel' in result.attrs['class']
 | |
| 
 | |
|     # ensure result is not a mix/playlist
 | |
|     not_video = not_video or \
 | |
|                'yt-lockup-playlist' in result.parent.attrs['class']
 | |
| 
 | |
|     # ensure video result is not an advertisement
 | |
|     not_video = not_video or \
 | |
|                 result.find('googleads') is not None
 | |
| 
 | |
|     video = not not_video
 | |
|     return video
 | |
| 
 | |
| 
 | |
| def generate_youtube_url(raw_song, meta_tags, tries_remaining=5):
 | |
|     """ Search for the song on YouTube and generate a URL to its video. """
 | |
|     # prevents an infinite loop but allows for a few retries
 | |
|     if tries_remaining == 0:
 | |
|         log.debug('No tries left. I quit.')
 | |
|         return
 | |
| 
 | |
|     query = {'part': 'snippet',
 | |
|              'maxResults': 50,
 | |
|              'type': 'video'}
 | |
| 
 | |
|     if args.music_videos_only:
 | |
|         query['videoCategoryId'] = '10'
 | |
| 
 | |
|     if not meta_tags:
 | |
|         song = raw_song
 | |
|         query['q'] = song
 | |
|     else:
 | |
|         song = generate_songname(meta_tags)
 | |
|         query['q'] = song
 | |
|     log.debug('Query: {0}'.format(query))
 | |
| 
 | |
|     data = pafy.call_gdata('search', query)
 | |
|     query2 = {'part': 'contentDetails,snippet,statistics',
 | |
|               'maxResults': 50,
 | |
|               'id': ','.join(i['id']['videoId'] for i in data['items'])}
 | |
|     log.debug('Query2: {0}'.format(query2))
 | |
| 
 | |
|     vdata = pafy.call_gdata('videos', query2)
 | |
| 
 | |
|     videos = []
 | |
|     for x in vdata['items']:
 | |
|         duration_s = pafy.playlist.parseISO8591(x['contentDetails']['duration'])
 | |
|         youtubedetails = {'link': x['id'], 'title': x['snippet']['title'],
 | |
|                           'videotime':internals.videotime_from_seconds(duration_s),
 | |
|                           'seconds': duration_s}
 | |
|         videos.append(youtubedetails)
 | |
|         if not meta_tags:
 | |
|             break
 | |
| 
 | |
|     if not videos:
 | |
|         return None
 | |
| 
 | |
|     log.debug(pprint.pformat(videos))
 | |
| 
 | |
|     if args.manual:
 | |
|         log.info(song)
 | |
|         log.info('0. Skip downloading this song.\n')
 | |
|         # fetch all video links on first page on YouTube
 | |
|         for i, v in enumerate(videos):
 | |
|             log.info(u'{0}. {1} {2} {3}'.format(i+1, v['title'], v['videotime'],
 | |
|                   "http://youtube.com/watch?v="+v['link']))
 | |
|         # let user select the song to download
 | |
|         result = internals.input_link(videos)
 | |
|         if not result:
 | |
|             return None
 | |
|     else:
 | |
|         if not meta_tags:
 | |
|             # if the metadata could not be acquired, take the first result
 | |
|             # from Youtube because the proper song length is unknown
 | |
|             result = videos[0]
 | |
|             log.debug('Since no metadata found on Spotify, going with the first result')
 | |
|         else:
 | |
|             # filter out videos that do not have a similar length to the Spotify song
 | |
|             duration_tolerance = 10
 | |
|             max_duration_tolerance = 20
 | |
|             possible_videos_by_duration = list()
 | |
| 
 | |
|             '''
 | |
|             start with a reasonable duration_tolerance, and increment duration_tolerance
 | |
|             until one of the Youtube results falls within the correct duration or
 | |
|             the duration_tolerance has reached the max_duration_tolerance
 | |
|             '''
 | |
|             while len(possible_videos_by_duration) == 0:
 | |
|                 possible_videos_by_duration = list(filter(lambda x: abs(x['seconds'] - (int(meta_tags['duration_ms'])/1000)) <= duration_tolerance, videos))
 | |
|                 duration_tolerance += 1
 | |
|                 if duration_tolerance > max_duration_tolerance:
 | |
|                     log.error("{0} by {1} was not found.\n".format(meta_tags['name'],meta_tags['artists'][0]['name']))
 | |
|                     return None
 | |
| 
 | |
|             result = possible_videos_by_duration[0]
 | |
| 
 | |
|     if result:
 | |
|         url = "http://youtube.com/watch?v=" + result['link']
 | |
|     else:
 | |
|         url = None
 | |
| 
 | |
|     return  url
 | |
| 
 | |
| 
 | |
| def go_pafy(raw_song, meta_tags=None):
 | |
|     """ Parse track from YouTube. """
 | |
|     if internals.is_youtube(raw_song):
 | |
|         track_info = pafy.new(raw_song)
 | |
|     else:
 | |
|         track_url = generate_youtube_url(raw_song, meta_tags)
 | |
| 
 | |
|         if track_url:
 | |
|             track_info = pafy.new(track_url)
 | |
|         else:
 | |
|             track_info = None
 | |
| 
 | |
|     return track_info
 | |
| 
 | |
| 
 | |
| def get_youtube_title(content, number=None):
 | |
|     """ Get the YouTube video's title. """
 | |
|     title = content.title
 | |
|     if number:
 | |
|         return '{0}. {1}'.format(number, title)
 | |
|     else:
 | |
|         return title
 | |
| 
 | |
| 
 | |
| def download_song(file_name, content):
 | |
|     """ Download the audio file from YouTube. """
 | |
|     if args.input_ext in (".webm", ".m4a"):
 | |
|         link = content.getbestaudio(preftype=args.input_ext[1:])
 | |
|     else:
 | |
|         return False
 | |
| 
 | |
|     if link:
 | |
|         log.debug('Downloading from URL: ' + link.url)
 | |
|         filepath = '{0}{1}'.format(os.path.join(args.folder, file_name),
 | |
|                                    args.input_ext)
 | |
|         log.debug('Saving to: ' + filepath)
 | |
|         link.download(filepath=filepath)
 | |
|         return True
 | |
|     else:
 | |
|         return False
 | |
| 
 | |
| 
 | |
| def check_exists(music_file, raw_song, meta_tags):
 | |
|     """ Check if the input song already exists in the given folder. """
 | |
|     log.debug('Cleaning any temp files and checking '
 | |
|               'if "{}" already exists'.format(music_file))
 | |
|     songs = os.listdir(args.folder)
 | |
|     for song in songs:
 | |
|         if song.endswith('.temp'):
 | |
|             os.remove(os.path.join(args.folder, song))
 | |
|             continue
 | |
|         # check if any song with similar name is already present in the given folder
 | |
|         file_name = internals.sanitize_title(music_file)
 | |
|         if song.startswith(file_name):
 | |
|             log.debug('Found an already existing song: "{}"'.format(song))
 | |
|             if internals.is_spotify(raw_song):
 | |
|                 # check if the already downloaded song has correct metadata
 | |
|                 # if not, remove it and download again without prompt
 | |
|                 already_tagged = metadata.compare(os.path.join(args.folder, song),
 | |
|                                                   meta_tags)
 | |
|                 log.debug('Checking if it is already tagged correctly? {}',
 | |
|                                                             already_tagged)
 | |
|                 if not already_tagged:
 | |
|                     os.remove(os.path.join(args.folder, song))
 | |
|                     return False
 | |
| 
 | |
|             log.warning('"{}" already exists'.format(song))
 | |
|             if args.overwrite == 'prompt':
 | |
|                 log.info('"{}" has already been downloaded. '
 | |
|                          'Re-download? (y/N): '.format(song))
 | |
|                 prompt = input('> ')
 | |
|                 if prompt.lower() == 'y':
 | |
|                     os.remove(os.path.join(args.folder, song))
 | |
|                     return False
 | |
|                 else:
 | |
|                     return True
 | |
|             elif args.overwrite == 'force':
 | |
|                 os.remove(os.path.join(args.folder, song))
 | |
|                 log.info('Overwriting "{}"'.format(song))
 | |
|                 return False
 | |
|             elif args.overwrite == 'skip':
 | |
|                 log.info('Skipping "{}"'.format(song))
 | |
|                 return True
 | |
|     return False
 | |
| 
 | |
| 
 | |
| def grab_list(text_file):
 | |
|     """ Download all songs from the list. """
 | |
|     with open(text_file, 'r') as listed:
 | |
|         lines = (listed.read()).splitlines()
 | |
|     # ignore blank lines in text_file (if any)
 | |
|     try:
 | |
|         lines.remove('')
 | |
|     except ValueError:
 | |
|         pass
 | |
|     log.info(u'Preparing to download {} songs'.format(len(lines)))
 | |
|     number = 1
 | |
| 
 | |
|     for raw_song in lines:
 | |
|         print('')
 | |
|         try:
 | |
|             grab_single(raw_song, number=number)
 | |
|         # token expires after 1 hour
 | |
|         except spotipy.client.SpotifyException:
 | |
|             # refresh token when it expires
 | |
|             log.debug('Token expired, generating new one and authorizing')
 | |
|             new_token = spotify_tools.generate_token()
 | |
|             global spotify
 | |
|             spotify = spotipy.Spotify(auth=new_token)
 | |
|             grab_single(raw_song, number=number)
 | |
|         # detect network problems
 | |
|         except (urllib.request.URLError, TypeError, IOError):
 | |
|             lines.append(raw_song)
 | |
|             # remove the downloaded song from file
 | |
|             internals.trim_song(text_file)
 | |
|             # and append it at the end of file
 | |
|             with open(text_file, 'a') as myfile:
 | |
|                 myfile.write(raw_song + '\n')
 | |
|             log.warning('Failed to download song. Will retry after other songs\n')
 | |
|             # wait 0.5 sec to avoid infinite looping
 | |
|             time.sleep(0.5)
 | |
|             continue
 | |
| 
 | |
|         log.debug('Removing downloaded song from text file')
 | |
|         internals.trim_song(text_file)
 | |
|         number += 1
 | |
| 
 | |
| 
 | |
| def grab_playlist(playlist):
 | |
|     if '/' in playlist:
 | |
|         if playlist.endswith('/'):
 | |
|             playlist = playlist[:-1]
 | |
|         splits = playlist.split('/')
 | |
|     else:
 | |
|         splits = playlist.split(':')
 | |
| 
 | |
|     try:
 | |
|         username = splits[-3]
 | |
|     except IndexError:
 | |
|         # Wrong format, in either case
 | |
|         log.error('The provided playlist URL is not in a recognized format!')
 | |
|         sys.exit(10)
 | |
|     playlist_id = splits[-1]
 | |
|     try:
 | |
|         spotify_tools.write_playlist(username, playlist_id)
 | |
|     except spotipy.client.SpotifyException:
 | |
|         log.error('Unable to find playlist')
 | |
|         log.info('Make sure the playlist is set to publicly visible and then try again')
 | |
|         sys.exit(11)
 | |
| 
 | |
| 
 | |
| def grab_single(raw_song, number=None):
 | |
|     """ Logic behind downloading a song. """
 | |
|     if internals.is_youtube(raw_song):
 | |
|         log.debug('Input song is a YouTube URL')
 | |
|         content = go_pafy(raw_song, meta_tags=None)
 | |
|         raw_song = slugify(content.title).replace('-', ' ')
 | |
|         meta_tags = spotify_tools.generate_metadata(raw_song)
 | |
|     else:
 | |
|         meta_tags = spotify_tools.generate_metadata(raw_song)
 | |
|         content = go_pafy(raw_song, meta_tags)
 | |
| 
 | |
|     if not content:
 | |
|         log.debug('Found no matching video')
 | |
|         return
 | |
| 
 | |
|     # "[number]. [artist] - [song]" if downloading from list
 | |
|     # otherwise "[artist] - [song]"
 | |
|     youtube_title = get_youtube_title(content, number)
 | |
|     log.info('{} ({})'.format(youtube_title, content.watchv_url))
 | |
|     # generate file name of the song to download
 | |
|     songname = content.title
 | |
| 
 | |
|     if meta_tags:
 | |
|         refined_songname = generate_songname(meta_tags)
 | |
|         log.debug('Refining songname from "{0}" to "{1}"'.format(songname, refined_songname))
 | |
|         if not refined_songname == ' - ':
 | |
|             songname = refined_songname
 | |
| 
 | |
|     if args.dry_run:
 | |
|         return
 | |
| 
 | |
|     file_name = internals.sanitize_title(songname)
 | |
| 
 | |
|     if not check_exists(file_name, raw_song, meta_tags):
 | |
|         if download_song(file_name, content):
 | |
|             input_song = file_name + args.input_ext
 | |
|             output_song = file_name + args.output_ext
 | |
|             print('')
 | |
| 
 | |
|             try:
 | |
|                 convert.song(input_song, output_song, args.folder,
 | |
|                              avconv=args.avconv)
 | |
|             except FileNotFoundError:
 | |
|                 encoder = 'avconv' if args.avconv else 'ffmpeg'
 | |
|                 log.warning('Could not find {0}, skipping conversion'.format(encoder))
 | |
|                 args.output_ext = args.input_ext
 | |
|                 output_song = file_name + args.output_ext
 | |
| 
 | |
|             if not args.input_ext == args.output_ext:
 | |
|                 os.remove(os.path.join(args.folder, input_song))
 | |
| 
 | |
|             if not args.no_metadata:
 | |
|                 if metadata:
 | |
|                     metadata.embed(os.path.join(args.folder, output_song), meta_tags)
 | |
|                 else:
 | |
|                     log.warning('Could not find metadata')
 | |
| 
 | |
|         else:
 | |
|             log.error('No audio streams available')
 | |
| 
 | |
| 
 | |
| # token is mandatory when using Spotify's API
 | |
| # https://developer.spotify.com/news-stories/2017/01/27/removing-unauthenticated-calls-to-the-web-api/
 | |
| token = spotify_tools.generate_token()
 | |
| spotify = spotipy.Spotify(auth=token)
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     args = internals.get_arguments()
 | |
|     internals.filter_path(args.folder)
 | |
| 
 | |
|     logger.log = logger.logzero.setup_logger(formatter=logger.formatter,
 | |
|                                       level=args.log_level)
 | |
|     log = logger.log
 | |
|     log.debug('Python version: {}'.format(sys.version))
 | |
|     log.debug('Platform: {}'.format(platform.platform()))
 | |
|     log.debug(pprint.pformat(args.__dict__))
 | |
| 
 | |
|     try:
 | |
|         if args.song:
 | |
|             grab_single(raw_song=args.song)
 | |
|         elif args.list:
 | |
|             grab_list(text_file=args.list)
 | |
|         elif args.playlist:
 | |
|             grab_playlist(playlist=args.playlist)
 | |
|         elif args.album:
 | |
|             spotify_tools.grab_album(album=args.album)
 | |
|         elif args.username:
 | |
|             spotify_tools.feed_playlist(username=args.username)
 | |
| 
 | |
|         # Actually we don't necessarily need this, but yeah...
 | |
|         # Explicit is better than implicit!
 | |
|         sys.exit(0)
 | |
| 
 | |
|     except KeyboardInterrupt as e:
 | |
|         log.exception(e)
 | |
|         sys.exit(3)
 |