13 Commits

Author SHA1 Message Date
snyk-bot
2a2fa1a9b5 fix: requirements.txt to reduce vulnerabilities
The following vulnerabilities are fixed by pinning transitive dependencies:
- https://snyk.io/vuln/SNYK-PYTHON-WEBSOCKETS-1582792
2021-11-14 05:26:14 +00:00
c09d35670e Added pypi version badge. 2021-07-05 11:54:58 +02:00
39661ea3db Naming conflict, renaming pypi project name to suffixed w/ -kevin. 2021-07-04 16:41:30 +02:00
df8fdbd4fa Moved appending pacakge to path from setup.py to __init__ file. 2021-07-04 16:35:52 +02:00
177d73c516 Removed log file. 2021-07-04 16:35:12 +02:00
518b5b0e78 Restructured project files.
Moved all packages files to delugeClient folder.
Split contents of deluge_cli to __main__, utils, deluge & torrent.
Config default changed from config.ini --> delugeClient/default_config.ini.
Setup.py updated with new entry.
2021-07-04 16:28:23 +02:00
9d34802957 Removed unused, and updated packages. 2021-07-04 16:24:26 +02:00
e36ba428a7 Setup file for building package. 2021-07-03 21:04:58 +02:00
57b1b4e1f2 log file added to gitignore 2020-04-19 14:25:44 +02:00
fada382a32 Removed relative import of utils. 2020-04-19 14:25:32 +02:00
8d943cb1ad Destroy flag added to remove function (default false) and argv parameters 2020-04-19 14:25:08 +02:00
bd516f34d8 Updated cryptography to version 2.5 2020-04-19 14:24:08 +02:00
cfe14dd1ba Changed formatting encoding from ascii to utf-8 2020-04-19 14:23:45 +02:00
16 changed files with 534 additions and 375 deletions

2
.gitignore vendored
View File

@@ -1,3 +1,5 @@
deluge_cli.log
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]

View File

@@ -5,6 +5,9 @@
<h4 align="center"> A easy to use Deluge CLI that can connect to Deluge RPC (even over ssh) written entirely in python.</h4>
<p align="center">
<a href="https://pypi.org/project/delugeClient-kevin/">
<img src="https://img.shields.io/pypi/v/delugeClient-kevin" />
</a>
<a href="https://snyk.io/test/github/kevinmidboe/delugeclient?targetFile=requirements.txt">
<img src="https://snyk.io/test/github/kevinmidboe/delugeclient/badge.svg?targetFile=requirements.txt" alt="Known Vulnerabilities" data-canonical-src="https://snyk.io/test/github/kevinmidboe/delugeclient?targetFile=requirements.txt" style="max-width:100%;">
</a>

View File

@@ -1,10 +0,0 @@
[Deluge]
HOST = YOUR_DELUGE_HOST
PORT = YOUR_DELUGE_PORT
USER = YOUR_DELUGE_USER
PASSWORD = YOUR_DELUGE_PASSWORD
[ssh]
HOST = YOUR_DELUGE_SERVER_IP
USER = YOUR_SSH_USER
PKEY = YOUR_SSH_PRIVATE_KEY_DIRECTORY

22
delugeClient/__init__.py Normal file
View File

@@ -0,0 +1,22 @@
import os
from sys import path
path.append(os.path.dirname(__file__))
__version__=0.1
import logging
from delugeUtils import BASE_DIR
logger = logging.getLogger('deluge_cli')
logger.setLevel(logging.DEBUG)
fh = logging.FileHandler(os.path.join(BASE_DIR, 'deluge_cli.log'))
fh.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
formatter = logging.Formatter('%(asctime)s %(levelname)8s %(name)s | %(message)s')
fh.setFormatter(formatter)
logger.addHandler(fh)
logger.addHandler(ch)

148
delugeClient/__main__.py Normal file
View File

