CLI optimizations

This commit is contained in:
Ritiek Malhotra
2020-04-17 14:32:57 +05:30
parent 164f342262
commit 42dd650ed8
6 changed files with 122 additions and 35 deletions

View File

@@ -0,0 +1,6 @@
class NoYouTubeVideoError(Exception):
__module__ = Exception.__module__
def __init__(self, message=None):
super().__init__(message)

View File

@@ -1,6 +1,8 @@
from spotdl.metadata.providers import ProviderSpotify
from spotdl.metadata.providers import ProviderYouTube
from spotdl.metadata.providers import YouTubeSearch
from spotdl.metadata.embedders import EmbedderDefault
from spotdl.metadata.exceptions import SpotifyMetadataNotFoundError
from spotdl.encode.encoders import EncoderFFmpeg
from spotdl.encode.encoders import EncoderAvconv
@@ -9,6 +11,8 @@ from spotdl.lyrics.providers import LyricWikia
from spotdl.lyrics.providers import Genius
from spotdl.lyrics.exceptions import LyricsNotFoundError
from spotdl.command_line.exceptions import NoYouTubeVideoError
from spotdl.track import Track
import spotdl.util
@@ -29,8 +33,19 @@ def search_lyrics(query):
return lyrics
def search_metadata(track, search_format="{artist} - {track-name} lyrics"):
def search_metadata_on_spotify(query):
provider = ProviderSpotify()
try:
metadata = provider.from_query(query)
except SpotifyMetadataNotFoundError:
metadata = {}
return metadata
def search_metadata(track, search_format="{artist} - {track-name} lyrics", manual=False):
youtube = ProviderYouTube()
youtube_searcher = YouTubeSearch()
if spotdl.util.is_spotify(track):
spotify = ProviderSpotify()
spotify_metadata = spotify.from_url(track)
@@ -39,20 +54,51 @@ def search_metadata(track, search_format="{artist} - {track-name} lyrics"):
spotify_metadata,
)
search_query = spotdl.util.format_string(search_format, spotify_metadata)
youtube_metadata = youtube.from_query(search_query)
youtube_urls = youtube_searcher.search(search_query)
if not youtube_urls:
# raise NoYouTubeVideoError(
# 'No videos found for the search query: "{}"'.format(search_query)
# )
return
if manual:
pass
else:
youtube_url = youtube_urls.bestmatch()["url"]
youtube_metadata = youtube.from_url(youtube_url)
metadata = spotdl.util.merge(
youtube_metadata,
spotify_metadata
)
elif spotdl.util.is_youtube(track):
metadata = youtube.from_url(track)
lyric_query = spotdl.util.format_string(
"{artist} - {track-name}",
metadata,
)
else:
metadata = youtube.from_query(track)
lyric_query = track
spotify_metadata = spotdl.util.ThreadWithReturnValue(
target=search_metadata_on_spotify,
args=(track,)
)
spotify_metadata.start()
youtube_urls = youtube_searcher.search(track)
if not youtube_urls:
# raise NoYouTubeVideoError(
# 'No videos found for the search query: "{}"'.format(track)
# )
return
if manual:
pass
else:
youtube_url = youtube_urls.bestmatch()["url"]
youtube_metadata = youtube.from_url(youtube_url)
metadata = spotdl.util.merge(
youtube_metadata,
spotify_metadata.join()
)
metadata["lyrics"] = spotdl.util.ThreadWithReturnValue(
target=search_lyrics,
@@ -170,10 +216,11 @@ def download_tracks_from_file(path, arguments):
}
metadata["next_track"].start()
while tracks_count > 0:
tracks_count -= 1
metadata["current_track"] = metadata["next_track"].join()
metadata["next_track"] = None
try:
print(tracks_count)
print(tracks)
if tracks_count > 0:
current_track = next_track
next_track = tracks.pop(0)
@@ -185,20 +232,26 @@ def download_tracks_from_file(path, arguments):
log_fmt=(str(current_iteration) + ". {artist} - {track-name}")
# log.info(log_fmt)
if metadata["current_track"] is None:
# log.warning("Something went wrong. Will retry after downloading remaining tracks")
print("FUCK")
raise TypeError("fuck.")
print(metadata["current_track"]["name"])
download_track_from_metadata(
metadata["current_track"],
arguments
)
current_iteration += 1
except (urllib.request.URLError, TypeError, IOError) as e:
# log.exception(e.args[0])
# log.warning("Failed. Will retry after other songs\n")
tracks.append(current_track)
else:
tracks_count -= 1
if arguments.write_successful:
with open(arguments.write_successful, "a") as fout:
fout.write(current_track)
finally:
current_iteration += 1
with open(path, "w") as fout:
fout.writelines(tracks)

