Better validation of args and logging of non success responses

This commit is contained in:
2022-11-24 00:20:16 +01:00
parent b076b1274b
commit 7ef58745e1
2 changed files with 70 additions and 15 deletions

View File

@@ -9,7 +9,7 @@ import typer
from pprint import pprint
from deluge import Deluge
from utils import ColorizeFilter, BASE_DIR
from utils import ColorizeFilter, BASE_DIR, validHash, convertFilesize
from __version__ import __version__
from __init__ import addHandler
@@ -19,16 +19,17 @@ addHandler(ch)
logger = logging.getLogger('deluge_cli')
app = typer.Typer()
deluge = Deluge()
deluge = None
def signal_handler(signal, frame):
"""
Handle exit by Keyboardinterrupt
"""
global deluge
del deluge
logger.info('\nGood bye!')
sys.exit(0)
sys.exit(1)
def handleKeyboardInterrupt():
signal.signal(signal.SIGINT, signal_handler)
@@ -53,22 +54,30 @@ def printResponse(response, json=False):
raise error
@app.command()
def add(magnet: str):
def add(magnet: str, json: bool = typer.Option(False, help="Print as json")):
'''
Add magnet torrent
'''
logger.debug('Add command selected')
logger.info('Add command selected')
logger.debug(magnet)
response = deluge.add(magnet)
printResponse(response)
if validHash(response):
torrent = deluge.get(response)
printResponse(torrent, json)
else:
logger.info('Unable to add torrent')
@app.command()
def ls(json: bool = typer.Option(False, help="Print as json")):
'''
List all torrents
'''
logger.debug('List command selected')
logger.info('List command selected')
response = deluge.get_all()
if response is None:
logger.info('No torrents found')
return
printResponse(response, json)
@app.command()
@@ -76,7 +85,9 @@ def get(id: str, json: bool = typer.Option(False, help="Print as json")):
'''
Get torrent by id or hash
'''
logger.debug('Get command selected for id {}'.format(id))
logger.info('Get command selected for id: {}'.format(id))
if not validHash(id):
return logger.info("Id is not valid")
response = deluge.get(id)
printResponse(response, json)
@@ -85,27 +96,50 @@ def toggle(id: str):
'''
Toggle torrent download state
'''
logger.debug('Toggle command selected for id {}'.format(id))
response = deluge.toggle(id)
printResponse(response)
logger.info('Toggle command selected for id: {}'.format(id))
if not validHash(id):
return logger.info("Id is not valid")
deluge.toggle(id)
torrent = deluge.get(id)
printResponse(torrent)
@app.command()
def search(query: str, json: bool = typer.Option(False, help="Print as json")):
'''
Search for string segment in torrent name
'''
logger.debug('Search command selected with query: {}'.format(query))
logger.info('Search command selected with query: {}'.format(query))
response = deluge.search(query)
printResponse(response, json)
@app.command()
def remove(id: str, destroy: bool = typer.Option(False, help="Remove torrent data")):
def rm(name: str, destroy: bool = typer.Option(False, help="Remove torrent by name")):
'''
Remove torrent by name
'''
logger.info('Removing torrent with name: {}, destroy flag: {}'.format(name, destroy))
response = deluge.removeByName(name, destroy)
@app.command()
def remove(id: str, destroy: bool = typer.Option(False, help="Remove torrent by id")):
'''
Remove torrent by id or hash
'''
logger.debug('Remove command selected for id: {} with destroy: {}'.format(id, destroy))
logger.info('Removing torrent with id: {}, destroy flag: {}'.format(id, destroy))
if not validHash(id):
return logger.info("Id is not valid")
response = deluge.remove(id, destroy)
printResponse(response)
@app.command()
def disk():
'''
Get free disk space
'''
response = deluge.freeSpace()
if response == None or not isinstance(response, int):
logger.error("Unable to get available disk space")
return
print(convertFilesize(response))
@app.command()
def version():
@@ -130,6 +164,10 @@ def defaultOptions(debug: bool = typer.Option(False, '--debug', help='Set log le
elif debug == True:
ch.setLevel(logging.DEBUG)
# Initiate deluge
global deluge
deluge = Deluge()
def main():
app()
del deluge

View File

@@ -79,3 +79,20 @@ def convert(data):
if isinstance(data, tuple): return map(convert, data)
json_data = json.dumps(data)
return json_data
def validHash(hash: str):
try:
return hash and len(hash) == 40 and int(hash, 16)
except ValueError:
return False
import math
def convertFilesize(size_bytes):
if size_bytes == None or size_bytes == 0:
return "0B"
size_name = ("B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB")
i = int(math.floor(math.log(size_bytes, 1024)))
p = math.pow(1024, i)
s = round(size_bytes / p, 2)
return "%s %s" % (s, size_name[i])