Files
spotify-downloader/spotdl/metadata/providers/youtube.py
Ritiek Malhotra ec765119fa Setup coloredlogs
2020-05-03 09:06:03 +05:30

226 lines
7.2 KiB
Python

import pytube
from bs4 import BeautifulSoup
import urllib.request
import threading
from collections.abc import Sequence
from spotdl.metadata import StreamsBase
from spotdl.metadata import ProviderBase
from spotdl.metadata.exceptions import YouTubeMetadataNotFoundError
import spotdl.util
import logging
logger = logging.getLogger(__name__)
BASE_SEARCH_URL = "https://www.youtube.com/results?sp=EgIQAQ%253D%253D&q={}"
HEADERS = [('Range', 'bytes=0-'),]
class YouTubeVideos(Sequence):
def __init__(self, videos):
self.videos = videos
super().__init__()
def __repr__(self):
return "YouTubeVideos({})".format(self.videos)
def __len__(self):
return len(self.videos)
def __getitem__(self, index):
return self.videos[index]
def bestmatch(self):
return self.videos[0]
class YouTubeSearch:
def __init__(self):
self.base_search_url = BASE_SEARCH_URL
def generate_search_url(self, query):
quoted_query = urllib.request.quote(query)
return self.base_search_url.format(quoted_query)
def _fetch_response_html(self, url):
response = urllib.request.urlopen(url)
soup = BeautifulSoup(response.read(), "html.parser")
return soup
def _extract_video_details_from_result(self, html):
video_time = html.find("span", class_="video-time").get_text()
inner_html = html.find("div", class_="yt-lockup-content")
video_id = inner_html.find("a")["href"][-11:]
video_title = inner_html.find("a")["title"]
video_details = {
"url": "https://www.youtube.com/watch?v=" + video_id,
"title": video_title,
"duration": video_time,
}
return video_details
def _fetch_search_results(self, html, limit=10):
result_source = html.find_all(
"div", {"class": "yt-lockup-dismissable yt-uix-tile"}
)
videos = []
for result in result_source:
if not self._is_video(result):
continue
video = self._extract_video_details_from_result(result)
videos.append(video)
if len(videos) >= limit:
break
return videos
def _is_video(self, result):
# ensure result is not a channel
not_video = (
result.find("channel") is not None
or "yt-lockup-channel" in result.parent.attrs["class"]
or "yt-lockup-channel" in result.attrs["class"]
)
# ensure result is not a mix/playlist
not_video = not_video or "yt-lockup-playlist" in result.parent.attrs["class"]
# ensure video result is not an advertisement
not_video = not_video or result.find("googleads") is not None
video = not not_video
return video
def search(self, query, limit=10):
""" Search and scrape YouTube to return a list of matching videos. """
search_url = self.generate_search_url(query)
logger.debug("Opening URL: {0}".format(search_url))
html = self._fetch_response_html(search_url)
videos = self._fetch_search_results(html)
return YouTubeVideos(videos)
class YouTubeStreams(StreamsBase):
def __init__(self, streams):
self.network_headers = HEADERS
audiostreams = streams.filter(only_audio=True).order_by("abr").desc()
thread_pool = []
self.all = []
for stream in audiostreams:
standard_stream = {
# Store only the integer part for bitrate. For example
# the given bitrate would be "192kbps", we store only
# the integer part (192) here and drop the rest.
"bitrate": int(stream.abr[:-4]),
"connection": None,
"download_url": stream.url,
"encoding": stream.audio_codec,
"filesize": None,
}
establish_connection = threading.Thread(
target=self._store_connection,
args=(standard_stream,),
)
thread_pool.append(establish_connection)
establish_connection.start()
self.all.append(standard_stream)
for thread in thread_pool:
thread.join()
def _store_connection(self, stream):
response = self._make_request(stream["download_url"])
stream["connection"] = response
stream["filesize"] = int(response.headers["Content-Length"])
def _make_request(self, url):
request = urllib.request.Request(url)
for header in self.network_headers:
request.add_header(*header)
return urllib.request.urlopen(request)
def get(self, quality="best", preftype="automatic"):
if quality == "best":
return self.getbest(preftype=preftype)
elif quality == "worst":
return self.getworst(preftype=preftype)
else:
return None
def getbest(self, preftype="automatic"):
if preftype == "automatic":
return self.all[0]
for stream in self.all:
if stream["encoding"] == preftype:
return stream
def getworst(self, preftype="automatic"):
if preftype == "automatic":
return self.all[-1]
for stream in self.all[::-1]:
if stream["encoding"] == preftype:
return stream
class ProviderYouTube(ProviderBase):
def from_query(self, query):
watch_urls = YouTubeSearch().search(query)
if not watch_urls:
raise YouTubeMetadataNotFoundError(
'YouTube returned nothing for the given search '
'query ("{}")'.format(query)
)
return self.from_url(watch_urls[0])
def from_url(self, url):
content = pytube.YouTube(url)
return self.from_pytube_object(content)
def from_pytube_object(self, content):
return self.metadata_to_standard_form(content)
def _fetch_publish_date(self, content):
# FIXME: This needs to be supported in PyTube itself
# See https://github.com/nficano/pytube/issues/595
position = content.watch_html.find("publishDate")
publish_date = content.watch_html[position+16:position+25]
return publish_date
def metadata_to_standard_form(self, content):
""" Fetch a song's metadata from YouTube. """
publish_date = self._fetch_publish_date(content)
metadata = {
"name": content.title,
"artists": [{"name": content.author}],
"duration": content.length,
"external_urls": {"youtube": content.watch_url},
"album": {
"images": [{"url": content.thumbnail_url}],
"artists": [{"name": None}],
"name": None,
},
"year": publish_date.split("-")[0],
"release_date": publish_date,
"type": "track",
"disc_number": 1,
"track_number": 1,
"total_tracks": 1,
"publisher": None,
"external_ids": {"isrc": None},
"lyrics": None,
"copyright": None,
"genre": None,
"streams": YouTubeStreams(content.streams),
"provider": "youtube",
}
return metadata