@@ -0,0 +1,148 @@
#!/usr/bin/env python3.6
"""Custom delugeRPC client
Usage:
deluge_cli add MAGNET [DIR] [--json | --debug | --warning | --error]
deluge_cli search NAME [--json]
deluge_cli get TORRENT [--json | --debug | --warning | --error]
deluge_cli ls [--downloading | --seeding | --paused | --json]
deluge_cli toggle TORRENT
deluge_cli progress [--json]
deluge_cli rm NAME [--destroy] [--debug | --warning | --error]
deluge_cli (-h | --help)
deluge_cli --version
Arguments:
MAGNET Magnet link to add
DIR Directory to save to
TORRENT A selected torrent
Options:
-h --help Show this screen
--version Show version
--print Print response from commands
--json Print response as JSON
--debug Print all debug log
--warning Print only logged warnings
--error Print error messages (Error/Warning)
"""
import os
import sys
import signal
import logging
from docopt import docopt
from pprint import pprint
from deluge import Deluge
from utils import ColorizeFilter, BASE_DIR
from __init__ import __version__
logger = logging.getLogger('deluge_cli')
logger.setLevel(logging.DEBUG)
fh = logging.FileHandler(os.path.join(BASE_DIR, 'deluge_cli.log'))
fh.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
formatter = logging.Formatter('%(asctime)s %(levelname)8s %(name)s | %(message)s')
fh.setFormatter(formatter)
logger.addHandler(fh)
logger.addHandler(ch)
logger.addFilter(ColorizeFilter())
def signal_handler(signal, frame):
"""
Handle exit by Keyboardinterrupt
"""
logger.info('\nGood bye!')
sys.exit(0)
def main():
"""
Main function, parse the input
"""
signal.signal(signal.SIGINT, signal_handler)
arguments = docopt(__doc__, version=__version__)
# Set logging level for streamHandler
if arguments['--debug']:
ch.setLevel(logging.DEBUG)
elif arguments['--warning']:
ch.setLevel(logging.WARNING)
elif arguments['--error']:
ch.setLevel(logging.ERROR)
logger.info('Deluge client')
logger.debug(arguments)
# Get config settings
deluge = Deluge()
_id = arguments['TORRENT']
query = arguments['NAME']
magnet = arguments['MAGNET']
name = arguments['NAME']
_filter = [ a[2:] for a in ['--downloading', '--seeding', '--paused'] if arguments[a] ]
response = None
if arguments['add']:
logger.info('Add cmd selected with link {}'.format(magnet))
response = deluge.add(magnet)
if response is not None:
logger.info('Successfully added torrent.\nResponse from deluge: {}'.format(response))
else:
logger.warning('Add response returned empty: {}'.format(response))
elif arguments['search']:
logger.info('Search cmd selected for query: {}'.format(query))
response = deluge.search(query)
if response is not None or response != '[]':
logger.info('Search found {} torrents'.format(len(response)))
else:
logger.info('Empty response for search query.')
elif arguments['progress']:
logger.info('Progress cmd selected.')
response = deluge.progress()
elif arguments['get']:
logger.info('Get cmd selected for id: {}'.format(_id))
response = deluge.get(_id)
elif arguments['ls']:
logger.info('List cmd selected')
response = deluge.get_all(_filter=_filter)
elif arguments['toggle']:
logger.info('Toggling id: {}'.format(_id))
deluge.togglePaused(_id)
elif arguments['rm']:
destroy = arguments['--destroy']
logger.info('Remove by name: {}.'.format(name))
if destroy:
logger.info('Destroy set, removing files')
deluge.remove(name, destroy)
try:
if arguments['--json']:
if len(response) > 1:
print('[{}]'.format(','.join([t.toJSON() for t in response])))
else:
print(response[0].toJSON())
except KeyError as error:
logger.error('Unexpected error while trying to print')
raise error
return response
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,11 @@
[deluge]
host=
port=58846
user=
password=
[ssh]
host=
user=
password=
pkey=

167
delugeClient/deluge.py Normal file
View File

