From 866751ad64089a6ea0c3d3b58c1f049d3bbf1d67 Mon Sep 17 00:00:00 2001 From: Leon Morten Richter Date: Wed, 28 Dec 2022 14:18:42 +0100 Subject: [PATCH] fetch routers in parallel (optionally) --- mktxp/cli/config/_mktxp.conf | 2 + mktxp/cli/config/config.py | 144 ++++++++++++++++++-------------- mktxp/flow/collector_handler.py | 99 ++++++++++++++++------ 3 files changed, 157 insertions(+), 88 deletions(-) diff --git a/mktxp/cli/config/_mktxp.conf b/mktxp/cli/config/_mktxp.conf index cdefe21..7131b05 100644 --- a/mktxp/cli/config/_mktxp.conf +++ b/mktxp/cli/config/_mktxp.conf @@ -25,3 +25,5 @@ verbose_mode = False # Set it on for troubleshooting + fetch_routers_in_parallel = False # Set to True if you want to fetch multiple routers parallel + max_worker_threads = 5 # Max number of worker threads that can fetch routers. Meaningless if fetch_routers_in_parallel is set to False diff --git a/mktxp/cli/config/config.py b/mktxp/cli/config/config.py index 2b320c6..ad943e6 100755 --- a/mktxp/cli/config/config.py +++ b/mktxp/cli/config/config.py @@ -1,17 +1,19 @@ # coding=utf8 -## Copyright (c) 2020 Arseniy Kuznetsov +# Copyright (c) 2020 Arseniy Kuznetsov ## -## This program is free software; you can redistribute it and/or -## modify it under the terms of the GNU General Public License -## as published by the Free Software Foundation; either version 2 -## of the License, or (at your option) any later version. +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. ## -## This program is distributed in the hope that it will be useful, -## but WITHOUT ANY WARRANTY; without even the implied warranty of -## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -## GNU General Public License for more details. +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. -import os, sys, shutil +import os +import sys +import shutil from collections import namedtuple from configobj import ConfigObj from abc import ABCMeta, abstractmethod @@ -22,6 +24,7 @@ from mktxp.utils.utils import FSHelper ''' MKTXP conf file handling ''' + class MKTXPConfigKeys: ''' MKTXP config file keys ''' @@ -38,12 +41,12 @@ class MKTXPConfigKeys: FE_PACKAGE_KEY = 'installed_packages' FE_DHCP_KEY = 'dhcp' - FE_DHCP_LEASE_KEY = 'dhcp_lease' - FE_DHCP_POOL_KEY = 'pool' - FE_IP_CONNECTIONS_KEY = 'connections' + FE_DHCP_LEASE_KEY = 'dhcp_lease' + FE_DHCP_POOL_KEY = 'pool' + FE_IP_CONNECTIONS_KEY = 'connections' FE_INTERFACE_KEY = 'interface' FE_FIREWALL_KEY = 'firewall' - + FE_IPV6_FIREWALL_KEY = 'ipv6_firewall' FE_IPV6_NEIGHBOR_KEY = 'ipv6_neighbor' @@ -60,7 +63,7 @@ class MKTXPConfigKeys: FE_USER_KEY = 'user' FE_QUEUE_KEY = 'queue' - MKTXP_SOCKET_TIMEOUT = 'socket_timeout' + MKTXP_SOCKET_TIMEOUT = 'socket_timeout' MKTXP_INITIAL_DELAY = 'initial_delay_on_failure' MKTXP_MAX_DELAY = 'max_delay_on_failure' MKTXP_INC_DIV = 'delay_inc_div' @@ -68,13 +71,14 @@ class MKTXPConfigKeys: MKTXP_BANDWIDTH_TEST_INTERVAL = 'bandwidth_test_interval' MKTXP_VERBOSE_MODE = 'verbose_mode' MKTXP_MIN_COLLECT_INTERVAL = 'minimal_collect_interval' + MKTXP_FETCH_IN_PARALLEL = 'fetch_routers_in_parallel' + MKTXP_MAX_WORKER_THREADS = 'max_worker_threads' # UnRegistered entries placeholder NO_ENTRIES_REGISTERED = 'NoEntriesRegistered' MKTXP_USE_COMMENTS_OVER_NAMES = 'use_comments_over_names' - # Base router id labels ROUTERBOARD_NAME = 'routerboard_name' ROUTERBOARD_ADDRESS = 'routerboard_address' @@ -89,20 +93,26 @@ class MKTXPConfigKeys: DEFAULT_MKTXP_INC_DIV = 5 DEFAULT_MKTXP_BANDWIDTH_TEST_INTERVAL = 420 DEFAULT_MKTXP_MIN_COLLECT_INTERVAL = 5 + DEFAULT_MKTXP_FETCH_IN_PARALLEL = False + DEFAULT_MKTXP_MAX_WORKER_THREADS = 5 + + + BOOLEAN_KEYS_NO = {ENABLED_KEY, SSL_KEY, NO_SSL_CERTIFICATE, + SSL_CERTIFICATE_VERIFY, FE_IPV6_FIREWALL_KEY, FE_IPV6_NEIGHBOR_KEY} - BOOLEAN_KEYS_NO = {ENABLED_KEY, SSL_KEY, NO_SSL_CERTIFICATE, SSL_CERTIFICATE_VERIFY, FE_IPV6_FIREWALL_KEY, FE_IPV6_NEIGHBOR_KEY} - # Feature keys enabled by default BOOLEAN_KEYS_YES = {FE_DHCP_KEY, FE_PACKAGE_KEY, FE_DHCP_LEASE_KEY, FE_DHCP_POOL_KEY, FE_IP_CONNECTIONS_KEY, FE_INTERFACE_KEY, FE_FIREWALL_KEY, - FE_MONITOR_KEY, FE_ROUTE_KEY, MKTXP_USE_COMMENTS_OVER_NAMES, - FE_WIRELESS_KEY, FE_WIRELESS_CLIENTS_KEY, FE_CAPSMAN_KEY, FE_CAPSMAN_CLIENTS_KEY, FE_POE_KEY, - FE_NETWATCH_KEY, FE_PUBLIC_IP_KEY, FE_USER_KEY, FE_QUEUE_KEY} + FE_MONITOR_KEY, FE_ROUTE_KEY, MKTXP_USE_COMMENTS_OVER_NAMES, + FE_WIRELESS_KEY, FE_WIRELESS_CLIENTS_KEY, FE_CAPSMAN_KEY, FE_CAPSMAN_CLIENTS_KEY, FE_POE_KEY, + FE_NETWATCH_KEY, FE_PUBLIC_IP_KEY, FE_USER_KEY, FE_QUEUE_KEY} SYSTEM_BOOLEAN_KEYS_YES = {MKTXP_BANDWIDTH_KEY} - SYSTEM_BOOLEAN_KEYS_NO = {MKTXP_VERBOSE_MODE} + SYSTEM_BOOLEAN_KEYS_NO = {MKTXP_VERBOSE_MODE, MKTXP_FETCH_IN_PARALLEL} STR_KEYS = (HOST_KEY, USER_KEY, PASSWD_KEY) - MKTXP_INT_KEYS = (PORT_KEY, MKTXP_SOCKET_TIMEOUT, MKTXP_INITIAL_DELAY, MKTXP_MAX_DELAY, MKTXP_INC_DIV, MKTXP_BANDWIDTH_TEST_INTERVAL, MKTXP_MIN_COLLECT_INTERVAL) + MKTXP_INT_KEYS = (PORT_KEY, MKTXP_SOCKET_TIMEOUT, MKTXP_INITIAL_DELAY, MKTXP_MAX_DELAY, + MKTXP_INC_DIV, MKTXP_BANDWIDTH_TEST_INTERVAL, MKTXP_MIN_COLLECT_INTERVAL, + MKTXP_MAX_WORKER_THREADS,) # MKTXP config entry nane MKTXP_CONFIG_ENTRY_NAME = 'MKTXP' @@ -110,21 +120,23 @@ class MKTXPConfigKeys: class ConfigEntry: MKTXPConfigEntry = namedtuple('MKTXPConfigEntry', [MKTXPConfigKeys.ENABLED_KEY, MKTXPConfigKeys.HOST_KEY, MKTXPConfigKeys.PORT_KEY, - MKTXPConfigKeys.USER_KEY, MKTXPConfigKeys.PASSWD_KEY, - MKTXPConfigKeys.SSL_KEY, MKTXPConfigKeys.NO_SSL_CERTIFICATE, MKTXPConfigKeys.SSL_CERTIFICATE_VERIFY, - MKTXPConfigKeys.FE_DHCP_KEY, MKTXPConfigKeys.FE_PACKAGE_KEY, MKTXPConfigKeys.FE_DHCP_LEASE_KEY, MKTXPConfigKeys.FE_DHCP_POOL_KEY, MKTXPConfigKeys.FE_INTERFACE_KEY, - MKTXPConfigKeys.FE_FIREWALL_KEY, MKTXPConfigKeys.FE_MONITOR_KEY, MKTXPConfigKeys.FE_ROUTE_KEY, MKTXPConfigKeys.FE_WIRELESS_KEY, MKTXPConfigKeys.FE_WIRELESS_CLIENTS_KEY, - MKTXPConfigKeys.FE_IP_CONNECTIONS_KEY, MKTXPConfigKeys.FE_CAPSMAN_KEY, MKTXPConfigKeys.FE_CAPSMAN_CLIENTS_KEY, MKTXPConfigKeys.FE_POE_KEY, MKTXPConfigKeys.FE_NETWATCH_KEY, - MKTXPConfigKeys.MKTXP_USE_COMMENTS_OVER_NAMES, MKTXPConfigKeys.FE_PUBLIC_IP_KEY, MKTXPConfigKeys.FE_IPV6_FIREWALL_KEY, MKTXPConfigKeys.FE_IPV6_NEIGHBOR_KEY, - MKTXPConfigKeys.FE_USER_KEY, MKTXPConfigKeys.FE_QUEUE_KEY - ]) + MKTXPConfigKeys.USER_KEY, MKTXPConfigKeys.PASSWD_KEY, + MKTXPConfigKeys.SSL_KEY, MKTXPConfigKeys.NO_SSL_CERTIFICATE, MKTXPConfigKeys.SSL_CERTIFICATE_VERIFY, + MKTXPConfigKeys.FE_DHCP_KEY, MKTXPConfigKeys.FE_PACKAGE_KEY, MKTXPConfigKeys.FE_DHCP_LEASE_KEY, MKTXPConfigKeys.FE_DHCP_POOL_KEY, MKTXPConfigKeys.FE_INTERFACE_KEY, + MKTXPConfigKeys.FE_FIREWALL_KEY, MKTXPConfigKeys.FE_MONITOR_KEY, MKTXPConfigKeys.FE_ROUTE_KEY, MKTXPConfigKeys.FE_WIRELESS_KEY, MKTXPConfigKeys.FE_WIRELESS_CLIENTS_KEY, + MKTXPConfigKeys.FE_IP_CONNECTIONS_KEY, MKTXPConfigKeys.FE_CAPSMAN_KEY, MKTXPConfigKeys.FE_CAPSMAN_CLIENTS_KEY, MKTXPConfigKeys.FE_POE_KEY, MKTXPConfigKeys.FE_NETWATCH_KEY, + MKTXPConfigKeys.MKTXP_USE_COMMENTS_OVER_NAMES, MKTXPConfigKeys.FE_PUBLIC_IP_KEY, MKTXPConfigKeys.FE_IPV6_FIREWALL_KEY, MKTXPConfigKeys.FE_IPV6_NEIGHBOR_KEY, + MKTXPConfigKeys.FE_USER_KEY, MKTXPConfigKeys.FE_QUEUE_KEY + ]) MKTXPSystemEntry = namedtuple('MKTXPSystemEntry', [MKTXPConfigKeys.PORT_KEY, MKTXPConfigKeys.MKTXP_SOCKET_TIMEOUT, - MKTXPConfigKeys.MKTXP_INITIAL_DELAY, MKTXPConfigKeys.MKTXP_MAX_DELAY, - MKTXPConfigKeys.MKTXP_INC_DIV, MKTXPConfigKeys.MKTXP_BANDWIDTH_KEY, - MKTXPConfigKeys.MKTXP_VERBOSE_MODE, MKTXPConfigKeys.MKTXP_BANDWIDTH_TEST_INTERVAL, MKTXPConfigKeys.MKTXP_MIN_COLLECT_INTERVAL]) + MKTXPConfigKeys.MKTXP_INITIAL_DELAY, MKTXPConfigKeys.MKTXP_MAX_DELAY, + MKTXPConfigKeys.MKTXP_INC_DIV, MKTXPConfigKeys.MKTXP_BANDWIDTH_KEY, + MKTXPConfigKeys.MKTXP_VERBOSE_MODE, MKTXPConfigKeys.MKTXP_BANDWIDTH_TEST_INTERVAL, + MKTXPConfigKeys.MKTXP_MIN_COLLECT_INTERVAL, MKTXPConfigKeys.MKTXP_FETCH_IN_PARALLEL, + MKTXPConfigKeys.MKTXP_MAX_WORKER_THREADS]) -class OSConfig(metaclass = ABCMeta): +class OSConfig(metaclass=ABCMeta): ''' OS-related config ''' @staticmethod @@ -169,7 +181,7 @@ class LinuxConfig(OSConfig): @property def mktxp_user_dir_path(self): return FSHelper.full_path('~/mktxp') - #return FSHelper.full_path('/etc/mktxp') + # return FSHelper.full_path('/etc/mktxp') class MKTXPConfigHandler: @@ -183,11 +195,15 @@ class MKTXPConfigHandler: os.makedirs(self.os_config.mktxp_user_dir_path) # if needed, stage the user config data - self.usr_conf_data_path = os.path.join(self.os_config.mktxp_user_dir_path, 'mktxp.conf') - self.mktxp_conf_path = os.path.join(self.os_config.mktxp_user_dir_path, '_mktxp.conf') + self.usr_conf_data_path = os.path.join( + self.os_config.mktxp_user_dir_path, 'mktxp.conf') + self.mktxp_conf_path = os.path.join( + self.os_config.mktxp_user_dir_path, '_mktxp.conf') - self._create_os_path(self.usr_conf_data_path, 'mktxp/cli/config/mktxp.conf') - self._create_os_path(self.mktxp_conf_path, 'mktxp/cli/config/_mktxp.conf') + self._create_os_path(self.usr_conf_data_path, + 'mktxp/cli/config/mktxp.conf') + self._create_os_path(self.mktxp_conf_path, + 'mktxp/cli/config/_mktxp.conf') self.re_compiled = {} @@ -225,7 +241,8 @@ class MKTXPConfigHandler: def _create_os_path(self, os_path, resource_path): if not os.path.exists(os_path): # stage from the conf templates - lookup_path = resource_filename(Requirement.parse("mktxp"), resource_path) + lookup_path = resource_filename( + Requirement.parse("mktxp"), resource_path) shutil.copy(lookup_path, os_path) def _config_entry_reader(self, entry_name): @@ -236,7 +253,7 @@ class MKTXPConfigHandler: config_entry_reader[key] = self.config[entry_name].as_bool(key) else: config_entry_reader[key] = True if key in MKTXPConfigKeys.BOOLEAN_KEYS_YES else False - write_needed = True # read from disk next time + write_needed = True # read from disk next time for key in MKTXPConfigKeys.STR_KEYS: config_entry_reader[key] = self.config[entry_name][key] @@ -245,10 +262,12 @@ class MKTXPConfigHandler: # port if self.config[entry_name].get(MKTXPConfigKeys.PORT_KEY): - config_entry_reader[MKTXPConfigKeys.PORT_KEY] = self.config[entry_name].as_int(MKTXPConfigKeys.PORT_KEY) + config_entry_reader[MKTXPConfigKeys.PORT_KEY] = self.config[entry_name].as_int( + MKTXPConfigKeys.PORT_KEY) else: - config_entry_reader[MKTXPConfigKeys.PORT_KEY] = self._default_value_for_key(MKTXPConfigKeys.SSL_KEY, config_entry_reader[MKTXPConfigKeys.SSL_KEY]) - write_needed = True # read from disk next time + config_entry_reader[MKTXPConfigKeys.PORT_KEY] = self._default_value_for_key( + MKTXPConfigKeys.SSL_KEY, config_entry_reader[MKTXPConfigKeys.SSL_KEY]) + write_needed = True # read from disk next time if write_needed: self.config[entry_name] = config_entry_reader @@ -266,33 +285,36 @@ class MKTXPConfigHandler: system_entry_reader[key] = self._config[entry_name].as_int(key) else: system_entry_reader[key] = self._default_value_for_key(key) - write_needed = True # read from disk next time + write_needed = True # read from disk next time for key in MKTXPConfigKeys.SYSTEM_BOOLEAN_KEYS_NO.union(MKTXPConfigKeys.SYSTEM_BOOLEAN_KEYS_YES): if self._config[entry_name].get(key): - system_entry_reader[key] = self._config[entry_name].as_bool(key) + system_entry_reader[key] = self._config[entry_name].as_bool( + key) else: system_entry_reader[key] = True if key in MKTXPConfigKeys.SYSTEM_BOOLEAN_KEYS_YES else False - write_needed = True # read from disk next time - + write_needed = True # read from disk next time + if write_needed: self._config[entry_name] = system_entry_reader self._config.write() - + return system_entry_reader - def _default_value_for_key(self, key, value = None): + def _default_value_for_key(self, key, value=None): return { - MKTXPConfigKeys.SSL_KEY: lambda value: MKTXPConfigKeys.DEFAULT_API_SSL_PORT if value else MKTXPConfigKeys.DEFAULT_API_PORT, - MKTXPConfigKeys.PORT_KEY: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_PORT, - MKTXPConfigKeys.MKTXP_SOCKET_TIMEOUT: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_SOCKET_TIMEOUT, - MKTXPConfigKeys.MKTXP_INITIAL_DELAY: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_INITIAL_DELAY, - MKTXPConfigKeys.MKTXP_MAX_DELAY: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_MAX_DELAY, - MKTXPConfigKeys.MKTXP_INC_DIV: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_INC_DIV, - MKTXPConfigKeys.MKTXP_BANDWIDTH_TEST_INTERVAL: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_BANDWIDTH_TEST_INTERVAL, - MKTXPConfigKeys.MKTXP_MIN_COLLECT_INTERVAL: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_MIN_COLLECT_INTERVAL - }[key](value) + MKTXPConfigKeys.SSL_KEY: lambda value: MKTXPConfigKeys.DEFAULT_API_SSL_PORT if value else MKTXPConfigKeys.DEFAULT_API_PORT, + MKTXPConfigKeys.PORT_KEY: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_PORT, + MKTXPConfigKeys.MKTXP_SOCKET_TIMEOUT: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_SOCKET_TIMEOUT, + MKTXPConfigKeys.MKTXP_INITIAL_DELAY: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_INITIAL_DELAY, + MKTXPConfigKeys.MKTXP_MAX_DELAY: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_MAX_DELAY, + MKTXPConfigKeys.MKTXP_INC_DIV: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_INC_DIV, + MKTXPConfigKeys.MKTXP_BANDWIDTH_TEST_INTERVAL: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_BANDWIDTH_TEST_INTERVAL, + MKTXPConfigKeys.MKTXP_MIN_COLLECT_INTERVAL: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_MIN_COLLECT_INTERVAL, + MKTXPConfigKeys.MKTXP_FETCH_IN_PARALLEL: lambda _: MKTXPConfigKeys.DEFAULT_MKTXP_FETCH_IN_PARALLEL, + MKTXPConfigKeys.MKTXP_MAX_WORKER_THREADS: lambda _: MKTXPConfigKeys.DEFAULT_MKTXP_MAX_WORKER_THREADS, + }[key](value) + # Simplest possible Singleton impl config_handler = MKTXPConfigHandler() - diff --git a/mktxp/flow/collector_handler.py b/mktxp/flow/collector_handler.py index 78eafea..834ab41 100644 --- a/mktxp/flow/collector_handler.py +++ b/mktxp/flow/collector_handler.py @@ -1,32 +1,84 @@ # coding=utf8 -## Copyright (c) 2020 Arseniy Kuznenowov +# Copyright (c) 2020 Arseniy Kuznenowov ## -## This program is free software; you can redistribute it and/or -## modify it under the terms of the GNU General Public License -## as published by the Free Software Foundation; either version 2 -## of the License, or (at your option) any later version. +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. ## -## This program is distributed in the hope that it will be useful, -## but WITHOUT ANY WARRANTY; without even the implied warranty of -## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -## GNU General Public License for more details. - +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +from concurrent.futures import ThreadPoolExecutor from timeit import default_timer from datetime import datetime from mktxp.cli.config.config import config_handler + class CollectorHandler: ''' MKTXP Collectors Handler ''' + def __init__(self, entries_handler, collector_registry): self.entries_handler = entries_handler self.collector_registry = collector_registry - self.last_collect_timestamp = 0 + self.last_collect_timestamp = 0 + + def collect_synchronous(self): + """ + Collect the metrics of all router entries defined in the current users configuration synchronously. + This function iterates over each router entry one-by-one. + Thus, the total runtime of this function scales linearly with the number of registered routers. + """ + for router_entry in self.entries_handler.router_entries: + if not router_entry.api_connection.is_connected(): + # let's pick up on things in the next run + router_entry.api_connection.connect() + continue + + for collector_ID, collect_func in self.collector_registry.registered_collectors.items(): + start = default_timer() + yield from collect_func(router_entry) + router_entry.time_spent[collector_ID] += default_timer() - start + + def collect_single(self, router_entry): + results = [] + for collector_ID, collect_func in self.collector_registry.registered_collectors.items(): + start = default_timer() + result = list(collect_func(router_entry)) + results += result + router_entry.time_spent[collector_ID] += default_timer() - start + return results + + def collect_parallel(self, max_worker_threads=5): + """ + Collect the metrics of all router entries defined in the current users configuration in parallel. + This function iterates over multiple routers in parallel (depending on the value of max_worker_threads). + Thus, the total runtime scales sub linearly (number_of_routers / max_worker_threads). + """ + with ThreadPoolExecutor(max_workers=max_worker_threads) as executor: + futures = [] + + for router_entry in self.entries_handler.router_entries: + if not router_entry.api_connection.is_connected(): + # let's pick up on things in the next run + router_entry.api_connection.connect() + continue + + # Publish the collection function as a future + future = executor.submit(self.collect_single, router_entry) + futures.append(future) + + # Join all futures and collect their results + for future in futures: + results = future.result() + yield from results def collect(self): - now = datetime.now().timestamp() - diff = now - self.last_collect_timestamp + now = datetime.now().timestamp() + diff = now - self.last_collect_timestamp if diff < config_handler.system_entry().minimal_collect_interval: if config_handler.system_entry().verbose_mode: print(f'An attemp to collect metrics within minimal collection interval: {diff} < {config_handler.system_entry().minimal_collect_interval}') @@ -36,17 +88,10 @@ class CollectorHandler: yield from self.collector_registry.bandwidthCollector.collect() - for router_entry in self.entries_handler.router_entries: - if not router_entry.api_connection.is_connected(): - # let's pick up on things in the next run - router_entry.api_connection.connect() - continue - - for collector_ID, collect_func in self.collector_registry.registered_collectors.items(): - start = default_timer() - yield from collect_func(router_entry) - router_entry.time_spent[collector_ID] += default_timer() - start - - - - + # Check whether to run in parallel by looking at the mktxp system configuration + parallel = config_handler.system_entry().fetch_routers_in_parallel + max_worker_threads = config_handler.system_entry().max_worker_threads + if parallel: + yield from self.collect_parallel(max_worker_threads=max_worker_threads) + else: + yield from self.collect_synchronous()