View File

@@ -1,3 +1,4 @@
from spotdl.metadata.providers.spotify import ProviderSpotify
from spotdl.metadata.providers.youtube import ProviderYouTube
from spotdl.metadata.providers.youtube import YouTubeSearch

View File

@@ -22,7 +22,7 @@ class ProviderSpotify(ProviderBase):
def from_query(self, query):
tracks = self.spotify.search(query, limit=1)["tracks"]["items"]
if tracks is None:
if not tracks:
raise SpotifyMetadataNotFoundError(
'Could not find any tracks matching the given search query ("{}")'.format(
query,

View File

@@ -3,32 +3,75 @@ from bs4 import BeautifulSoup
import urllib.request
import threading
from collections.abc import Sequence
from spotdl.metadata import StreamsBase
from spotdl.metadata import ProviderBase
from spotdl.metadata.exceptions import YouTubeMetadataNotFoundError
BASE_URL = "https://www.youtube.com/results?sp=EgIQAQ%253D%253D&q={}"
import spotdl.util
BASE_SEARCH_URL = "https://www.youtube.com/results?sp=EgIQAQ%253D%253D&q={}"
HEADERS = [('Range', 'bytes=0-'),]
class YouTubeVideos(Sequence):
def __init__(self, videos):
self.videos = videos
super().__init__()
def __len__(self):
return len(self.videos)
def __getitem__(self, index):
return self.videos[index]
def bestmatch(self):
return self.videos[0]
class YouTubeSearch:
def __init__(self):
self.base_url = BASE_URL
self.base_search_url = BASE_SEARCH_URL
def generate_search_url(self, query):
quoted_query = urllib.request.quote(query)
return self.base_url.format(quoted_query)
return self.base_search_url.format(quoted_query)
def _fetch_response_html(self, url):
response = urllib.request.urlopen(url)
soup = BeautifulSoup(response.read(), "html.parser")
return soup
def _fetch_search_results(self, html):
results = html.find_all(
def _extract_video_details_from_result(self, html):
video_time = html.find("span", class_="video-time").get_text()
inner_html = html.find("div", class_="yt-lockup-content")
video_id = inner_html.find("a")["href"][-11:]
video_title = inner_html.find("a")["title"]
video_details = {
"url": "https://www.youtube.com/watch?v=" + video_id,
"title": video_title,
"duration": video_time,
}
return video_details
def _fetch_search_results(self, html, limit=10):
result_source = html.find_all(
"div", {"class": "yt-lockup-dismissable yt-uix-tile"}
)
return results
videos = []
for result in result_source:
if not self._is_video(result):
continue
video = self._extract_video_details_from_result(result)
videos.append(video)
if len(videos) >= limit:
break
return videos
def _is_video(self, result):
# ensure result is not a channel
@@ -47,32 +90,14 @@ class YouTubeSearch:
video = not not_video
return video
def _parse_video_id(self, result):
details = result.find("div", class_="yt-lockup-content")
video_id = details.find("a")["href"][-11:]
return video_id
def search(self, query, limit=10, tries_remaining=5):
def search(self, query, limit=10):
""" Search and scrape YouTube to return a list of matching videos. """
# prevents an infinite loop but allows for a few retries
if tries_remaining == 0:
# log.debug("No tries left. I quit.")
return
search_url = self.generate_search_url(query)
# log.debug("Opening URL: {0}".format(search_url))
html = self._fetch_response_html(search_url)
videos = []
for result in self._fetch_search_results(html):
if not self._is_video(result):
continue
if len(videos) >= limit:
break
video_id = self._parse_video_id(result)
videos.append("https://www.youtube.com/watch?v=" + video_id)
return videos
videos = self._fetch_search_results(html)
return YouTubeVideos(videos)
class YouTubeStreams(StreamsBase):

View File

@@ -29,14 +29,16 @@ class ThreadWithReturnValue(threading.Thread):
def __init__(self, target=lambda: None, args=()):
super().__init__(target=target, args=args)
self._return = None
def run(self):
if self._target is not None:
self._return = self._target(
*self._args,
**self._kwargs
)
def join(self, *args):
super().join(*args)
def join(self, *args, **kwargs):
super().join(*args, **kwargs)
return self._return