@@ -0,0 +1,167 @@
#!/usr/bin/env python3.6
import os
import re
import sys
import logging
import requests
import logging.config
from deluge_client import DelugeRPCClient
from sshtunnel import SSHTunnelForwarder
from delugeUtils import getConfig, BASE_DIR
from torrent import Torrent
logger = logging.getLogger('deluge_cli')
def split_words(string):
logger.debug('Splitting input: {} (type: {}) with split_words'.format(string, type(string)))
return re.findall(r"[\w\d']+", string.lower())
class Deluge(object):
"""docstring for ClassName"""
def __init__(self):
config = getConfig()
self.host = config['deluge']['host']
self.port = int(config['deluge']['port'])
self.user = config['deluge']['user']
self.password = config['deluge']['password']
self.ssh_host = config['ssh']['host']
self.ssh_user = config['ssh']['user']
self.ssh_pkey = config['ssh']['pkey']
self.ssh_password = config['ssh']['password']
self._connect()
def freeSpace(self):
return self.client.call('core.get_free_space')
def parseResponse(self, response):
torrents = []
for key in response:
torrent = response[key]
torrents.append(Torrent.fromDeluge(torrent))
return torrents
def _connect(self):
logger.info('Checking if script on same server as deluge RPC')
if self.host != 'localhost' and self.host is not None:
try:
if self.password:
self.tunnel = SSHTunnelForwarder(self.ssh_host, ssh_username=self.ssh_user, ssh_password=self.ssh_password,
local_bind_address=('localhost', self.port), remote_bind_address=('localhost', self.port))
elif self.pkey is not None:
self.tunnel = SSHTunnelForwarder(self.ssh_host, ssh_username=self.ssh_user, ssh_pkey=self.ssh_pkey,
local_bind_address=('localhost', self.port), remote_bind_address=('localhost', self.port))
except ValueError as error:
logger.error("Either password or private key path must be set in config.")
raise error
self.tunnel.start()
self.client = DelugeRPCClient(self.host, self.port, self.user, self.password)
self.client.connect()
def add(self, url):
logger.info('Adding magnet with url: {}.'.format(url))
if (url.startswith('magnet')):
return self.client.call('core.add_torrent_magnet', url, {})
elif url.startswith('http'):
magnet = self.getMagnetFromFile(url)
return self.client.call('core.add_torrent_magnet', magnet, {})
def get_all(self, _filter=None):
if (type(_filter) is list and len(_filter)):
if ('seeding' in _filter):
response = self.client.call('core.get_torrents_status', {'state': 'Seeding'}, [])
elif ('downloading' in _filter):
response = self.client.call('core.get_torrents_status', {'state': 'Downloading'}, [])
elif ('paused' in _filter):
response = self.client.call('core.get_torrents_status', {'paused': 'true'}, [])
else:
response = self.client.call('core.get_torrents_status', {}, [])
return self.parseResponse(response)
def search(self, query):
allTorrents = self.get_all()
torrentNamesMatchingQuery = []
if len(allTorrents):
for torrent in allTorrents:
if query in torrent.name:
torrentNamesMatchingQuery.append(torrent)
allTorrents = torrentNamesMatchingQuery
return allTorrents
q_list = split_words(query)
return [ t for t in self.get_all() if (set(q_list) <= set(split_words(t.name))) ]
def get(self, id):
response = self.client.call('core.get_torrent_status', id, {})
return Torrent.fromDeluge(response)
def togglePaused(self, id):
torrent = self.get(id)
if (torrent.paused):
response = self.client.call('core.resume_torrent', [id])
else:
response = self.client.call('core.pause_torrent', [id])
return response
def remove(self, name, destroy=False):
matches = list(filter(lambda t: t.name == name, self.get_all()))
logger.info('Matches for {}: {}'.format(name, matches))
if (len(matches) > 1):
raise ValueError('Multiple files found matching key. Unable to remove.')
elif (len(matches) == 1):
torrent = matches[0]
response = self.client.call('core.remove_torrent', torrent.key, destroy)
logger.info('Response: {}'.format(str(response)))
if (response == False):
raise AttributeError('Unable to remove torrent.')
return response
else:
logger.error('ERROR. No torrent found with that name.')
def filterOnValue(self, torrents, value):
filteredTorrents = []
for t in torrents:
value_template = {'key': None, 'name': None, value: None}
value_template['key'] = t.key
value_template['name'] = t.name
value_template[value] = getattr(t, value)
filteredTorrents.append(value_template)
return filteredTorrents
def progress(self):
attributes = ['progress', 'eta', 'state', 'finished']
all_torrents = self.get_all()
torrents = []
for i, attribute in enumerate(attributes):
if i < 1:
torrents = self.filterOnValue(all_torrents, attribute)
continue
torrents = [dict(e, **v) for e,v in zip(torrents, self.filterOnValue(all_torrents, attribute))]
return torrents
def __del__(self):
if hasattr(self, 'tunnel'):
logger.info('Closing ssh tunnel')
self.tunnel.stop()
def getMagnetFromFile(self, url):
logger.info('File url found, fetching magnet.')
r = requests.get(url, allow_redirects=False)
magnet = r.headers['Location']
logger.info('Found magnet: {}.'.format(magnet))
return magnet

