mirror of
https://github.com/KevinMidboe/spotify-downloader.git
synced 2025-10-29 09:50:16 +00:00
Refactor embedding metadata to media
This commit is contained in:
@@ -4,6 +4,7 @@ import subprocess
|
||||
import urllib.request
|
||||
|
||||
from spotdl.encode.encoders import EncoderFFmpeg
|
||||
from spotdl.metadata.embedders import EmbedderDefault
|
||||
|
||||
CHUNK_SIZE= 16 * 1024
|
||||
HEADERS = [('Range', 'bytes=0-'),]
|
||||
@@ -23,13 +24,13 @@ class Track:
|
||||
def _calculate_total_chunks(self, filesize):
|
||||
return (filesize // self._chunksize) + 1
|
||||
|
||||
def download_while_re_encoding(self, path, encoder=EncoderFFmpeg(), show_progress=True):
|
||||
def download_while_re_encoding(self, target_path, encoder=EncoderFFmpeg(), show_progress=True):
|
||||
stream = self.metadata["streams"].getbest()
|
||||
total_chunks = self._calculate_total_chunks(stream["filesize"])
|
||||
response = self._make_request(stream["download_url"])
|
||||
process = encoder.re_encode_from_stdin(
|
||||
stream["encoding"],
|
||||
path
|
||||
target_path
|
||||
)
|
||||
for _ in tqdm.trange(total_chunks):
|
||||
chunk = response.read(self._chunksize)
|
||||
@@ -38,11 +39,11 @@ class Track:
|
||||
process.stdin.close()
|
||||
process.wait()
|
||||
|
||||
def download(self, path, show_progress=True):
|
||||
def download(self, target_path, show_progress=True):
|
||||
stream = self.metadata["streams"].getbest()
|
||||
total_chunks = self._calculate_total_chunks(stream["filesize"])
|
||||
response = self._make_request(stream["download_url"])
|
||||
with open(path, "wb") as fout:
|
||||
with open(target_path, "wb") as fout:
|
||||
for _ in tqdm.trange(total_chunks):
|
||||
chunk = response.read(self._chunksize)
|
||||
fout.write(chunk)
|
||||
@@ -62,5 +63,6 @@ class Track:
|
||||
process.stdin.close()
|
||||
process.wait()
|
||||
|
||||
def apply_metadata(path):
|
||||
pass
|
||||
def apply_metadata(self, input_path, embedder=EmbedderDefault()):
|
||||
embedder.apply_metadata(input_path, self.metadata)
|
||||
|
||||
|
||||
@@ -48,7 +48,6 @@ class EncoderBase(ABC):
|
||||
self._loglevel = loglevel
|
||||
self._additional_arguments = additional_arguments
|
||||
|
||||
@abstractmethod
|
||||
def set_argument(self, argument):
|
||||
"""
|
||||
This method must be used to set any custom functionality
|
||||
@@ -56,13 +55,12 @@ class EncoderBase(ABC):
|
||||
"""
|
||||
self._additional_arguments += argument.split()
|
||||
|
||||
@abstractmethod
|
||||
def get_encoding(self, filename):
|
||||
def get_encoding(self, path):
|
||||
"""
|
||||
This method must determine the encoding for a local
|
||||
audio file. Such as "mp3", "wav", "m4a", etc.
|
||||
"""
|
||||
_, extension = os.path.splitext(filename)
|
||||
_, extension = os.path.splitext(path)
|
||||
# Ignore the initial dot from file extension
|
||||
return extension[1:]
|
||||
|
||||
@@ -75,7 +73,7 @@ class EncoderBase(ABC):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def _generate_encode_command(self, input_file, output_file):
|
||||
def _generate_encode_command(self, input_path, target_path):
|
||||
"""
|
||||
This method must the complete command for that would be
|
||||
used to invoke the encoder and perform the encoding.
|
||||
@@ -92,9 +90,17 @@ class EncoderBase(ABC):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def re_encode(self, input_file, output_file):
|
||||
def re_encode(self, input_path, target_path):
|
||||
"""
|
||||
This method must invoke FFmpeg to encode a given input
|
||||
This method must invoke the encoder to encode a given input
|
||||
file to a specified output file.
|
||||
"""
|
||||
pass
|
||||
|
||||
def re_encode_from_stdin(self, input_encoding, target_path):
|
||||
"""
|
||||
This method must invoke the encoder to encode stdin to a
|
||||
specified output file.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
@@ -31,14 +31,11 @@ class EncoderFFmpeg(EncoderBase):
|
||||
raise FFmpegNotFoundError(e.args[0])
|
||||
self._rules = RULES
|
||||
|
||||
def set_argument(self, argument):
|
||||
super().set_argument(argument)
|
||||
|
||||
def set_trim_silence(self):
|
||||
self.set_argument("-af silenceremove=start_periods=1")
|
||||
|
||||
def get_encoding(self, filename):
|
||||
return super().get_encoding(filename)
|
||||
def get_encoding(self, path):
|
||||
return super().get_encoding(path)
|
||||
|
||||
def _generate_encoding_arguments(self, input_encoding, output_encoding):
|
||||
initial_arguments = self._rules.get(input_encoding)
|
||||
@@ -46,26 +43,24 @@ class EncoderFFmpeg(EncoderBase):
|
||||
raise TypeError(
|
||||
'The input format ("{}") is not supported.'.format(
|
||||
input_encoding,
|
||||
)
|
||||
)
|
||||
))
|
||||
arguments = initial_arguments.get(output_encoding)
|
||||
if arguments is None:
|
||||
raise TypeError(
|
||||
'The output format ("{}") is not supported.'.format(
|
||||
output_encoding,
|
||||
)
|
||||
)
|
||||
))
|
||||
return arguments
|
||||
|
||||
def set_debuglog(self):
|
||||
self._loglevel = "-loglevel debug"
|
||||
|
||||
def _generate_encode_command(self, input_file, output_file,
|
||||
def _generate_encode_command(self, input_path, target_path,
|
||||
input_encoding=None, output_encoding=None):
|
||||
if input_encoding is None:
|
||||
input_encoding = self.get_encoding(input_file)
|
||||
input_encoding = self.get_encoding(input_path)
|
||||
if output_encoding is None:
|
||||
output_encoding = self.get_encoding(output_file)
|
||||
output_encoding = self.get_encoding(target_path)
|
||||
arguments = self._generate_encoding_arguments(
|
||||
input_encoding,
|
||||
output_encoding
|
||||
@@ -73,30 +68,30 @@ class EncoderFFmpeg(EncoderBase):
|
||||
command = [self.encoder_path] \
|
||||
+ ["-y", "-nostdin"] \
|
||||
+ self._loglevel.split() \
|
||||
+ ["-i", input_file] \
|
||||
+ ["-i", input_path] \
|
||||
+ arguments.split() \
|
||||
+ self._additional_arguments \
|
||||
+ [output_file]
|
||||
+ [target_path]
|
||||
|
||||
return command
|
||||
|
||||
def re_encode(self, input_file, output_file, delete_original=False):
|
||||
def re_encode(self, input_path, target_path, delete_original=False):
|
||||
encode_command = self._generate_encode_command(
|
||||
input_file,
|
||||
output_file
|
||||
input_path,
|
||||
target_path
|
||||
)
|
||||
process = subprocess.Popen(encode_command)
|
||||
process.wait()
|
||||
encode_successful = process.returncode == 0
|
||||
if encode_successful and delete_original:
|
||||
os.remove(input_file)
|
||||
os.remove(input_path)
|
||||
return process
|
||||
|
||||
def re_encode_from_stdin(self, input_encoding, output_file):
|
||||
output_encoding = self.get_encoding(output_file)
|
||||
def re_encode_from_stdin(self, input_encoding, target_path):
|
||||
output_encoding = self.get_encoding(target_path)
|
||||
encode_command = self._generate_encode_command(
|
||||
"-",
|
||||
output_file,
|
||||
target_path,
|
||||
input_encoding=input_encoding,
|
||||
)
|
||||
process = subprocess.Popen(encode_command, stdin=subprocess.PIPE)
|
||||
|
||||
@@ -15,47 +15,47 @@ class TestEncoderFFmpeg:
|
||||
|
||||
|
||||
class TestEncodingDefaults:
|
||||
def m4a_to_mp3_encoder(input_file, output_file):
|
||||
def m4a_to_mp3_encoder(input_path, target_path):
|
||||
command = [
|
||||
'ffmpeg', '-y', '-nostdin', '-hide_banner', '-nostats', '-v', 'panic',
|
||||
'-i', input_file,
|
||||
'-i', input_path,
|
||||
'-codec:v', 'copy',
|
||||
'-codec:a', 'libmp3lame',
|
||||
'-ar', '48000',
|
||||
'-b:a', '192k',
|
||||
'-vn', output_file
|
||||
'-vn', target_path
|
||||
]
|
||||
return command
|
||||
|
||||
def m4a_to_webm_encoder(input_file, output_file):
|
||||
def m4a_to_webm_encoder(input_path, target_path):
|
||||
command = [
|
||||
'ffmpeg', '-y', '-nostdin', '-hide_banner', '-nostats', '-v', 'panic',
|
||||
'-i', input_file,
|
||||
'-i', input_path,
|
||||
'-codec:a', 'libopus',
|
||||
'-vbr', 'on',
|
||||
'-b:a', '192k',
|
||||
'-vn', output_file
|
||||
'-vn', target_path
|
||||
]
|
||||
return command
|
||||
|
||||
def m4a_to_m4a_encoder(input_file, output_file):
|
||||
def m4a_to_m4a_encoder(input_path, target_path):
|
||||
command = [
|
||||
'ffmpeg', '-y', '-nostdin', '-hide_banner', '-nostats', '-v', 'panic',
|
||||
'-i', input_file,
|
||||
'-i', input_path,
|
||||
'-acodec', 'copy',
|
||||
'-b:a', '192k',
|
||||
'-vn', output_file
|
||||
'-vn', target_path
|
||||
]
|
||||
return command
|
||||
|
||||
def m4a_to_flac_encoder(input_file, output_file):
|
||||
def m4a_to_flac_encoder(input_path, target_path):
|
||||
command = [
|
||||
'ffmpeg', '-y', '-nostdin', '-hide_banner', '-nostats', '-v', 'panic',
|
||||
'-i', input_file,
|
||||
'-i', input_path,
|
||||
'-codec:a', 'flac',
|
||||
'-ar', '48000',
|
||||
'-b:a', '192k',
|
||||
'-vn', output_file
|
||||
'-vn', target_path
|
||||
]
|
||||
return command
|
||||
|
||||
@@ -71,47 +71,47 @@ class TestEncodingDefaults:
|
||||
|
||||
|
||||
class TestEncodingInDebugMode:
|
||||
def m4a_to_mp3_encoder_with_debug(input_file, output_file):
|
||||
def m4a_to_mp3_encoder_with_debug(input_path, target_path):
|
||||
command = [
|
||||
'ffmpeg', '-y', '-nostdin', '-loglevel', 'debug',
|
||||
'-i', input_file,
|
||||
'-i', input_path,
|
||||
'-codec:v', 'copy',
|
||||
'-codec:a', 'libmp3lame',
|
||||
'-ar', '48000',
|
||||
'-b:a', '192k',
|
||||
'-vn', output_file
|
||||
'-vn', target_path
|
||||
]
|
||||
return command
|
||||
|
||||
def m4a_to_webm_encoder_with_debug(input_file, output_file):
|
||||
def m4a_to_webm_encoder_with_debug(input_path, target_path):
|
||||
command = [
|
||||
'ffmpeg', '-y', '-nostdin', '-loglevel', 'debug',
|
||||
'-i', input_file,
|
||||
'-i', input_path,
|
||||
'-codec:a', 'libopus',
|
||||
'-vbr', 'on',
|
||||
'-b:a', '192k',
|
||||
'-vn', output_file
|
||||
'-vn', target_path
|
||||
]
|
||||
return command
|
||||
|
||||
def m4a_to_m4a_encoder_with_debug(input_file, output_file):
|
||||
def m4a_to_m4a_encoder_with_debug(input_path, target_path):
|
||||
command = [
|
||||
'ffmpeg', '-y', '-nostdin', '-loglevel', 'debug',
|
||||
'-i', input_file,
|
||||
'-i', input_path,
|
||||
'-acodec', 'copy',
|
||||
'-b:a', '192k',
|
||||
'-vn', output_file
|
||||
'-vn', target_path
|
||||
]
|
||||
return command
|
||||
|
||||
def m4a_to_flac_encoder_with_debug(input_file, output_file):
|
||||
def m4a_to_flac_encoder_with_debug(input_path, target_path):
|
||||
command = [
|
||||
'ffmpeg', '-y', '-nostdin', '-loglevel', 'debug',
|
||||
'-i', input_file,
|
||||
'-i', input_path,
|
||||
'-codec:a', 'flac',
|
||||
'-ar', '48000',
|
||||
'-b:a', '192k',
|
||||
'-vn', output_file
|
||||
'-vn', target_path
|
||||
]
|
||||
return command
|
||||
|
||||
@@ -128,55 +128,55 @@ class TestEncodingInDebugMode:
|
||||
|
||||
|
||||
class TestEncodingAndTrimSilence:
|
||||
def m4a_to_mp3_encoder_and_trim_silence(input_file, output_file):
|
||||
def m4a_to_mp3_encoder_and_trim_silence(input_path, target_path):
|
||||
command = [
|
||||
'ffmpeg', '-y', '-nostdin', '-hide_banner', '-nostats', '-v', 'panic',
|
||||
'-i', input_file,
|
||||
'-i', input_path,
|
||||
'-codec:v', 'copy',
|
||||
'-codec:a', 'libmp3lame',
|
||||
'-ar', '48000',
|
||||
'-b:a', '192k',
|
||||
'-vn',
|
||||
'-af', 'silenceremove=start_periods=1',
|
||||
output_file
|
||||
target_path
|
||||
]
|
||||
return command
|
||||
|
||||
def m4a_to_webm_encoder_and_trim_silence(input_file, output_file):
|
||||
def m4a_to_webm_encoder_and_trim_silence(input_path, target_path):
|
||||
command = [
|
||||
'ffmpeg', '-y', '-nostdin', '-hide_banner', '-nostats', '-v', 'panic',
|
||||
'-i', input_file,
|
||||
'-i', input_path,
|
||||
'-codec:a', 'libopus',
|
||||
'-vbr', 'on',
|
||||
'-b:a', '192k',
|
||||
'-vn',
|
||||
'-af', 'silenceremove=start_periods=1',
|
||||
output_file
|
||||
target_path
|
||||
]
|
||||
return command
|
||||
|
||||
def m4a_to_m4a_encoder_and_trim_silence(input_file, output_file):
|
||||
def m4a_to_m4a_encoder_and_trim_silence(input_path, target_path):
|
||||
command = [
|
||||
'ffmpeg', '-y', '-nostdin', '-hide_banner', '-nostats', '-v', 'panic',
|
||||
'-i', input_file,
|
||||
'-i', input_path,
|
||||
'-acodec', 'copy',
|
||||
'-b:a', '192k',
|
||||
'-vn',
|
||||
'-af', 'silenceremove=start_periods=1',
|
||||
output_file
|
||||
target_path
|
||||
]
|
||||
return command
|
||||
|
||||
def m4a_to_flac_encoder_and_trim_silence(input_file, output_file):
|
||||
def m4a_to_flac_encoder_and_trim_silence(input_path, target_path):
|
||||
command = [
|
||||
'ffmpeg', '-y', '-nostdin', '-hide_banner', '-nostats', '-v', 'panic',
|
||||
'-i', input_file,
|
||||
'-i', input_path,
|
||||
'-codec:a', 'flac',
|
||||
'-ar', '48000',
|
||||
'-b:a', '192k',
|
||||
'-vn',
|
||||
'-af', 'silenceremove=start_periods=1',
|
||||
output_file
|
||||
target_path
|
||||
]
|
||||
return command
|
||||
|
||||
|
||||
@@ -1,182 +0,0 @@
|
||||
from mutagen.easyid3 import EasyID3
|
||||
from mutagen.id3 import ID3, TORY, TYER, TPUB, APIC, USLT, COMM
|
||||
from mutagen.mp4 import MP4, MP4Cover
|
||||
from mutagen.flac import Picture, FLAC
|
||||
|
||||
import urllib.request
|
||||
from logzero import logger as log
|
||||
|
||||
from spotdl.const import TAG_PRESET, M4A_TAG_PRESET
|
||||
|
||||
|
||||
def compare(music_file, metadata):
|
||||
"""Check if the input music file title matches the expected title."""
|
||||
already_tagged = False
|
||||
try:
|
||||
if music_file.endswith(".mp3"):
|
||||
audiofile = EasyID3(music_file)
|
||||
already_tagged = audiofile["title"][0] == metadata["name"]
|
||||
elif music_file.endswith(".m4a"):
|
||||
audiofile = MP4(music_file)
|
||||
already_tagged = audiofile["\xa9nam"][0] == metadata["name"]
|
||||
except (KeyError, TypeError):
|
||||
pass
|
||||
|
||||
return already_tagged
|
||||
|
||||
|
||||
def embed(music_file, meta_tags):
|
||||
""" Embed metadata. """
|
||||
embed = EmbedMetadata(music_file, meta_tags)
|
||||
if music_file.endswith(".m4a"):
|
||||
log.info("Applying metadata")
|
||||
return embed.as_m4a()
|
||||
elif music_file.endswith(".mp3"):
|
||||
log.info("Applying metadata")
|
||||
return embed.as_mp3()
|
||||
elif music_file.endswith(".flac"):
|
||||
log.info("Applying metadata")
|
||||
return embed.as_flac()
|
||||
else:
|
||||
log.warning("Cannot embed metadata into given output extension")
|
||||
return False
|
||||
|
||||
|
||||
class EmbedMetadata:
|
||||
def __init__(self, music_file, meta_tags):
|
||||
self.music_file = music_file
|
||||
self.meta_tags = meta_tags
|
||||
self.spotify_metadata = meta_tags["spotify_metadata"]
|
||||
self.provider = "spotify" if meta_tags["spotify_metadata"] else "youtube"
|
||||
|
||||
def as_mp3(self):
|
||||
""" Embed metadata to MP3 files. """
|
||||
music_file = self.music_file
|
||||
meta_tags = self.meta_tags
|
||||
# EasyID3 is fun to use ;)
|
||||
# For supported easyid3 tags:
|
||||
# https://github.com/quodlibet/mutagen/blob/master/mutagen/easyid3.py
|
||||
# Check out somewhere at end of above linked file
|
||||
audiofile = EasyID3(music_file)
|
||||
self._embed_basic_metadata(audiofile, preset=TAG_PRESET)
|
||||
audiofile["media"] = meta_tags["type"]
|
||||
audiofile["author"] = meta_tags["artists"][0]["name"]
|
||||
audiofile["lyricist"] = meta_tags["artists"][0]["name"]
|
||||
audiofile["arranger"] = meta_tags["artists"][0]["name"]
|
||||
audiofile["performer"] = meta_tags["artists"][0]["name"]
|
||||
audiofile["website"] = meta_tags["external_urls"][self.provider]
|
||||
audiofile["length"] = str(meta_tags["duration"])
|
||||
if meta_tags["publisher"]:
|
||||
audiofile["encodedby"] = meta_tags["publisher"]
|
||||
if meta_tags["external_ids"]["isrc"]:
|
||||
audiofile["isrc"] = meta_tags["external_ids"]["isrc"]
|
||||
audiofile.save(v2_version=3)
|
||||
|
||||
# For supported id3 tags:
|
||||
# https://github.com/quodlibet/mutagen/blob/master/mutagen/id3/_frames.py
|
||||
# Each class represents an id3 tag
|
||||
audiofile = ID3(music_file)
|
||||
if meta_tags["year"]:
|
||||
audiofile["TORY"] = TORY(encoding=3, text=meta_tags["year"])
|
||||
audiofile["TYER"] = TYER(encoding=3, text=meta_tags["year"])
|
||||
if meta_tags["publisher"]:
|
||||
audiofile["TPUB"] = TPUB(encoding=3, text=meta_tags["publisher"])
|
||||
audiofile["COMM"] = COMM(
|
||||
encoding=3, text=meta_tags["external_urls"][self.provider]
|
||||
)
|
||||
if meta_tags["lyrics"]:
|
||||
audiofile["USLT"] = USLT(
|
||||
encoding=3, desc=u"Lyrics", text=meta_tags["lyrics"]
|
||||
)
|
||||
try:
|
||||
albumart = urllib.request.urlopen(meta_tags["album"]["images"][0]["url"])
|
||||
audiofile["APIC"] = APIC(
|
||||
encoding=3,
|
||||
mime="image/jpeg",
|
||||
type=3,
|
||||
desc=u"Cover",
|
||||
data=albumart.read(),
|
||||
)
|
||||
albumart.close()
|
||||
except IndexError:
|
||||
pass
|
||||
|
||||
audiofile.save(v2_version=3)
|
||||
return True
|
||||
|
||||
def as_m4a(self):
|
||||
""" Embed metadata to M4A files. """
|
||||
music_file = self.music_file
|
||||
meta_tags = self.meta_tags
|
||||
audiofile = MP4(music_file)
|
||||
self._embed_basic_metadata(audiofile, preset=M4A_TAG_PRESET)
|
||||
if meta_tags["year"]:
|
||||
audiofile[M4A_TAG_PRESET["year"]] = meta_tags["year"]
|
||||
audiofile[M4A_TAG_PRESET["comment"]] = meta_tags["external_urls"][self.provider]
|
||||
if meta_tags["lyrics"]:
|
||||
audiofile[M4A_TAG_PRESET["lyrics"]] = meta_tags["lyrics"]
|
||||
try:
|
||||
albumart = urllib.request.urlopen(meta_tags["album"]["images"][0]["url"])
|
||||
audiofile[M4A_TAG_PRESET["albumart"]] = [
|
||||
MP4Cover(albumart.read(), imageformat=MP4Cover.FORMAT_JPEG)
|
||||
]
|
||||
albumart.close()
|
||||
except IndexError:
|
||||
pass
|
||||
|
||||
audiofile.save()
|
||||
return True
|
||||
|
||||
def as_flac(self):
|
||||
music_file = self.music_file
|
||||
meta_tags = self.meta_tags
|
||||
audiofile = FLAC(music_file)
|
||||
self._embed_basic_metadata(audiofile)
|
||||
if meta_tags["year"]:
|
||||
audiofile["year"] = meta_tags["year"]
|
||||
audiofile["comment"] = meta_tags["external_urls"][self.provider]
|
||||
if meta_tags["lyrics"]:
|
||||
audiofile["lyrics"] = meta_tags["lyrics"]
|
||||
|
||||
image = Picture()
|
||||
image.type = 3
|
||||
image.desc = "Cover"
|
||||
image.mime = "image/jpeg"
|
||||
albumart = urllib.request.urlopen(meta_tags["album"]["images"][0]["url"])
|
||||
image.data = albumart.read()
|
||||
albumart.close()
|
||||
audiofile.add_picture(image)
|
||||
|
||||
audiofile.save()
|
||||
return True
|
||||
|
||||
def _embed_basic_metadata(self, audiofile, preset=TAG_PRESET):
|
||||
meta_tags = self.meta_tags
|
||||
audiofile[preset["artist"]] = meta_tags["artists"][0]["name"]
|
||||
if meta_tags["album"]["artists"][0]["name"]:
|
||||
audiofile[preset["albumartist"]] = meta_tags["album"]["artists"][0]["name"]
|
||||
if meta_tags["album"]["name"]:
|
||||
audiofile[preset["album"]] = meta_tags["album"]["name"]
|
||||
audiofile[preset["title"]] = meta_tags["name"]
|
||||
if meta_tags["release_date"]:
|
||||
audiofile[preset["date"]] = meta_tags["release_date"]
|
||||
audiofile[preset["originaldate"]] = meta_tags["release_date"]
|
||||
if meta_tags["genre"]:
|
||||
audiofile[preset["genre"]] = meta_tags["genre"]
|
||||
if meta_tags["copyright"]:
|
||||
audiofile[preset["copyright"]] = meta_tags["copyright"]
|
||||
if self.music_file.endswith(".flac"):
|
||||
audiofile[preset["discnumber"]] = str(meta_tags["disc_number"])
|
||||
else:
|
||||
audiofile[preset["discnumber"]] = [(meta_tags["disc_number"], 0)]
|
||||
if self.music_file.endswith(".flac"):
|
||||
audiofile[preset["tracknumber"]] = str(meta_tags["track_number"])
|
||||
else:
|
||||
if preset["tracknumber"] == TAG_PRESET["tracknumber"]:
|
||||
audiofile[preset["tracknumber"]] = "{}/{}".format(
|
||||
meta_tags["track_number"], meta_tags["total_tracks"]
|
||||
)
|
||||
else:
|
||||
audiofile[preset["tracknumber"]] = [
|
||||
(meta_tags["track_number"], meta_tags["total_tracks"])
|
||||
]
|
||||
@@ -1,2 +1,5 @@
|
||||
from spotdl.metadata.metadata_base import MetadataBase
|
||||
from spotdl.metadata.metadata_base import StreamsBase
|
||||
from spotdl.metadata.provider_base import ProviderBase
|
||||
from spotdl.metadata.provider_base import StreamsBase
|
||||
|
||||
from spotdl.metadata.embedder_base import EmbedderBase
|
||||
|
||||
|
||||
88
spotdl/metadata/embedder_base.py
Normal file
88
spotdl/metadata/embedder_base.py
Normal file
@@ -0,0 +1,88 @@
|
||||
import os
|
||||
|
||||
from abc import ABC
|
||||
from abc import abstractmethod
|
||||
|
||||
class EmbedderBase(ABC):
|
||||
"""
|
||||
The class must define the supported media file encoding
|
||||
formats here using a static variable - such as:
|
||||
|
||||
>>> supported_formats = ("mp3", "opus", "flac")
|
||||
"""
|
||||
supported_formats = ()
|
||||
|
||||
@abstractmethod
|
||||
def __init__(self):
|
||||
"""
|
||||
For every supported format, there must be a corresponding
|
||||
method that applies metadata on this format.
|
||||
|
||||
Such as if mp3 is supported, there must exist a method named
|
||||
`as_mp3` on this class that applies metadata on mp3 files.
|
||||
"""
|
||||
# self.targets = { fmt: eval(str("self.as_" + fmt))
|
||||
# for fmt in self.supported_formats }
|
||||
#
|
||||
# TODO: The above code seems to fail for some reason
|
||||
# I do not know.
|
||||
self.targets = {}
|
||||
for fmt in self.supported_formats:
|
||||
# FIXME: Calling `eval` is dangerous here!
|
||||
self.targets[fmt] = eval("self.as_" + fmt)
|
||||
|
||||
def get_encoding(self, path):
|
||||
"""
|
||||
This method must determine the encoding for a local
|
||||
audio file. Such as "mp3", "wav", "m4a", etc.
|
||||
"""
|
||||
_, extension = os.path.splitext(path)
|
||||
# Ignore the initial dot from file extension
|
||||
return extension[1:]
|
||||
|
||||
def apply_metadata(self, path, metadata, encoding=None):
|
||||
"""
|
||||
This method must automatically detect the media encoding
|
||||
format from file path and embed the corresponding metadata
|
||||
on the given file by calling an appropriate submethod.
|
||||
"""
|
||||
if encoding is None:
|
||||
encoding = self.get_encoding(path)
|
||||
if encoding not in self.supported_formats:
|
||||
raise TypeError(
|
||||
'The input format ("{}") is not supported.'.format(
|
||||
encoding,
|
||||
))
|
||||
embed_on_given_format = self.targets[encoding]
|
||||
embed_on_given_format(path, metadata)
|
||||
|
||||
def as_mp3(self, path, metadata):
|
||||
"""
|
||||
Method for mp3 support. This method might be defined in
|
||||
a subclass.
|
||||
|
||||
Other methods for additional supported formats must also
|
||||
be declared here.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def as_opus(self, path, metadata):
|
||||
"""
|
||||
Method for opus support. This method might be defined in
|
||||
a subclass.
|
||||
|
||||
Other methods for additional supported formats must also
|
||||
be declared here.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def as_flac(self, path, metadata):
|
||||
"""
|
||||
Method for flac support. This method might be defined in
|
||||
a subclass.
|
||||
|
||||
Other methods for additional supported formats must also
|
||||
be declared here.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
2
spotdl/metadata/embedders/__init__.py
Normal file
2
spotdl/metadata/embedders/__init__.py
Normal file
@@ -0,0 +1,2 @@
|
||||
from spotdl.metadata.embedders.default_embedder import EmbedderDefault
|
||||
|
||||
173
spotdl/metadata/embedders/default_embedder.py
Normal file
173
spotdl/metadata/embedders/default_embedder.py
Normal file
@@ -0,0 +1,173 @@
|
||||
from mutagen.easyid3 import EasyID3
|
||||
from mutagen.id3 import ID3, TORY, TYER, TPUB, APIC, USLT, COMM
|
||||
from mutagen.mp4 import MP4, MP4Cover
|
||||
from mutagen.flac import Picture, FLAC
|
||||
|
||||
import urllib.request
|
||||
|
||||
from spotdl.metadata import EmbedderBase
|
||||
|
||||
# Apple has specific tags - see mutagen docs -
|
||||
# http://mutagen.readthedocs.io/en/latest/api/mp4.html
|
||||
M4A_TAG_PRESET = {
|
||||
"album": "\xa9alb",
|
||||
"artist": "\xa9ART",
|
||||
"date": "\xa9day",
|
||||
"title": "\xa9nam",
|
||||
"year": "\xa9day",
|
||||
"originaldate": "purd",
|
||||
"comment": "\xa9cmt",
|
||||
"group": "\xa9grp",
|
||||
"writer": "\xa9wrt",
|
||||
"genre": "\xa9gen",
|
||||
"tracknumber": "trkn",
|
||||
"albumartist": "aART",
|
||||
"discnumber": "disk",
|
||||
"cpil": "cpil",
|
||||
"albumart": "covr",
|
||||
"copyright": "cprt",
|
||||
"tempo": "tmpo",
|
||||
"lyrics": "\xa9lyr",
|
||||
"comment": "\xa9cmt",
|
||||
}
|
||||
|
||||
TAG_PRESET = {}
|
||||
for key in M4A_TAG_PRESET.keys():
|
||||
TAG_PRESET[key] = key
|
||||
|
||||
|
||||
class EmbedderDefault(EmbedderBase):
|
||||
supported_formats = ("mp3", "opus", "flac")
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._m4a_tag_preset = M4A_TAG_PRESET
|
||||
self._tag_preset = TAG_PRESET
|
||||
# self.provider = "spotify" if metadata["spotify_metadata"] else "youtube"
|
||||
|
||||
def as_mp3(self, path, metadata):
|
||||
""" Embed metadata to MP3 files. """
|
||||
# EasyID3 is fun to use ;)
|
||||
# For supported easyid3 tags:
|
||||
# https://github.com/quodlibet/mutagen/blob/master/mutagen/easyid3.py
|
||||
# Check out somewhere at end of above linked file
|
||||
audiofile = EasyID3(path)
|
||||
self._embed_basic_metadata(audiofile, metadata, "mp3", preset=TAG_PRESET)
|
||||
audiofile["media"] = metadata["type"]
|
||||
audiofile["author"] = metadata["artists"][0]["name"]
|
||||
audiofile["lyricist"] = metadata["artists"][0]["name"]
|
||||
audiofile["arranger"] = metadata["artists"][0]["name"]
|
||||
audiofile["performer"] = metadata["artists"][0]["name"]
|
||||
provider = metadata["provider"]
|
||||
audiofile["website"] = metadata["external_urls"][provider]
|
||||
audiofile["length"] = str(metadata["duration"])
|
||||
if metadata["publisher"]:
|
||||
audiofile["encodedby"] = metadata["publisher"]
|
||||
if metadata["external_ids"]["isrc"]:
|
||||
audiofile["isrc"] = metadata["external_ids"]["isrc"]
|
||||
audiofile.save(v2_version=3)
|
||||
|
||||
# For supported id3 tags:
|
||||
# https://github.com/quodlibet/mutagen/blob/master/mutagen/id3/_frames.py
|
||||
# Each class represents an id3 tag
|
||||
audiofile = ID3(path)
|
||||
if metadata["year"]:
|
||||
audiofile["TORY"] = TORY(encoding=3, text=metadata["year"])
|
||||
audiofile["TYER"] = TYER(encoding=3, text=metadata["year"])
|
||||
if metadata["publisher"]:
|
||||
audiofile["TPUB"] = TPUB(encoding=3, text=metadata["publisher"])
|
||||
provider = metadata["provider"]
|
||||
audiofile["COMM"] = COMM(
|
||||
encoding=3, text=metadata["external_urls"][provider]
|
||||
)
|
||||
if metadata["lyrics"]:
|
||||
audiofile["USLT"] = USLT(
|
||||
encoding=3, desc=u"Lyrics", text=metadata["lyrics"]
|
||||
)
|
||||
try:
|
||||
albumart = urllib.request.urlopen(metadata["album"]["images"][0]["url"])
|
||||
audiofile["APIC"] = APIC(
|
||||
encoding=3,
|
||||
mime="image/jpeg",
|
||||
type=3,
|
||||
desc=u"Cover",
|
||||
data=albumart.read(),
|
||||
)
|
||||
albumart.close()
|
||||
except IndexError:
|
||||
pass
|
||||
|
||||
audiofile.save(v2_version=3)
|
||||
|
||||
def as_opus(self, path):
|
||||
""" Embed metadata to M4A files. """
|
||||
audiofile = MP4(path)
|
||||
self._embed_basic_metadata(audiofile, metadata, "opus", preset=M4A_TAG_PRESET)
|
||||
if metadata["year"]:
|
||||
audiofile[M4A_TAG_PRESET["year"]] = metadata["year"]
|
||||
provider = metadata["provider"]
|
||||
audiofile[M4A_TAG_PRESET["comment"]] = metadata["external_urls"][provider]
|
||||
if metadata["lyrics"]:
|
||||
audiofile[M4A_TAG_PRESET["lyrics"]] = metadata["lyrics"]
|
||||
try:
|
||||
albumart = urllib.request.urlopen(metadata["album"]["images"][0]["url"])
|
||||
audiofile[M4A_TAG_PRESET["albumart"]] = [
|
||||
MP4Cover(albumart.read(), imageformat=MP4Cover.FORMAT_JPEG)
|
||||
]
|
||||
albumart.close()
|
||||
except IndexError:
|
||||
pass
|
||||
|
||||
audiofile.save()
|
||||
|
||||
def as_flac(self, path, metadata):
|
||||
audiofile = FLAC(path)
|
||||
self._embed_basic_metadata(audiofile, metadata, "flac")
|
||||
if metadata["year"]:
|
||||
audiofile["year"] = metadata["year"]
|
||||
provider = metadata["provider"]
|
||||
audiofile["comment"] = metadata["external_urls"][provider]
|
||||
if metadata["lyrics"]:
|
||||
audiofile["lyrics"] = metadata["lyrics"]
|
||||
|
||||
image = Picture()
|
||||
image.type = 3
|
||||
image.desc = "Cover"
|
||||
image.mime = "image/jpeg"
|
||||
albumart = urllib.request.urlopen(metadata["album"]["images"][0]["url"])
|
||||
image.data = albumart.read()
|
||||
albumart.close()
|
||||
audiofile.add_picture(image)
|
||||
|
||||
audiofile.save()
|
||||
|
||||
def _embed_basic_metadata(self, audiofile, metadata, encoding, preset=TAG_PRESET):
|
||||
audiofile[preset["artist"]] = metadata["artists"][0]["name"]
|
||||
if metadata["album"]["artists"][0]["name"]:
|
||||
audiofile[preset["albumartist"]] = metadata["album"]["artists"][0]["name"]
|
||||
if metadata["album"]["name"]:
|
||||
audiofile[preset["album"]] = metadata["album"]["name"]
|
||||
audiofile[preset["title"]] = metadata["name"]
|
||||
if metadata["release_date"]:
|
||||
audiofile[preset["date"]] = metadata["release_date"]
|
||||
audiofile[preset["originaldate"]] = metadata["release_date"]
|
||||
if metadata["genre"]:
|
||||
audiofile[preset["genre"]] = metadata["genre"]
|
||||
if metadata["copyright"]:
|
||||
audiofile[preset["copyright"]] = metadata["copyright"]
|
||||
if encoding == "flac":
|
||||
audiofile[preset["discnumber"]] = str(metadata["disc_number"])
|
||||
else:
|
||||
audiofile[preset["discnumber"]] = [(metadata["disc_number"], 0)]
|
||||
if encoding == "flac":
|
||||
audiofile[preset["tracknumber"]] = str(metadata["track_number"])
|
||||
else:
|
||||
if preset["tracknumber"] == TAG_PRESET["tracknumber"]:
|
||||
audiofile[preset["tracknumber"]] = "{}/{}".format(
|
||||
metadata["track_number"], metadata["total_tracks"]
|
||||
)
|
||||
else:
|
||||
audiofile[preset["tracknumber"]] = [
|
||||
(metadata["track_number"], metadata["total_tracks"])
|
||||
]
|
||||
|
||||
@@ -34,7 +34,7 @@ class StreamsBase(ABC):
|
||||
return self.all[-1]
|
||||
|
||||
|
||||
class MetadataBase(ABC):
|
||||
class ProviderBase(ABC):
|
||||
def set_credentials(self, client_id, client_secret):
|
||||
"""
|
||||
This method may or not be used depending on
|
||||
@@ -1,2 +1,3 @@
|
||||
from spotdl.metadata.providers.spotify import MetadataSpotify
|
||||
from spotdl.metadata.providers.youtube import MetadataYouTube
|
||||
from spotdl.metadata.providers.spotify import ProviderSpotify
|
||||
from spotdl.metadata.providers.youtube import ProviderYouTube
|
||||
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
import spotipy
|
||||
import spotipy.oauth2 as oauth2
|
||||
|
||||
from spotdl.metadata import MetadataBase
|
||||
from spotdl.metadata import ProviderBase
|
||||
|
||||
|
||||
class MetadataSpotify(MetadataBase):
|
||||
class ProviderSpotify(ProviderBase):
|
||||
def __init__(self, spotify=None):
|
||||
self.spotify = spotify
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@ from bs4 import BeautifulSoup
|
||||
import urllib.request
|
||||
|
||||
from spotdl.metadata import StreamsBase
|
||||
from spotdl.metadata import MetadataBase
|
||||
from spotdl.metadata import ProviderBase
|
||||
|
||||
BASE_URL = "https://www.youtube.com/results?sp=EgIQAQ%253D%253D&q={}"
|
||||
|
||||
@@ -90,7 +90,7 @@ class YouTubeStreams(StreamsBase):
|
||||
return self.all[-1]
|
||||
|
||||
|
||||
class MetadataYouTube(MetadataBase):
|
||||
class ProviderYouTube(ProviderBase):
|
||||
def from_query(self, query):
|
||||
watch_urls = YouTubeSearch().search(query)
|
||||
return self.from_url(watch_urls[0])
|
||||
|
||||
Reference in New Issue
Block a user