mirror of
				https://github.com/KevinMidboe/transatlanticTorrentExpress.git
				synced 2025-10-29 18:00:19 +00:00 
			
		
		
		
	Merge pull request #4 from KevinMidboe/refactor/classified-and-simplified
Refactor: Classified and simplified
This commit is contained in:
		| @@ -1 +1 @@ | |||||||
| 135 |  | ||||||
|   | |||||||
							
								
								
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							| @@ -1,5 +1,6 @@ | |||||||
| # Project config file | # Project config file | ||||||
| config.ini | config.ini | ||||||
|  | .avgspeed.txt | ||||||
|  |  | ||||||
|  |  | ||||||
| # Byte-compiled / optimized / DLL files | # Byte-compiled / optimized / DLL files | ||||||
|   | |||||||
							
								
								
									
										22
									
								
								README.md
									
									
									
									
									
								
							
							
						
						
									
										22
									
								
								README.md
									
									
									
									
									
								
							| @@ -21,28 +21,8 @@ Create copy of config and edit following values: | |||||||
| cp config.ini.default config.ini | cp config.ini.default config.ini | ||||||
| ``` | ``` | ||||||
|  |  | ||||||
| ```ini |  | ||||||
| [SSH] |  | ||||||
| host= |  | ||||||
| user= |  | ||||||
|  |  | ||||||
| [FILES] |  | ||||||
| remote= |  | ||||||
| local= |  | ||||||
|  |  | ||||||
| [LOGGER] |  | ||||||
| CH_LEVEL=INFO |  | ||||||
|  |  | ||||||
| [ELASTIC] |  | ||||||
| host= |  | ||||||
| port= |  | ||||||
| ssl= |  | ||||||
| api_key= |  | ||||||
|  |  | ||||||
| ``` |  | ||||||
|  |  | ||||||
| ## Run | ## Run | ||||||
|  |  | ||||||
| ```bash | ```bash | ||||||
| python3 src/transatlanticTorrentExpress.py | python3 transatlanticTorrentExpress.py | ||||||
| ``` | ``` | ||||||
| @@ -1,15 +1,17 @@ | |||||||
| [SSH] | [SATELLITE] | ||||||
|  | path= | ||||||
| host= | host= | ||||||
| user= | user= | ||||||
|  | remove=True | ||||||
|  |  | ||||||
| [FILES] | [LOCAL] | ||||||
| remote= | path= | ||||||
| local= |  | ||||||
|  |  | ||||||
| [LOGGER] | [LOGGER] | ||||||
| CH_LEVEL=INFO | CH_LEVEL=INFO | ||||||
|  |  | ||||||
| [ELASTIC] | [ELASTIC] | ||||||
|  | enabled=False | ||||||
| host= | host= | ||||||
| port= | port= | ||||||
| ssl= | ssl= | ||||||
|   | |||||||
							
								
								
									
										15
									
								
								logger.py
									
									
									
									
									
								
							
							
						
						
									
										15
									
								
								logger.py
									
									
									
									
									
								
							| @@ -59,6 +59,10 @@ class ESHandler(logging.Handler): | |||||||
