mirror of
				https://github.com/KevinMidboe/spotify-downloader.git
				synced 2025-10-29 18:00:15 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			109 lines
		
	
	
		
			3.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			109 lines
		
	
	
		
			3.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import tqdm
 | 
						|
 | 
						|
import urllib.request
 | 
						|
import subprocess
 | 
						|
import sys
 | 
						|
 | 
						|
from spotdl.encode.encoders import EncoderFFmpeg
 | 
						|
from spotdl.metadata.embedders import EmbedderDefault
 | 
						|
 | 
						|
import spotdl.util
 | 
						|
 | 
						|
CHUNK_SIZE = 16 * 1024
 | 
						|
 | 
						|
class Track:
 | 
						|
    def __init__(self, metadata, cache_albumart=False):
 | 
						|
        self.metadata = metadata
 | 
						|
        self._chunksize = CHUNK_SIZE
 | 
						|
 | 
						|
        if cache_albumart:
 | 
						|
            self._albumart_thread = self._cache_albumart()
 | 
						|
 | 
						|
        self._cache_albumart = cache_albumart
 | 
						|
 | 
						|
    def _cache_albumart(self):
 | 
						|
        albumart_thread = spotdl.util.ThreadWithReturnValue(
 | 
						|
            target=lambda url: urllib.request.urlopen(url).read(),
 | 
						|
            args=(self.metadata["album"]["images"][0]["url"],)
 | 
						|
        )
 | 
						|
        albumart_thread.start()
 | 
						|
        return albumart_thread
 | 
						|
 | 
						|
    def calculate_total_chunks(self, filesize):
 | 
						|
        return (filesize // self._chunksize) + 1
 | 
						|
 | 
						|
    def make_progress_bar(self, total_chunks):
 | 
						|
        progress_bar = tqdm.trange(
 | 
						|
            total_chunks,
 | 
						|
            unit_scale=(self._chunksize // 1024),
 | 
						|
            unit="KiB",
 | 
						|
            dynamic_ncols=True,
 | 
						|
            bar_format='{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt}KiB '
 | 
						|
                '[{elapsed}<{remaining}, {rate_fmt}{postfix}]',
 | 
						|
        )
 | 
						|
        return progress_bar
 | 
						|
 | 
						|
    def download_while_re_encoding(self, stream, target_path, target_encoding=None,
 | 
						|
                                   encoder=EncoderFFmpeg(), show_progress=True):
 | 
						|
        total_chunks = self.calculate_total_chunks(stream["filesize"])
 | 
						|
        process = encoder.re_encode_from_stdin(
 | 
						|
            stream["encoding"],
 | 
						|
            target_path,
 | 
						|
            target_encoding=target_encoding
 | 
						|
        )
 | 
						|
        response = stream["connection"]
 | 
						|
 | 
						|
        progress_bar = self.make_progress_bar(total_chunks)
 | 
						|
        for _ in progress_bar:
 | 
						|
            chunk = response.read(self._chunksize)
 | 
						|
            process.stdin.write(chunk)
 | 
						|
 | 
						|
        process.stdin.close()
 | 
						|
        process.wait()
 | 
						|
 | 
						|
    def download(self, stream, target_path, show_progress=True):
 | 
						|
        total_chunks = self.calculate_total_chunks(stream["filesize"])
 | 
						|
        progress_bar = self.make_progress_bar(total_chunks)
 | 
						|
        response = stream["connection"]
 | 
						|
        if target_path == "-":
 | 
						|
            # Target is STDOUT
 | 
						|
            for _ in progress_bar:
 | 
						|
                chunk = response.read(self._chunksize)
 | 
						|
                sys.stdout.buffer.write(chunk)
 | 
						|
        else:
 | 
						|
            with open(target_path, "wb") as fout:
 | 
						|
                for _ in progress_bar:
 | 
						|
                    chunk = response.read(self._chunksize)
 | 
						|
                    fout.write(chunk)
 | 
						|
 | 
						|
    def re_encode(self, input_path, target_path, target_encoding=None,
 | 
						|
                  encoder=EncoderFFmpeg(), show_progress=True):
 | 
						|
        stream = self.metadata["streams"].getbest()
 | 
						|
        total_chunks = self.calculate_total_chunks(stream["filesize"])
 | 
						|
        process = encoder.re_encode_from_stdin(
 | 
						|
            stream["encoding"],
 | 
						|
            target_path,
 | 
						|
            target_encoding=target_encoding
 | 
						|
        )
 | 
						|
        with open(input_path, "rb") as fin:
 | 
						|
            for _ in tqdm.trange(total_chunks):
 | 
						|
                chunk = fin.read(self._chunksize)
 | 
						|
                process.stdin.write(chunk)
 | 
						|
 | 
						|
        process.stdin.close()
 | 
						|
        process.wait()
 | 
						|
 | 
						|
    def apply_metadata(self, input_path, encoding=None, embedder=EmbedderDefault()):
 | 
						|
        if self._cache_albumart:
 | 
						|
            albumart = self._albumart_thread.join()
 | 
						|
        else:
 | 
						|
            albumart = None
 | 
						|
 | 
						|
        embedder.apply_metadata(
 | 
						|
            input_path,
 | 
						|
            self.metadata,
 | 
						|
            cached_albumart=albumart,
 | 
						|
            encoding=encoding,
 | 
						|
        )
 | 
						|
 |