View File

@@ -0,0 +1,81 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# @Author: kevinmidboe
# @Date: 2018-04-17 19:55:38
# @Last Modified by: KevinMidboe
# @Last Modified time: 2018-05-04 00:04:25
import os
import sys
import json
import shutil
import logging
import colored
import configparser
from pprint import pprint
from colored import stylize
__all__ = ('ColorizeFilter', )
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
logger = logging.getLogger('deluge_cli')
def checkConfigExists():
user_config_dir = os.path.expanduser("~") + "/.config/delugeClient"
config_dir = os.path.join(user_config_dir, 'config.ini')
def getConfig():
"""
Read path and get configuartion file with site settings
:return: config settings read from 'config.ini'
:rtype: configparser.ConfigParser
"""
config = configparser.ConfigParser()
user_config_dir = os.path.expanduser("~") + "/.config/delugeClient"
config_dir = os.path.join(user_config_dir, 'config.ini')
if not os.path.isfile(config_dir):
defaultConfig = os.path.join(BASE_DIR, 'default_config.ini')
logger.error('Missing config! Moved default.config.ini to {}.\nOpen this file and set all varaibles!'.format(config_dir))
os.makedirs(user_config_dir, exist_ok=True)
shutil.copyfile(defaultConfig, config_dir)
config.read(config_dir)
requiredParameters = [('deluge host', config['deluge']['host']), ('deluge port', config['deluge']['port']),
('deluge user', config['deluge']['user']), ('deluge password', config['deluge']['password']),
('ssh password', config['ssh']['user'])]
for key, value in requiredParameters:
if value == '':
logger.error('Missing value for variable: "{}" in config: \
"$HOME/.config/delugeClient/config.ini".'.format(key))
exit(1)
return config
class ColorizeFilter(logging.Filter):
"""
Class for setting specific colors to levels of severity for log output
"""
color_by_level = {
10: 'chartreuse_3b',
20: 'white',
30: 'orange_1',
40: 'red'
}
def filter(self, record):
record.raw_msg = record.msg
color = self.color_by_level.get(record.levelno)
if color:
record.msg = stylize(record.msg, colored.fg(color))
return True
def convert(data):
if isinstance(data, bytes): return data.decode('utf-8')
if isinstance(data, dict): return dict(map(convert, data.items()))
if isinstance(data, tuple): return map(convert, data)
json_data = json.dumps(data)
return json_data

48
delugeClient/torrent.py Normal file
View File

@@ -0,0 +1,48 @@
import json
import logging
from distutils.util import strtobool
from utils import convert
logger = logging.getLogger('deluge_cli')
class Torrent(object):
def __init__(self, key, name, progress, eta, save_path, state, paused, finished, files):
super(Torrent, self).__init__()
self.key = key
self.name = name
self.progress = "{0:.2f}".format(float(progress))
self.eta = eta
self.save_path = save_path
self.state = state
self.paused = paused
self.finished = finished
self.files = list(files)
def isFolder(self):
return len(self.files) > 1
def toBool(self, value):
return True if strtobool(value) else False
@classmethod
def fromDeluge(cls, d):
# Receive a dict with byte values, convert all elements to string values
d = convert(d)
toBool = lambda val: True if strtobool(val) else False
return cls(d['hash'], d['name'], d['progress'], d['eta'], d['save_path'], d['state'],
toBool(d['paused']), toBool(d['is_finished']), d['files'])
def toJSON(self, files=False):
torrentDict = {'key': self.key, 'name': self.name, 'progress': self.progress, 'eta': self.eta,
'save_path': self.save_path, 'state': self.state, 'paused': self.paused,
'finished': self.finished, 'files': self.files, 'is_folder': self.isFolder()}
if (files is False):
del torrentDict['files']
return json.dumps(torrentDict)
def __str__(self):
return "Name: {}, Progress: {}%, ETA: {}, State: {}, Paused: {}".format(
self.name, self.progress, self.eta, self.state, self.paused)

