mirror of
https://github.com/KevinMidboe/spotify-downloader.git
synced 2025-10-29 18:00:15 +00:00
Changes for v1.0.0 release (#345)
* Move spotdl.py inside spotdl directory * Fix Dockerfile * Re-upload FFmpeg binary * Small fixes ;)
This commit is contained in:
1
spotdl/__init__.py
Executable file
1
spotdl/__init__.py
Executable file
@@ -0,0 +1 @@
|
||||
__version__ = '1.0.0'
|
||||
33
spotdl/const.py
Normal file
33
spotdl/const.py
Normal file
@@ -0,0 +1,33 @@
|
||||
import logzero
|
||||
|
||||
_log_format = ("%(color)s%(levelname)s:%(end_color)s %(message)s")
|
||||
_formatter = logzero.LogFormatter(fmt=_log_format)
|
||||
|
||||
# options
|
||||
log = logzero.setup_logger(formatter=_formatter)
|
||||
args = None
|
||||
|
||||
# Apple has specific tags - see mutagen docs -
|
||||
# http://mutagen.readthedocs.io/en/latest/api/mp4.html
|
||||
M4A_TAG_PRESET = { 'album' : '\xa9alb',
|
||||
'artist' : '\xa9ART',
|
||||
'date' : '\xa9day',
|
||||
'title' : '\xa9nam',
|
||||
'year' : '\xa9day',
|
||||
'originaldate' : 'purd',
|
||||
'comment' : '\xa9cmt',
|
||||
'group' : '\xa9grp',
|
||||
'writer' : '\xa9wrt',
|
||||
'genre' : '\xa9gen',
|
||||
'tracknumber' : 'trkn',
|
||||
'albumartist' : 'aART',
|
||||
'discnumber' : 'disk',
|
||||
'cpil' : 'cpil',
|
||||
'albumart' : 'covr',
|
||||
'copyright' : 'cprt',
|
||||
'tempo' : 'tmpo',
|
||||
'lyrics' : '\xa9lyr' }
|
||||
|
||||
TAG_PRESET = {}
|
||||
for key in M4A_TAG_PRESET.keys():
|
||||
TAG_PRESET[key] = key
|
||||
90
spotdl/convert.py
Normal file
90
spotdl/convert.py
Normal file
@@ -0,0 +1,90 @@
|
||||
import subprocess
|
||||
import os
|
||||
from spotdl.const import log
|
||||
|
||||
|
||||
"""What are the differences and similarities between ffmpeg, libav, and avconv?
|
||||
https://stackoverflow.com/questions/9477115
|
||||
|
||||
ffmeg encoders high to lower quality
|
||||
libopus > libvorbis >= libfdk_aac > aac > libmp3lame
|
||||
|
||||
libfdk_aac due to copyrights needs to be compiled by end user
|
||||
on MacOS brew install ffmpeg --with-fdk-aac will do just that. Other OS?
|
||||
https://trac.ffmpeg.org/wiki/Encode/AAC
|
||||
"""
|
||||
|
||||
|
||||
def song(input_song, output_song, folder, avconv=False, trim_silence=False):
|
||||
""" Do the audio format conversion. """
|
||||
if input_song == output_song:
|
||||
return 0
|
||||
convert = Converter(input_song, output_song, folder, trim_silence)
|
||||
log.info('Converting {0} to {1}'.format(
|
||||
input_song, output_song.split('.')[-1]))
|
||||
if avconv:
|
||||
exit_code = convert.with_avconv()
|
||||
else:
|
||||
exit_code = convert.with_ffmpeg()
|
||||
return exit_code
|
||||
|
||||
|
||||
class Converter:
|
||||
def __init__(self, input_song, output_song, folder, trim_silence=False):
|
||||
self.input_file = os.path.join(folder, input_song)
|
||||
self.output_file = os.path.join(folder, output_song)
|
||||
self.trim_silence = trim_silence
|
||||
|
||||
def with_avconv(self):
|
||||
if log.level == 10:
|
||||
level = 'debug'
|
||||
else:
|
||||
level = '0'
|
||||
|
||||
command = ['avconv', '-loglevel', level, '-i',
|
||||
self.input_file, '-ab', '192k',
|
||||
self.output_file, '-y']
|
||||
|
||||
if self.trim_silence:
|
||||
log.warning('--trim-silence not supported with avconv')
|
||||
|
||||
log.debug(command)
|
||||
return subprocess.call(command)
|
||||
|
||||
def with_ffmpeg(self):
|
||||
ffmpeg_pre = 'ffmpeg -y '
|
||||
|
||||
if not log.level == 10:
|
||||
ffmpeg_pre += '-hide_banner -nostats -v panic '
|
||||
|
||||
_, input_ext = os.path.splitext(self.input_file)
|
||||
_, output_ext = os.path.splitext(self.output_file)
|
||||
|
||||
ffmpeg_params = ''
|
||||
|
||||
if input_ext == '.m4a':
|
||||
if output_ext == '.mp3':
|
||||
ffmpeg_params = '-codec:v copy -codec:a libmp3lame -ar 44100 '
|
||||
elif output_ext == '.webm':
|
||||
ffmpeg_params = '-codec:a libopus -vbr on '
|
||||
|
||||
elif input_ext == '.webm':
|
||||
if output_ext == '.mp3':
|
||||
ffmpeg_params = '-codec:a libmp3lame -ar 44100 '
|
||||
elif output_ext == '.m4a':
|
||||
ffmpeg_params = '-cutoff 20000 -codec:a libfdk_aac -ar 44100 '
|
||||
|
||||
if output_ext == '.flac':
|
||||
ffmpeg_params = '-codec:a flac -ar 44100 '
|
||||
|
||||
# add common params for any of the above combination
|
||||
ffmpeg_params += '-b:a 192k -vn '
|
||||
ffmpeg_pre += ' -i'
|
||||
|
||||
if self.trim_silence:
|
||||
ffmpeg_params += '-af silenceremove=start_periods=1 '
|
||||
|
||||
command = ffmpeg_pre.split() + [self.input_file] + ffmpeg_params.split() + [self.output_file]
|
||||
|
||||
log.debug(command)
|
||||
return subprocess.call(command)
|
||||
190
spotdl/handle.py
Normal file
190
spotdl/handle.py
Normal file
@@ -0,0 +1,190 @@
|
||||
import appdirs
|
||||
from spotdl import internals, const
|
||||
|
||||
log = const.log
|
||||
|
||||
import logging
|
||||
import yaml
|
||||
import argparse
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
|
||||
_LOG_LEVELS_STR = ['INFO', 'WARNING', 'ERROR', 'DEBUG']
|
||||
|
||||
default_conf = { 'spotify-downloader':
|
||||
{ 'manual' : False,
|
||||
'no-metadata' : False,
|
||||
'avconv' : False,
|
||||
'folder' : internals.get_music_dir(),
|
||||
'overwrite' : 'prompt',
|
||||
'input-ext' : '.m4a',
|
||||
'output-ext' : '.mp3',
|
||||
'trim-silence' : False,
|
||||
'download-only-metadata' : False,
|
||||
'dry-run' : False,
|
||||
'music-videos-only' : False,
|
||||
'no-spaces' : False,
|
||||
'file-format' : '{artist} - {track_name}',
|
||||
'search-format' : '{artist} - {track_name} lyrics',
|
||||
'youtube-api-key' : None,
|
||||
'log-level' : 'INFO' }
|
||||
}
|
||||
|
||||
|
||||
def log_leveller(log_level_str):
|
||||
loggin_levels = [logging.INFO, logging.WARNING, logging.ERROR, logging.DEBUG]
|
||||
log_level_str_index = _LOG_LEVELS_STR.index(log_level_str)
|
||||
loggin_level = loggin_levels[log_level_str_index]
|
||||
return loggin_level
|
||||
|
||||
|
||||
def merge(default, config):
|
||||
""" Override default dict with config dict. """
|
||||
merged = default.copy()
|
||||
merged.update(config)
|
||||
return merged
|
||||
|
||||
|
||||
def get_config(config_file):
|
||||
try:
|
||||
with open(config_file, 'r') as ymlfile:
|
||||
cfg = yaml.load(ymlfile)
|
||||
except FileNotFoundError:
|
||||
log.info('Writing default configuration to {0}:'.format(config_file))
|
||||
with open(config_file, 'w') as ymlfile:
|
||||
yaml.dump(default_conf, ymlfile, default_flow_style=False)
|
||||
cfg = default_conf
|
||||
|
||||
for line in yaml.dump(default_conf['spotify-downloader'], default_flow_style=False).split('\n'):
|
||||
if line.strip():
|
||||
log.info(line.strip())
|
||||
log.info('Please note that command line arguments have higher priority '
|
||||
'than their equivalents in the configuration file')
|
||||
|
||||
return cfg['spotify-downloader']
|
||||
|
||||
|
||||
def override_config(config_file, parser, raw_args=None):
|
||||
""" Override default dict with config dict passed as comamnd line argument. """
|
||||
config_file = os.path.realpath(config_file)
|
||||
config = merge(default_conf['spotify-downloader'], get_config(config_file))
|
||||
parser.set_defaults(**config)
|
||||
return parser.parse_args(raw_args)
|
||||
|
||||
|
||||
def get_arguments(raw_args=None, to_group=True, to_merge=True):
|
||||
parser = argparse.ArgumentParser(
|
||||
description='Download and convert tracks from Spotify, Youtube etc.',
|
||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||
|
||||
if to_merge:
|
||||
config_dir = os.path.join(appdirs.user_config_dir(), 'spotdl')
|
||||
os.makedirs(config_dir, exist_ok=True)
|
||||
config_file = os.path.join(config_dir, 'config.yml')
|
||||
config = merge(default_conf['spotify-downloader'], get_config(config_file))
|
||||
else:
|
||||
config = default_conf['spotify-downloader']
|
||||
|
||||
if to_group:
|
||||
group = parser.add_mutually_exclusive_group(required=True)
|
||||
|
||||
group.add_argument(
|
||||
'-s', '--song',
|
||||
help='download track by spotify link or name')
|
||||
group.add_argument(
|
||||
'-l', '--list',
|
||||
help='download tracks from a file')
|
||||
group.add_argument(
|
||||
'-p', '--playlist',
|
||||
help='load tracks from playlist URL into <playlist_name>.txt')
|
||||
group.add_argument(
|
||||
'-b', '--album',
|
||||
help='load tracks from album URL into <album_name>.txt')
|
||||
group.add_argument(
|
||||
'-u', '--username',
|
||||
help="load tracks from user's playlist into <playlist_name>.txt")
|
||||
group.add_argument(
|
||||
'-V', '--version',
|
||||
help="show version and exit",
|
||||
action='store_true')
|
||||
|
||||
parser.add_argument(
|
||||
'-m', '--manual', default=config['manual'],
|
||||
help='choose the track to download manually from a list '
|
||||
'of matching tracks',
|
||||
action='store_true')
|
||||
parser.add_argument(
|
||||
'-nm', '--no-metadata', default=config['no-metadata'],
|
||||
help='do not embed metadata in tracks', action='store_true')
|
||||
parser.add_argument(
|
||||
'-a', '--avconv', default=config['avconv'],
|
||||
help='use avconv for conversion (otherwise defaults to ffmpeg)',
|
||||
action='store_true')
|
||||
parser.add_argument(
|
||||
'-f', '--folder', default=os.path.abspath(config['folder']),
|
||||
help='path to folder where downloaded tracks will be stored in')
|
||||
parser.add_argument(
|
||||
'--overwrite', default=config['overwrite'],
|
||||
help='change the overwrite policy',
|
||||
choices={'prompt', 'force', 'skip'})
|
||||
parser.add_argument(
|
||||
'-i', '--input-ext', default=config['input-ext'],
|
||||
help='preferred input format .m4a or .webm (Opus)',
|
||||
choices={'.m4a', '.webm'})
|
||||
parser.add_argument(
|
||||
'-o', '--output-ext', default=config['output-ext'],
|
||||
help='preferred output format .mp3, .m4a (AAC), .flac, etc.')
|
||||
parser.add_argument(
|
||||
'-ff', '--file-format', default=config['file-format'],
|
||||
help='file format to save the downloaded track with, each tag '
|
||||
'is surrounded by curly braces. Possible formats: '
|
||||
'{}'.format([internals.formats[x] for x in internals.formats]))
|
||||
parser.add_argument(
|
||||
'--trim-silence', default=config['trim-silence'],
|
||||
help='remove silence from the start of the audio',
|
||||
action='store_true')
|
||||
parser.add_argument(
|
||||
'-sf', '--search-format', default=config['search-format'],
|
||||
help='search format to search for on YouTube, each tag '
|
||||
'is surrounded by curly braces. Possible formats: '
|
||||
'{}'.format([internals.formats[x] for x in internals.formats]))
|
||||
parser.add_argument(
|
||||
'-dm', '--download-only-metadata', default=config['download-only-metadata'],
|
||||
help='download tracks only whose metadata is found',
|
||||
action='store_true')
|
||||
parser.add_argument(
|
||||
'-d', '--dry-run', default=config['dry-run'],
|
||||
help='show only track title and YouTube URL, and then skip '
|
||||
'to the next track (if any)',
|
||||
action='store_true')
|
||||
parser.add_argument(
|
||||
'-mo', '--music-videos-only', default=config['music-videos-only'],
|
||||
help='search only for music videos on Youtube (works only '
|
||||
'when YouTube API key is set',
|
||||
action='store_true')
|
||||
parser.add_argument(
|
||||
'-ns', '--no-spaces', default=config['no-spaces'],
|
||||
help='replace spaces with underscores in file names',
|
||||
action='store_true')
|
||||
parser.add_argument(
|
||||
'-ll', '--log-level', default=config['log-level'],
|
||||
choices=_LOG_LEVELS_STR,
|
||||
type=str.upper,
|
||||
help='set log verbosity')
|
||||
parser.add_argument(
|
||||
'-yk', '--youtube-api-key', default=config['youtube-api-key'],
|
||||
help=argparse.SUPPRESS)
|
||||
parser.add_argument(
|
||||
'-c', '--config', default=None,
|
||||
help='path to custom config.yml file')
|
||||
|
||||
parsed = parser.parse_args(raw_args)
|
||||
|
||||
if parsed.config is not None and to_merge:
|
||||
parsed = override_config(parsed.config, parser)
|
||||
|
||||
parsed.log_level = log_leveller(parsed.log_level)
|
||||
|
||||
return parsed
|
||||
182
spotdl/internals.py
Executable file
182
spotdl/internals.py
Executable file
@@ -0,0 +1,182 @@
|
||||
import os
|
||||
import sys
|
||||
|
||||
from spotdl import const
|
||||
|
||||
log = const.log
|
||||
|
||||
try:
|
||||
from slugify import SLUG_OK, slugify
|
||||
except ImportError:
|
||||
log.error('Oops! `unicode-slugify` was not found.')
|
||||
log.info('Please remove any other slugify library and install `unicode-slugify`')
|
||||
sys.exit(5)
|
||||
|
||||
formats = { 0 : 'track_name',
|
||||
1 : 'artist',
|
||||
2 : 'album',
|
||||
3 : 'album_artist',
|
||||
4 : 'genre',
|
||||
5 : 'disc_number',
|
||||
6 : 'duration',
|
||||
7 : 'year',
|
||||
8 : 'original_date',
|
||||
9 : 'track_number',
|
||||
10 : 'total_tracks',
|
||||
11 : 'isrc' }
|
||||
|
||||
|
||||
def input_link(links):
|
||||
""" Let the user input a choice. """
|
||||
while True:
|
||||
try:
|
||||
log.info('Choose your number:')
|
||||
the_chosen_one = int(input('> '))
|
||||
if 1 <= the_chosen_one <= len(links):
|
||||
return links[the_chosen_one - 1]
|
||||
elif the_chosen_one == 0:
|
||||
return None
|
||||
else:
|
||||
log.warning('Choose a valid number!')
|
||||
except ValueError:
|
||||
log.warning('Choose a valid number!')
|
||||
|
||||
|
||||
def trim_song(text_file):
|
||||
""" Remove the first song from file. """
|
||||
with open(text_file, 'r') as file_in:
|
||||
data = file_in.read().splitlines(True)
|
||||
with open(text_file, 'w') as file_out:
|
||||
file_out.writelines(data[1:])
|
||||
return data[0]
|
||||
|
||||
|
||||
def is_spotify(raw_song):
|
||||
""" Check if the input song is a Spotify link. """
|
||||
status = len(raw_song) == 22 and raw_song.replace(" ", "%20") == raw_song
|
||||
status = status or raw_song.find('spotify') > -1
|
||||
return status
|
||||
|
||||
|
||||
def is_youtube(raw_song):
|
||||
""" Check if the input song is a YouTube link. """
|
||||
status = len(raw_song) == 11 and raw_song.replace(" ", "%20") == raw_song
|
||||
status = status and not raw_song.lower() == raw_song
|
||||
status = status or 'youtube.com/watch?v=' in raw_song
|
||||
return status
|
||||
|
||||
|
||||
def format_string(string_format, tags, slugification=False, force_spaces=False):
|
||||
""" Generate a string of the format '[artist] - [song]' for the given spotify song. """
|
||||
format_tags = dict(formats)
|
||||
format_tags[0] = tags['name']
|
||||
format_tags[1] = tags['artists'][0]['name']
|
||||
format_tags[2] = tags['album']['name']
|
||||
format_tags[3] = tags['artists'][0]['name']
|
||||
format_tags[4] = tags['genre']
|
||||
format_tags[5] = tags['disc_number']
|
||||
format_tags[6] = tags['duration']
|
||||
format_tags[7] = tags['year']
|
||||
format_tags[8] = tags['release_date']
|
||||
format_tags[9] = tags['track_number']
|
||||
format_tags[10] = tags['total_tracks']
|
||||
format_tags[11] = tags['external_ids']['isrc']
|
||||
|
||||
for tag in format_tags:
|
||||
if slugification:
|
||||
format_tags[tag] = sanitize_title(format_tags[tag],
|
||||
ok='-_()[]{}')
|
||||
else:
|
||||
format_tags[tag] = str(format_tags[tag])
|
||||
|
||||
for x in formats:
|
||||
format_tag = '{' + formats[x] + '}'
|
||||
string_format = string_format.replace(format_tag,
|
||||
format_tags[x])
|
||||
|
||||
if const.args.no_spaces and not force_spaces:
|
||||
string_format = string_format.replace(' ', '_')
|
||||
|
||||
return string_format
|
||||
|
||||
|
||||
def sanitize_title(title, ok='-_()[]{}\/'):
|
||||
""" Generate filename of the song to be downloaded. """
|
||||
|
||||
if const.args.no_spaces:
|
||||
title = title.replace(' ', '_')
|
||||
|
||||
# slugify removes any special characters
|
||||
title = slugify(title, ok=ok, lower=False, spaces=True)
|
||||
return title
|
||||
|
||||
|
||||
def filter_path(path):
|
||||
if not os.path.exists(path):
|
||||
os.makedirs(path)
|
||||
for temp in os.listdir(path):
|
||||
if temp.endswith('.temp'):
|
||||
os.remove(os.path.join(path, temp))
|
||||
|
||||
|
||||
def videotime_from_seconds(time):
|
||||
if time < 60:
|
||||
return str(time)
|
||||
if time < 3600:
|
||||
return '{0}:{1:02}'.format(time//60, time % 60)
|
||||
|
||||
return '{0}:{1:02}:{2:02}'.format((time//60)//60, (time//60) % 60, time % 60)
|
||||
|
||||
|
||||
def get_sec(time_str):
|
||||
if ':' in time_str:
|
||||
splitter = ':'
|
||||
elif '.' in time_str:
|
||||
splitter = '.'
|
||||
else:
|
||||
raise ValueError("No expected character found in {} to split"
|
||||
"time values.".format(time_str))
|
||||
v = time_str.split(splitter, 3)
|
||||
v.reverse()
|
||||
sec = 0
|
||||
if len(v) > 0: # seconds
|
||||
sec += int(v[0])
|
||||
if len(v) > 1: # minutes
|
||||
sec += int(v[1]) * 60
|
||||
if len(v) > 2: # hours
|
||||
sec += int(v[2]) * 3600
|
||||
return sec
|
||||
|
||||
|
||||
def get_splits(url):
|
||||
if '/' in url:
|
||||
if url.endswith('/'):
|
||||
url = url[:-1]
|
||||
splits = url.split('/')
|
||||
else:
|
||||
splits = url.split(':')
|
||||
return splits
|
||||
|
||||
|
||||
# a hacky way to user's localized music directory
|
||||
# (thanks @linusg, issue #203)
|
||||
def get_music_dir():
|
||||
home = os.path.expanduser('~')
|
||||
|
||||
# On Linux, the localized folder names are the actual ones.
|
||||
# It's a freedesktop standard though.
|
||||
if sys.platform.startswith('linux'):
|
||||
for file_item in ('.config/user-dirs.dirs', 'user-dirs.dirs'):
|
||||
path = os.path.join(home, file_item)
|
||||
if os.path.isfile(path):
|
||||
with open(path, 'r') as f:
|
||||
for line in f:
|
||||
if line.startswith('XDG_MUSIC_DIR'):
|
||||
return os.path.expandvars(line.strip().split('=')[1].strip('"'))
|
||||
|
||||
# On both Windows and macOS, the localized folder names you see in
|
||||
# Explorer and Finder are actually in English on the file system.
|
||||
# So, defaulting to C:\Users\<user>\Music or /Users/<user>/Music
|
||||
# respectively is sufficient.
|
||||
# On Linux, default to /home/<user>/Music if the above method failed.
|
||||
return os.path.join(home, 'Music')
|
||||
159
spotdl/metadata.py
Executable file
159
spotdl/metadata.py
Executable file
@@ -0,0 +1,159 @@
|
||||
from mutagen.easyid3 import EasyID3
|
||||
from mutagen.id3 import ID3, TORY, TYER, TPUB, APIC, USLT, COMM
|
||||
from mutagen.mp4 import MP4, MP4Cover
|
||||
from mutagen.flac import Picture, FLAC
|
||||
from spotdl.const import log, TAG_PRESET, M4A_TAG_PRESET
|
||||
|
||||
import urllib.request
|
||||
|
||||
|
||||
def compare(music_file, metadata):
|
||||
"""Check if the input music file title matches the expected title."""
|
||||
already_tagged = False
|
||||
try:
|
||||
if music_file.endswith('.mp3'):
|
||||
audiofile = EasyID3(music_file)
|
||||
already_tagged = audiofile['title'][0] == metadata['name']
|
||||
elif music_file.endswith('.m4a'):
|
||||
audiofile = MP4(music_file)
|
||||
already_tagged = audiofile['\xa9nam'][0] == metadata['name']
|
||||
except (KeyError, TypeError):
|
||||
pass
|
||||
|
||||
return already_tagged
|
||||
|
||||
|
||||
def embed(music_file, meta_tags):
|
||||
""" Embed metadata. """
|
||||
embed = EmbedMetadata(music_file, meta_tags)
|
||||
if music_file.endswith('.m4a'):
|
||||
log.info('Applying metadata')
|
||||
return embed.as_m4a()
|
||||
elif music_file.endswith('.mp3'):
|
||||
log.info('Applying metadata')
|
||||
return embed.as_mp3()
|
||||
elif music_file.endswith('.flac'):
|
||||
log.info('Applying metadata')
|
||||
return embed.as_flac()
|
||||
else:
|
||||
log.warning('Cannot embed metadata into given output extension')
|
||||
return False
|
||||
|
||||
|
||||
class EmbedMetadata:
|
||||
def __init__(self, music_file, meta_tags):
|
||||
self.music_file = music_file
|
||||
self.meta_tags = meta_tags
|
||||
|
||||
def as_mp3(self):
|
||||
""" Embed metadata to MP3 files. """
|
||||
music_file = self.music_file
|
||||
meta_tags = self.meta_tags
|
||||
# EasyID3 is fun to use ;)
|
||||
# For supported easyid3 tags:
|
||||
# https://github.com/quodlibet/mutagen/blob/master/mutagen/easyid3.py
|
||||
# Check out somewhere at end of above linked file
|
||||
audiofile = EasyID3(music_file)
|
||||
self._embed_basic_metadata(audiofile, preset=TAG_PRESET)
|
||||
audiofile['media'] = meta_tags['type']
|
||||
audiofile['author'] = meta_tags['artists'][0]['name']
|
||||
audiofile['lyricist'] = meta_tags['artists'][0]['name']
|
||||
audiofile['arranger'] = meta_tags['artists'][0]['name']
|
||||
audiofile['performer'] = meta_tags['artists'][0]['name']
|
||||
audiofile['website'] = meta_tags['external_urls']['spotify']
|
||||
audiofile['length'] = str(meta_tags['duration'])
|
||||
if meta_tags['publisher']:
|
||||
audiofile['encodedby'] = meta_tags['publisher']
|
||||
if meta_tags['external_ids']['isrc']:
|
||||
audiofile['isrc'] = meta_tags['external_ids']['isrc']
|
||||
audiofile.save(v2_version=3)
|
||||
|
||||
# For supported id3 tags:
|
||||
# https://github.com/quodlibet/mutagen/blob/master/mutagen/id3/_frames.py
|
||||
# Each class represents an id3 tag
|
||||
audiofile = ID3(music_file)
|
||||
audiofile['TORY'] = TORY(encoding=3, text=meta_tags['year'])
|
||||
audiofile['TYER'] = TYER(encoding=3, text=meta_tags['year'])
|
||||
audiofile['TPUB'] = TPUB(encoding=3, text=meta_tags['publisher'])
|
||||
audiofile['COMM'] = COMM(encoding=3, text=meta_tags['external_urls']['spotify'])
|
||||
if meta_tags['lyrics']:
|
||||
audiofile['USLT'] = USLT(encoding=3, desc=u'Lyrics', text=meta_tags['lyrics'])
|
||||
try:
|
||||
albumart = urllib.request.urlopen(meta_tags['album']['images'][0]['url'])
|
||||
audiofile['APIC'] = APIC(encoding=3, mime='image/jpeg', type=3,
|
||||
desc=u'Cover', data=albumart.read())
|
||||
albumart.close()
|
||||
except IndexError:
|
||||
pass
|
||||
|
||||
audiofile.save(v2_version=3)
|
||||
return True
|
||||
|
||||
def as_m4a(self):
|
||||
""" Embed metadata to M4A files. """
|
||||
music_file = self.music_file
|
||||
meta_tags = self.meta_tags
|
||||
audiofile = MP4(music_file)
|
||||
self._embed_basic_metadata(audiofile, preset=M4A_TAG_PRESET)
|
||||
audiofile[M4A_TAG_PRESET['year']] = meta_tags['year']
|
||||
if meta_tags['lyrics']:
|
||||
audiofile['lyrics'] = meta_tags['lyrics']
|
||||
try:
|
||||
albumart = urllib.request.urlopen(meta_tags['album']['images'][0]['url'])
|
||||
audiofile[M4A_TAG_PRESET['albumart']] = [MP4Cover(
|
||||
albumart.read(), imageformat=MP4Cover.FORMAT_JPEG)]
|
||||
albumart.close()
|
||||
except IndexError:
|
||||
pass
|
||||
|
||||
audiofile.save()
|
||||
return True
|
||||
|
||||
def as_flac(self):
|
||||
music_file = self.music_file
|
||||
meta_tags = self.meta_tags
|
||||
audiofile = FLAC(music_file)
|
||||
self._embed_basic_metadata(audiofile)
|
||||
audiofile['year'] = meta_tags['year']
|
||||
audiofile['comment'] = meta_tags['external_urls']['spotify']
|
||||
if meta_tags['lyrics']:
|
||||
audiofile['lyrics'] = meta_tags['lyrics']
|
||||
|
||||
image = Picture()
|
||||
image.type = 3
|
||||
image.desc = 'Cover'
|
||||
image.mime = 'image/jpeg'
|
||||
albumart = urllib.request.urlopen(meta_tags['album']['images'][0]['url'])
|
||||
image.data = albumart.read()
|
||||
albumart.close()
|
||||
audiofile.add_picture(image)
|
||||
|
||||
audiofile.save()
|
||||
return True
|
||||
|
||||
def _embed_basic_metadata(self, audiofile, preset=TAG_PRESET):
|
||||
meta_tags = self.meta_tags
|
||||
audiofile[preset['artist']] = meta_tags['artists'][0]['name']
|
||||
audiofile[preset['albumartist']] = meta_tags['artists'][0]['name']
|
||||
audiofile[preset['album']] = meta_tags['album']['name']
|
||||
audiofile[preset['title']] = meta_tags['name']
|
||||
audiofile[preset['date']] = meta_tags['release_date']
|
||||
audiofile[preset['originaldate']] = meta_tags['release_date']
|
||||
if meta_tags['genre']:
|
||||
audiofile[preset['genre']] = meta_tags['genre']
|
||||
if meta_tags['copyright']:
|
||||
audiofile[preset['copyright']] = meta_tags['copyright']
|
||||
if self.music_file.endswith('.flac'):
|
||||
audiofile[preset['discnumber']] = str(meta_tags['disc_number'])
|
||||
else:
|
||||
audiofile[preset['discnumber']] = [(meta_tags['disc_number'], 0)]
|
||||
if self.music_file.endswith('.flac'):
|
||||
audiofile[preset['tracknumber']] = str(meta_tags['track_number'])
|
||||
else:
|
||||
if preset['tracknumber'] == TAG_PRESET['tracknumber']:
|
||||
audiofile[preset['tracknumber']] = '{}/{}'.format(meta_tags['track_number'],
|
||||
meta_tags['total_tracks'])
|
||||
else:
|
||||
audiofile[preset['tracknumber']] = [
|
||||
(meta_tags['track_number'], meta_tags['total_tracks'])
|
||||
]
|
||||
214
spotdl/spotdl.py
Executable file
214
spotdl/spotdl.py
Executable file
@@ -0,0 +1,214 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: UTF-8 -*-
|
||||
from spotdl import const
|
||||
from spotdl import handle
|
||||
from spotdl import metadata
|
||||
from spotdl import convert
|
||||
from spotdl import internals
|
||||
from spotdl import spotify_tools
|
||||
from spotdl import youtube_tools
|
||||
from slugify import slugify
|
||||
import spotipy
|
||||
import urllib.request
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import platform
|
||||
import pprint
|
||||
|
||||
|
||||
def check_exists(music_file, raw_song, meta_tags):
|
||||
""" Check if the input song already exists in the given folder. """
|
||||
log.debug('Cleaning any temp files and checking '
|
||||
'if "{}" already exists'.format(music_file))
|
||||
songs = os.listdir(const.args.folder)
|
||||
for song in songs:
|
||||
if song.endswith('.temp'):
|
||||
os.remove(os.path.join(const.args.folder, song))
|
||||
continue
|
||||
# check if a song with the same name is already present in the given folder
|
||||
if os.path.splitext(song)[0] == music_file:
|
||||
log.debug('Found an already existing song: "{}"'.format(song))
|
||||
if internals.is_spotify(raw_song):
|
||||
# check if the already downloaded song has correct metadata
|
||||
# if not, remove it and download again without prompt
|
||||
already_tagged = metadata.compare(os.path.join(const.args.folder, song),
|
||||
meta_tags)
|
||||
log.debug('Checking if it is already tagged correctly? {}',
|
||||
already_tagged)
|
||||
if not already_tagged:
|
||||
os.remove(os.path.join(const.args.folder, song))
|
||||
return False
|
||||
|
||||
log.warning('"{}" already exists'.format(song))
|
||||
if const.args.overwrite == 'prompt':
|
||||
log.info('"{}" has already been downloaded. '
|
||||
'Re-download? (y/N): '.format(song))
|
||||
prompt = input('> ')
|
||||
if prompt.lower() == 'y':
|
||||
os.remove(os.path.join(const.args.folder, song))
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
elif const.args.overwrite == 'force':
|
||||
os.remove(os.path.join(const.args.folder, song))
|
||||
log.info('Overwriting "{}"'.format(song))
|
||||
return False
|
||||
elif const.args.overwrite == 'skip':
|
||||
log.info('Skipping "{}"'.format(song))
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def download_list(text_file):
|
||||
""" Download all songs from the list. """
|
||||
with open(text_file, 'r') as listed:
|
||||
# read tracks into a list and remove any duplicates
|
||||
lines = listed.read().splitlines()
|
||||
lines = list(set(lines))
|
||||
# ignore blank lines in text_file (if any)
|
||||
try:
|
||||
lines.remove('')
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
log.info(u'Preparing to download {} songs'.format(len(lines)))
|
||||
downloaded_songs = []
|
||||
|
||||
for number, raw_song in enumerate(lines, 1):
|
||||
print('')
|
||||
try:
|
||||
download_single(raw_song, number=number)
|
||||
# token expires after 1 hour
|
||||
except spotipy.client.SpotifyException:
|
||||
# refresh token when it expires
|
||||
log.debug('Token expired, generating new one and authorizing')
|
||||
new_token = spotify_tools.generate_token()
|
||||
spotify_tools.spotify = spotipy.Spotify(auth=new_token)
|
||||
download_single(raw_song, number=number)
|
||||
# detect network problems
|
||||
except (urllib.request.URLError, TypeError, IOError):
|
||||
lines.append(raw_song)
|
||||
# remove the downloaded song from file
|
||||
internals.trim_song(text_file)
|
||||
# and append it at the end of file
|
||||
with open(text_file, 'a') as myfile:
|
||||
myfile.write(raw_song + '\n')
|
||||
log.warning('Failed to download song. Will retry after other songs\n')
|
||||
# wait 0.5 sec to avoid infinite looping
|
||||
time.sleep(0.5)
|
||||
continue
|
||||
|
||||
downloaded_songs.append(raw_song)
|
||||
log.debug('Removing downloaded song from text file')
|
||||
internals.trim_song(text_file)
|
||||
|
||||
return downloaded_songs
|
||||
|
||||
|
||||
def download_single(raw_song, number=None):
|
||||
""" Logic behind downloading a song. """
|
||||
if internals.is_youtube(raw_song):
|
||||
log.debug('Input song is a YouTube URL')
|
||||
content = youtube_tools.go_pafy(raw_song, meta_tags=None)
|
||||
raw_song = slugify(content.title).replace('-', ' ')
|
||||
meta_tags = spotify_tools.generate_metadata(raw_song)
|
||||
else:
|
||||
meta_tags = spotify_tools.generate_metadata(raw_song)
|
||||
content = youtube_tools.go_pafy(raw_song, meta_tags)
|
||||
|
||||
if content is None:
|
||||
log.debug('Found no matching video')
|
||||
return
|
||||
|
||||
if const.args.download_only_metadata and meta_tags is None:
|
||||
log.info('Found no metadata. Skipping the download')
|
||||
return
|
||||
|
||||
# "[number]. [artist] - [song]" if downloading from list
|
||||
# otherwise "[artist] - [song]"
|
||||
youtube_title = youtube_tools.get_youtube_title(content, number)
|
||||
log.info('{} ({})'.format(youtube_title, content.watchv_url))
|
||||
|
||||
# generate file name of the song to download
|
||||
songname = content.title
|
||||
|
||||
if meta_tags is not None:
|
||||
refined_songname = internals.format_string(const.args.file_format,
|
||||
meta_tags,
|
||||
slugification=True)
|
||||
log.debug('Refining songname from "{0}" to "{1}"'.format(songname, refined_songname))
|
||||
if not refined_songname == ' - ':
|
||||
songname = refined_songname
|
||||
else:
|
||||
log.warning('Could not find metadata')
|
||||
songname = internals.sanitize_title(songname)
|
||||
|
||||
if const.args.dry_run:
|
||||
return
|
||||
|
||||
if not check_exists(songname, raw_song, meta_tags):
|
||||
# deal with file formats containing slashes to non-existent directories
|
||||
songpath = os.path.join(const.args.folder, os.path.dirname(songname))
|
||||
os.makedirs(songpath, exist_ok=True)
|
||||
input_song = songname + const.args.input_ext
|
||||
output_song = songname + const.args.output_ext
|
||||
if youtube_tools.download_song(input_song, content):
|
||||
print('')
|
||||
try:
|
||||
convert.song(input_song, output_song, const.args.folder,
|
||||
avconv=const.args.avconv, trim_silence=const.args.trim_silence)
|
||||
except FileNotFoundError:
|
||||
encoder = 'avconv' if const.args.avconv else 'ffmpeg'
|
||||
log.warning('Could not find {0}, skipping conversion'.format(encoder))
|
||||
const.args.output_ext = const.args.input_ext
|
||||
output_song = songname + const.args.output_ext
|
||||
|
||||
if not const.args.input_ext == const.args.output_ext:
|
||||
os.remove(os.path.join(const.args.folder, input_song))
|
||||
if not const.args.no_metadata and meta_tags is not None:
|
||||
metadata.embed(os.path.join(const.args.folder, output_song), meta_tags)
|
||||
return True
|
||||
|
||||
|
||||
def main():
|
||||
const.args = handle.get_arguments()
|
||||
|
||||
if const.args.version:
|
||||
print('spotdl {version}'.format(version=__version__))
|
||||
sys.exit()
|
||||
|
||||
internals.filter_path(const.args.folder)
|
||||
youtube_tools.set_api_key()
|
||||
|
||||
const.log = const.logzero.setup_logger(formatter=const._formatter,
|
||||
level=const.args.log_level)
|
||||
global log
|
||||
log = const.log
|
||||
log.debug('Python version: {}'.format(sys.version))
|
||||
log.debug('Platform: {}'.format(platform.platform()))
|
||||
log.debug(pprint.pformat(const.args.__dict__))
|
||||
|
||||
try:
|
||||
if const.args.song:
|
||||
download_single(raw_song=const.args.song)
|
||||
elif const.args.list:
|
||||
download_list(text_file=const.args.list)
|
||||
elif const.args.playlist:
|
||||
spotify_tools.write_playlist(playlist_url=const.args.playlist)
|
||||
elif const.args.album:
|
||||
spotify_tools.write_album(album_url=const.args.album)
|
||||
elif const.args.username:
|
||||
spotify_tools.write_user_playlist(username=const.args.username)
|
||||
|
||||
# actually we don't necessarily need this, but yeah...
|
||||
# explicit is better than implicit!
|
||||
sys.exit(0)
|
||||
|
||||
except KeyboardInterrupt as e:
|
||||
log.exception(e)
|
||||
sys.exit(3)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
182
spotdl/spotify_tools.py
Normal file
182
spotdl/spotify_tools.py
Normal file
@@ -0,0 +1,182 @@
|
||||
import spotipy
|
||||
import spotipy.oauth2 as oauth2
|
||||
import lyricwikia
|
||||
|
||||
from spotdl import internals
|
||||
from spotdl.const import log
|
||||
|
||||
from slugify import slugify
|
||||
from titlecase import titlecase
|
||||
import pprint
|
||||
import sys
|
||||
|
||||
|
||||
def generate_token():
|
||||
""" Generate the token. Please respect these credentials :) """
|
||||
credentials = oauth2.SpotifyClientCredentials(
|
||||
client_id='4fe3fecfe5334023a1472516cc99d805',
|
||||
client_secret='0f02b7c483c04257984695007a4a8d5c')
|
||||
token = credentials.get_access_token()
|
||||
return token
|
||||
|
||||
# token is mandatory when using Spotify's API
|
||||
# https://developer.spotify.com/news-stories/2017/01/27/removing-unauthenticated-calls-to-the-web-api/
|
||||
token = generate_token()
|
||||
spotify = spotipy.Spotify(auth=token)
|
||||
|
||||
|
||||
def generate_metadata(raw_song):
|
||||
""" Fetch a song's metadata from Spotify. """
|
||||
if internals.is_spotify(raw_song):
|
||||
# fetch track information directly if it is spotify link
|
||||
log.debug('Fetching metadata for given track URL')
|
||||
meta_tags = spotify.track(raw_song)
|
||||
else:
|
||||
# otherwise search on spotify and fetch information from first result
|
||||
log.debug('Searching for "{}" on Spotify'.format(raw_song))
|
||||
try:
|
||||
meta_tags = spotify.search(raw_song, limit=1)['tracks']['items'][0]
|
||||
except IndexError:
|
||||
return None
|
||||
artist = spotify.artist(meta_tags['artists'][0]['id'])
|
||||
album = spotify.album(meta_tags['album']['id'])
|
||||
|
||||
try:
|
||||
meta_tags[u'genre'] = titlecase(artist['genres'][0])
|
||||
except IndexError:
|
||||
meta_tags[u'genre'] = None
|
||||
try:
|
||||
meta_tags[u'copyright'] = album['copyrights'][0]['text']
|
||||
except IndexError:
|
||||
meta_tags[u'copyright'] = None
|
||||
try:
|
||||
meta_tags[u'external_ids'][u'isrc']
|
||||
except KeyError:
|
||||
meta_tags[u'external_ids'][u'isrc'] = None
|
||||
|
||||
meta_tags[u'release_date'] = album['release_date']
|
||||
meta_tags[u'publisher'] = album['label']
|
||||
meta_tags[u'total_tracks'] = album['tracks']['total']
|
||||
|
||||
log.debug('Fetching lyrics')
|
||||
|
||||
try:
|
||||
meta_tags['lyrics'] = lyricwikia.get_lyrics(
|
||||
meta_tags['artists'][0]['name'],
|
||||
meta_tags['name'])
|
||||
except lyricwikia.LyricsNotFound:
|
||||
meta_tags['lyrics'] = None
|
||||
|
||||
# Some sugar
|
||||
meta_tags['year'], *_ = meta_tags['release_date'].split('-')
|
||||
meta_tags['duration'] = meta_tags['duration_ms'] / 1000.0
|
||||
# Remove unwanted parameters
|
||||
del meta_tags['duration_ms']
|
||||
del meta_tags['available_markets']
|
||||
del meta_tags['album']['available_markets']
|
||||
|
||||
log.debug(pprint.pformat(meta_tags))
|
||||
return meta_tags
|
||||
|
||||
|
||||
def write_user_playlist(username, text_file=None):
|
||||
links = get_playlists(username=username)
|
||||
playlist = internals.input_link(links)
|
||||
return write_playlist(playlist, text_file)
|
||||
|
||||
|
||||
def get_playlists(username):
|
||||
""" Fetch user playlists when using the -u option. """
|
||||
playlists = spotify.user_playlists(username)
|
||||
links = []
|
||||
check = 1
|
||||
|
||||
while True:
|
||||
for playlist in playlists['items']:
|
||||
# in rare cases, playlists may not be found, so playlists['next']
|
||||
# is None. Skip these. Also see Issue #91.
|
||||
if playlist['name'] is not None:
|
||||
log.info(u'{0:>5}. {1:<30} ({2} tracks)'.format(
|
||||
check, playlist['name'],
|
||||
playlist['tracks']['total']))
|
||||
playlist_url = playlist['external_urls']['spotify']
|
||||
log.debug(playlist_url)
|
||||
links.append(playlist_url)
|
||||
check += 1
|
||||
if playlists['next']:
|
||||
playlists = spotify.next(playlists)
|
||||
else:
|
||||
break
|
||||
|
||||
return links
|
||||
|
||||
|
||||
def fetch_playlist(playlist):
|
||||
splits = internals.get_splits(playlist)
|
||||
try:
|
||||
username = splits[-3]
|
||||
except IndexError:
|
||||
# Wrong format, in either case
|
||||
log.error('The provided playlist URL is not in a recognized format!')
|
||||
sys.exit(10)
|
||||
playlist_id = splits[-1]
|
||||
try:
|
||||
results = spotify.user_playlist(username, playlist_id,
|
||||
fields='tracks,next,name')
|
||||
except spotipy.client.SpotifyException:
|
||||
log.error('Unable to find playlist')
|
||||
log.info('Make sure the playlist is set to publicly visible and then try again')
|
||||
sys.exit(11)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def write_playlist(playlist_url, text_file=None):
|
||||
playlist = fetch_playlist(playlist_url)
|
||||
tracks = playlist['tracks']
|
||||
if not text_file:
|
||||
text_file = u'{0}.txt'.format(slugify(playlist['name'], ok='-_()[]{}'))
|
||||
return write_tracks(tracks, text_file)
|
||||
|
||||
|
||||
def fetch_album(album):
|
||||
splits = internals.get_splits(album)
|
||||
album_id = splits[-1]
|
||||
album = spotify.album(album_id)
|
||||
return album
|
||||
|
||||
|
||||
def write_album(album_url, text_file=None):
|
||||
album = fetch_album(album_url)
|
||||
tracks = spotify.album_tracks(album['id'])
|
||||
if not text_file:
|
||||
text_file = u'{0}.txt'.format(slugify(album['name'], ok='-_()[]{}'))
|
||||
return write_tracks(tracks, text_file)
|
||||
|
||||
|
||||
def write_tracks(tracks, text_file):
|
||||
log.info(u'Writing {0} tracks to {1}'.format(
|
||||
tracks['total'], text_file))
|
||||
track_urls = []
|
||||
with open(text_file, 'a') as file_out:
|
||||
while True:
|
||||
for item in tracks['items']:
|
||||
if 'track' in item:
|
||||
track = item['track']
|
||||
else:
|
||||
track = item
|
||||
try:
|
||||
track_url = track['external_urls']['spotify']
|
||||
log.debug(track_url)
|
||||
file_out.write(track_url + '\n')
|
||||
track_urls.append(track_url)
|
||||
except KeyError:
|
||||
log.warning(u'Skipping track {0} by {1} (local only?)'.format(
|
||||
track['name'], track['artists'][0]['name']))
|
||||
# 1 page = 50 results
|
||||
# check if there are more pages
|
||||
if tracks['next']:
|
||||
tracks = spotify.next(tracks)
|
||||
else:
|
||||
break
|
||||
return track_urls
|
||||
240
spotdl/youtube_tools.py
Normal file
240
spotdl/youtube_tools.py
Normal file
@@ -0,0 +1,240 @@
|
||||
from bs4 import BeautifulSoup
|
||||
import urllib
|
||||
import pafy
|
||||
|
||||
from spotdl import internals
|
||||
from spotdl import const
|
||||
|
||||
import os
|
||||
import pprint
|
||||
|
||||
log = const.log
|
||||
|
||||
# Fix download speed throttle on short duration tracks
|
||||
# Read more on mps-youtube/pafy#199
|
||||
pafy.g.opener.addheaders.append(('Range', 'bytes=0-'))
|
||||
|
||||
|
||||
def set_api_key():
|
||||
if const.args.youtube_api_key:
|
||||
key = const.args.youtube_api_key
|
||||
else:
|
||||
# Please respect this YouTube token :)
|
||||
key = 'AIzaSyC6cEeKlxtOPybk9sEe5ksFN5sB-7wzYp0'
|
||||
pafy.set_api_key(key)
|
||||
|
||||
|
||||
def go_pafy(raw_song, meta_tags=None):
|
||||
""" Parse track from YouTube. """
|
||||
if internals.is_youtube(raw_song):
|
||||
track_info = pafy.new(raw_song)
|
||||
else:
|
||||
track_url = generate_youtube_url(raw_song, meta_tags)
|
||||
|
||||
if track_url:
|
||||
track_info = pafy.new(track_url)
|
||||
else:
|
||||
track_info = None
|
||||
|
||||
return track_info
|
||||
|
||||
|
||||
def get_youtube_title(content, number=None):
|
||||
""" Get the YouTube video's title. """
|
||||
title = content.title
|
||||
if number:
|
||||
return '{0}. {1}'.format(number, title)
|
||||
else:
|
||||
return title
|
||||
|
||||
|
||||
def download_song(file_name, content):
|
||||
""" Download the audio file from YouTube. """
|
||||
_, extension = os.path.splitext(file_name)
|
||||
if extension in ('.webm', '.m4a'):
|
||||
link = content.getbestaudio(preftype=extension[1:])
|
||||
else:
|
||||
log.debug('No audio streams available for {} type'.format(extension))
|
||||
return False
|
||||
|
||||
if link:
|
||||
log.debug('Downloading from URL: ' + link.url)
|
||||
filepath = os.path.join(const.args.folder, file_name)
|
||||
log.debug('Saving to: ' + filepath)
|
||||
link.download(filepath=filepath)
|
||||
return True
|
||||
else:
|
||||
log.debug('No audio streams available')
|
||||
return False
|
||||
|
||||
|
||||
def generate_search_url(query):
|
||||
""" Generate YouTube search URL for the given song. """
|
||||
# urllib.request.quote() encodes string with special characters
|
||||
quoted_query = urllib.request.quote(query)
|
||||
# Special YouTube URL filter to search only for videos
|
||||
url = 'https://www.youtube.com/results?sp=EgIQAQ%253D%253D&q={0}'.format(quoted_query)
|
||||
return url
|
||||
|
||||
|
||||
def is_video(result):
|
||||
# ensure result is not a channel
|
||||
not_video = result.find('channel') is not None or \
|
||||
'yt-lockup-channel' in result.parent.attrs['class'] or \
|
||||
'yt-lockup-channel' in result.attrs['class']
|
||||
|
||||
# ensure result is not a mix/playlist
|
||||
not_video = not_video or \
|
||||
'yt-lockup-playlist' in result.parent.attrs['class']
|
||||
|
||||
# ensure video result is not an advertisement
|
||||
not_video = not_video or \
|
||||
result.find('googleads') is not None
|
||||
|
||||
video = not not_video
|
||||
return video
|
||||
|
||||
|
||||
def generate_youtube_url(raw_song, meta_tags):
|
||||
url_fetch = GenerateYouTubeURL(raw_song, meta_tags)
|
||||
if const.args.youtube_api_key:
|
||||
url = url_fetch.api()
|
||||
else:
|
||||
url = url_fetch.scrape()
|
||||
return url
|
||||
|
||||
|
||||
class GenerateYouTubeURL:
|
||||
def __init__(self, raw_song, meta_tags):
|
||||
self.raw_song = raw_song
|
||||
self.meta_tags = meta_tags
|
||||
|
||||
if meta_tags is None:
|
||||
self.search_query = raw_song
|
||||
else:
|
||||
self.search_query = internals.format_string(const.args.search_format,
|
||||
meta_tags, force_spaces=True)
|
||||
|
||||
def _best_match(self, videos):
|
||||
""" Select the best matching video from a list of videos. """
|
||||
if const.args.manual:
|
||||
log.info(self.raw_song)
|
||||
log.info('0. Skip downloading this song.\n')
|
||||
# fetch all video links on first page on YouTube
|
||||
for i, v in enumerate(videos):
|
||||
log.info(u'{0}. {1} {2} {3}'.format(i+1, v['title'], v['videotime'],
|
||||
"http://youtube.com/watch?v="+v['link']))
|
||||
# let user select the song to download
|
||||
result = internals.input_link(videos)
|
||||
if result is None:
|
||||
return None
|
||||
else:
|
||||
if not self.meta_tags:
|
||||
# if the metadata could not be acquired, take the first result
|
||||
# from Youtube because the proper song length is unknown
|
||||
result = videos[0]
|
||||
log.debug('Since no metadata found on Spotify, going with the first result')
|
||||
else:
|
||||
# filter out videos that do not have a similar length to the Spotify song
|
||||
duration_tolerance = 10
|
||||
max_duration_tolerance = 20
|
||||
possible_videos_by_duration = []
|
||||
|
||||
# start with a reasonable duration_tolerance, and increment duration_tolerance
|
||||
# until one of the Youtube results falls within the correct duration or
|
||||
# the duration_tolerance has reached the max_duration_tolerance
|
||||
while len(possible_videos_by_duration) == 0:
|
||||
possible_videos_by_duration = list(filter(lambda x: abs(x['seconds'] - self.meta_tags['duration']) <= duration_tolerance, videos))
|
||||
duration_tolerance += 1
|
||||
if duration_tolerance > max_duration_tolerance:
|
||||
log.error("{0} by {1} was not found.\n".format(self.meta_tags['name'], self.meta_tags['artists'][0]['name']))
|
||||
return None
|
||||
|
||||
result = possible_videos_by_duration[0]
|
||||
|
||||
if result:
|
||||
url = "http://youtube.com/watch?v={0}".format(result['link'])
|
||||
else:
|
||||
url = None
|
||||
|
||||
return url
|
||||
|
||||
def scrape(self, bestmatch=True, tries_remaining=5):
|
||||
""" Search and scrape YouTube to return a list of matching videos. """
|
||||
|
||||
# prevents an infinite loop but allows for a few retries
|
||||
if tries_remaining == 0:
|
||||
log.debug('No tries left. I quit.')
|
||||
return
|
||||
|
||||
search_url = generate_search_url(self.search_query)
|
||||
log.debug('Opening URL: {0}'.format(search_url))
|
||||
|
||||
item = urllib.request.urlopen(search_url).read()
|
||||
items_parse = BeautifulSoup(item, "html.parser")
|
||||
|
||||
videos = []
|
||||
for x in items_parse.find_all('div', {'class': 'yt-lockup-dismissable yt-uix-tile'}):
|
||||
|
||||
if not is_video(x):
|
||||
continue
|
||||
|
||||
y = x.find('div', class_='yt-lockup-content')
|
||||
link = y.find('a')['href'][-11:]
|
||||
title = y.find('a')['title']
|
||||
|
||||
try:
|
||||
videotime = x.find('span', class_="video-time").get_text()
|
||||
except AttributeError:
|
||||
log.debug('Could not find video duration on YouTube, retrying..')
|
||||
return self.scrape(bestmatch=bestmatch, tries_remaining=tries_remaining-1)
|
||||
|
||||
youtubedetails = {'link': link, 'title': title, 'videotime': videotime,
|
||||
'seconds': internals.get_sec(videotime)}
|
||||
videos.append(youtubedetails)
|
||||
|
||||
if bestmatch:
|
||||
return self._best_match(videos)
|
||||
|
||||
return videos
|
||||
|
||||
|
||||
def api(self, bestmatch=True):
|
||||
""" Use YouTube API to search and return a list of matching videos. """
|
||||
|
||||
query = { 'part' : 'snippet',
|
||||
'maxResults' : 50,
|
||||
'type' : 'video' }
|
||||
|
||||
if const.args.music_videos_only:
|
||||
query['videoCategoryId'] = '10'
|
||||
|
||||
if not self.meta_tags:
|
||||
song = self.raw_song
|
||||
query['q'] = song
|
||||
else:
|
||||
query['q'] = self.search_query
|
||||
log.debug('query: {0}'.format(query))
|
||||
|
||||
data = pafy.call_gdata('search', query)
|
||||
data['items'] = list(filter(lambda x: x['id'].get('videoId') is not None,
|
||||
data['items']))
|
||||
query_results = {'part': 'contentDetails,snippet,statistics',
|
||||
'maxResults': 50,
|
||||
'id': ','.join(i['id']['videoId'] for i in data['items'])}
|
||||
log.debug('query_results: {0}'.format(query_results))
|
||||
|
||||
vdata = pafy.call_gdata('videos', query_results)
|
||||
|
||||
videos = []
|
||||
for x in vdata['items']:
|
||||
duration_s = pafy.playlist.parseISO8591(x['contentDetails']['duration'])
|
||||
youtubedetails = {'link': x['id'], 'title': x['snippet']['title'],
|
||||
'videotime':internals.videotime_from_seconds(duration_s),
|
||||
'seconds': duration_s}
|
||||
videos.append(youtubedetails)
|
||||
|
||||
if bestmatch:
|
||||
return self._best_match(videos)
|
||||
|
||||
return videos
|
||||
Reference in New Issue
Block a user