From cedefbc42b1f7798a6050a5dedc42fee7f6fa5fe Mon Sep 17 00:00:00 2001 From: Kevin Midboe Date: Fri, 25 Aug 2023 19:31:40 +0200 Subject: [PATCH] Re-wrote everything to be defined in Classes, better logging & exec Abstracted remote and local filesystems into a System class. Transporter does the file transfering, simplified this code. Removes a lot of the error handling that was there in replacement of state that will be more resilient, predictable and less error-prone. --- transatlanticTorrentExpress.py | 384 +++++++++++++++++++-------------- 1 file changed, 222 insertions(+), 162 deletions(-) diff --git a/transatlanticTorrentExpress.py b/transatlanticTorrentExpress.py index 39cae73..a969b64 100755 --- a/transatlanticTorrentExpress.py +++ b/transatlanticTorrentExpress.py @@ -1,5 +1,5 @@ #!/usr/bin/python3 -import os, sys +import os, sys, math import shutil from subprocess import check_output, Popen, PIPE from datetime import timedelta @@ -14,214 +14,274 @@ except Exception: from logger import logger from utils import getConfig, readAvgSpeedFromDisk, writeAvgSpeedToDisk, VIDEO_EXTENSIONS -ESTIMATED_TRANSFER_SPEED=readAvgSpeedFromDisk() +LAST_TRANSFER_SPEED=readAvgSpeedFromDisk() TRANSFER_SPEED_UNIT="Mb/s" LAST_FILE_TRANSFER_SPEED=None -def fileSizeByPath(path): - filename = path.split('/')[-1] - config = getConfig() - host = config['SSH']['host'] - user = config['SSH']['user'] - remotePath = config['FILES']['remote'] - diskUsageCmd = 'du -hs' - if (remotePath in path): - cmd = 'ssh {}@{} {} "\'{}\'"'.format(user, host, diskUsageCmd, path) - else: - cmd = '{} "{}"'.format(diskUsageCmd, path) +def parseFileListResponse(filesString): + if filesString == None: + return [] + files = filesString.decode('utf-8').split('\n') + return list(filter(lambda x: len(x) > 0, files)) # remove any empty newline from list + + +def filesNotShared(remote, local): + c = set(remote.files) - set(local.files) + files = list(c) + if len(files) == 0: + return False + + return list(filter(lambda file: not local.findFile(file), files)) + + +class File(): + def __init__(self, file, system): + self.name = file + self.system = system + self.size = self.fileSize() + self.sizeInBytes = self.fileSizeInBytes() + + def fileSize(self): + filePath = self.system.buildFilePath(self) + cmd = "du -hs '{}'".format(filePath) + + if self.system.remote: + cmd = 'ssh {}@{} {}'.format(self.system.user, self.system.path, cmd) + diskusageOutput = check_output(cmd, shell=True) diskusageOutput = diskusageOutput.decode('utf-8').split('\t') return diskusageOutput[0] + 'B' -def fileSizeInBytes(fileSize, blockSize=1024): - try: - if fileSize[-2] == 'G': - fileSizeInBytes = float(fileSize[:-2]) * 1024 * 1024 * 1024 - elif fileSize[-2] == 'M': - fileSizeInBytes = float(fileSize[:-2]) * 1024 * 1024 - elif fileSize[-2] == 'K': - fileSizeInBytes = float(fileSize[:-2]) * 1024 - except: - logger.error('Filesize to float. Filesize:', es={'output': fileSize}) - return + def fileSizeInBytes(self, blockSize=1024): + fileSizeBytes = blockSize - return fileSizeInBytes - -def estimateFileTransferTime(fileSize, filename): - global ESTIMATED_TRANSFER_SPEED,TRANSFER_SPEED_UNIT,LAST_FILE_TRANSFER_SPEED - - fileSizeBytes = fileSizeInBytes(fileSize) - if fileSizeBytes == None: - logger.info('Unable to calculate transfer time for file', es={'filename': filename}) + try: + if self.size[-2] == 'G': + fileSizeBytes = float(self.size[:-2]) * 1024 * 1024 * 1024 + elif self.size[-2] == 'M': + fileSizeBytes = float(self.size[:-2]) * 1024 * 1024 + elif self.size[-2] == 'K': + fileSizeBytes = float(self.size[:-2]) * 1024 + except: + logger.error('Filesize to float. Filesize:', es={'output': self.size}) return - if (LAST_FILE_TRANSFER_SPEED): - estimatedTransferSpeed = LAST_FILE_TRANSFER_SPEED - else: - estimatedTransferSpeed = ESTIMATED_TRANSFER_SPEED - logger.debug('Guessing transfer speed with static speed variable', es={'transferSpeed': ESTIMATED_TRANSFER_SPEED, - 'transferSpeedUnit': TRANSFER_SPEED_UNIT}) + return fileSizeBytes - elapsedTimeInSeconds = (fileSizeBytes / 1000 / 1000 * 8) / estimatedTransferSpeed - estimatedTransferTime = str(timedelta(seconds=elapsedTimeInSeconds)).split('.')[0] + @property + def telemetry(self): + return { + 'filename': self.name, + 'filesize': self.size, + 'bytes': self.sizeInBytes + } - # trying to find the speed we average transfer at - logger.info('Estimated transfer time'.format(estimatedTransferTime), es={'filename': filename, - 'filesize': fileSize, - 'bytes': fileSizeBytes, - 'seconds': elapsedTimeInSeconds, - 'transferTime': estimatedTransferTime, - 'transferSpeed': estimatedTransferSpeed, - 'transferSpeedUnit': TRANSFER_SPEED_UNIT}) - return estimatedTransferSpeed + def __str__(self): + return str(self.name) -def getFiles(path, host=None, user=None): - logger.debug('Getting filenames from path: {}'.format(path), es={'path': path}) - if (host and user): - cmd = "ssh {}@{} ls '{}'".format(user, host, path) - else: - cmd = "ls '{}'".format(path) + def __repr__(self): + return repr(self.name) - contents = check_output(cmd, shell=True) - if contents != None: - contents = contents.decode('utf-8').split('\n') - contents = list(filter(lambda x: len(x) > 0, contents)) - return contents +class System(): + def __init__(self, path, host=None, user=None): + self.path = path + self.files = [] + self.host = host + self.user = user + self.remote = host or user -def filesNotShared(remote, local): - c = set(remote) - set(local) - if c == set(): - return False - - return list(c) + def getFiles(self): + logger.debug('Getting files from path', es={'path': self.path}) -def transferFiles(files, localPath, remotePath, host=None, user=None): - transferedFiles = [] + cmd = "ls '{}'".format(self.path) + if self.remote: + cmd = 'ssh {}@{} {}'.format(self.user, self.host, cmd) - for file in files: - if file in getFiles(localPath): - logger.debug('File already exists at remote path. Skipping.') - continue + contents = check_output(cmd, shell=True) + files = parseFileListResponse(contents) + for file in files: + self.files.append(File(file, self)) - remoteFile = os.path.join(remotePath, file) - fileSize = fileSizeByPath(remoteFile) - fileSizeBytes = fileSizeInBytes(fileSize) + logger.debug('Files found', es={'files': self.files}) + return self.files - logger.info('Moving file: {}'.format(file), es={'filename': file, - 'filesize': fileSize, - 'bytes': fileSizeBytes}) + def findFile(self, file): + cmd = "find {} -type f -name '{}'".format(self.path, file.name) + if self.remote: + cmd = 'ssh {}@{} {}'.format(self.user, self.path, cmd) - file = os.path.join(remotePath, file) - spaceEscapedFile = file.replace(' ', '\\ ') + fileMatch = check_output(cmd, shell=True) + return fileMatch != b'' - # check if file is folder-less, if so create folder and update localPath - folderedLocalPath = None - filename, fileExtension = os.path.splitext(file) - if fileExtension in VIDEO_EXTENSIONS: - folderedLocalPath = os.path.join(localPath, filename) - os.makedirs(folderedLocalPath) + def buildFilePath(self, file): + return os.path.join(self.path, file.name) - # Build rsync command - if host and user: - cmd = "rsync -rz {}@{}:'{}' '{}'".format(user, host, spaceEscapedFile, folderedLocalPath or localPath) - else: - cmd = "rsync -rz '{}' '{}'".format(spaceEscapedFile, localPath) + def rsyncFilePath(self, file): + filePath = self.buildFilePath(file) - estimatedTransferSpeed = estimateFileTransferTime(fileSize, file) - start = time() + if not self.remote: + return "'{}'".format(filePath) - rsyncProcess = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) - stdout, stderr = rsyncProcess.communicate() + return "{}@{}:'{}'".format(self.user, self.host, filePath) - if stderr: - stderr = stderr.decode('utf-8') - logger.error('Rsync error', es={'filename':file, 'output':stderr}) - stdout = stdout.decode('utf-8') - logger.debug('Rsync output', es={'filename': file, 'output': stdout}) +class Transport(): + def __init__(self, files, satelliteSystem, localSystem, downloadClient): + self.files = files + self.transferedFiles = [] + self.satelliteSystem = satelliteSystem + self.localSystem = localSystem + self.avgTransferSpeed = LAST_TRANSFER_SPEED # in MegaBits / Mb + self.downloadClient = downloadClient - global LAST_FILE_TRANSFER_SPEED,TRANSFER_SPEED_UNIT - transferTime = int(time() - start) - if transferTime == 0: - transferTime = 1 - calculatedTransferSpeed = int(fileSizeBytes / 1000 / 1000 * 8) / transferTime + def setTransferSpeed(self, file, elapsedTransferTime): + elapsedTransferTime = math.ceil(elapsedTransferTime) + transferSpeed = math.ceil(file.sizeInBytes / 1000 / 1000 * 8 / elapsedTransferTime) - if calculatedTransferSpeed / estimatedTransferSpeed < 10: - transferSpeed = calculatedTransferSpeed - logger.info('Actual recorded transfer time', es={'filename': file, - 'filesize': fileSize, - 'bytes': fileSizeBytes, - 'transferTime': str(timedelta(seconds=transferTime)), - 'transferSpeed': transferSpeed, - 'transferSpeedUnit': TRANSFER_SPEED_UNIT, - 'seconds': transferTime}) - else: - transferSpeed = LAST_FILE_TRANSFER_SPEED - logger.warning('Fishy transferspeed, using previous speed calculation', es={'filename': file, - 'filesize': fileSize, 'bytes': fileSizeBytes,'transferTime': str(timedelta(seconds=transferTime)), - 'transferSpeed': calculatedTransferSpeed, 'transferSpeedUnit': TRANSFER_SPEED_UNIT, 'seconds': transferTime}) + transferTime = str(timedelta(seconds=elapsedTransferTime)) - if LAST_FILE_TRANSFER_SPEED: - # Calculate to average of all transfers this instance - LAST_FILE_TRANSFER_SPEED = (LAST_FILE_TRANSFER_SPEED + transferSpeed) / 2 - else: - LAST_FILE_TRANSFER_SPEED = transferSpeed + esData = { + 'filename': file.name, + 'filesize': file.size, + 'bytes': file.sizeInBytes, + 'transferTime': transferTime, + 'transferSpeed': transferSpeed, + 'transferSpeedUnit': TRANSFER_SPEED_UNIT, + 'seconds': elapsedTransferTime + } + logger.info('Transfer finished', es=esData) - writeAvgSpeedToDisk(LAST_FILE_TRANSFER_SPEED) + self.avgTransferSpeed = (transferSpeed + self.avgTransferSpeed) / 2 if self.avgTransferSpeed else transferSpeed + writeAvgSpeedToDisk(self.avgTransferSpeed) - transferedFiles.append(file) + def ensureLocalParentFolder(self, file): + folderName, fileExtension = os.path.splitext(file.name) + if fileExtension not in VIDEO_EXTENSIONS: + return False - return transferedFiles - -def removeFromDeluge(files): - deluge = Deluge() - - for file in files: - file = file.split('/')[-1] - - logger.info('Removing {} from deluge'.format(file), es={"filename": file}) try: - response = deluge.removeByName(file, True) - if response == None: - raise Exception('No torrent with that name found') + folderedLocalPath = os.path.join(self.localSystem.path, folderName) + os.makedirs(folderedLocalPath) + logger.info("Created local parent folder for folder-less file", es=file.telemetry) + return folderName + except FileExistsError: + logger.warning("Error creating local parent folder, already exists", es=file.telemetry) + return folderName + except Exception as error: + msg = str(error) + logger.error("Unexpected error while creating local folder", es={**file.telemetry, 'error': msg}) + logger.error(msg) - logger.info('Successfully removed: {}'.format(file), es={'filename': file}) + def estimateFileTransferTime(self, file): + fileSizeInMegabit = file.sizeInBytes / 1000 / 1000 * 8 + estimatedTransferTimeInSeconds = math.floor(fileSizeInMegabit / self.avgTransferSpeed) + estimatedTransferTime = str(timedelta(seconds=estimatedTransferTimeInSeconds)) + + telemetry = { + **file.telemetry, + 'seconds': estimatedTransferTimeInSeconds, + 'transferTime': estimatedTransferTime, + 'transferSpeed': self.avgTransferSpeed, + 'transferSpeedUnit': TRANSFER_SPEED_UNIT + } + logger.info('Estimate transfer speed', es=telemetry) + + def start(self): + for file in self.files: + try: + localFolder = self.localSystem.path + newParentFolder = self.ensureLocalParentFolder(file) + + if newParentFolder: + localFolder = os.path.join(localFolder, newParentFolder) + + transferStartTime = time() + logger.info('Transfer starting', es=file.telemetry) + if self.avgTransferSpeed is not None: + self.estimateFileTransferTime(file) + + cmd = "rsync -ra {} '{}'".format(self.satelliteSystem.rsyncFilePath(file), localFolder) + rsyncProcess = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + stdout, stderr = rsyncProcess.communicate() + stdout = stdout.decode('utf-8') + stderr = stderr.decode('utf-8') + + if stderr: + print(stderr) + logger.error('Rsync error', es={**file.telemetry, 'error':stderr}) + continue + + if len(stdout): + logger.debug('Rsync output', es={**file.telemetry, 'output': stdout}) + + elapsedTransferTime = time() - transferStartTime + self.setTransferSpeed(file, elapsedTransferTime) + + if self.downloadClient.enabled is True: + self.downloadClient.remove(file) + + except Exception as err: + logger.error('Unexpected error when transfering file', es={**file.telemetry, 'error': str(err)}) + continue + + +class DownloadClient(): + def __init__(self, enabled=True): + try: + self.enabled = enabled + self.deluge = None + + if enabled is True: + self.deluge = Deluge() + except Exception as err: + logger.error("Unexpected error from deluge", es={**file.telemetry, 'error': str(err)}) + + def remove(self, file): + logger.info('Removing file from deluge', es=file.telemetry) + + try: + response = self.deluge.removeByName(file.name, True) + if response is not None: + logger.info('Successfully removed file from deluge', es=file.telemetry) + return + + raise Exception('Deluge item not found') except Exception as err: - logger.error('Deluge error: {}'.format(err), es={'filename': file}) + logger.error('Unexpected deluge error', es={**file.telemetry, 'error': str(err)}) + def main(): config = getConfig() - host = config['SSH']['host'] - user = config['SSH']['user'] - remotePath = config['FILES']['remote'] - localPath = config['FILES']['local'] + host = config['SATELLITE']['host'] + user = config['SATELLITE']['user'] + remotePath = config['SATELLITE']['path'] + remove = config['SATELLITE']['remove'] + localPath = config['LOCAL']['path'] - remoteFiles = getFiles(remotePath, host, user) - if len(remoteFiles) > 0: - logger.debug('Remote files found: {}'.format(remoteFiles), es={'files': remoteFiles}) - else: + satelliteSystem = System(remotePath, host, user) + localSystem = System(localPath) + downloadClient = DownloadClient(remove) + + satelliteSystem.getFiles() + localSystem.getFiles() + + if len(satelliteSystem.files) == 0: logger.debug('No remote files found') - - localFiles = getFiles(localPath) - if len(localFiles) > 0: - logger.debug('Local files found: {}'.format(localFiles), es={'files': localFiles}) - else: - logger.debug('No local files found') + return - newFiles = filesNotShared(remoteFiles, localFiles) - if newFiles: - logger.info('New files: {}'.format(newFiles), es={'files': newFiles}) - - transferedFiles = transferFiles(newFiles, localPath, remotePath, host, user) - removeFromDeluge(transferedFiles) - - else: + newFiles = filesNotShared(satelliteSystem, localSystem) + if not newFiles: logger.debug('No new files found to travel on the great transatlantic express') + return + + logger.info('New files found to travel transatlantic express', es={'files': newFiles}) + transport = Transport(newFiles, satelliteSystem, localSystem, downloadClient) + transport.start() if __name__ == '__main__': main()