async router fetch, configurable timeouts

This commit is contained in:
Arseniy Kuznetsov
2023-01-01 10:30:09 +01:00
parent 660dd33369
commit 0159c47fb3
3 changed files with 80 additions and 18 deletions

View File

@@ -27,3 +27,6 @@
fetch_routers_in_parallel = False # Set to True if you want to fetch multiple routers parallel fetch_routers_in_parallel = False # Set to True if you want to fetch multiple routers parallel
max_worker_threads = 5 # Max number of worker threads that can fetch routers. Meaningless if fetch_routers_in_parallel is set to False max_worker_threads = 5 # Max number of worker threads that can fetch routers. Meaningless if fetch_routers_in_parallel is set to False
max_scrape_duration = 10 # Max duration of individual routers' metrics collection
total_max_scrape_duration = 30 # Max overall duration of all metrics collection

View File

@@ -73,6 +73,8 @@ class MKTXPConfigKeys:
MKTXP_MIN_COLLECT_INTERVAL = 'minimal_collect_interval' MKTXP_MIN_COLLECT_INTERVAL = 'minimal_collect_interval'
MKTXP_FETCH_IN_PARALLEL = 'fetch_routers_in_parallel' MKTXP_FETCH_IN_PARALLEL = 'fetch_routers_in_parallel'
MKTXP_MAX_WORKER_THREADS = 'max_worker_threads' MKTXP_MAX_WORKER_THREADS = 'max_worker_threads'
MKTXP_MAX_SCRAPE_DURATION = 'max_scrape_duration'
MKTXP_TOTAL_MAX_SCRAPE_DURATION = 'total_max_scrape_duration'
# UnRegistered entries placeholder # UnRegistered entries placeholder
NO_ENTRIES_REGISTERED = 'NoEntriesRegistered' NO_ENTRIES_REGISTERED = 'NoEntriesRegistered'
@@ -95,6 +97,8 @@ class MKTXPConfigKeys:
DEFAULT_MKTXP_MIN_COLLECT_INTERVAL = 5 DEFAULT_MKTXP_MIN_COLLECT_INTERVAL = 5
DEFAULT_MKTXP_FETCH_IN_PARALLEL = False DEFAULT_MKTXP_FETCH_IN_PARALLEL = False
DEFAULT_MKTXP_MAX_WORKER_THREADS = 5 DEFAULT_MKTXP_MAX_WORKER_THREADS = 5
DEFAULT_MKTXP_MAX_SCRAPE_DURATION = 10
DEFAULT_MKTXP_TOTAL_MAX_SCRAPE_DURATION = 30
BOOLEAN_KEYS_NO = {ENABLED_KEY, SSL_KEY, NO_SSL_CERTIFICATE, BOOLEAN_KEYS_NO = {ENABLED_KEY, SSL_KEY, NO_SSL_CERTIFICATE,
@@ -112,7 +116,7 @@ class MKTXPConfigKeys:
STR_KEYS = (HOST_KEY, USER_KEY, PASSWD_KEY) STR_KEYS = (HOST_KEY, USER_KEY, PASSWD_KEY)
MKTXP_INT_KEYS = (PORT_KEY, MKTXP_SOCKET_TIMEOUT, MKTXP_INITIAL_DELAY, MKTXP_MAX_DELAY, MKTXP_INT_KEYS = (PORT_KEY, MKTXP_SOCKET_TIMEOUT, MKTXP_INITIAL_DELAY, MKTXP_MAX_DELAY,
MKTXP_INC_DIV, MKTXP_BANDWIDTH_TEST_INTERVAL, MKTXP_MIN_COLLECT_INTERVAL, MKTXP_INC_DIV, MKTXP_BANDWIDTH_TEST_INTERVAL, MKTXP_MIN_COLLECT_INTERVAL,
MKTXP_MAX_WORKER_THREADS,) MKTXP_MAX_WORKER_THREADS, MKTXP_MAX_SCRAPE_DURATION, MKTXP_TOTAL_MAX_SCRAPE_DURATION)
# MKTXP config entry nane # MKTXP config entry nane
MKTXP_CONFIG_ENTRY_NAME = 'MKTXP' MKTXP_CONFIG_ENTRY_NAME = 'MKTXP'
@@ -133,7 +137,8 @@ class ConfigEntry:
MKTXPConfigKeys.MKTXP_INC_DIV, MKTXPConfigKeys.MKTXP_BANDWIDTH_KEY, MKTXPConfigKeys.MKTXP_INC_DIV, MKTXPConfigKeys.MKTXP_BANDWIDTH_KEY,
MKTXPConfigKeys.MKTXP_VERBOSE_MODE, MKTXPConfigKeys.MKTXP_BANDWIDTH_TEST_INTERVAL, MKTXPConfigKeys.MKTXP_VERBOSE_MODE, MKTXPConfigKeys.MKTXP_BANDWIDTH_TEST_INTERVAL,
MKTXPConfigKeys.MKTXP_MIN_COLLECT_INTERVAL, MKTXPConfigKeys.MKTXP_FETCH_IN_PARALLEL, MKTXPConfigKeys.MKTXP_MIN_COLLECT_INTERVAL, MKTXPConfigKeys.MKTXP_FETCH_IN_PARALLEL,
MKTXPConfigKeys.MKTXP_MAX_WORKER_THREADS]) MKTXPConfigKeys.MKTXP_MAX_WORKER_THREADS, MKTXPConfigKeys.MKTXP_MAX_SCRAPE_DURATION,
MKTXPConfigKeys.MKTXP_TOTAL_MAX_SCRAPE_DURATION])
class OSConfig(metaclass=ABCMeta): class OSConfig(metaclass=ABCMeta):
@@ -326,6 +331,8 @@ class MKTXPConfigHandler:
MKTXPConfigKeys.MKTXP_MIN_COLLECT_INTERVAL: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_MIN_COLLECT_INTERVAL, MKTXPConfigKeys.MKTXP_MIN_COLLECT_INTERVAL: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_MIN_COLLECT_INTERVAL,
MKTXPConfigKeys.MKTXP_FETCH_IN_PARALLEL: lambda _: MKTXPConfigKeys.DEFAULT_MKTXP_FETCH_IN_PARALLEL, MKTXPConfigKeys.MKTXP_FETCH_IN_PARALLEL: lambda _: MKTXPConfigKeys.DEFAULT_MKTXP_FETCH_IN_PARALLEL,
MKTXPConfigKeys.MKTXP_MAX_WORKER_THREADS: lambda _: MKTXPConfigKeys.DEFAULT_MKTXP_MAX_WORKER_THREADS, MKTXPConfigKeys.MKTXP_MAX_WORKER_THREADS: lambda _: MKTXPConfigKeys.DEFAULT_MKTXP_MAX_WORKER_THREADS,
MKTXPConfigKeys.MKTXP_MAX_SCRAPE_DURATION: lambda _: MKTXPConfigKeys.DEFAULT_MKTXP_MAX_SCRAPE_DURATION,
MKTXPConfigKeys.MKTXP_TOTAL_MAX_SCRAPE_DURATION: lambda _: MKTXPConfigKeys.DEFAULT_MKTXP_TOTAL_MAX_SCRAPE_DURATION,
}[key](value) }[key](value)

