Re-wrote everything to be defined in Classes, better logging & exec

Abstracted remote and local filesystems into a System class.

Transporter does the file transfering, simplified this code. Removes a
lot of the error handling that was there in replacement of state that
will be more resilient, predictable and less error-prone.
This commit is contained in:
2023-08-25 19:31:40 +02:00
parent 9d77dd210b
commit cedefbc42b

View File

@@ -1,5 +1,5 @@
#!/usr/bin/python3 #!/usr/bin/python3
import os, sys import os, sys, math
import shutil import shutil
from subprocess import check_output, Popen, PIPE from subprocess import check_output, Popen, PIPE
from datetime import timedelta from datetime import timedelta
@@ -14,214 +14,274 @@ except Exception:
from logger import logger from logger import logger
from utils import getConfig, readAvgSpeedFromDisk, writeAvgSpeedToDisk, VIDEO_EXTENSIONS from utils import getConfig, readAvgSpeedFromDisk, writeAvgSpeedToDisk, VIDEO_EXTENSIONS
ESTIMATED_TRANSFER_SPEED=readAvgSpeedFromDisk() LAST_TRANSFER_SPEED=readAvgSpeedFromDisk()
TRANSFER_SPEED_UNIT="Mb/s" TRANSFER_SPEED_UNIT="Mb/s"
LAST_FILE_TRANSFER_SPEED=None LAST_FILE_TRANSFER_SPEED=None
def fileSizeByPath(path):
filename = path.split('/')[-1]
config = getConfig()
host = config['SSH']['host']
user = config['SSH']['user']
remotePath = config['FILES']['remote']
diskUsageCmd = 'du -hs' def parseFileListResponse(filesString):
if (remotePath in path): if filesString == None:
cmd = 'ssh {}@{} {} "\'{}\'"'.format(user, host, diskUsageCmd, path) return []
else:
cmd = '{} "{}"'.format(diskUsageCmd, path) files = filesString.decode('utf-8').split('\n')
return list(filter(lambda x: len(x) > 0, files)) # remove any empty newline from list
def filesNotShared(remote, local):
c = set(remote.files) - set(local.files)
files = list(c)
if len(files) == 0:
return False
return list(filter(lambda file: not local.findFile(file), files))
class File():
def __init__(self, file, system):
self.name = file
self.system = system
self.size = self.fileSize()
self.sizeInBytes = self.fileSizeInBytes()
def fileSize(self):
filePath = self.system.buildFilePath(self)
cmd = "du -hs '{}'".format(filePath)
if self.system.remote:
cmd = 'ssh {}@{} {}'.format(self.system.user, self.system.path, cmd)
diskusageOutput = check_output(cmd, shell=True) diskusageOutput = check_output(cmd, shell=True)
diskusageOutput = diskusageOutput.decode('utf-8').split('\t') diskusageOutput = diskusageOutput.decode('utf-8').split('\t')
return diskusageOutput[0] + 'B' return diskusageOutput[0] + 'B'
def fileSizeInBytes(fileSize, blockSize=1024): def fileSizeInBytes(self, blockSize=1024):
try: fileSizeBytes = blockSize
if fileSize[-2] == 'G':
fileSizeInBytes = float(fileSize[:-2]) * 1024 * 1024 * 1024
elif fileSize[-2] == 'M':
fileSizeInBytes = float(fileSize[:-2]) * 1024 * 1024
elif fileSize[-2] == 'K':
fileSizeInBytes = float(fileSize[:-2]) * 1024
except:
logger.error('Filesize to float. Filesize:', es={'output': fileSize})
return
return fileSizeInBytes try:
if self.size[-2] == 'G':
def estimateFileTransferTime(fileSize, filename): fileSizeBytes = float(self.size[:-2]) * 1024 * 1024 * 1024
global ESTIMATED_TRANSFER_SPEED,TRANSFER_SPEED_UNIT,LAST_FILE_TRANSFER_SPEED elif self.size[-2] == 'M':
fileSizeBytes = float(self.size[:-2]) * 1024 * 1024
fileSizeBytes = fileSizeInBytes(fileSize) elif self.size[-2] == 'K':
if fileSizeBytes == None: fileSizeBytes = float(self.size[:-2]) * 1024
logger.info('Unable to calculate transfer time for file', es={'filename': filename}) except:
logger.error('Filesize to float. Filesize:', es={'output': self.size})
return return
if (LAST_FILE_TRANSFER_SPEED): return fileSizeBytes
estimatedTransferSpeed = LAST_FILE_TRANSFER_SPEED
else:
estimatedTransferSpeed = ESTIMATED_TRANSFER_SPEED
logger.debug('Guessing transfer speed with static speed variable', es={'transferSpeed': ESTIMATED_TRANSFER_SPEED,
'transferSpeedUnit': TRANSFER_SPEED_UNIT})
elapsedTimeInSeconds = (fileSizeBytes / 1000 / 1000 * 8) / estimatedTransferSpeed @property
estimatedTransferTime = str(timedelta(seconds=elapsedTimeInSeconds)).split('.')[0] def telemetry(self):
return {
'filename': self.name,
'filesize': self.size,
'bytes': self.sizeInBytes
}
# trying to find the speed we average transfer at def __str__(self):
logger.info('Estimated transfer time'.format(estimatedTransferTime), es={'filename': filename, return str(self.name)
'filesize': fileSize,
'bytes': fileSizeBytes,
'seconds': elapsedTimeInSeconds,
'transferTime': estimatedTransferTime,
'transferSpeed': estimatedTransferSpeed,
'transferSpeedUnit': TRANSFER_SPEED_UNIT})
return estimatedTransferSpeed
def getFiles(path, host=None, user=None): def __repr__(self):
logger.debug('Getting filenames from path: {}'.format(path), es={'path': path}) return repr(self.name)
if (host and user):
cmd = "ssh {}@{} ls '{}'".format(user, host, path)
else:
cmd = "ls '{}'".format(path)
contents = check_output(cmd, shell=True)
if contents != None:
contents = contents.decode('utf-8').split('\n')
contents = list(filter(lambda x: len(x) > 0, contents))
return contents class System():
def __init__(self, path, host=None, user=None):
self.path = path
self.files = []
self.host = host
self.user = user
self.remote = host or user
def filesNotShared(remote, local): def getFiles(self):
c = set(remote) - set(local) logger.debug('Getting files from path', es={'path': self.path})
if c == set():
return False
return list(c) cmd = "ls '{}'".format(self.path)
if self.remote:
cmd = 'ssh {}@{} {}'.format(self.user, self.host, cmd)
def transferFiles(files, localPath, remotePath, host=None, user=None): contents = check_output(cmd, shell=True)
transferedFiles = [] files = parseFileListResponse(contents)
for file in files:
self.files.append(File(file, self))
for file in files: logger.debug('Files found', es={'files': self.files})
if file in getFiles(localPath): return self.files
logger.debug('File already exists at remote path. Skipping.')
continue
remoteFile = os.path.join(remotePath, file) def findFile(self, file):
fileSize = fileSizeByPath(remoteFile) cmd = "find {} -type f -name '{}'".format(self.path, file.name)
fileSizeBytes = fileSizeInBytes(fileSize) if self.remote:
cmd = 'ssh {}@{} {}'.format(self.user, self.path, cmd)
logger.info('Moving file: {}'.format(file), es={'filename': file, fileMatch = check_output(cmd, shell=True)
'filesize': fileSize, return fileMatch != b''
'bytes': fileSizeBytes})
file = os.path.join(remotePath, file) def buildFilePath(self, file):
spaceEscapedFile = file.replace(' ', '\\ ') return os.path.join(self.path, file.name)
# check if file is folder-less, if so create folder and update localPath def rsyncFilePath(self, file):
folderedLocalPath = None filePath = self.buildFilePath(file)
filename, fileExtension = os.path.splitext(file)
if fileExtension in VIDEO_EXTENSIONS:
folderedLocalPath = os.path.join(localPath, filename)
os.makedirs(folderedLocalPath)
# Build rsync command if not self.remote:
if host and user: return "'{}'".format(filePath)
cmd = "rsync -rz {}@{}:'{}' '{}'".format(user, host, spaceEscapedFile, folderedLocalPath or localPath)
else:
cmd = "rsync -rz '{}' '{}'".format(spaceEscapedFile, localPath)
estimatedTransferSpeed = estimateFileTransferTime(fileSize, file) return "{}@{}:'{}'".format(self.user, self.host, filePath)
start = time()
rsyncProcess = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True)
stdout, stderr = rsyncProcess.communicate()
if stderr: class Transport():
stderr = stderr.decode('utf-8') def __init__(self, files, satelliteSystem, localSystem, downloadClient):
logger.error('Rsync error', es={'filename':file, 'output':stderr}) self.files = files
self.transferedFiles = []
self.satelliteSystem = satelliteSystem
self.localSystem = localSystem
self.avgTransferSpeed = LAST_TRANSFER_SPEED # in MegaBits / Mb
self.downloadClient = downloadClient
stdout = stdout.decode('utf-8') def setTransferSpeed(self, file, elapsedTransferTime):
logger.debug('Rsync output', es={'filename': file, 'output': stdout}) elapsedTransferTime = math.ceil(elapsedTransferTime)
transferSpeed = math.ceil(file.sizeInBytes / 1000 / 1000 * 8 / elapsedTransferTime)
global LAST_FILE_TRANSFER_SPEED,TRANSFER_SPEED_UNIT transferTime = str(timedelta(seconds=elapsedTransferTime))
transferTime = int(time() - start)
if transferTime == 0:
transferTime = 1
calculatedTransferSpeed = int(fileSizeBytes / 1000 / 1000 * 8) / transferTime
if calculatedTransferSpeed / estimatedTransferSpeed < 10: esData = {
transferSpeed = calculatedTransferSpeed 'filename': file.name,
logger.info('Actual recorded transfer time', es={'filename': file, 'filesize': file.size,
'filesize': fileSize, 'bytes': file.sizeInBytes,
'bytes': fileSizeBytes, 'transferTime': transferTime,
'transferTime': str(timedelta(seconds=transferTime)), 'transferSpeed': transferSpeed,
'transferSpeed': transferSpeed, 'transferSpeedUnit': TRANSFER_SPEED_UNIT,
'transferSpeedUnit': TRANSFER_SPEED_UNIT, 'seconds': elapsedTransferTime
'seconds': transferTime}) }
else: logger.info('Transfer finished', es=esData)
transferSpeed = LAST_FILE_TRANSFER_SPEED
logger.warning('Fishy transferspeed, using previous speed calculation', es={'filename': file,
'filesize': fileSize, 'bytes': fileSizeBytes,'transferTime': str(timedelta(seconds=transferTime)),
'transferSpeed': calculatedTransferSpeed, 'transferSpeedUnit': TRANSFER_SPEED_UNIT, 'seconds': transferTime})
if LAST_FILE_TRANSFER_SPEED: self.avgTransferSpeed = (transferSpeed + self.avgTransferSpeed) / 2 if self.avgTransferSpeed else transferSpeed
# Calculate to average of all transfers this instance writeAvgSpeedToDisk(self.avgTransferSpeed)
LAST_FILE_TRANSFER_SPEED = (LAST_FILE_TRANSFER_SPEED + transferSpeed) / 2
else:
LAST_FILE_TRANSFER_SPEED = transferSpeed
writeAvgSpeedToDisk(LAST_FILE_TRANSFER_SPEED) def ensureLocalParentFolder(self, file):
folderName, fileExtension = os.path.splitext(file.name)
if fileExtension not in VIDEO_EXTENSIONS:
return False
transferedFiles.append(file)
return transferedFiles
def removeFromDeluge(files):
deluge = Deluge()
for file in files:
file = file.split('/')[-1]
logger.info('Removing {} from deluge'.format(file), es={"filename": file})
try: try:
response = deluge.removeByName(file, True) folderedLocalPath = os.path.join(self.localSystem.path, folderName)
if response == None: os.makedirs(folderedLocalPath)
raise Exception('No torrent with that name found') logger.info("Created local parent folder for folder-less file", es=file.telemetry)
return folderName
except FileExistsError:
logger.warning("Error creating local parent folder, already exists", es=file.telemetry)
return folderName
except Exception as error:
msg = str(error)
logger.error("Unexpected error while creating local folder", es={**file.telemetry, 'error': msg})
logger.error(msg)
logger.info('Successfully removed: {}'.format(file), es={'filename': file}) def estimateFileTransferTime(self, file):
fileSizeInMegabit = file.sizeInBytes / 1000 / 1000 * 8
estimatedTransferTimeInSeconds = math.floor(fileSizeInMegabit / self.avgTransferSpeed)
estimatedTransferTime = str(timedelta(seconds=estimatedTransferTimeInSeconds))
telemetry = {
**file.telemetry,
'seconds': estimatedTransferTimeInSeconds,
'transferTime': estimatedTransferTime,
'transferSpeed': self.avgTransferSpeed,
'transferSpeedUnit': TRANSFER_SPEED_UNIT
}
logger.info('Estimate transfer speed', es=telemetry)
def start(self):
for file in self.files:
try:
localFolder = self.localSystem.path
newParentFolder = self.ensureLocalParentFolder(file)
if newParentFolder:
localFolder = os.path.join(localFolder, newParentFolder)
transferStartTime = time()
logger.info('Transfer starting', es=file.telemetry)
if self.avgTransferSpeed is not None:
self.estimateFileTransferTime(file)
cmd = "rsync -ra {} '{}'".format(self.satelliteSystem.rsyncFilePath(file), localFolder)
rsyncProcess = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True)
stdout, stderr = rsyncProcess.communicate()
stdout = stdout.decode('utf-8')
stderr = stderr.decode('utf-8')
if stderr:
print(stderr)
logger.error('Rsync error', es={**file.telemetry, 'error':stderr})
continue
if len(stdout):
logger.debug('Rsync output', es={**file.telemetry, 'output': stdout})
elapsedTransferTime = time() - transferStartTime
self.setTransferSpeed(file, elapsedTransferTime)
if self.downloadClient.enabled is True:
self.downloadClient.remove(file)
except Exception as err:
logger.error('Unexpected error when transfering file', es={**file.telemetry, 'error': str(err)})
continue
class DownloadClient():
def __init__(self, enabled=True):
try:
self.enabled = enabled
self.deluge = None
if enabled is True:
self.deluge = Deluge()
except Exception as err:
logger.error("Unexpected error from deluge", es={**file.telemetry, 'error': str(err)})
def remove(self, file):
logger.info('Removing file from deluge', es=file.telemetry)
try:
response = self.deluge.removeByName(file.name, True)
if response is not None:
logger.info('Successfully removed file from deluge', es=file.telemetry)
return
raise Exception('Deluge item not found')
except Exception as err: except Exception as err:
logger.error('Deluge error: {}'.format(err), es={'filename': file}) logger.error('Unexpected deluge error', es={**file.telemetry, 'error': str(err)})
def main(): def main():
config = getConfig() config = getConfig()
host = config['SSH']['host'] host = config['SATELLITE']['host']
user = config['SSH']['user'] user = config['SATELLITE']['user']
remotePath = config['FILES']['remote'] remotePath = config['SATELLITE']['path']
localPath = config['FILES']['local'] remove = config['SATELLITE']['remove']
localPath = config['LOCAL']['path']
remoteFiles = getFiles(remotePath, host, user) satelliteSystem = System(remotePath, host, user)
if len(remoteFiles) > 0: localSystem = System(localPath)
logger.debug('Remote files found: {}'.format(remoteFiles), es={'files': remoteFiles}) downloadClient = DownloadClient(remove)
else:
satelliteSystem.getFiles()
localSystem.getFiles()
if len(satelliteSystem.files) == 0:
logger.debug('No remote files found') logger.debug('No remote files found')
return
localFiles = getFiles(localPath) newFiles = filesNotShared(satelliteSystem, localSystem)
if len(localFiles) > 0: if not newFiles:
logger.debug('Local files found: {}'.format(localFiles), es={'files': localFiles})
else:
logger.debug('No local files found')
newFiles = filesNotShared(remoteFiles, localFiles)
if newFiles:
logger.info('New files: {}'.format(newFiles), es={'files': newFiles})
transferedFiles = transferFiles(newFiles, localPath, remotePath, host, user)
removeFromDeluge(transferedFiles)
else:
logger.debug('No new files found to travel on the great transatlantic express') logger.debug('No new files found to travel on the great transatlantic express')
return
logger.info('New files found to travel transatlantic express', es={'files': newFiles})
transport = Transport(newFiles, satelliteSystem, localSystem, downloadClient)
transport.start()
if __name__ == '__main__': if __name__ == '__main__':
main() main()