From 866751ad64089a6ea0c3d3b58c1f049d3bbf1d67 Mon Sep 17 00:00:00 2001 From: Leon Morten Richter Date: Wed, 28 Dec 2022 14:18:42 +0100 Subject: [PATCH 1/3] fetch routers in parallel (optionally) --- mktxp/cli/config/_mktxp.conf | 2 + mktxp/cli/config/config.py | 144 ++++++++++++++++++-------------- mktxp/flow/collector_handler.py | 99 ++++++++++++++++------ 3 files changed, 157 insertions(+), 88 deletions(-) diff --git a/mktxp/cli/config/_mktxp.conf b/mktxp/cli/config/_mktxp.conf index cdefe21..7131b05 100644 --- a/mktxp/cli/config/_mktxp.conf +++ b/mktxp/cli/config/_mktxp.conf @@ -25,3 +25,5 @@ verbose_mode = False # Set it on for troubleshooting + fetch_routers_in_parallel = False # Set to True if you want to fetch multiple routers parallel + max_worker_threads = 5 # Max number of worker threads that can fetch routers. Meaningless if fetch_routers_in_parallel is set to False diff --git a/mktxp/cli/config/config.py b/mktxp/cli/config/config.py index 2b320c6..ad943e6 100755 --- a/mktxp/cli/config/config.py +++ b/mktxp/cli/config/config.py @@ -1,17 +1,19 @@ # coding=utf8 -## Copyright (c) 2020 Arseniy Kuznetsov +# Copyright (c) 2020 Arseniy Kuznetsov ## -## This program is free software; you can redistribute it and/or -## modify it under the terms of the GNU General Public License -## as published by the Free Software Foundation; either version 2 -## of the License, or (at your option) any later version. +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. ## -## This program is distributed in the hope that it will be useful, -## but WITHOUT ANY WARRANTY; without even the implied warranty of -## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -## GNU General Public License for more details. +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. -import os, sys, shutil +import os +import sys +import shutil from collections import namedtuple from configobj import ConfigObj from abc import ABCMeta, abstractmethod @@ -22,6 +24,7 @@ from mktxp.utils.utils import FSHelper ''' MKTXP conf file handling ''' + class MKTXPConfigKeys: ''' MKTXP config file keys ''' @@ -38,12 +41,12 @@ class MKTXPConfigKeys: FE_PACKAGE_KEY = 'installed_packages' FE_DHCP_KEY = 'dhcp' - FE_DHCP_LEASE_KEY = 'dhcp_lease' - FE_DHCP_POOL_KEY = 'pool' - FE_IP_CONNECTIONS_KEY = 'connections' + FE_DHCP_LEASE_KEY = 'dhcp_lease' + FE_DHCP_POOL_KEY = 'pool' + FE_IP_CONNECTIONS_KEY = 'connections' FE_INTERFACE_KEY = 'interface' FE_FIREWALL_KEY = 'firewall' - + FE_IPV6_FIREWALL_KEY = 'ipv6_firewall' FE_IPV6_NEIGHBOR_KEY = 'ipv6_neighbor' @@ -60,7 +63,7 @@ class MKTXPConfigKeys: FE_USER_KEY = 'user' FE_QUEUE_KEY = 'queue' - MKTXP_SOCKET_TIMEOUT = 'socket_timeout' + MKTXP_SOCKET_TIMEOUT = 'socket_timeout' MKTXP_INITIAL_DELAY = 'initial_delay_on_failure' MKTXP_MAX_DELAY = 'max_delay_on_failure' MKTXP_INC_DIV = 'delay_inc_div' @@ -68,13 +71,14 @@ class MKTXPConfigKeys: MKTXP_BANDWIDTH_TEST_INTERVAL = 'bandwidth_test_interval' MKTXP_VERBOSE_MODE = 'verbose_mode' MKTXP_MIN_COLLECT_INTERVAL = 'minimal_collect_interval' + MKTXP_FETCH_IN_PARALLEL = 'fetch_routers_in_parallel' + MKTXP_MAX_WORKER_THREADS = 'max_worker_threads' # UnRegistered entries placeholder NO_ENTRIES_REGISTERED = 'NoEntriesRegistered' MKTXP_USE_COMMENTS_OVER_NAMES = 'use_comments_over_names' - # Base router id labels ROUTERBOARD_NAME = 'routerboard_name' ROUTERBOARD_ADDRESS = 'routerboard_address' @@ -89,20 +93,26 @@ class MKTXPConfigKeys: DEFAULT_MKTXP_INC_DIV = 5 DEFAULT_MKTXP_BANDWIDTH_TEST_INTERVAL = 420 DEFAULT_MKTXP_MIN_COLLECT_INTERVAL = 5 + DEFAULT_MKTXP_FETCH_IN_PARALLEL = False + DEFAULT_MKTXP_MAX_WORKER_THREADS = 5 + + + BOOLEAN_KEYS_NO = {ENABLED_KEY, SSL_KEY, NO_SSL_CERTIFICATE, + SSL_CERTIFICATE_VERIFY, FE_IPV6_FIREWALL_KEY, FE_IPV6_NEIGHBOR_KEY} - BOOLEAN_KEYS_NO = {ENABLED_KEY, SSL_KEY, NO_SSL_CERTIFICATE, SSL_CERTIFICATE_VERIFY, FE_IPV6_FIREWALL_KEY, FE_IPV6_NEIGHBOR_KEY} - # Feature keys enabled by default BOOLEAN_KEYS_YES = {FE_DHCP_KEY, FE_PACKAGE_KEY, FE_DHCP_LEASE_KEY, FE_DHCP_POOL_KEY, FE_IP_CONNECTIONS_KEY, FE_INTERFACE_KEY, FE_FIREWALL_KEY, - FE_MONITOR_KEY, FE_ROUTE_KEY, MKTXP_USE_COMMENTS_OVER_NAMES, - FE_WIRELESS_KEY, FE_WIRELESS_CLIENTS_KEY, FE_CAPSMAN_KEY, FE_CAPSMAN_CLIENTS_KEY, FE_POE_KEY, - FE_NETWATCH_KEY, FE_PUBLIC_IP_KEY, FE_USER_KEY, FE_QUEUE_KEY} + FE_MONITOR_KEY, FE_ROUTE_KEY, MKTXP_USE_COMMENTS_OVER_NAMES, + FE_WIRELESS_KEY, FE_WIRELESS_CLIENTS_KEY, FE_CAPSMAN_KEY, FE_CAPSMAN_CLIENTS_KEY, FE_POE_KEY, + FE_NETWATCH_KEY, FE_PUBLIC_IP_KEY, FE_USER_KEY, FE_QUEUE_KEY} SYSTEM_BOOLEAN_KEYS_YES = {MKTXP_BANDWIDTH_KEY} - SYSTEM_BOOLEAN_KEYS_NO = {MKTXP_VERBOSE_MODE} + SYSTEM_BOOLEAN_KEYS_NO = {MKTXP_VERBOSE_MODE, MKTXP_FETCH_IN_PARALLEL} STR_KEYS = (HOST_KEY, USER_KEY, PASSWD_KEY) - MKTXP_INT_KEYS = (PORT_KEY, MKTXP_SOCKET_TIMEOUT, MKTXP_INITIAL_DELAY, MKTXP_MAX_DELAY, MKTXP_INC_DIV, MKTXP_BANDWIDTH_TEST_INTERVAL, MKTXP_MIN_COLLECT_INTERVAL) + MKTXP_INT_KEYS = (PORT_KEY, MKTXP_SOCKET_TIMEOUT, MKTXP_INITIAL_DELAY, MKTXP_MAX_DELAY, + MKTXP_INC_DIV, MKTXP_BANDWIDTH_TEST_INTERVAL, MKTXP_MIN_COLLECT_INTERVAL, + MKTXP_MAX_WORKER_THREADS,) # MKTXP config entry nane MKTXP_CONFIG_ENTRY_NAME = 'MKTXP' @@ -110,21 +120,23 @@ class MKTXPConfigKeys: class ConfigEntry: MKTXPConfigEntry = namedtuple('MKTXPConfigEntry', [MKTXPConfigKeys.ENABLED_KEY, MKTXPConfigKeys.HOST_KEY, MKTXPConfigKeys.PORT_KEY, - MKTXPConfigKeys.USER_KEY, MKTXPConfigKeys.PASSWD_KEY, - MKTXPConfigKeys.SSL_KEY, MKTXPConfigKeys.NO_SSL_CERTIFICATE, MKTXPConfigKeys.SSL_CERTIFICATE_VERIFY, - MKTXPConfigKeys.FE_DHCP_KEY, MKTXPConfigKeys.FE_PACKAGE_KEY, MKTXPConfigKeys.FE_DHCP_LEASE_KEY, MKTXPConfigKeys.FE_DHCP_POOL_KEY, MKTXPConfigKeys.FE_INTERFACE_KEY, - MKTXPConfigKeys.FE_FIREWALL_KEY, MKTXPConfigKeys.FE_MONITOR_KEY, MKTXPConfigKeys.FE_ROUTE_KEY, MKTXPConfigKeys.FE_WIRELESS_KEY, MKTXPConfigKeys.FE_WIRELESS_CLIENTS_KEY, - MKTXPConfigKeys.FE_IP_CONNECTIONS_KEY, MKTXPConfigKeys.FE_CAPSMAN_KEY, MKTXPConfigKeys.FE_CAPSMAN_CLIENTS_KEY, MKTXPConfigKeys.FE_POE_KEY, MKTXPConfigKeys.FE_NETWATCH_KEY, - MKTXPConfigKeys.MKTXP_USE_COMMENTS_OVER_NAMES, MKTXPConfigKeys.FE_PUBLIC_IP_KEY, MKTXPConfigKeys.FE_IPV6_FIREWALL_KEY, MKTXPConfigKeys.FE_IPV6_NEIGHBOR_KEY, - MKTXPConfigKeys.FE_USER_KEY, MKTXPConfigKeys.FE_QUEUE_KEY - ]) + MKTXPConfigKeys.USER_KEY, MKTXPConfigKeys.PASSWD_KEY, + MKTXPConfigKeys.SSL_KEY, MKTXPConfigKeys.NO_SSL_CERTIFICATE, MKTXPConfigKeys.SSL_CERTIFICATE_VERIFY, + MKTXPConfigKeys.FE_DHCP_KEY, MKTXPConfigKeys.FE_PACKAGE_KEY, MKTXPConfigKeys.FE_DHCP_LEASE_KEY, MKTXPConfigKeys.FE_DHCP_POOL_KEY, MKTXPConfigKeys.FE_INTERFACE_KEY, + MKTXPConfigKeys.FE_FIREWALL_KEY, MKTXPConfigKeys.FE_MONITOR_KEY, MKTXPConfigKeys.FE_ROUTE_KEY, MKTXPConfigKeys.FE_WIRELESS_KEY, MKTXPConfigKeys.FE_WIRELESS_CLIENTS_KEY, + MKTXPConfigKeys.FE_IP_CONNECTIONS_KEY, MKTXPConfigKeys.FE_CAPSMAN_KEY, MKTXPConfigKeys.FE_CAPSMAN_CLIENTS_KEY, MKTXPConfigKeys.FE_POE_KEY, MKTXPConfigKeys.FE_NETWATCH_KEY, + MKTXPConfigKeys.MKTXP_USE_COMMENTS_OVER_NAMES, MKTXPConfigKeys.FE_PUBLIC_IP_KEY, MKTXPConfigKeys.FE_IPV6_FIREWALL_KEY, MKTXPConfigKeys.FE_IPV6_NEIGHBOR_KEY, + MKTXPConfigKeys.FE_USER_KEY, MKTXPConfigKeys.FE_QUEUE_KEY + ]) MKTXPSystemEntry = namedtuple('MKTXPSystemEntry', [MKTXPConfigKeys.PORT_KEY, MKTXPConfigKeys.MKTXP_SOCKET_TIMEOUT, - MKTXPConfigKeys.MKTXP_INITIAL_DELAY, MKTXPConfigKeys.MKTXP_MAX_DELAY, - MKTXPConfigKeys.MKTXP_INC_DIV, MKTXPConfigKeys.MKTXP_BANDWIDTH_KEY, - MKTXPConfigKeys.MKTXP_VERBOSE_MODE, MKTXPConfigKeys.MKTXP_BANDWIDTH_TEST_INTERVAL, MKTXPConfigKeys.MKTXP_MIN_COLLECT_INTERVAL]) + MKTXPConfigKeys.MKTXP_INITIAL_DELAY, MKTXPConfigKeys.MKTXP_MAX_DELAY, + MKTXPConfigKeys.MKTXP_INC_DIV, MKTXPConfigKeys.MKTXP_BANDWIDTH_KEY, + MKTXPConfigKeys.MKTXP_VERBOSE_MODE, MKTXPConfigKeys.MKTXP_BANDWIDTH_TEST_INTERVAL, + MKTXPConfigKeys.MKTXP_MIN_COLLECT_INTERVAL, MKTXPConfigKeys.MKTXP_FETCH_IN_PARALLEL, + MKTXPConfigKeys.MKTXP_MAX_WORKER_THREADS]) -class OSConfig(metaclass = ABCMeta): +class OSConfig(metaclass=ABCMeta): ''' OS-related config ''' @staticmethod @@ -169,7 +181,7 @@ class LinuxConfig(OSConfig): @property def mktxp_user_dir_path(self): return FSHelper.full_path('~/mktxp') - #return FSHelper.full_path('/etc/mktxp') + # return FSHelper.full_path('/etc/mktxp') class MKTXPConfigHandler: @@ -183,11 +195,15 @@ class MKTXPConfigHandler: os.makedirs(self.os_config.mktxp_user_dir_path) # if needed, stage the user config data - self.usr_conf_data_path = os.path.join(self.os_config.mktxp_user_dir_path, 'mktxp.conf') - self.mktxp_conf_path = os.path.join(self.os_config.mktxp_user_dir_path, '_mktxp.conf') + self.usr_conf_data_path = os.path.join( + self.os_config.mktxp_user_dir_path, 'mktxp.conf') + self.mktxp_conf_path = os.path.join( + self.os_config.mktxp_user_dir_path, '_mktxp.conf') - self._create_os_path(self.usr_conf_data_path, 'mktxp/cli/config/mktxp.conf') - self._create_os_path(self.mktxp_conf_path, 'mktxp/cli/config/_mktxp.conf') + self._create_os_path(self.usr_conf_data_path, + 'mktxp/cli/config/mktxp.conf') + self._create_os_path(self.mktxp_conf_path, + 'mktxp/cli/config/_mktxp.conf') self.re_compiled = {} @@ -225,7 +241,8 @@ class MKTXPConfigHandler: def _create_os_path(self, os_path, resource_path): if not os.path.exists(os_path): # stage from the conf templates - lookup_path = resource_filename(Requirement.parse("mktxp"), resource_path) + lookup_path = resource_filename( + Requirement.parse("mktxp"), resource_path) shutil.copy(lookup_path, os_path) def _config_entry_reader(self, entry_name): @@ -236,7 +253,7 @@ class MKTXPConfigHandler: config_entry_reader[key] = self.config[entry_name].as_bool(key) else: config_entry_reader[key] = True if key in MKTXPConfigKeys.BOOLEAN_KEYS_YES else False - write_needed = True # read from disk next time + write_needed = True # read from disk next time for key in MKTXPConfigKeys.STR_KEYS: config_entry_reader[key] = self.config[entry_name][key] @@ -245,10 +262,12 @@ class MKTXPConfigHandler: # port if self.config[entry_name].get(MKTXPConfigKeys.PORT_KEY): - config_entry_reader[MKTXPConfigKeys.PORT_KEY] = self.config[entry_name].as_int(MKTXPConfigKeys.PORT_KEY) + config_entry_reader[MKTXPConfigKeys.PORT_KEY] = self.config[entry_name].as_int( + MKTXPConfigKeys.PORT_KEY) else: - config_entry_reader[MKTXPConfigKeys.PORT_KEY] = self._default_value_for_key(MKTXPConfigKeys.SSL_KEY, config_entry_reader[MKTXPConfigKeys.SSL_KEY]) - write_needed = True # read from disk next time + config_entry_reader[MKTXPConfigKeys.PORT_KEY] = self._default_value_for_key( + MKTXPConfigKeys.SSL_KEY, config_entry_reader[MKTXPConfigKeys.SSL_KEY]) + write_needed = True # read from disk next time if write_needed: self.config[entry_name] = config_entry_reader @@ -266,33 +285,36 @@ class MKTXPConfigHandler: system_entry_reader[key] = self._config[entry_name].as_int(key) else: system_entry_reader[key] = self._default_value_for_key(key) - write_needed = True # read from disk next time + write_needed = True # read from disk next time for key in MKTXPConfigKeys.SYSTEM_BOOLEAN_KEYS_NO.union(MKTXPConfigKeys.SYSTEM_BOOLEAN_KEYS_YES): if self._config[entry_name].get(key): - system_entry_reader[key] = self._config[entry_name].as_bool(key) + system_entry_reader[key] = self._config[entry_name].as_bool( + key) else: system_entry_reader[key] = True if key in MKTXPConfigKeys.SYSTEM_BOOLEAN_KEYS_YES else False - write_needed = True # read from disk next time - + write_needed = True # read from disk next time + if write_needed: self._config[entry_name] = system_entry_reader self._config.write() - + return system_entry_reader - def _default_value_for_key(self, key, value = None): + def _default_value_for_key(self, key, value=None): return { - MKTXPConfigKeys.SSL_KEY: lambda value: MKTXPConfigKeys.DEFAULT_API_SSL_PORT if value else MKTXPConfigKeys.DEFAULT_API_PORT, - MKTXPConfigKeys.PORT_KEY: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_PORT, - MKTXPConfigKeys.MKTXP_SOCKET_TIMEOUT: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_SOCKET_TIMEOUT, - MKTXPConfigKeys.MKTXP_INITIAL_DELAY: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_INITIAL_DELAY, - MKTXPConfigKeys.MKTXP_MAX_DELAY: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_MAX_DELAY, - MKTXPConfigKeys.MKTXP_INC_DIV: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_INC_DIV, - MKTXPConfigKeys.MKTXP_BANDWIDTH_TEST_INTERVAL: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_BANDWIDTH_TEST_INTERVAL, - MKTXPConfigKeys.MKTXP_MIN_COLLECT_INTERVAL: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_MIN_COLLECT_INTERVAL - }[key](value) + MKTXPConfigKeys.SSL_KEY: lambda value: MKTXPConfigKeys.DEFAULT_API_SSL_PORT if value else MKTXPConfigKeys.DEFAULT_API_PORT, + MKTXPConfigKeys.PORT_KEY: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_PORT, + MKTXPConfigKeys.MKTXP_SOCKET_TIMEOUT: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_SOCKET_TIMEOUT, + MKTXPConfigKeys.MKTXP_INITIAL_DELAY: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_INITIAL_DELAY, + MKTXPConfigKeys.MKTXP_MAX_DELAY: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_MAX_DELAY, + MKTXPConfigKeys.MKTXP_INC_DIV: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_INC_DIV, + MKTXPConfigKeys.MKTXP_BANDWIDTH_TEST_INTERVAL: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_BANDWIDTH_TEST_INTERVAL, + MKTXPConfigKeys.MKTXP_MIN_COLLECT_INTERVAL: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_MIN_COLLECT_INTERVAL, + MKTXPConfigKeys.MKTXP_FETCH_IN_PARALLEL: lambda _: MKTXPConfigKeys.DEFAULT_MKTXP_FETCH_IN_PARALLEL, + MKTXPConfigKeys.MKTXP_MAX_WORKER_THREADS: lambda _: MKTXPConfigKeys.DEFAULT_MKTXP_MAX_WORKER_THREADS, + }[key](value) + # Simplest possible Singleton impl config_handler = MKTXPConfigHandler() - diff --git a/mktxp/flow/collector_handler.py b/mktxp/flow/collector_handler.py index 78eafea..834ab41 100644 --- a/mktxp/flow/collector_handler.py +++ b/mktxp/flow/collector_handler.py @@ -1,32 +1,84 @@ # coding=utf8 -## Copyright (c) 2020 Arseniy Kuznenowov +# Copyright (c) 2020 Arseniy Kuznenowov ## -## This program is free software; you can redistribute it and/or -## modify it under the terms of the GNU General Public License -## as published by the Free Software Foundation; either version 2 -## of the License, or (at your option) any later version. +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. ## -## This program is distributed in the hope that it will be useful, -## but WITHOUT ANY WARRANTY; without even the implied warranty of -## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -## GNU General Public License for more details. - +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +from concurrent.futures import ThreadPoolExecutor from timeit import default_timer from datetime import datetime from mktxp.cli.config.config import config_handler + class CollectorHandler: ''' MKTXP Collectors Handler ''' + def __init__(self, entries_handler, collector_registry): self.entries_handler = entries_handler self.collector_registry = collector_registry - self.last_collect_timestamp = 0 + self.last_collect_timestamp = 0 + + def collect_synchronous(self): + """ + Collect the metrics of all router entries defined in the current users configuration synchronously. + This function iterates over each router entry one-by-one. + Thus, the total runtime of this function scales linearly with the number of registered routers. + """ + for router_entry in self.entries_handler.router_entries: + if not router_entry.api_connection.is_connected(): + # let's pick up on things in the next run + router_entry.api_connection.connect() + continue + + for collector_ID, collect_func in self.collector_registry.registered_collectors.items(): + start = default_timer() + yield from collect_func(router_entry) + router_entry.time_spent[collector_ID] += default_timer() - start + + def collect_single(self, router_entry): + results = [] + for collector_ID, collect_func in self.collector_registry.registered_collectors.items(): + start = default_timer() + result = list(collect_func(router_entry)) + results += result + router_entry.time_spent[collector_ID] += default_timer() - start + return results + + def collect_parallel(self, max_worker_threads=5): + """ + Collect the metrics of all router entries defined in the current users configuration in parallel. + This function iterates over multiple routers in parallel (depending on the value of max_worker_threads). + Thus, the total runtime scales sub linearly (number_of_routers / max_worker_threads). + """ + with ThreadPoolExecutor(max_workers=max_worker_threads) as executor: + futures = [] + + for router_entry in self.entries_handler.router_entries: + if not router_entry.api_connection.is_connected(): + # let's pick up on things in the next run + router_entry.api_connection.connect() + continue + + # Publish the collection function as a future + future = executor.submit(self.collect_single, router_entry) + futures.append(future) + + # Join all futures and collect their results + for future in futures: + results = future.result() + yield from results def collect(self): - now = datetime.now().timestamp() - diff = now - self.last_collect_timestamp + now = datetime.now().timestamp() + diff = now - self.last_collect_timestamp if diff < config_handler.system_entry().minimal_collect_interval: if config_handler.system_entry().verbose_mode: print(f'An attemp to collect metrics within minimal collection interval: {diff} < {config_handler.system_entry().minimal_collect_interval}') @@ -36,17 +88,10 @@ class CollectorHandler: yield from self.collector_registry.bandwidthCollector.collect() - for router_entry in self.entries_handler.router_entries: - if not router_entry.api_connection.is_connected(): - # let's pick up on things in the next run - router_entry.api_connection.connect() - continue - - for collector_ID, collect_func in self.collector_registry.registered_collectors.items(): - start = default_timer() - yield from collect_func(router_entry) - router_entry.time_spent[collector_ID] += default_timer() - start - - - - + # Check whether to run in parallel by looking at the mktxp system configuration + parallel = config_handler.system_entry().fetch_routers_in_parallel + max_worker_threads = config_handler.system_entry().max_worker_threads + if parallel: + yield from self.collect_parallel(max_worker_threads=max_worker_threads) + else: + yield from self.collect_synchronous() From 660dd33369b27dae3c1b5ddfc08fd5527f1d994f Mon Sep 17 00:00:00 2001 From: Arseniy Kuznetsov Date: Fri, 30 Dec 2022 19:11:07 +0100 Subject: [PATCH 2/3] simple queue, parallel router fetch as_completed , config autoupdate fixes --- mktxp/cli/config/config.py | 35 +++++++++++++------- mktxp/collector/queue_collector.py | 52 ++++++++++++++---------------- mktxp/datasource/queue_ds.py | 42 +++++++++++++----------- mktxp/flow/collector_handler.py | 12 +++---- mktxp/flow/collector_registry.py | 2 ++ mktxp/flow/router_entry.py | 1 + 6 files changed, 80 insertions(+), 64 deletions(-) diff --git a/mktxp/cli/config/config.py b/mktxp/cli/config/config.py index ad943e6..1b4e677 100755 --- a/mktxp/cli/config/config.py +++ b/mktxp/cli/config/config.py @@ -247,13 +247,14 @@ class MKTXPConfigHandler: def _config_entry_reader(self, entry_name): config_entry_reader = {} - write_needed = False + new_keys = [] + for key in MKTXPConfigKeys.BOOLEAN_KEYS_NO.union(MKTXPConfigKeys.BOOLEAN_KEYS_YES): if self.config[entry_name].get(key): config_entry_reader[key] = self.config[entry_name].as_bool(key) else: config_entry_reader[key] = True if key in MKTXPConfigKeys.BOOLEAN_KEYS_YES else False - write_needed = True # read from disk next time + new_keys.append(key) # read from disk next time for key in MKTXPConfigKeys.STR_KEYS: config_entry_reader[key] = self.config[entry_name][key] @@ -267,25 +268,31 @@ class MKTXPConfigHandler: else: config_entry_reader[MKTXPConfigKeys.PORT_KEY] = self._default_value_for_key( MKTXPConfigKeys.SSL_KEY, config_entry_reader[MKTXPConfigKeys.SSL_KEY]) - write_needed = True # read from disk next time - - if write_needed: + new_keys.append(MKTXPConfigKeys.PORT_KEY) # read from disk next time + + if new_keys: self.config[entry_name] = config_entry_reader - self.config.write() + try: + self.config.write() + if self._config[MKTXPConfigKeys.MKTXP_CONFIG_ENTRY_NAME].as_bool(MKTXPConfigKeys.MKTXP_VERBOSE_MODE): + print(f'Updated router entry {entry_name} with new feature keys {new_keys}') + except Exception as exc: + print(f'Error updating router entry {entry_name} with new feature keys {new_keys}: {exc}') + print('Please update mktxp.conf to its latest version manually') return config_entry_reader def _system_entry_reader(self): system_entry_reader = {} entry_name = MKTXPConfigKeys.MKTXP_CONFIG_ENTRY_NAME - write_needed = False + new_keys = [] for key in MKTXPConfigKeys.MKTXP_INT_KEYS: if self._config[entry_name].get(key): system_entry_reader[key] = self._config[entry_name].as_int(key) else: system_entry_reader[key] = self._default_value_for_key(key) - write_needed = True # read from disk next time + new_keys.append(key) # read from disk next time for key in MKTXPConfigKeys.SYSTEM_BOOLEAN_KEYS_NO.union(MKTXPConfigKeys.SYSTEM_BOOLEAN_KEYS_YES): if self._config[entry_name].get(key): @@ -293,11 +300,17 @@ class MKTXPConfigHandler: key) else: system_entry_reader[key] = True if key in MKTXPConfigKeys.SYSTEM_BOOLEAN_KEYS_YES else False - write_needed = True # read from disk next time + new_keys.append(key) # read from disk next time - if write_needed: + if new_keys: self._config[entry_name] = system_entry_reader - self._config.write() + try: + self._config.write() + if self._config[entry_name].as_bool(MKTXPConfigKeys.MKTXP_VERBOSE_MODE): + print(f'Updated system entry {entry_name} with new system keys {new_keys}') + except Exception as exc: + print(f'Error updating system entry {entry_name} with new system keys {new_keys}: {exc}') + print('Please update _mktxp.conf to its latest version manually') return system_entry_reader diff --git a/mktxp/collector/queue_collector.py b/mktxp/collector/queue_collector.py index 46344af..f680880 100644 --- a/mktxp/collector/queue_collector.py +++ b/mktxp/collector/queue_collector.py @@ -13,75 +13,71 @@ from mktxp.collector.base_collector import BaseCollector -from mktxp.datasource.queue_ds import QueueTreeMetricsDataSource +from mktxp.datasource.queue_ds import QueueMetricsDataSource class QueueTreeCollector(BaseCollector): '''Queue Tree collector''' @staticmethod def collect(router_entry): - if not router_entry.config_entry.installed_packages: + if not router_entry.config_entry.queue: return - qt_labels = ['name', 'parent', 'packet_mark', 'limit_at', 'max_limit', 'priority', 'bytes', 'packets', 'queued_bytes', 'queued_packets','dropped', 'rate', 'packet_rate', 'disabled'] - qt_records = QueueTreeMetricsDataSource.metric_records(router_entry, metric_labels=qt_labels) + qt_labels = ['name', 'parent', 'packet_mark', 'limit_at', 'max_limit', 'priority', 'bytes', 'queued_bytes', 'dropped', 'rate', 'disabled'] + qt_records = QueueMetricsDataSource.metric_records(router_entry, metric_labels=qt_labels, kind = 'tree') if qt_records: qt_rate_metric = BaseCollector.counter_collector('queue_tree_rates', 'Average passing data rate in bytes per second', qt_records, 'rate', ['name']) yield qt_rate_metric - qt_packet_rate_metric = BaseCollector.counter_collector('queue_tree_packet_rates', 'Average passing data rate in packets per second', qt_records, 'packet_rate', ['name']) - yield qt_packet_rate_metric - qt_byte_metric = BaseCollector.counter_collector('queue_tree_bytes', 'Number of processed bytes', qt_records, 'bytes', ['name']) yield qt_byte_metric - qt_packet_metric = BaseCollector.counter_collector('queue_tree_pakets', 'Number of processed packets', qt_records, 'packets', ['name']) - yield qt_packet_metric - qt_queued_metric = BaseCollector.counter_collector('queue_tree_queued_bytes', 'Number of queued bytes', qt_records, 'queued_bytes', ['name']) yield qt_queued_metric - qt_queued_packets_metric = BaseCollector.counter_collector('queue_tree_queued_packets', 'Number of queued packets', qt_records, 'queued_packets', ['name']) - yield qt_queued_packets_metric - - - qt_drop_metric = BaseCollector.counter_collector('queue_tree_dropped', 'Number of dropped packets', qt_records, 'dropped', ['name']) + qt_drop_metric = BaseCollector.counter_collector('queue_tree_dropped', 'Number of dropped bytes', qt_records, 'dropped', ['name']) yield qt_drop_metric -class SimpleCollector(BaseCollector): +class QueueSimpleCollector(BaseCollector): '''Simple Queue collector''' @staticmethod def collect(router_entry): - if not router_entry.config_entry.installed_packages: + if not router_entry.config_entry.queue: return qt_labels = ['name', 'parent', 'packet_mark', 'limit_at', 'max_limit', 'priority', 'bytes', 'packets', 'queued_bytes', 'queued_packets','dropped', 'rate', 'packet_rate', 'disabled'] - qt_records = QueueTreeMetricsDataSource.metric_records(router_entry, metric_labels=qt_labels) + qt_records = QueueMetricsDataSource.metric_records(router_entry, metric_labels=qt_labels, kind = 'simple') if qt_records: - qt_rate_metric = BaseCollector.counter_collector('queue_tree_rates', 'Average passing data rate in bytes per second', qt_records, 'rate', ['name']) + qt_rate_metric = BaseCollector.counter_collector('queue_simple_rates_upload', 'Average passing upload data rate in bytes per second', qt_records, 'rate_up', ['name']) yield qt_rate_metric - qt_packet_rate_metric = BaseCollector.counter_collector('queue_tree_packet_rates', 'Average passing data rate in packets per second', qt_records, 'packet_rate', ['name']) - yield qt_packet_rate_metric + qt_rate_metric = BaseCollector.counter_collector('queue_simple_rates_download', 'Average passing download data rate in bytes per second', qt_records, 'rate_down', ['name']) + yield qt_rate_metric - qt_byte_metric = BaseCollector.counter_collector('queue_tree_bytes', 'Number of processed bytes', qt_records, 'bytes', ['name']) + + qt_byte_metric = BaseCollector.counter_collector('queue_simple_bytes_upload', 'Number of upload processed bytes', qt_records, 'bytes_up', ['name']) yield qt_byte_metric - qt_packet_metric = BaseCollector.counter_collector('queue_tree_pakets', 'Number of processed packets', qt_records, 'packets', ['name']) - yield qt_packet_metric + qt_byte_metric = BaseCollector.counter_collector('queue_simple_bytes_download', 'Number of download processed bytes', qt_records, 'bytes_down', ['name']) + yield qt_byte_metric - qt_queued_metric = BaseCollector.counter_collector('queue_tree_queued_bytes', 'Number of queued bytes', qt_records, 'queued_bytes', ['name']) + + qt_queued_metric = BaseCollector.counter_collector('queue_simple_queued_bytes_upload', 'Number of upload queued bytes', qt_records, 'queued_bytes_up', ['name']) yield qt_queued_metric - qt_queued_packets_metric = BaseCollector.counter_collector('queue_tree_queued_packets', 'Number of queued packets', qt_records, 'queued_packets', ['name']) - yield qt_queued_packets_metric + qt_queued_metric = BaseCollector.counter_collector('queue_simple_queued_bytes_downloadd', 'Number of download queued bytes', qt_records, 'queued_bytes_down', ['name']) + yield qt_queued_metric - qt_drop_metric = BaseCollector.counter_collector('queue_tree_dropped', 'Number of dropped packets', qt_records, 'dropped', ['name']) + qt_drop_metric = BaseCollector.counter_collector('queue_simple_dropped_upload', 'Number of upload dropped bytes', qt_records, 'dropped_up', ['name']) + yield qt_drop_metric + + + qt_drop_metric = BaseCollector.counter_collector('queue_simple_dropped_download', 'Number of download dropped bytes', qt_records, 'dropped_down', ['name']) yield qt_drop_metric diff --git a/mktxp/datasource/queue_ds.py b/mktxp/datasource/queue_ds.py index 8634936..632a80b 100644 --- a/mktxp/datasource/queue_ds.py +++ b/mktxp/datasource/queue_ds.py @@ -15,31 +15,37 @@ from mktxp.datasource.base_ds import BaseDSProcessor -class QueueTreeMetricsDataSource: - ''' Queue Tree Metrics data provider +class QueueMetricsDataSource: + ''' Queue Metrics data provider ''' @staticmethod - def metric_records(router_entry, *, metric_labels = None): + def metric_records(router_entry, *, metric_labels = None, kind = 'tree'): if metric_labels is None: metric_labels = [] try: - queue_tree_records = router_entry.api_connection.router_api().get_resource('/queue/tree/').get() - return BaseDSProcessor.trimmed_records(router_entry, router_records = queue_tree_records, metric_labels = metric_labels) + queue_records = router_entry.api_connection.router_api().get_resource(f'/queue/{kind}/').get() + queue_records = BaseDSProcessor.trimmed_records(router_entry, router_records = queue_records, metric_labels = metric_labels) except Exception as exc: print(f'Error getting system resource info from router{router_entry.router_name}@{router_entry.config_entry.hostname}: {exc}') return None + if kind == 'tree': + return queue_records + + # simple queue records need splitting upload/download values + splitted_queue_records = [] + for queue_record in queue_records: + splitted_queue_record = {} + for key, value in queue_record.items(): + split_values = value.split('/') + if split_values and len(split_values) > 1: + splitted_queue_record[f'{key}_up'] = split_values[0] + splitted_queue_record[f'{key}_down'] = split_values[1] + else: + splitted_queue_record[key] = value + splitted_queue_records.append(splitted_queue_record) + return splitted_queue_records + + + -class SimpleQueueMetricsDataSource: - ''' Simple Queue Metrics data provider - ''' - @staticmethod - def metric_records(router_entry, *, metric_labels = None): - if metric_labels is None: - metric_labels = [] - try: - simple_queue_records = router_entry.api_connection.router_api().get_resource('/queue/simple/').get() - return BaseDSProcessor.trimmed_records(router_entry, router_records = simple_queue_records, metric_labels = metric_labels) - except Exception as exc: - print(f'Error getting system resource info from router{router_entry.router_name}@{router_entry.config_entry.hostname}: {exc}') - return None diff --git a/mktxp/flow/collector_handler.py b/mktxp/flow/collector_handler.py index 834ab41..c97a08f 100644 --- a/mktxp/flow/collector_handler.py +++ b/mktxp/flow/collector_handler.py @@ -11,7 +11,7 @@ # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import ThreadPoolExecutor, as_completed from timeit import default_timer from datetime import datetime from mktxp.cli.config.config import config_handler @@ -68,13 +68,11 @@ class CollectorHandler: continue # Publish the collection function as a future - future = executor.submit(self.collect_single, router_entry) - futures.append(future) + futures.append(executor.submit(self.collect_single, router_entry)) + + for future in as_completed(futures): + yield from future.result() - # Join all futures and collect their results - for future in futures: - results = future.result() - yield from results def collect(self): now = datetime.now().timestamp() diff --git a/mktxp/flow/collector_registry.py b/mktxp/flow/collector_registry.py index f9a0d5f..e4e9d34 100644 --- a/mktxp/flow/collector_registry.py +++ b/mktxp/flow/collector_registry.py @@ -34,6 +34,7 @@ from mktxp.collector.firewall_collector import FirewallCollector from mktxp.collector.mktxp_collector import MKTXPCollector from mktxp.collector.user_collector import UserCollector from mktxp.collector.queue_collector import QueueTreeCollector +from mktxp.collector.queue_collector import QueueSimpleCollector class CollectorRegistry: @@ -69,6 +70,7 @@ class CollectorRegistry: self.register('UserCollector', UserCollector.collect) self.register('QueueTreeCollector', QueueTreeCollector.collect) + self.register('QueueSimpleCollector', QueueSimpleCollector.collect) self.register('MKTXPCollector', MKTXPCollector.collect) diff --git a/mktxp/flow/router_entry.py b/mktxp/flow/router_entry.py index c572df9..cdd2f29 100644 --- a/mktxp/flow/router_entry.py +++ b/mktxp/flow/router_entry.py @@ -45,6 +45,7 @@ class RouterEntry: 'WLANCollector': 0, 'CapsmanCollector': 0, 'QueueTreeCollector': 0, + 'QueueSimpleCollector': 0, 'UserCollector': 0, 'MKTXPCollector': 0 } From 0159c47fb3f8701047265a6044065fa25bd9ca79 Mon Sep 17 00:00:00 2001 From: Arseniy Kuznetsov Date: Sun, 1 Jan 2023 10:30:09 +0100 Subject: [PATCH 3/3] async router fetch, configurable timeouts --- mktxp/cli/config/_mktxp.conf | 3 ++ mktxp/cli/config/config.py | 11 ++++- mktxp/flow/collector_handler.py | 84 ++++++++++++++++++++++++++------- 3 files changed, 80 insertions(+), 18 deletions(-) diff --git a/mktxp/cli/config/_mktxp.conf b/mktxp/cli/config/_mktxp.conf index 7131b05..add84b0 100644 --- a/mktxp/cli/config/_mktxp.conf +++ b/mktxp/cli/config/_mktxp.conf @@ -27,3 +27,6 @@ fetch_routers_in_parallel = False # Set to True if you want to fetch multiple routers parallel max_worker_threads = 5 # Max number of worker threads that can fetch routers. Meaningless if fetch_routers_in_parallel is set to False + + max_scrape_duration = 10 # Max duration of individual routers' metrics collection + total_max_scrape_duration = 30 # Max overall duration of all metrics collection \ No newline at end of file diff --git a/mktxp/cli/config/config.py b/mktxp/cli/config/config.py index 1b4e677..05549ea 100755 --- a/mktxp/cli/config/config.py +++ b/mktxp/cli/config/config.py @@ -73,6 +73,8 @@ class MKTXPConfigKeys: MKTXP_MIN_COLLECT_INTERVAL = 'minimal_collect_interval' MKTXP_FETCH_IN_PARALLEL = 'fetch_routers_in_parallel' MKTXP_MAX_WORKER_THREADS = 'max_worker_threads' + MKTXP_MAX_SCRAPE_DURATION = 'max_scrape_duration' + MKTXP_TOTAL_MAX_SCRAPE_DURATION = 'total_max_scrape_duration' # UnRegistered entries placeholder NO_ENTRIES_REGISTERED = 'NoEntriesRegistered' @@ -95,6 +97,8 @@ class MKTXPConfigKeys: DEFAULT_MKTXP_MIN_COLLECT_INTERVAL = 5 DEFAULT_MKTXP_FETCH_IN_PARALLEL = False DEFAULT_MKTXP_MAX_WORKER_THREADS = 5 + DEFAULT_MKTXP_MAX_SCRAPE_DURATION = 10 + DEFAULT_MKTXP_TOTAL_MAX_SCRAPE_DURATION = 30 BOOLEAN_KEYS_NO = {ENABLED_KEY, SSL_KEY, NO_SSL_CERTIFICATE, @@ -112,7 +116,7 @@ class MKTXPConfigKeys: STR_KEYS = (HOST_KEY, USER_KEY, PASSWD_KEY) MKTXP_INT_KEYS = (PORT_KEY, MKTXP_SOCKET_TIMEOUT, MKTXP_INITIAL_DELAY, MKTXP_MAX_DELAY, MKTXP_INC_DIV, MKTXP_BANDWIDTH_TEST_INTERVAL, MKTXP_MIN_COLLECT_INTERVAL, - MKTXP_MAX_WORKER_THREADS,) + MKTXP_MAX_WORKER_THREADS, MKTXP_MAX_SCRAPE_DURATION, MKTXP_TOTAL_MAX_SCRAPE_DURATION) # MKTXP config entry nane MKTXP_CONFIG_ENTRY_NAME = 'MKTXP' @@ -133,7 +137,8 @@ class ConfigEntry: MKTXPConfigKeys.MKTXP_INC_DIV, MKTXPConfigKeys.MKTXP_BANDWIDTH_KEY, MKTXPConfigKeys.MKTXP_VERBOSE_MODE, MKTXPConfigKeys.MKTXP_BANDWIDTH_TEST_INTERVAL, MKTXPConfigKeys.MKTXP_MIN_COLLECT_INTERVAL, MKTXPConfigKeys.MKTXP_FETCH_IN_PARALLEL, - MKTXPConfigKeys.MKTXP_MAX_WORKER_THREADS]) + MKTXPConfigKeys.MKTXP_MAX_WORKER_THREADS, MKTXPConfigKeys.MKTXP_MAX_SCRAPE_DURATION, + MKTXPConfigKeys.MKTXP_TOTAL_MAX_SCRAPE_DURATION]) class OSConfig(metaclass=ABCMeta): @@ -326,6 +331,8 @@ class MKTXPConfigHandler: MKTXPConfigKeys.MKTXP_MIN_COLLECT_INTERVAL: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_MIN_COLLECT_INTERVAL, MKTXPConfigKeys.MKTXP_FETCH_IN_PARALLEL: lambda _: MKTXPConfigKeys.DEFAULT_MKTXP_FETCH_IN_PARALLEL, MKTXPConfigKeys.MKTXP_MAX_WORKER_THREADS: lambda _: MKTXPConfigKeys.DEFAULT_MKTXP_MAX_WORKER_THREADS, + MKTXPConfigKeys.MKTXP_MAX_SCRAPE_DURATION: lambda _: MKTXPConfigKeys.DEFAULT_MKTXP_MAX_SCRAPE_DURATION, + MKTXPConfigKeys.MKTXP_TOTAL_MAX_SCRAPE_DURATION: lambda _: MKTXPConfigKeys.DEFAULT_MKTXP_TOTAL_MAX_SCRAPE_DURATION, }[key](value) diff --git a/mktxp/flow/collector_handler.py b/mktxp/flow/collector_handler.py index c97a08f..b56688a 100644 --- a/mktxp/flow/collector_handler.py +++ b/mktxp/flow/collector_handler.py @@ -14,8 +14,9 @@ from concurrent.futures import ThreadPoolExecutor, as_completed from timeit import default_timer from datetime import datetime +from threading import Event, Timer from mktxp.cli.config.config import config_handler - +from mktxp.cli.config.config import MKTXPConfigKeys class CollectorHandler: ''' MKTXP Collectors Handler @@ -26,7 +27,8 @@ class CollectorHandler: self.collector_registry = collector_registry self.last_collect_timestamp = 0 - def collect_synchronous(self): + + def collect_sync(self): """ Collect the metrics of all router entries defined in the current users configuration synchronously. This function iterates over each router entry one-by-one. @@ -43,53 +45,103 @@ class CollectorHandler: yield from collect_func(router_entry) router_entry.time_spent[collector_ID] += default_timer() - start - def collect_single(self, router_entry): + + def collect_router_entry_async(self, router_entry, scrape_timeout_event, total_scrape_timeout_event): results = [] for collector_ID, collect_func in self.collector_registry.registered_collectors.items(): + if scrape_timeout_event.is_set(): + print(f'Hit timeout while scraping router entry: {router_entry.router_id[MKTXPConfigKeys.ROUTERBOARD_NAME]}') + break + + if total_scrape_timeout_event.is_set(): + print(f'Hit overall timeout while scraping router entry: {router_entry.router_id[MKTXPConfigKeys.ROUTERBOARD_NAME]}') + break + start = default_timer() result = list(collect_func(router_entry)) results += result router_entry.time_spent[collector_ID] += default_timer() - start + return results - def collect_parallel(self, max_worker_threads=5): + + def collect_async(self, max_worker_threads=5): """ Collect the metrics of all router entries defined in the current users configuration in parallel. This function iterates over multiple routers in parallel (depending on the value of max_worker_threads). Thus, the total runtime scales sub linearly (number_of_routers / max_worker_threads). """ + + def timeout(timeout_event): + timeout_event.set() + + # overall scrape duration + total_scrape_timeout_event = Event() + total_scrape_timer = Timer(config_handler.system_entry().total_max_scrape_duration, timeout, args=(total_scrape_timeout_event,)) + total_scrape_timer.start() + with ThreadPoolExecutor(max_workers=max_worker_threads) as executor: - futures = [] + futures = {} for router_entry in self.entries_handler.router_entries: + if total_scrape_timeout_event.is_set(): + print(f'Hit overall timeout while scraping router entry: {router_entry.router_id[MKTXPConfigKeys.ROUTERBOARD_NAME]}') + break + if not router_entry.api_connection.is_connected(): # let's pick up on things in the next run router_entry.api_connection.connect() continue - # Publish the collection function as a future - futures.append(executor.submit(self.collect_single, router_entry)) + # Duration of individual scrapes + scrape_timeout_event = Event() + scrape_timer = Timer(config_handler.system_entry().max_scrape_duration, timeout, args=(scrape_timeout_event,)) + scrape_timer.start() + + futures[executor.submit(self.collect_router_entry_async, router_entry, scrape_timeout_event, total_scrape_timeout_event)] = scrape_timer for future in as_completed(futures): + # cancel unused timers for scrapes finished regularly (within set duration) + futures[future].cancel() yield from future.result() + + # in case collection finished without timeouts, cancel the overall scrape duration timer + total_scrape_timer.cancel() def collect(self): - now = datetime.now().timestamp() - diff = now - self.last_collect_timestamp - if diff < config_handler.system_entry().minimal_collect_interval: - if config_handler.system_entry().verbose_mode: - print(f'An attemp to collect metrics within minimal collection interval: {diff} < {config_handler.system_entry().minimal_collect_interval}') - print('deferring..') + if not self._valid_collect_interval(): return - self.last_collect_timestamp = now + # bandwidth collector yield from self.collector_registry.bandwidthCollector.collect() + # all other collectors # Check whether to run in parallel by looking at the mktxp system configuration parallel = config_handler.system_entry().fetch_routers_in_parallel max_worker_threads = config_handler.system_entry().max_worker_threads if parallel: - yield from self.collect_parallel(max_worker_threads=max_worker_threads) + yield from self.collect_async(max_worker_threads=max_worker_threads) else: - yield from self.collect_synchronous() + yield from self.collect_sync() + + + def _valid_collect_interval(self): + now = datetime.now().timestamp() + diff = now - self.last_collect_timestamp + if diff < config_handler.system_entry().minimal_collect_interval: + if config_handler.system_entry().verbose_mode: + print(f'An attemp to collect metrics within minimal metrics collection interval: {diff} < {config_handler.system_entry().minimal_collect_interval}') + print('deferring..') + return False + + self.last_collect_timestamp = now + return True + + + + + + + +