View File

@@ -1 +0,0 @@

View File

@@ -1,308 +0,0 @@
#!/usr/bin/env python3.6
"""Custom delugeRPC client
Usage:
deluge_cli add MAGNET [DIR] [--debug | --warning | --error]
deluge_cli search NAME
deluge_cli get TORRENT
deluge_cli ls [--downloading | --seeding | --paused]
deluge_cli toggle TORRENT
deluge_cli progress
deluge_cli rm NAME [--destroy | --debug | --warning | --error]
deluge_cli (-h | --help)
deluge_cli --version
Arguments:
MAGNET Magnet link to add
DIR Directory to save to
TORRENT A selected torrent
Options:
-h --help Show this screen
--version Show version
--debug Print all debug log
--warning Print only logged warnings
--error Print error messages (Error/Warning)
--destroy When removing choose to remove file on disk
"""
import argparse
import os
import sys
import re
import signal
import socket
import logging
import logging.config
import configparser
from distutils.util import strtobool
from pprint import pprint
from deluge_client import DelugeRPCClient
from sshtunnel import SSHTunnelForwarder
from docopt import docopt
from utils import ColorizeFilter, convert
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
logger = logging.getLogger('deluge_cli')
logger.setLevel(logging.DEBUG)
fh = logging.FileHandler(os.path.join(BASE_DIR, 'deluge_cli.log'))
fh.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
formatter = logging.Formatter('%(asctime)s %(levelname)8s %(name)s | %(message)s')
fh.setFormatter(formatter)
logger.addHandler(fh)
logger.addHandler(ch)
logger.addFilter(ColorizeFilter())
def getConfig():
"""
Read path and get configuartion file with site settings
:return: config settings read from 'config.ini'
:rtype: configparser.ConfigParser
"""
config = configparser.ConfigParser()
config_dir = os.path.join(BASE_DIR, 'config.ini')
config.read(config_dir)
config_values = list(dict(config.items('Deluge')).values())
config_values.extend(list(dict(config.items('ssh')).values()))
if any(value.startswith('YOUR') for value in config_values):
raise ValueError('Please set variables in config.ini file.')
return config
def split_words(string):
logger.debug('Splitting input: {} (type: {}) with split_words'.format(string, type(string)))
return re.findall(r"[\w\d']+", string.lower())
class Deluge(object):
"""docstring for ClassName"""
def __init__(self):
config = getConfig()
self.host = config['Deluge']['HOST']
self.port = int(config['Deluge']['PORT'])
self.user = config['Deluge']['USER']
self.password = config['Deluge']['PASSWORD']
self.ssh_host = config['ssh']['HOST']
self.ssh_user = config['ssh']['USER']
self.ssh_pkey = config['ssh']['PKEY']
self._connect()
def parseResponse(self, response):
torrents = []
for key in response:
torrent = response[key]
torrents.append(Torrent.fromDeluge(torrent))
return torrents
def _connect(self):
logger.info('Checking if script on same server as deluge RPC')
if (socket.gethostbyname(socket.gethostname()) != self.host):
self.tunnel = SSHTunnelForwarder(self.ssh_host, ssh_username=self.ssh_user, ssh_pkey=self.ssh_pkey,
local_bind_address=('localhost', self.port), remote_bind_address=('localhost', self.port))
self.tunnel.start()
self.client = DelugeRPCClient(self.host, self.port, self.user, self.password)
self.client.connect()
def add(self, url):
if (url.startswith('magnet')):
return self.client.call('core.add_torrent_magnet', url, {})
def get_all(self, _filter=None):
if (type(_filter) is list and len(_filter)):
if ('seeding' in _filter):
response = self.client.call('core.get_torrents_status', {'state': 'Seeding'}, [])
elif ('downloading' in _filter):
response = self.client.call('core.get_torrents_status', {'state': 'Downloading'}, [])
elif ('paused' in _filter):
response = self.client.call('core.get_torrents_status', {'paused': 'true'}, [])
else:
response = self.client.call('core.get_torrents_status', {}, [])
return self.parseResponse(response)
def search(self, query):
q_list = split_words(query)
return [ t for t in self.get_all() if (set(q_list) <= set(split_words(t.name))) ]
def get(self, id):
response = self.client.call('core.get_torrent_status', id, {})
return Torrent.fromDeluge(response)
def togglePaused(self, id):
torrent = self.get(id)
if (torrent.paused):
response = self.client.call('core.resume_torrent', [id])
else:
response = self.client.call('core.pause_torrent', [id])
print('Response:', response)
def remove(self, name, destroyFiles=False):
matches = list(filter(lambda t: t.name == name, self.get_all()))
logger.info('Matches for {}: {}'.format(name, matches))
if (len(matches) > 1):
raise ValueError('Multiple files found matching key. Unable to remove.')
elif (len(matches) == 1):
torrent = matches[0]
response = self.client.call('core.remove_torrent', torrent.key, destroyFiles)
logger.info('Response: {}'.format(str(response)))
if (response == False):
raise AttributeError('Unable to remove torrent.')
return response
else:
logger.error('ERROR. No torrent found with that name.')
def filterOnValue(self, torrents, value):
filteredTorrents = []
for t in torrents:
value_template = {'key': None, 'name': None, value: None}
value_template['key'] = t.key
value_template['name'] = t.name
value_template[value] = getattr(t, value)
filteredTorrents.append(value_template)
return filteredTorrents
def progress(self):
attributes = ['progress', 'eta', 'state', 'finished']
all_torrents = self.get_all()
torrents = []
for i, attribute in enumerate(attributes):
if i < 1:
torrents = self.filterOnValue(all_torrents, attribute)
continue
torrents = [dict(e, **v) for e,v in zip(torrents, self.filterOnValue(all_torrents, attribute))]
return torrents
def __del__(self):
if hasattr(self, 'tunnel'):
logger.info('Closing ssh tunnel')
self.tunnel.stop()
class Torrent(object):
def __init__(self, key, name, progress, eta, save_path, state, paused, finished, files):
super(Torrent, self).__init__()
self.key = key
self.name = name
self.progress = "{0:.2f}".format(float(progress))
self.eta = eta
self.save_path = save_path
self.state = state
self.paused = paused
self.finished = finished
self.files = list(files)
def isFolder(self):
return len(self.files) > 1
def toBool(self, value):
return True if strtobool(value) else False
@classmethod
def fromDeluge(cls, d):
# Receive a dict with byte values, convert all elements to string values
d = convert(d)
toBool = lambda val: True if strtobool(val) else False
return cls(d['hash'], d['name'], d['progress'], d['eta'], d['save_path'], d['state'],
toBool(d['paused']), toBool(d['is_finished']), d['files'])
def toJSON(self):
return {'key': self.key, 'name': self.name, 'progress': self.progress, 'eta': self.eta,
'save_path': self.save_path, 'state': self.state, 'paused': self.paused,
'finished': self.finished, 'files': self.files, 'is_folder': self.isFolder()}
def __str__(self):
return "Name: {}, Progress: {}%, ETA: {}, State: {}, Paused: {}".format(
self.name, self.progress, self.eta, self.state, self.paused)
def signal_handler(signal, frame):
"""
Handle exit by Keyboardinterrupt
"""
logger.info('\nGood bye!')
sys.exit(0)
def main():
"""
Main function, parse the input
"""
signal.signal(signal.SIGINT, signal_handler)
arguments = docopt(__doc__, version='1')
# Set logging level for streamHandler
if arguments['--debug']:
ch.setLevel(logging.DEBUG)
elif arguments['--warning']:
ch.setLevel(logging.WARNING)
elif arguments['--error']:
ch.setLevel(logging.ERROR)
logger.info('Deluge client')
logger.debug(arguments)
# Get config settings
deluge = Deluge()
_id = arguments['TORRENT']
query = arguments['NAME']
magnet = arguments['MAGNET']
name = arguments['NAME']
_filter = [ a[2:] for a in ['--downloading', '--seeding', '--paused'] if arguments[a] ]
print(_id, query, _filter)
if arguments['add']:
logger.info('Add cmd selected with link {}'.format(magnet))
response = deluge.add(magnet)
print('Add response: ', response)
elif arguments['search']:
logger.info('Search cmd selected for query: {}'.format(query))
response = deluge.search(query)
[ pprint(t.toJSON()) for t in response ]
elif arguments['progress']:
logger.info('Progress cmd selected.')
pprint(deluge.progress())
exit(0)
[ pprint(t.toJSON()) for t in deluge.progress() ]
elif arguments['get']:
logger.info('Get cmd selected for id: {}'.format(_id))
response = deluge.get(_id)
pprint(response.toJSON())
elif arguments['ls']:
logger.info('List cmd selected')
[ pprint(t.toJSON()) for t in deluge.get_all(_filter=_filter) ]
elif arguments['toggle']:
logger.info('Toggling id: {}'.format(_id))
deluge.togglePaused(_id)
elif arguments['rm']:
destroy = arguments['--destroy']
logger.info('Remove by name: {}. Destroy files: {}'.format(name, destroy))
deluge.remove(name, destroy)
if __name__ == '__main__':
main()

