Decouple fetching metadata

This commit is contained in:
Ritiek Malhotra
2020-03-22 21:44:04 +05:30
parent dae76a0abb
commit 7413c541d3
12 changed files with 373 additions and 485 deletions

View File

@@ -0,0 +1,2 @@
from spotdl.metadata.metadata_base import MetadataBase
from spotdl.metadata.metadata_base import StreamsBase

View File

@@ -0,0 +1,69 @@
from abc import ABC
from abc import abstractmethod
class StreamsBase(ABC):
@abstractmethod
def __init__(self, streams):
"""
This method must parse audio streams into a list of
dictionaries with the keys:
"bitrate", "download_url", "encoding", "filesize".
The list should typically be sorted in descending order
based on the audio stream's bitrate.
This sorted list must be assigned to `self.all`.
"""
self.all = streams
@abstractmethod
def getbest(self):
"""
This method must return the audio stream with the
highest bitrate.
"""
return self.all[0]
@abstractmethod
def getworst(self):
"""
This method must return the audio stream with the
lowest bitrate.
"""
return self.all[-1]
class MetadataBase(ABC):
def set_credentials(self, client_id, client_secret):
"""
This method may or not be used depending on
whether the metadata provider requires authentication
or not.
"""
pass
@abstractmethod
def from_url(self, url):
"""
This method must return track metadata from the
corresponding Spotify URL.
"""
pass
@abstractmethod
def from_query(self, query):
"""
This method must return track metadata from the
corresponding search query.
"""
pass
@abstractmethod
def metadata_to_standard_form(self, metadata):
"""
This method must transform the fetched metadata
into a format consistent with all other metadata
providers, for easy utilization.
"""
pass

View File

@@ -0,0 +1,2 @@
from spotdl.metadata.providers.spotify import MetadataSpotify
from spotdl.metadata.providers.youtube import MetadataYouTube

View File

@@ -0,0 +1,66 @@
import spotipy
import spotipy.oauth2 as oauth2
from spotdl.metadata import MetadataBase
class MetadataSpotify(MetadataBase):
def __init__(self, spotify=None):
self.spotify = spotify
def set_credentials(self, client_id, client_secret):
token = self._generate_token(client_id, client_secret)
self.spotify = spotipy.Spotify(auth=token)
def from_url(self, url):
metadata = self.spotify.track(url)
return self.metadata_to_standard_form(metadata)
def from_query(self, query):
metadata = self.spotify.search(query, limit=1)["tracks"]["items"][0]
return self.metadata_to_standard_form(metadata)
def _generate_token(self, client_id, client_secret):
""" Generate the token. """
credentials = oauth2.SpotifyClientCredentials(
client_id=client_id,
client_secret=client_secret,
)
token = credentials.get_access_token()
return token
def _titlecase(self, string):
return " ".join(word.capitalize() for word in string.split())
def metadata_to_standard_form(self, metadata):
artist = self.spotify.artist(metadata["artists"][0]["id"])
album = self.spotify.album(metadata["album"]["id"])
try:
metadata[u"genre"] = self._titlecase(artist["genres"][0])
except IndexError:
metadata[u"genre"] = None
try:
metadata[u"copyright"] = album["copyrights"][0]["text"]
except IndexError:
metadata[u"copyright"] = None
try:
metadata[u"external_ids"][u"isrc"]
except KeyError:
metadata[u"external_ids"][u"isrc"] = None
metadata[u"release_date"] = album["release_date"]
metadata[u"publisher"] = album["label"]
metadata[u"total_tracks"] = album["tracks"]["total"]
# Some sugar
metadata["year"], *_ = metadata["release_date"].split("-")
metadata["duration"] = metadata["duration_ms"] / 1000.0
metadata["provider"] = "spotify"
# Remove unwanted parameters
del metadata["duration_ms"]
del metadata["available_markets"]
del metadata["album"]["available_markets"]
return metadata