|     } |     } | ||||||
|  |  | ||||||
|     if hasattr(record, 'es'): |     if hasattr(record, 'es'): | ||||||
|  |       for key in record.es.keys(): | ||||||
|  |         if key == 'files': | ||||||
|  |           record.es[key] = [ file.__repr__() for file in record.es[key] ] | ||||||
|  |  | ||||||
|       for param in record.es.values(): |       for param in record.es.values(): | ||||||
|         if ': {}'.format(param) in record.message: |         if ': {}'.format(param) in record.message: | ||||||
|           doc['message'] = record.message.replace(': {}'.format(str(param)), '') |           doc['message'] = record.message.replace(': {}'.format(str(param)), '') | ||||||
| @@ -74,7 +78,7 @@ class ESHandler(logging.Handler): | |||||||
|         return response |         return response | ||||||
|     except urllib.error.HTTPError as e: |     except urllib.error.HTTPError as e: | ||||||
|         print('Unable to reach elastic, error:', e) |         print('Unable to reach elastic, error:', e) | ||||||
|         return asdf |         return | ||||||
|  |  | ||||||
| class ElasticFieldParameterAdapter(logging.LoggerAdapter): | class ElasticFieldParameterAdapter(logging.LoggerAdapter): | ||||||
|   def __init__(self, logger, extra={}): |   def __init__(self, logger, extra={}): | ||||||
| @@ -93,12 +97,15 @@ esHost = config['ELASTIC']['host'] | |||||||
| esPort = config['ELASTIC']['port'] | esPort = config['ELASTIC']['port'] | ||||||
| esSSL = config['ELASTIC']['ssl'] | esSSL = config['ELASTIC']['ssl'] | ||||||
| esApiKey = config['ELASTIC']['api_key'] | esApiKey = config['ELASTIC']['api_key'] | ||||||
| eh = ESHandler(host=esHost, port=esPort, ssl=esSSL, apiKey=esApiKey) | esEnabled = config['ELASTIC']['enabled'] | ||||||
| eh.setLevel(logging.DEBUG) | if esEnabled == 'True': | ||||||
|  |   eh = ESHandler(host=esHost, port=esPort, ssl=esSSL, apiKey=esApiKey) | ||||||
|  |   eh.setLevel(logging.DEBUG) | ||||||
|  |  | ||||||
| formatter = logging.Formatter('%(asctime)s %(levelname)8s | %(message)s') | formatter = logging.Formatter('%(asctime)s %(levelname)8s | %(message)s') | ||||||
| fh.setFormatter(formatter) | fh.setFormatter(formatter) | ||||||
| logger.addHandler(fh) | logger.addHandler(fh) | ||||||
| logger.addHandler(ch) | logger.addHandler(ch) | ||||||
| logger.addHandler(eh) | if esEnabled == 'True': | ||||||
|  |   logger.addHandler(eh) | ||||||
| logger = ElasticFieldParameterAdapter(logger) | logger = ElasticFieldParameterAdapter(logger) | ||||||
|   | |||||||
| @@ -1,5 +1,5 @@ | |||||||
| #!/usr/bin/python3 | #!/usr/bin/python3 | ||||||
| import os, sys | import os, sys, math | ||||||
| import shutil | import shutil | ||||||
| from subprocess import check_output, Popen, PIPE | from subprocess import check_output, Popen, PIPE | ||||||
| from datetime import timedelta | from datetime import timedelta | ||||||
| @@ -14,214 +14,274 @@ except Exception: | |||||||
| from logger import logger | from logger import logger | ||||||
| from utils import getConfig, readAvgSpeedFromDisk, writeAvgSpeedToDisk, VIDEO_EXTENSIONS | from utils import getConfig, readAvgSpeedFromDisk, writeAvgSpeedToDisk, VIDEO_EXTENSIONS | ||||||
|  |  | ||||||
| ESTIMATED_TRANSFER_SPEED=readAvgSpeedFromDisk() | LAST_TRANSFER_SPEED=readAvgSpeedFromDisk() | ||||||
| TRANSFER_SPEED_UNIT="Mb/s" | TRANSFER_SPEED_UNIT="Mb/s" | ||||||
| LAST_FILE_TRANSFER_SPEED=None | LAST_FILE_TRANSFER_SPEED=None | ||||||
|  |  | ||||||
| def fileSizeByPath(path): |  | ||||||
|     filename = path.split('/')[-1] |  | ||||||
|     config = getConfig() |  | ||||||
|     host = config['SSH']['host'] |  | ||||||
|     user = config['SSH']['user'] |  | ||||||
|     remotePath = config['FILES']['remote'] |  | ||||||
|  |  | ||||||
|     diskUsageCmd = 'du -hs' | def parseFileListResponse(filesString): | ||||||
|     if (remotePath in path): |   if filesString == None: | ||||||
|       cmd = 'ssh {}@{} {} "\'{}\'"'.format(user, host, diskUsageCmd, path) |     return [] | ||||||
|     else: |  | ||||||
|       cmd = '{} "{}"'.format(diskUsageCmd, path) |  | ||||||
|  |  | ||||||
|  |   files = filesString.decode('utf-8').split('\n') | ||||||
|  |   return list(filter(lambda x: len(x) > 0, files)) # remove any empty newline from list | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def filesNotShared(remote, local): | ||||||
|  |   c = set(remote.files) - set(local.files) | ||||||
|  |   files = list(c) | ||||||
|  |   if len(files) == 0: | ||||||
|  |     return False | ||||||
|  |  | ||||||
|  |   return list(filter(lambda file: not local.findFile(file), files)) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class File(): | ||||||
|  |   def __init__(self, file, system): | ||||||
|  |     self.name = file | ||||||
|  |     self.system = system | ||||||
|  |     self.size = self.fileSize() | ||||||
|  |     self.sizeInBytes = self.fileSizeInBytes() | ||||||
|  |  | ||||||
|  |   def fileSize(self): | ||||||
|  |     filePath = self.system.buildFilePath(self) | ||||||
|  |     cmd = "du -hs '{}'".format(filePath) | ||||||
|  |  | ||||||
|  |     if self.system.remote: | ||||||
|  |       cmd = 'ssh {}@{} "{}"'.format(self.system.user, self.system.host, cmd) | ||||||
|  |      | ||||||
|     diskusageOutput = check_output(cmd, shell=True) |     diskusageOutput = check_output(cmd, shell=True) | ||||||
|  |  | ||||||
|     diskusageOutput = diskusageOutput.decode('utf-8').split('\t') |     diskusageOutput = diskusageOutput.decode('utf-8').split('\t') | ||||||
|     return diskusageOutput[0] + 'B' |     return diskusageOutput[0] + 'B' | ||||||
|  |  | ||||||
| def fileSizeInBytes(fileSize, blockSize=1024): |   def fileSizeInBytes(self, blockSize=1024): | ||||||
|   try: |     fileSizeBytes = blockSize | ||||||
|     if fileSize[-2] == 'G': |  | ||||||
|       fileSizeInBytes = float(fileSize[:-2]) * 1024 * 1024 * 1024 |  | ||||||
|     elif fileSize[-2] == 'M': |  | ||||||
|       fileSizeInBytes = float(fileSize[:-2]) * 1024 * 1024 |  | ||||||
|     elif fileSize[-2] == 'K': |  | ||||||
|       fileSizeInBytes = float(fileSize[:-2]) * 1024 |  | ||||||
|   except: |  | ||||||
|     logger.error('Filesize to float. Filesize:', es={'output': fileSize}) |  | ||||||
|     return |  | ||||||
|  |  | ||||||
|   return fileSizeInBytes |     try: | ||||||
|  |       if self.size[-2] == 'G': | ||||||
| def estimateFileTransferTime(fileSize, filename): |         fileSizeBytes = float(self.size[:-2]) * 1024 * 1024 * 1024 | ||||||
|     global ESTIMATED_TRANSFER_SPEED,TRANSFER_SPEED_UNIT,LAST_FILE_TRANSFER_SPEED |       elif self.size[-2] == 'M': | ||||||
|  |         fileSizeBytes = float(self.size[:-2]) * 1024 * 1024 | ||||||
|     fileSizeBytes = fileSizeInBytes(fileSize) |       elif self.size[-2] == 'K': | ||||||
|     if fileSizeBytes == None: |         fileSizeBytes = float(self.size[:-2]) * 1024 | ||||||
|       logger.info('Unable to calculate transfer time for file', es={'filename': filename}) |     except: | ||||||
|  |       logger.error('Filesize to float. Filesize:', es={'output': self.size}) | ||||||
|       return |       return | ||||||
|  |  | ||||||
|     if (LAST_FILE_TRANSFER_SPEED): |     return fileSizeBytes | ||||||
|       estimatedTransferSpeed = LAST_FILE_TRANSFER_SPEED |  | ||||||
|     else: |  | ||||||
|       estimatedTransferSpeed = ESTIMATED_TRANSFER_SPEED |  | ||||||
|       logger.debug('Guessing transfer speed with static speed variable', es={'transferSpeed': ESTIMATED_TRANSFER_SPEED, |  | ||||||
|                                                                             'transferSpeedUnit': TRANSFER_SPEED_UNIT}) |  | ||||||
|  |  | ||||||
|     elapsedTimeInSeconds = (fileSizeBytes / 1000 / 1000 * 8) / estimatedTransferSpeed |   @property | ||||||
|     estimatedTransferTime = str(timedelta(seconds=elapsedTimeInSeconds)).split('.')[0] |   def telemetry(self): | ||||||
|  |     return { | ||||||
|  |       'filename': self.name, | ||||||
|  |       'filesize': self.size, | ||||||
|  |       'bytes': self.sizeInBytes | ||||||
|  |     } | ||||||
|  |  | ||||||
|     # trying to find the speed we average transfer at |   def __str__(self): | ||||||
|     logger.info('Estimated transfer time'.format(estimatedTransferTime), es={'filename': filename, |     return str(self.name) | ||||||
|                                                                              'filesize': fileSize, |  | ||||||
|                                                                              'bytes': fileSizeBytes, |  | ||||||
|                                                                              'seconds': elapsedTimeInSeconds, |  | ||||||
|                                                                              'transferTime': estimatedTransferTime, |  | ||||||
|                                                                              'transferSpeed': estimatedTransferSpeed, |  | ||||||
|                                                                              'transferSpeedUnit': TRANSFER_SPEED_UNIT}) |  | ||||||
|     return estimatedTransferSpeed |  | ||||||
|  |  | ||||||
| def getFiles(path, host=None, user=None): |   def __repr__(self): | ||||||
|   logger.debug('Getting filenames from path: {}'.format(path), es={'path': path}) |     return repr(self.name) | ||||||
|   if (host and user): |  | ||||||
|     cmd = "ssh {}@{} ls '{}'".format(user, host, path) |  | ||||||
|   else: |  | ||||||
|     cmd = "ls '{}'".format(path) |  | ||||||
|  |  | ||||||
|   contents = check_output(cmd, shell=True) |  | ||||||
|   if contents != None: |  | ||||||
|     contents = contents.decode('utf-8').split('\n') |  | ||||||
|     contents = list(filter(lambda x: len(x) > 0, contents)) |  | ||||||
|  |  | ||||||
|   return contents | class System(): | ||||||
|  |   def __init__(self, path, host=None, user=None): | ||||||
|  |     self.path = path | ||||||
|  |     self.files = [] | ||||||
|  |     self.host = host | ||||||
|  |     self.user = user | ||||||
|  |     self.remote = host or user | ||||||
|  |  | ||||||
| def filesNotShared(remote, local): |   def getFiles(self): | ||||||
|   c = set(remote) - set(local) |     logger.debug('Getting files from path', es={'path': self.path}) | ||||||
|   if c == set(): |  | ||||||
|     return False |  | ||||||
|    |  | ||||||
|   return list(c) |  | ||||||
|  |  | ||||||
| def transferFiles(files, localPath, remotePath, host=None, user=None): |     cmd = "ls '{}'".format(self.path) | ||||||
|   transferedFiles = [] |     if self.remote: | ||||||
|  |       cmd = 'ssh {}@{} {}'.format(self.user, self.host, cmd) | ||||||
|  |  | ||||||
|   for file in files: |     contents = check_output(cmd, shell=True) | ||||||
|     if file in getFiles(localPath): |     files = parseFileListResponse(contents) | ||||||
|       logger.debug('File already exists at remote path. Skipping.') |     for file in files: | ||||||
|       continue |       self.files.append(File(file, self)) | ||||||
|  |  | ||||||
|     remoteFile = os.path.join(remotePath, file) |     logger.debug('Files found', es={'files': self.files}) | ||||||
|     fileSize = fileSizeByPath(remoteFile) |     return self.files | ||||||
|     fileSizeBytes = fileSizeInBytes(fileSize) |  | ||||||
|  |  | ||||||
|     logger.info('Moving file: {}'.format(file), es={'filename': file, |   def findFile(self, file): | ||||||
|                                                     'filesize': fileSize, |     cmd = "find {} -type f -name '{}'".format(self.path, file.name) | ||||||
|                                                     'bytes': fileSizeBytes}) |     if self.remote: | ||||||
|  |       cmd = 'ssh {}@{} {}'.format(self.user, self.path, cmd) | ||||||
|  |  | ||||||
|     file = os.path.join(remotePath, file) |     fileMatch = check_output(cmd, shell=True) | ||||||
|     spaceEscapedFile = file.replace(' ', '\\ ') |     return fileMatch != b'' | ||||||
|  |  | ||||||
|     # check if file is folder-less, if so create folder and update localPath |   def buildFilePath(self, file): | ||||||
|     folderedLocalPath = None |     return os.path.join(self.path, file.name) | ||||||
|     filename, fileExtension = os.path.splitext(file) |  | ||||||
|     if fileExtension in VIDEO_EXTENSIONS: |  | ||||||
|       folderedLocalPath = os.path.join(localPath, filename) |  | ||||||
|       os.makedirs(folderedLocalPath) |  | ||||||
|  |  | ||||||
|     # Build rsync command |   def rsyncFilePath(self, file): | ||||||
|     if host and user: |     filePath = self.buildFilePath(file) | ||||||
|       cmd = "rsync -rz {}@{}:'{}' '{}'".format(user, host, spaceEscapedFile, folderedLocalPath or localPath) |  | ||||||
|     else: |  | ||||||
|       cmd = "rsync -rz '{}' '{}'".format(spaceEscapedFile, localPath) |  | ||||||
|  |  | ||||||
|     estimatedTransferSpeed = estimateFileTransferTime(fileSize, file) |     if not self.remote: | ||||||
|     start = time() |       return "'{}'".format(filePath) | ||||||
|  |  | ||||||
|     rsyncProcess = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) |     return "{}@{}:'{}'".format(self.user, self.host, filePath) | ||||||
|     stdout, stderr = rsyncProcess.communicate() |  | ||||||
|  |  | ||||||
|     if stderr: |  | ||||||
|       stderr = stderr.decode('utf-8') |  | ||||||
|       logger.error('Rsync error', es={'filename':file, 'output':stderr}) |  | ||||||
|  |  | ||||||
|     stdout = stdout.decode('utf-8') | class Transport(): | ||||||
|     logger.debug('Rsync output', es={'filename': file, 'output': stdout}) |   def __init__(self, files, satelliteSystem, localSystem, downloadClient): | ||||||
|  |     self.files = files | ||||||
|  |     self.transferedFiles = [] | ||||||
|  |     self.satelliteSystem = satelliteSystem | ||||||
|  |     self.localSystem = localSystem | ||||||
|  |     self.avgTransferSpeed = LAST_TRANSFER_SPEED # in MegaBits / Mb | ||||||
|  |     self.downloadClient = downloadClient | ||||||
|  |  | ||||||
|     global LAST_FILE_TRANSFER_SPEED,TRANSFER_SPEED_UNIT |   def setTransferSpeed(self, file, elapsedTransferTime): | ||||||
|     transferTime = int(time() - start) |     elapsedTransferTime = math.ceil(elapsedTransferTime) | ||||||
|     if transferTime == 0: |     transferSpeed = math.ceil(file.sizeInBytes / 1000 / 1000 * 8 / elapsedTransferTime) | ||||||
|       transferTime = 1 |  | ||||||
|     calculatedTransferSpeed = int(fileSizeBytes / 1000 / 1000 * 8) / transferTime |  | ||||||
|  |  | ||||||
|     if calculatedTransferSpeed / estimatedTransferSpeed < 10: |     transferTime = str(timedelta(seconds=elapsedTransferTime)) | ||||||
|       transferSpeed = calculatedTransferSpeed |  | ||||||
|       logger.info('Actual recorded transfer time', es={'filename': file, |  | ||||||
|                                                       'filesize': fileSize, |  | ||||||
|                                                        'bytes': fileSizeBytes, |  | ||||||
|                                                        'transferTime': str(timedelta(seconds=transferTime)), |  | ||||||
|                                                        'transferSpeed': transferSpeed, |  | ||||||
|                                                        'transferSpeedUnit': TRANSFER_SPEED_UNIT, |  | ||||||
|                                                        'seconds': transferTime}) |  | ||||||
|     else: |  | ||||||
|      transferSpeed = LAST_FILE_TRANSFER_SPEED |  | ||||||
|      logger.warning('Fishy transferspeed, using previous speed calculation', es={'filename': file, |  | ||||||
|                     'filesize': fileSize, 'bytes': fileSizeBytes,'transferTime': str(timedelta(seconds=transferTime)), |  | ||||||
|                     'transferSpeed': calculatedTransferSpeed, 'transferSpeedUnit': TRANSFER_SPEED_UNIT, 'seconds': transferTime}) |  | ||||||
|  |  | ||||||
|     if LAST_FILE_TRANSFER_SPEED: |     esData = { | ||||||
|       # Calculate to average of all transfers this instance |       'filename': file.name, | ||||||
|       LAST_FILE_TRANSFER_SPEED = (LAST_FILE_TRANSFER_SPEED + transferSpeed) / 2 |       'filesize': file.size, | ||||||
|     else: |       'bytes': file.sizeInBytes, | ||||||
|       LAST_FILE_TRANSFER_SPEED = transferSpeed |       'transferTime': transferTime, | ||||||
|  |       'transferSpeed': transferSpeed, | ||||||
|  |       'transferSpeedUnit': TRANSFER_SPEED_UNIT, | ||||||
|  |       'seconds': elapsedTransferTime | ||||||
|  |     } | ||||||
|  |     logger.info('Transfer finished', es=esData) | ||||||
|  |  | ||||||
|     writeAvgSpeedToDisk(LAST_FILE_TRANSFER_SPEED) |     self.avgTransferSpeed = (transferSpeed + self.avgTransferSpeed) / 2 if self.avgTransferSpeed else transferSpeed | ||||||
|  |     writeAvgSpeedToDisk(self.avgTransferSpeed) | ||||||
|  |  | ||||||
|     transferedFiles.append(file) |   def ensureLocalParentFolder(self, file): | ||||||
|  |     folderName, fileExtension = os.path.splitext(file.name) | ||||||
|  |     if fileExtension not in VIDEO_EXTENSIONS: | ||||||
|  |       return False | ||||||
|  |  | ||||||
|   return transferedFiles |  | ||||||
|  |  | ||||||
| def removeFromDeluge(files): |  | ||||||
|   deluge = Deluge() |  | ||||||
|  |  | ||||||
|   for file in files: |  | ||||||
|     file = file.split('/')[-1] |  | ||||||
|  |  | ||||||
|     logger.info('Removing {} from deluge'.format(file), es={"filename": file}) |  | ||||||
|     try: |     try: | ||||||
|       response = deluge.removeByName(file, True) |       folderedLocalPath = os.path.join(self.localSystem.path, folderName) | ||||||
|       if response == None: |       os.makedirs(folderedLocalPath) | ||||||
|         raise Exception('No torrent with that name found') |       logger.info("Created local parent folder for folder-less file", es=file.telemetry) | ||||||
|  |       return folderName | ||||||
|  |     except FileExistsError: | ||||||
|  |       logger.warning("Error creating local parent folder, already exists", es=file.telemetry) | ||||||
|  |       return folderName | ||||||
|  |     except Exception as error: | ||||||
|  |       msg = str(error) | ||||||
|  |       logger.error("Unexpected error while creating local folder", es={**file.telemetry, 'error': msg}) | ||||||
|  |       logger.error(msg) | ||||||
|  |  | ||||||
|       logger.info('Successfully removed: {}'.format(file), es={'filename': file}) |   def estimateFileTransferTime(self, file): | ||||||
|  |     fileSizeInMegabit = file.sizeInBytes / 1000 / 1000 * 8 | ||||||
|  |     estimatedTransferTimeInSeconds = math.floor(fileSizeInMegabit / self.avgTransferSpeed) | ||||||
|  |     estimatedTransferTime = str(timedelta(seconds=estimatedTransferTimeInSeconds)) | ||||||
|  |  | ||||||
|  |     telemetry = { | ||||||
|  |       **file.telemetry, | ||||||
|  |       'seconds': estimatedTransferTimeInSeconds, | ||||||
|  |       'transferTime': estimatedTransferTime, | ||||||
|  |       'transferSpeed': self.avgTransferSpeed, | ||||||
|  |       'transferSpeedUnit': TRANSFER_SPEED_UNIT | ||||||
|  |     } | ||||||
|  |     logger.info('Estimate transfer speed', es=telemetry) | ||||||
|  |  | ||||||
|  |   def start(self): | ||||||
|  |     for file in self.files: | ||||||
|  |       try: | ||||||
|  |         localFolder = self.localSystem.path | ||||||
|  |         newParentFolder = self.ensureLocalParentFolder(file) | ||||||
|  |  | ||||||
|  |         if newParentFolder: | ||||||
|  |           localFolder = os.path.join(localFolder, newParentFolder) | ||||||
|  |  | ||||||
|  |         transferStartTime = time() | ||||||
|  |         logger.info('Transfer starting', es=file.telemetry) | ||||||
|  |         if self.avgTransferSpeed is not None: | ||||||
|  |           self.estimateFileTransferTime(file) | ||||||
|  |  | ||||||
|  |         cmd = "rsync -ra {} '{}'".format(self.satelliteSystem.rsyncFilePath(file), localFolder) | ||||||
|  |         rsyncProcess = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) | ||||||
|  |         stdout, stderr = rsyncProcess.communicate() | ||||||
|  |         stdout = stdout.decode('utf-8') | ||||||
|  |         stderr = stderr.decode('utf-8') | ||||||
|  |  | ||||||
|  |         if stderr: | ||||||
|  |           print(stderr) | ||||||
|  |           logger.error('Rsync error', es={**file.telemetry, 'error':stderr}) | ||||||
|  |           continue | ||||||
|  |  | ||||||
|  |         if len(stdout): | ||||||
|  |           logger.debug('Rsync output', es={**file.telemetry, 'output': stdout}) | ||||||
|  |  | ||||||
|  |         elapsedTransferTime = time() - transferStartTime | ||||||
|  |         self.setTransferSpeed(file, elapsedTransferTime) | ||||||
|  |  | ||||||
|  |         if self.downloadClient.enabled is True: | ||||||
|  |           self.downloadClient.remove(file) | ||||||
|  |  | ||||||
|  |       except Exception as err: | ||||||
|  |         logger.error('Unexpected error when transfering file', es={**file.telemetry, 'error': str(err)}) | ||||||
|  |         continue | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class DownloadClient(): | ||||||
|  |   def __init__(self, enabled=True): | ||||||
|  |     try: | ||||||
|  |       self.enabled = enabled | ||||||
|  |       self.deluge = None | ||||||
|  |  | ||||||
|  |       if enabled is True: | ||||||
|  |         self.deluge = Deluge() | ||||||
|  |     except Exception as err: | ||||||
|  |       logger.error("Unexpected error from deluge", es={**file.telemetry, 'error': str(err)}) | ||||||
|  |  | ||||||
|  |   def remove(self, file): | ||||||
|  |     logger.info('Removing file from deluge', es=file.telemetry) | ||||||
|  |  | ||||||
|  |     try: | ||||||
|  |       response = self.deluge.removeByName(file.name, True) | ||||||
|  |       if response is not None: | ||||||
|  |         logger.info('Successfully removed file from deluge', es=file.telemetry) | ||||||
|  |         return | ||||||
|  |        | ||||||
|  |       raise Exception('Deluge item not found') | ||||||
|  |  | ||||||
|     except Exception as err: |     except Exception as err: | ||||||
|       logger.error('Deluge error: {}'.format(err), es={'filename': file}) |       logger.error('Unexpected deluge error', es={**file.telemetry, 'error': str(err)}) | ||||||
|  |  | ||||||
|  |  | ||||||
| def main(): | def main(): | ||||||
|   config = getConfig() |   config = getConfig() | ||||||
|   host = config['SSH']['host'] |   host = config['SATELLITE']['host'] | ||||||
|   user = config['SSH']['user'] |   user = config['SATELLITE']['user'] | ||||||
|   remotePath = config['FILES']['remote'] |   remotePath = config['SATELLITE']['path'] | ||||||
|   localPath = config['FILES']['local'] |   remove = config['SATELLITE']['remove'] | ||||||
|  |   localPath = config['LOCAL']['path'] | ||||||
|  |  | ||||||
|   remoteFiles = getFiles(remotePath, host, user) |   satelliteSystem = System(remotePath, host, user) | ||||||
|   if len(remoteFiles) > 0: |   localSystem = System(localPath) | ||||||
|     logger.debug('Remote files found: {}'.format(remoteFiles), es={'files': remoteFiles}) |   downloadClient = DownloadClient(remove == 'True') | ||||||
|   else: |  | ||||||
|  |   satelliteSystem.getFiles() | ||||||
|  |   localSystem.getFiles() | ||||||
|  |  | ||||||
|  |   if len(satelliteSystem.files) == 0: | ||||||
|     logger.debug('No remote files found') |     logger.debug('No remote files found') | ||||||
|    |     return | ||||||
|   localFiles = getFiles(localPath) |  | ||||||
|   if len(localFiles) > 0: |  | ||||||
|     logger.debug('Local files found: {}'.format(localFiles), es={'files': localFiles}) |  | ||||||
|   else: |  | ||||||
|     logger.debug('No local files found') |  | ||||||
|  |  | ||||||
|   newFiles = filesNotShared(remoteFiles, localFiles) |   newFiles = filesNotShared(satelliteSystem, localSystem) | ||||||
|   if newFiles: |   if not newFiles: | ||||||
|     logger.info('New files: {}'.format(newFiles), es={'files': newFiles}) |  | ||||||
|  |  | ||||||
|     transferedFiles = transferFiles(newFiles, localPath, remotePath, host, user) |  | ||||||
|     removeFromDeluge(transferedFiles) |  | ||||||
|  |  | ||||||
|   else: |  | ||||||
|     logger.debug('No new files found to travel on the great transatlantic express') |     logger.debug('No new files found to travel on the great transatlantic express') | ||||||
|  |     return | ||||||
|  |  | ||||||
|  |   logger.info('New files found to travel transatlantic express', es={'files': newFiles}) | ||||||
|  |   transport = Transport(newFiles, satelliteSystem, localSystem, downloadClient) | ||||||
|  |   transport.start() | ||||||
|  |  | ||||||
| if __name__ == '__main__': | if __name__ == '__main__': | ||||||
|   main() |   main() | ||||||
|   | |||||||
							
								
								
									
										16
									
								
								utils.py
									
									
									
									
									
								
							
							
						
						
									
										16
									
								
								utils.py
									
									
									
									
									
								
							| @@ -4,6 +4,7 @@ from configparser import RawConfigParser, NoOptionError | |||||||