7
pyproject.toml Normal file
View File

@@ -0,0 +1,7 @@
[build-system]
requires = [
"setuptools>=42",
"wheel"
]
build-backend = "setuptools.build_meta"

View File

@@ -1,14 +1,6 @@
asn1crypto==0.24.0
bcrypt==3.1.4
cffi==1.11.5
colored==1.3.5
cryptography==2.3
deluge-client==1.6.0
colored==1.4.2
deluge-client==1.9.0
docopt==0.6.2
idna==2.7
pyasn1==0.4.4
pycparser==2.18
PyNaCl==1.2.1
six==1.11.0
sshtunnel==0.1.4
websockets==6.0
requests==2.25.1
sshtunnel==0.4.0
websockets==10.0

39
setup.py Normal file
View File

@@ -0,0 +1,39 @@
from setuptools import setup, find_packages
import delugeClient
with open("README.md", "r", encoding="utf-8") as fh:
long_description = fh.read()
setup(
name="delugeClient-kevin",
version=delugeClient.__version__,
author="KevinMidboe",
description="Deluge client with custom functions written in python",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/kevinmidboe/delugeClient",
install_requires=[
'colored==1.4.2',
'deluge-client==1.9.0',
'docopt==0.6.2',
'requests==2.25.1',
'sshtunnel==0.4.0',
'websockets==9.1'
],
classifiers=[
'Programming Language :: Python',
'Operating System :: OS Independent',
'Programming Language :: Python :: 3.6',
],
entry_points={
'console_scripts': [
'delugeclient = delugeClient.__main__:main',
],
},
packages=find_packages(),
package_data={
'delugeClient': ['default_config.ini'],
},
python_requires=">=3.6",
)

View File

@@ -1,42 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# @Author: kevinmidboe
# @Date: 2018-04-17 19:55:38
# @Last Modified by: KevinMidboe
# @Last Modified time: 2018-05-04 00:04:25
import logging
import colored
import json
from pprint import pprint
from colored import stylize
__all__ = ('ColorizeFilter', )
class ColorizeFilter(logging.Filter):
"""
Class for setting specific colors to levels of severity for log output
"""
color_by_level = {
10: 'chartreuse_3b',
20: 'white',
30: 'orange_1',
40: 'red'
}
logger = logging.getLogger('deluge_cli')
def filter(self, record):
record.raw_msg = record.msg
color = self.color_by_level.get(record.levelno)
if color:
record.msg = stylize(record.msg, colored.fg(color))
return True
def convert(data):
if isinstance(data, bytes): return data.decode('utf-8')
if isinstance(data, dict): return dict(map(convert, data.items()))
if isinstance(data, tuple): return map(convert, data)
json_data = json.dumps(data)
return json_data