View File

@@ -0,0 +1,140 @@
import pytube
from bs4 import BeautifulSoup
import urllib.request
from spotdl.metadata import StreamsBase
from spotdl.metadata import MetadataBase
BASE_URL = "https://www.youtube.com/results?sp=EgIQAQ%253D%253D&q={}"
class YouTubeSearch:
def __init__(self):
self.base_url = BASE_URL
def generate_search_url(self, query):
quoted_query = urllib.request.quote(query)
return self.base_url.format(quoted_query)
def _fetch_response_html(self, url):
response = urllib.request.urlopen(url)
soup = BeautifulSoup(response.read(), "html.parser")
return soup
def _fetch_search_results(self, html):
results = html.find_all(
"div", {"class": "yt-lockup-dismissable yt-uix-tile"}
)
return results
def _is_video(self, result):
# ensure result is not a channel
not_video = (
result.find("channel") is not None
or "yt-lockup-channel" in result.parent.attrs["class"]
or "yt-lockup-channel" in result.attrs["class"]
)
# ensure result is not a mix/playlist
not_video = not_video or "yt-lockup-playlist" in result.parent.attrs["class"]
# ensure video result is not an advertisement
not_video = not_video or result.find("googleads") is not None
video = not not_video
return video
def _parse_video_id(self, result):
details = result.find("div", class_="yt-lockup-content")
video_id = details.find("a")["href"][-11:]
return video_id
def search(self, query, limit=10, tries_remaining=5):
""" Search and scrape YouTube to return a list of matching videos. """
# prevents an infinite loop but allows for a few retries
if tries_remaining == 0:
# log.debug("No tries left. I quit.")
return
search_url = self.generate_search_url(query)
# log.debug("Opening URL: {0}".format(search_url))
html = self._fetch_response_html(search_url)
videos = []
for result in self._fetch_search_results(html):
if not self._is_video(result):
continue
if len(videos) >= limit:
break
video_id = self._parse_video_id(result)
videos.append("https://www.youtube.com/watch?v=" + video_id)
return videos
class YouTubeStreams(StreamsBase):
def __init__(self, streams):
audiostreams = streams.filter(only_audio=True).order_by("abr").desc()
self.all = [{
"bitrate": int(stream.abr[:-4]),
"download_url": stream.url,
"encoding": stream.audio_codec,
"filesize": stream.filesize,
} for stream in audiostreams]
def getbest(self):
return self.all[0]
def getworst(self):
return self.all[-1]
class MetadataYouTube(MetadataBase):
def from_query(self, query):
watch_urls = YouTubeSearch().search(query)
return self.from_url(watch_urls[0])
def from_url(self, url):
content = pytube.YouTube(url)
return self.from_pytube_object(content)
def from_pytube_object(self, content):
return self.metadata_to_standard_form(content)
def _fetch_publish_date(self, content):
# FIXME: This needs to be supported in PyTube itself
# See https://github.com/nficano/pytube/issues/595
position = content.watch_html.find("publishDate")
publish_date = content.watch_html[position+16:position+25]
return publish_date
def metadata_to_standard_form(self, content):
""" Fetch a song's metadata from YouTube. """
streams = []
publish_date = self._fetch_publish_date(content)
metadata = {
"name": content.title,
"artists": [{"name": content.author}],
"duration": content.length,
"external_urls": {"youtube": content.watch_url},
"album": {
"images": [{"url": content.thumbnail_url}],
"artists": [{"name": None}],
"name": None,
},
"year": publish_date.split("-")[0],
"release_date": publish_date,
"type": "track",
"disc_number": 1,
"track_number": 1,
"total_tracks": 1,
"publisher": None,
"external_ids": {"isrc": None},
"lyrics": None,
"copyright": None,
"genre": None,
"streams": YouTubeStreams(content.streams),
"provider": "youtube",
}
return metadata