View File

@@ -14,8 +14,9 @@
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
from timeit import default_timer from timeit import default_timer
from datetime import datetime from datetime import datetime
from threading import Event, Timer
from mktxp.cli.config.config import config_handler from mktxp.cli.config.config import config_handler
from mktxp.cli.config.config import MKTXPConfigKeys
class CollectorHandler: class CollectorHandler:
''' MKTXP Collectors Handler ''' MKTXP Collectors Handler
@@ -26,7 +27,8 @@ class CollectorHandler:
self.collector_registry = collector_registry self.collector_registry = collector_registry
self.last_collect_timestamp = 0 self.last_collect_timestamp = 0
def collect_synchronous(self):
def collect_sync(self):
""" """
Collect the metrics of all router entries defined in the current users configuration synchronously. Collect the metrics of all router entries defined in the current users configuration synchronously.
This function iterates over each router entry one-by-one. This function iterates over each router entry one-by-one.
@@ -43,53 +45,103 @@ class CollectorHandler:
yield from collect_func(router_entry) yield from collect_func(router_entry)
router_entry.time_spent[collector_ID] += default_timer() - start router_entry.time_spent[collector_ID] += default_timer() - start
def collect_single(self, router_entry):
def collect_router_entry_async(self, router_entry, scrape_timeout_event, total_scrape_timeout_event):
results = [] results = []
for collector_ID, collect_func in self.collector_registry.registered_collectors.items(): for collector_ID, collect_func in self.collector_registry.registered_collectors.items():
if scrape_timeout_event.is_set():
print(f'Hit timeout while scraping router entry: {router_entry.router_id[MKTXPConfigKeys.ROUTERBOARD_NAME]}')
break
if total_scrape_timeout_event.is_set():
print(f'Hit overall timeout while scraping router entry: {router_entry.router_id[MKTXPConfigKeys.ROUTERBOARD_NAME]}')
break
start = default_timer() start = default_timer()
result = list(collect_func(router_entry)) result = list(collect_func(router_entry))
results += result results += result
router_entry.time_spent[collector_ID] += default_timer() - start router_entry.time_spent[collector_ID] += default_timer() - start
return results return results
def collect_parallel(self, max_worker_threads=5):
def collect_async(self, max_worker_threads=5):
""" """
Collect the metrics of all router entries defined in the current users configuration in parallel. Collect the metrics of all router entries defined in the current users configuration in parallel.
This function iterates over multiple routers in parallel (depending on the value of max_worker_threads). This function iterates over multiple routers in parallel (depending on the value of max_worker_threads).
Thus, the total runtime scales sub linearly (number_of_routers / max_worker_threads). Thus, the total runtime scales sub linearly (number_of_routers / max_worker_threads).
""" """
def timeout(timeout_event):
timeout_event.set()
# overall scrape duration
total_scrape_timeout_event = Event()
total_scrape_timer = Timer(config_handler.system_entry().total_max_scrape_duration, timeout, args=(total_scrape_timeout_event,))
total_scrape_timer.start()
with ThreadPoolExecutor(max_workers=max_worker_threads) as executor: with ThreadPoolExecutor(max_workers=max_worker_threads) as executor:
futures = [] futures = {}
for router_entry in self.entries_handler.router_entries: for router_entry in self.entries_handler.router_entries:
if total_scrape_timeout_event.is_set():
print(f'Hit overall timeout while scraping router entry: {router_entry.router_id[MKTXPConfigKeys.ROUTERBOARD_NAME]}')
break
if not router_entry.api_connection.is_connected(): if not router_entry.api_connection.is_connected():
# let's pick up on things in the next run # let's pick up on things in the next run
router_entry.api_connection.connect() router_entry.api_connection.connect()
continue continue
# Publish the collection function as a future # Duration of individual scrapes
futures.append(executor.submit(self.collect_single, router_entry)) scrape_timeout_event = Event()
scrape_timer = Timer(config_handler.system_entry().max_scrape_duration, timeout, args=(scrape_timeout_event,))
scrape_timer.start()
futures[executor.submit(self.collect_router_entry_async, router_entry, scrape_timeout_event, total_scrape_timeout_event)] = scrape_timer
for future in as_completed(futures): for future in as_completed(futures):
# cancel unused timers for scrapes finished regularly (within set duration)
futures[future].cancel()
yield from future.result() yield from future.result()
# in case collection finished without timeouts, cancel the overall scrape duration timer
total_scrape_timer.cancel()
def collect(self): def collect(self):
now = datetime.now().timestamp() if not self._valid_collect_interval():
diff = now - self.last_collect_timestamp
if diff < config_handler.system_entry().minimal_collect_interval:
if config_handler.system_entry().verbose_mode:
print(f'An attemp to collect metrics within minimal collection interval: {diff} < {config_handler.system_entry().minimal_collect_interval}')
print('deferring..')
return return
self.last_collect_timestamp = now
# bandwidth collector
yield from self.collector_registry.bandwidthCollector.collect() yield from self.collector_registry.bandwidthCollector.collect()
# all other collectors
# Check whether to run in parallel by looking at the mktxp system configuration # Check whether to run in parallel by looking at the mktxp system configuration
parallel = config_handler.system_entry().fetch_routers_in_parallel parallel = config_handler.system_entry().fetch_routers_in_parallel
max_worker_threads = config_handler.system_entry().max_worker_threads max_worker_threads = config_handler.system_entry().max_worker_threads
if parallel: if parallel:
yield from self.collect_parallel(max_worker_threads=max_worker_threads) yield from self.collect_async(max_worker_threads=max_worker_threads)
else: else:
yield from self.collect_synchronous() yield from self.collect_sync()
def _valid_collect_interval(self):
now = datetime.now().timestamp()
diff = now - self.last_collect_timestamp
if diff < config_handler.system_entry().minimal_collect_interval:
if config_handler.system_entry().verbose_mode:
print(f'An attemp to collect metrics within minimal metrics collection interval: {diff} < {config_handler.system_entry().minimal_collect_interval}')
print('deferring..')
return False
self.last_collect_timestamp = now
return True