fetch routers in parallel (optionally)

This commit is contained in:
Leon Morten Richter
2022-12-28 14:18:42 +01:00
parent 7bdd6a98ed
commit 866751ad64
3 changed files with 157 additions and 88 deletions

View File

@@ -25,3 +25,5 @@
verbose_mode = False # Set it on for troubleshooting verbose_mode = False # Set it on for troubleshooting
fetch_routers_in_parallel = False # Set to True if you want to fetch multiple routers parallel
max_worker_threads = 5 # Max number of worker threads that can fetch routers. Meaningless if fetch_routers_in_parallel is set to False

View File

@@ -1,17 +1,19 @@
# coding=utf8 # coding=utf8
## Copyright (c) 2020 Arseniy Kuznetsov # Copyright (c) 2020 Arseniy Kuznetsov
## ##
## This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
## modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
## as published by the Free Software Foundation; either version 2 # as published by the Free Software Foundation; either version 2
## of the License, or (at your option) any later version. # of the License, or (at your option) any later version.
## ##
## This program is distributed in the hope that it will be useful, # This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of # but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details. # GNU General Public License for more details.
import os, sys, shutil import os
import sys
import shutil
from collections import namedtuple from collections import namedtuple
from configobj import ConfigObj from configobj import ConfigObj
from abc import ABCMeta, abstractmethod from abc import ABCMeta, abstractmethod
@@ -22,6 +24,7 @@ from mktxp.utils.utils import FSHelper
''' MKTXP conf file handling ''' MKTXP conf file handling
''' '''
class MKTXPConfigKeys: class MKTXPConfigKeys:
''' MKTXP config file keys ''' MKTXP config file keys
''' '''
@@ -38,12 +41,12 @@ class MKTXPConfigKeys:
FE_PACKAGE_KEY = 'installed_packages' FE_PACKAGE_KEY = 'installed_packages'
FE_DHCP_KEY = 'dhcp' FE_DHCP_KEY = 'dhcp'
FE_DHCP_LEASE_KEY = 'dhcp_lease' FE_DHCP_LEASE_KEY = 'dhcp_lease'
FE_DHCP_POOL_KEY = 'pool' FE_DHCP_POOL_KEY = 'pool'
FE_IP_CONNECTIONS_KEY = 'connections' FE_IP_CONNECTIONS_KEY = 'connections'
FE_INTERFACE_KEY = 'interface' FE_INTERFACE_KEY = 'interface'
FE_FIREWALL_KEY = 'firewall' FE_FIREWALL_KEY = 'firewall'
FE_IPV6_FIREWALL_KEY = 'ipv6_firewall' FE_IPV6_FIREWALL_KEY = 'ipv6_firewall'
FE_IPV6_NEIGHBOR_KEY = 'ipv6_neighbor' FE_IPV6_NEIGHBOR_KEY = 'ipv6_neighbor'
@@ -60,7 +63,7 @@ class MKTXPConfigKeys:
FE_USER_KEY = 'user' FE_USER_KEY = 'user'
FE_QUEUE_KEY = 'queue' FE_QUEUE_KEY = 'queue'
MKTXP_SOCKET_TIMEOUT = 'socket_timeout' MKTXP_SOCKET_TIMEOUT = 'socket_timeout'
MKTXP_INITIAL_DELAY = 'initial_delay_on_failure' MKTXP_INITIAL_DELAY = 'initial_delay_on_failure'
MKTXP_MAX_DELAY = 'max_delay_on_failure' MKTXP_MAX_DELAY = 'max_delay_on_failure'
MKTXP_INC_DIV = 'delay_inc_div' MKTXP_INC_DIV = 'delay_inc_div'
@@ -68,13 +71,14 @@ class MKTXPConfigKeys:
MKTXP_BANDWIDTH_TEST_INTERVAL = 'bandwidth_test_interval' MKTXP_BANDWIDTH_TEST_INTERVAL = 'bandwidth_test_interval'
MKTXP_VERBOSE_MODE = 'verbose_mode' MKTXP_VERBOSE_MODE = 'verbose_mode'
MKTXP_MIN_COLLECT_INTERVAL = 'minimal_collect_interval' MKTXP_MIN_COLLECT_INTERVAL = 'minimal_collect_interval'
MKTXP_FETCH_IN_PARALLEL = 'fetch_routers_in_parallel'
MKTXP_MAX_WORKER_THREADS = 'max_worker_threads'
# UnRegistered entries placeholder # UnRegistered entries placeholder
NO_ENTRIES_REGISTERED = 'NoEntriesRegistered' NO_ENTRIES_REGISTERED = 'NoEntriesRegistered'
MKTXP_USE_COMMENTS_OVER_NAMES = 'use_comments_over_names' MKTXP_USE_COMMENTS_OVER_NAMES = 'use_comments_over_names'
# Base router id labels # Base router id labels
ROUTERBOARD_NAME = 'routerboard_name' ROUTERBOARD_NAME = 'routerboard_name'
ROUTERBOARD_ADDRESS = 'routerboard_address' ROUTERBOARD_ADDRESS = 'routerboard_address'
@@ -89,20 +93,26 @@ class MKTXPConfigKeys:
DEFAULT_MKTXP_INC_DIV = 5 DEFAULT_MKTXP_INC_DIV = 5
DEFAULT_MKTXP_BANDWIDTH_TEST_INTERVAL = 420 DEFAULT_MKTXP_BANDWIDTH_TEST_INTERVAL = 420
DEFAULT_MKTXP_MIN_COLLECT_INTERVAL = 5 DEFAULT_MKTXP_MIN_COLLECT_INTERVAL = 5
DEFAULT_MKTXP_FETCH_IN_PARALLEL = False
DEFAULT_MKTXP_MAX_WORKER_THREADS = 5
BOOLEAN_KEYS_NO = {ENABLED_KEY, SSL_KEY, NO_SSL_CERTIFICATE,
SSL_CERTIFICATE_VERIFY, FE_IPV6_FIREWALL_KEY, FE_IPV6_NEIGHBOR_KEY}
BOOLEAN_KEYS_NO = {ENABLED_KEY, SSL_KEY, NO_SSL_CERTIFICATE, SSL_CERTIFICATE_VERIFY, FE_IPV6_FIREWALL_KEY, FE_IPV6_NEIGHBOR_KEY}
# Feature keys enabled by default # Feature keys enabled by default
BOOLEAN_KEYS_YES = {FE_DHCP_KEY, FE_PACKAGE_KEY, FE_DHCP_LEASE_KEY, FE_DHCP_POOL_KEY, FE_IP_CONNECTIONS_KEY, FE_INTERFACE_KEY, FE_FIREWALL_KEY, BOOLEAN_KEYS_YES = {FE_DHCP_KEY, FE_PACKAGE_KEY, FE_DHCP_LEASE_KEY, FE_DHCP_POOL_KEY, FE_IP_CONNECTIONS_KEY, FE_INTERFACE_KEY, FE_FIREWALL_KEY,
FE_MONITOR_KEY, FE_ROUTE_KEY, MKTXP_USE_COMMENTS_OVER_NAMES, FE_MONITOR_KEY, FE_ROUTE_KEY, MKTXP_USE_COMMENTS_OVER_NAMES,
FE_WIRELESS_KEY, FE_WIRELESS_CLIENTS_KEY, FE_CAPSMAN_KEY, FE_CAPSMAN_CLIENTS_KEY, FE_POE_KEY, FE_WIRELESS_KEY, FE_WIRELESS_CLIENTS_KEY, FE_CAPSMAN_KEY, FE_CAPSMAN_CLIENTS_KEY, FE_POE_KEY,
FE_NETWATCH_KEY, FE_PUBLIC_IP_KEY, FE_USER_KEY, FE_QUEUE_KEY} FE_NETWATCH_KEY, FE_PUBLIC_IP_KEY, FE_USER_KEY, FE_QUEUE_KEY}
SYSTEM_BOOLEAN_KEYS_YES = {MKTXP_BANDWIDTH_KEY} SYSTEM_BOOLEAN_KEYS_YES = {MKTXP_BANDWIDTH_KEY}
SYSTEM_BOOLEAN_KEYS_NO = {MKTXP_VERBOSE_MODE} SYSTEM_BOOLEAN_KEYS_NO = {MKTXP_VERBOSE_MODE, MKTXP_FETCH_IN_PARALLEL}
STR_KEYS = (HOST_KEY, USER_KEY, PASSWD_KEY) STR_KEYS = (HOST_KEY, USER_KEY, PASSWD_KEY)
MKTXP_INT_KEYS = (PORT_KEY, MKTXP_SOCKET_TIMEOUT, MKTXP_INITIAL_DELAY, MKTXP_MAX_DELAY, MKTXP_INC_DIV, MKTXP_BANDWIDTH_TEST_INTERVAL, MKTXP_MIN_COLLECT_INTERVAL) MKTXP_INT_KEYS = (PORT_KEY, MKTXP_SOCKET_TIMEOUT, MKTXP_INITIAL_DELAY, MKTXP_MAX_DELAY,
MKTXP_INC_DIV, MKTXP_BANDWIDTH_TEST_INTERVAL, MKTXP_MIN_COLLECT_INTERVAL,
MKTXP_MAX_WORKER_THREADS,)
# MKTXP config entry nane # MKTXP config entry nane
MKTXP_CONFIG_ENTRY_NAME = 'MKTXP' MKTXP_CONFIG_ENTRY_NAME = 'MKTXP'
@@ -110,21 +120,23 @@ class MKTXPConfigKeys:
class ConfigEntry: class ConfigEntry:
MKTXPConfigEntry = namedtuple('MKTXPConfigEntry', [MKTXPConfigKeys.ENABLED_KEY, MKTXPConfigKeys.HOST_KEY, MKTXPConfigKeys.PORT_KEY, MKTXPConfigEntry = namedtuple('MKTXPConfigEntry', [MKTXPConfigKeys.ENABLED_KEY, MKTXPConfigKeys.HOST_KEY, MKTXPConfigKeys.PORT_KEY,
MKTXPConfigKeys.USER_KEY, MKTXPConfigKeys.PASSWD_KEY, MKTXPConfigKeys.USER_KEY, MKTXPConfigKeys.PASSWD_KEY,
MKTXPConfigKeys.SSL_KEY, MKTXPConfigKeys.NO_SSL_CERTIFICATE, MKTXPConfigKeys.SSL_CERTIFICATE_VERIFY, MKTXPConfigKeys.SSL_KEY, MKTXPConfigKeys.NO_SSL_CERTIFICATE, MKTXPConfigKeys.SSL_CERTIFICATE_VERIFY,
MKTXPConfigKeys.FE_DHCP_KEY, MKTXPConfigKeys.FE_PACKAGE_KEY, MKTXPConfigKeys.FE_DHCP_LEASE_KEY, MKTXPConfigKeys.FE_DHCP_POOL_KEY, MKTXPConfigKeys.FE_INTERFACE_KEY, MKTXPConfigKeys.FE_DHCP_KEY, MKTXPConfigKeys.FE_PACKAGE_KEY, MKTXPConfigKeys.FE_DHCP_LEASE_KEY, MKTXPConfigKeys.FE_DHCP_POOL_KEY, MKTXPConfigKeys.FE_INTERFACE_KEY,
MKTXPConfigKeys.FE_FIREWALL_KEY, MKTXPConfigKeys.FE_MONITOR_KEY, MKTXPConfigKeys.FE_ROUTE_KEY, MKTXPConfigKeys.FE_WIRELESS_KEY, MKTXPConfigKeys.FE_WIRELESS_CLIENTS_KEY, MKTXPConfigKeys.FE_FIREWALL_KEY, MKTXPConfigKeys.FE_MONITOR_KEY, MKTXPConfigKeys.FE_ROUTE_KEY, MKTXPConfigKeys.FE_WIRELESS_KEY, MKTXPConfigKeys.FE_WIRELESS_CLIENTS_KEY,
MKTXPConfigKeys.FE_IP_CONNECTIONS_KEY, MKTXPConfigKeys.FE_CAPSMAN_KEY, MKTXPConfigKeys.FE_CAPSMAN_CLIENTS_KEY, MKTXPConfigKeys.FE_POE_KEY, MKTXPConfigKeys.FE_NETWATCH_KEY, MKTXPConfigKeys.FE_IP_CONNECTIONS_KEY, MKTXPConfigKeys.FE_CAPSMAN_KEY, MKTXPConfigKeys.FE_CAPSMAN_CLIENTS_KEY, MKTXPConfigKeys.FE_POE_KEY, MKTXPConfigKeys.FE_NETWATCH_KEY,
MKTXPConfigKeys.MKTXP_USE_COMMENTS_OVER_NAMES, MKTXPConfigKeys.FE_PUBLIC_IP_KEY, MKTXPConfigKeys.FE_IPV6_FIREWALL_KEY, MKTXPConfigKeys.FE_IPV6_NEIGHBOR_KEY, MKTXPConfigKeys.MKTXP_USE_COMMENTS_OVER_NAMES, MKTXPConfigKeys.FE_PUBLIC_IP_KEY, MKTXPConfigKeys.FE_IPV6_FIREWALL_KEY, MKTXPConfigKeys.FE_IPV6_NEIGHBOR_KEY,
MKTXPConfigKeys.FE_USER_KEY, MKTXPConfigKeys.FE_QUEUE_KEY MKTXPConfigKeys.FE_USER_KEY, MKTXPConfigKeys.FE_QUEUE_KEY
]) ])
MKTXPSystemEntry = namedtuple('MKTXPSystemEntry', [MKTXPConfigKeys.PORT_KEY, MKTXPConfigKeys.MKTXP_SOCKET_TIMEOUT, MKTXPSystemEntry = namedtuple('MKTXPSystemEntry', [MKTXPConfigKeys.PORT_KEY, MKTXPConfigKeys.MKTXP_SOCKET_TIMEOUT,
MKTXPConfigKeys.MKTXP_INITIAL_DELAY, MKTXPConfigKeys.MKTXP_MAX_DELAY, MKTXPConfigKeys.MKTXP_INITIAL_DELAY, MKTXPConfigKeys.MKTXP_MAX_DELAY,
MKTXPConfigKeys.MKTXP_INC_DIV, MKTXPConfigKeys.MKTXP_BANDWIDTH_KEY, MKTXPConfigKeys.MKTXP_INC_DIV, MKTXPConfigKeys.MKTXP_BANDWIDTH_KEY,
MKTXPConfigKeys.MKTXP_VERBOSE_MODE, MKTXPConfigKeys.MKTXP_BANDWIDTH_TEST_INTERVAL, MKTXPConfigKeys.MKTXP_MIN_COLLECT_INTERVAL]) MKTXPConfigKeys.MKTXP_VERBOSE_MODE, MKTXPConfigKeys.MKTXP_BANDWIDTH_TEST_INTERVAL,
MKTXPConfigKeys.MKTXP_MIN_COLLECT_INTERVAL, MKTXPConfigKeys.MKTXP_FETCH_IN_PARALLEL,
MKTXPConfigKeys.MKTXP_MAX_WORKER_THREADS])
class OSConfig(metaclass = ABCMeta): class OSConfig(metaclass=ABCMeta):
''' OS-related config ''' OS-related config
''' '''
@staticmethod @staticmethod
@@ -169,7 +181,7 @@ class LinuxConfig(OSConfig):
@property @property
def mktxp_user_dir_path(self): def mktxp_user_dir_path(self):
return FSHelper.full_path('~/mktxp') return FSHelper.full_path('~/mktxp')
#return FSHelper.full_path('/etc/mktxp') # return FSHelper.full_path('/etc/mktxp')
class MKTXPConfigHandler: class MKTXPConfigHandler:
@@ -183,11 +195,15 @@ class MKTXPConfigHandler:
os.makedirs(self.os_config.mktxp_user_dir_path) os.makedirs(self.os_config.mktxp_user_dir_path)
# if needed, stage the user config data # if needed, stage the user config data
self.usr_conf_data_path = os.path.join(self.os_config.mktxp_user_dir_path, 'mktxp.conf') self.usr_conf_data_path = os.path.join(
self.mktxp_conf_path = os.path.join(self.os_config.mktxp_user_dir_path, '_mktxp.conf') self.os_config.mktxp_user_dir_path, 'mktxp.conf')
self.mktxp_conf_path = os.path.join(
self.os_config.mktxp_user_dir_path, '_mktxp.conf')
self._create_os_path(self.usr_conf_data_path, 'mktxp/cli/config/mktxp.conf') self._create_os_path(self.usr_conf_data_path,
self._create_os_path(self.mktxp_conf_path, 'mktxp/cli/config/_mktxp.conf') 'mktxp/cli/config/mktxp.conf')
self._create_os_path(self.mktxp_conf_path,
'mktxp/cli/config/_mktxp.conf')
self.re_compiled = {} self.re_compiled = {}
@@ -225,7 +241,8 @@ class MKTXPConfigHandler:
def _create_os_path(self, os_path, resource_path): def _create_os_path(self, os_path, resource_path):
if not os.path.exists(os_path): if not os.path.exists(os_path):
# stage from the conf templates # stage from the conf templates
lookup_path = resource_filename(Requirement.parse("mktxp"), resource_path) lookup_path = resource_filename(
Requirement.parse("mktxp"), resource_path)
shutil.copy(lookup_path, os_path) shutil.copy(lookup_path, os_path)
def _config_entry_reader(self, entry_name): def _config_entry_reader(self, entry_name):
@@ -236,7 +253,7 @@ class MKTXPConfigHandler:
config_entry_reader[key] = self.config[entry_name].as_bool(key) config_entry_reader[key] = self.config[entry_name].as_bool(key)
else: else:
config_entry_reader[key] = True if key in MKTXPConfigKeys.BOOLEAN_KEYS_YES else False config_entry_reader[key] = True if key in MKTXPConfigKeys.BOOLEAN_KEYS_YES else False
write_needed = True # read from disk next time write_needed = True # read from disk next time
for key in MKTXPConfigKeys.STR_KEYS: for key in MKTXPConfigKeys.STR_KEYS:
config_entry_reader[key] = self.config[entry_name][key] config_entry_reader[key] = self.config[entry_name][key]
@@ -245,10 +262,12 @@ class MKTXPConfigHandler:
# port # port
if self.config[entry_name].get(MKTXPConfigKeys.PORT_KEY): if self.config[entry_name].get(MKTXPConfigKeys.PORT_KEY):
config_entry_reader[MKTXPConfigKeys.PORT_KEY] = self.config[entry_name].as_int(MKTXPConfigKeys.PORT_KEY) config_entry_reader[MKTXPConfigKeys.PORT_KEY] = self.config[entry_name].as_int(
MKTXPConfigKeys.PORT_KEY)
else: else:
config_entry_reader[MKTXPConfigKeys.PORT_KEY] = self._default_value_for_key(MKTXPConfigKeys.SSL_KEY, config_entry_reader[MKTXPConfigKeys.SSL_KEY]) config_entry_reader[MKTXPConfigKeys.PORT_KEY] = self._default_value_for_key(
write_needed = True # read from disk next time MKTXPConfigKeys.SSL_KEY, config_entry_reader[MKTXPConfigKeys.SSL_KEY])
write_needed = True # read from disk next time
if write_needed: if write_needed:
self.config[entry_name] = config_entry_reader self.config[entry_name] = config_entry_reader
@@ -266,33 +285,36 @@ class MKTXPConfigHandler:
system_entry_reader[key] = self._config[entry_name].as_int(key) system_entry_reader[key] = self._config[entry_name].as_int(key)
else: else:
system_entry_reader[key] = self._default_value_for_key(key) system_entry_reader[key] = self._default_value_for_key(key)
write_needed = True # read from disk next time write_needed = True # read from disk next time
for key in MKTXPConfigKeys.SYSTEM_BOOLEAN_KEYS_NO.union(MKTXPConfigKeys.SYSTEM_BOOLEAN_KEYS_YES): for key in MKTXPConfigKeys.SYSTEM_BOOLEAN_KEYS_NO.union(MKTXPConfigKeys.SYSTEM_BOOLEAN_KEYS_YES):
if self._config[entry_name].get(key): if self._config[entry_name].get(key):
system_entry_reader[key] = self._config[entry_name].as_bool(key) system_entry_reader[key] = self._config[entry_name].as_bool(
key)
else: else:
system_entry_reader[key] = True if key in MKTXPConfigKeys.SYSTEM_BOOLEAN_KEYS_YES else False system_entry_reader[key] = True if key in MKTXPConfigKeys.SYSTEM_BOOLEAN_KEYS_YES else False
write_needed = True # read from disk next time write_needed = True # read from disk next time
if write_needed: if write_needed:
self._config[entry_name] = system_entry_reader self._config[entry_name] = system_entry_reader
self._config.write() self._config.write()
return system_entry_reader return system_entry_reader
def _default_value_for_key(self, key, value = None): def _default_value_for_key(self, key, value=None):
return { return {
MKTXPConfigKeys.SSL_KEY: lambda value: MKTXPConfigKeys.DEFAULT_API_SSL_PORT if value else MKTXPConfigKeys.DEFAULT_API_PORT, MKTXPConfigKeys.SSL_KEY: lambda value: MKTXPConfigKeys.DEFAULT_API_SSL_PORT if value else MKTXPConfigKeys.DEFAULT_API_PORT,
MKTXPConfigKeys.PORT_KEY: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_PORT, MKTXPConfigKeys.PORT_KEY: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_PORT,
MKTXPConfigKeys.MKTXP_SOCKET_TIMEOUT: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_SOCKET_TIMEOUT, MKTXPConfigKeys.MKTXP_SOCKET_TIMEOUT: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_SOCKET_TIMEOUT,
MKTXPConfigKeys.MKTXP_INITIAL_DELAY: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_INITIAL_DELAY, MKTXPConfigKeys.MKTXP_INITIAL_DELAY: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_INITIAL_DELAY,
MKTXPConfigKeys.MKTXP_MAX_DELAY: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_MAX_DELAY, MKTXPConfigKeys.MKTXP_MAX_DELAY: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_MAX_DELAY,
MKTXPConfigKeys.MKTXP_INC_DIV: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_INC_DIV, MKTXPConfigKeys.MKTXP_INC_DIV: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_INC_DIV,
MKTXPConfigKeys.MKTXP_BANDWIDTH_TEST_INTERVAL: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_BANDWIDTH_TEST_INTERVAL, MKTXPConfigKeys.MKTXP_BANDWIDTH_TEST_INTERVAL: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_BANDWIDTH_TEST_INTERVAL,
MKTXPConfigKeys.MKTXP_MIN_COLLECT_INTERVAL: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_MIN_COLLECT_INTERVAL MKTXPConfigKeys.MKTXP_MIN_COLLECT_INTERVAL: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_MIN_COLLECT_INTERVAL,
}[key](value) MKTXPConfigKeys.MKTXP_FETCH_IN_PARALLEL: lambda _: MKTXPConfigKeys.DEFAULT_MKTXP_FETCH_IN_PARALLEL,
MKTXPConfigKeys.MKTXP_MAX_WORKER_THREADS: lambda _: MKTXPConfigKeys.DEFAULT_MKTXP_MAX_WORKER_THREADS,
}[key](value)
# Simplest possible Singleton impl # Simplest possible Singleton impl
config_handler = MKTXPConfigHandler() config_handler = MKTXPConfigHandler()

View File

@@ -1,32 +1,84 @@
# coding=utf8 # coding=utf8
## Copyright (c) 2020 Arseniy Kuznenowov # Copyright (c) 2020 Arseniy Kuznenowov
## ##
## This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
## modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
## as published by the Free Software Foundation; either version 2 # as published by the Free Software Foundation; either version 2
## of the License, or (at your option) any later version. # of the License, or (at your option) any later version.
## ##
## This program is distributed in the hope that it will be useful, # This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of # but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details. # GNU General Public License for more details.
from concurrent.futures import ThreadPoolExecutor
from timeit import default_timer from timeit import default_timer
from datetime import datetime from datetime import datetime
from mktxp.cli.config.config import config_handler from mktxp.cli.config.config import config_handler
class CollectorHandler: class CollectorHandler:
''' MKTXP Collectors Handler ''' MKTXP Collectors Handler
''' '''
def __init__(self, entries_handler, collector_registry): def __init__(self, entries_handler, collector_registry):
self.entries_handler = entries_handler self.entries_handler = entries_handler
self.collector_registry = collector_registry self.collector_registry = collector_registry
self.last_collect_timestamp = 0 self.last_collect_timestamp = 0
def collect_synchronous(self):
"""
Collect the metrics of all router entries defined in the current users configuration synchronously.
This function iterates over each router entry one-by-one.
Thus, the total runtime of this function scales linearly with the number of registered routers.
"""
for router_entry in self.entries_handler.router_entries:
if not router_entry.api_connection.is_connected():
# let's pick up on things in the next run
router_entry.api_connection.connect()
continue
for collector_ID, collect_func in self.collector_registry.registered_collectors.items():
start = default_timer()
yield from collect_func(router_entry)
router_entry.time_spent[collector_ID] += default_timer() - start
def collect_single(self, router_entry):
results = []
for collector_ID, collect_func in self.collector_registry.registered_collectors.items():
start = default_timer()
result = list(collect_func(router_entry))
results += result
router_entry.time_spent[collector_ID] += default_timer() - start
return results
def collect_parallel(self, max_worker_threads=5):
"""
Collect the metrics of all router entries defined in the current users configuration in parallel.
This function iterates over multiple routers in parallel (depending on the value of max_worker_threads).
Thus, the total runtime scales sub linearly (number_of_routers / max_worker_threads).
"""
with ThreadPoolExecutor(max_workers=max_worker_threads) as executor:
futures = []
for router_entry in self.entries_handler.router_entries:
if not router_entry.api_connection.is_connected():
# let's pick up on things in the next run
router_entry.api_connection.connect()
continue
# Publish the collection function as a future
future = executor.submit(self.collect_single, router_entry)
futures.append(future)
# Join all futures and collect their results
for future in futures:
results = future.result()
yield from results
def collect(self): def collect(self):
now = datetime.now().timestamp() now = datetime.now().timestamp()
diff = now - self.last_collect_timestamp diff = now - self.last_collect_timestamp
if diff < config_handler.system_entry().minimal_collect_interval: if diff < config_handler.system_entry().minimal_collect_interval:
if config_handler.system_entry().verbose_mode: if config_handler.system_entry().verbose_mode:
print(f'An attemp to collect metrics within minimal collection interval: {diff} < {config_handler.system_entry().minimal_collect_interval}') print(f'An attemp to collect metrics within minimal collection interval: {diff} < {config_handler.system_entry().minimal_collect_interval}')
@@ -36,17 +88,10 @@ class CollectorHandler:
yield from self.collector_registry.bandwidthCollector.collect() yield from self.collector_registry.bandwidthCollector.collect()
for router_entry in self.entries_handler.router_entries: # Check whether to run in parallel by looking at the mktxp system configuration
if not router_entry.api_connection.is_connected(): parallel = config_handler.system_entry().fetch_routers_in_parallel
# let's pick up on things in the next run max_worker_threads = config_handler.system_entry().max_worker_threads
router_entry.api_connection.connect() if parallel:
continue yield from self.collect_parallel(max_worker_threads=max_worker_threads)
else:
for collector_ID, collect_func in self.collector_registry.registered_collectors.items(): yield from self.collect_synchronous()
start = default_timer()
yield from collect_func(router_entry)
router_entry.time_spent[collector_ID] += default_timer() - start