|  |  | ||||||
| pwd = os.path.dirname(os.path.abspath(__file__)) | pwd = os.path.dirname(os.path.abspath(__file__)) | ||||||
|  |  | ||||||
|  | AVG_SPEED_FILE = '.avgspeed.txt' | ||||||
| VIDEO_EXTENSIONS = ('.3g2', '.3gp', '.3gp2', '.3gpp', '.60d', '.ajp', '.asf', '.asx', '.avchd', '.avi', '.bik', | VIDEO_EXTENSIONS = ('.3g2', '.3gp', '.3gp2', '.3gpp', '.60d', '.ajp', '.asf', '.asx', '.avchd', '.avi', '.bik', | ||||||
|                     '.bix', '.box', '.cam', '.dat', '.divx', '.dmf', '.dv', '.dvr-ms', '.evo', '.flc', '.fli', |                     '.bix', '.box', '.cam', '.dat', '.divx', '.dmf', '.dv', '.dvr-ms', '.evo', '.flc', '.fli', | ||||||
|                     '.flic', '.flv', '.flx', '.gvi', '.gvp', '.h264', '.m1v', '.m2p', '.m2v', '.m4e', |                     '.flic', '.flv', '.flx', '.gvi', '.gvp', '.h264', '.m1v', '.m2p', '.m2v', '.m4e', | ||||||
| @@ -35,20 +36,23 @@ def getConfig(): | |||||||
|   return config |   return config | ||||||
|  |  | ||||||
| def writeAvgSpeedToDisk(speed): | def writeAvgSpeedToDisk(speed): | ||||||
|   path = os.path.join(pwd, '.avgspeed.txt') |   path = os.path.join(pwd, AVG_SPEED_FILE) | ||||||
|  |  | ||||||
|   with open(path, 'w') as f: |   with open(path, 'w') as f: | ||||||
|     f.write(str(int(speed or 100))) |     f.write(str(int(speed))) | ||||||
|     f.close() |     f.close() | ||||||
|  |  | ||||||
| def readAvgSpeedFromDisk(): | def readAvgSpeedFromDisk(): | ||||||
|   path = os.path.join(pwd, '.avgspeed.txt') |   path = os.path.join(pwd, AVG_SPEED_FILE) | ||||||
|  |  | ||||||
|   with open(path, 'r') as f: |   with open(path, 'r') as f: | ||||||
|     data = f.readline() |     data = f.readline() | ||||||
|     f.close() |     f.close() | ||||||
|  |  | ||||||
|   if data == '': |   speed = None | ||||||
|     data = '1' |   try: | ||||||
|  |     speed = int(data) | ||||||
|  |   except: | ||||||
|  |     pass | ||||||
|  |  | ||||||
|   return int(data) |   return speed | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user