Merge branch 'main' of github.com:nioram/mktxp into main

This commit is contained in:
nioram
2023-01-03 11:57:26 +00:00
7 changed files with 284 additions and 137 deletions

View File

@@ -25,3 +25,8 @@
verbose_mode = False # Set it on for troubleshooting verbose_mode = False # Set it on for troubleshooting
fetch_routers_in_parallel = False # Set to True if you want to fetch multiple routers parallel
max_worker_threads = 5 # Max number of worker threads that can fetch routers. Meaningless if fetch_routers_in_parallel is set to False
max_scrape_duration = 10 # Max duration of individual routers' metrics collection
total_max_scrape_duration = 30 # Max overall duration of all metrics collection

View File

@@ -1,17 +1,19 @@
# coding=utf8 # coding=utf8
## Copyright (c) 2020 Arseniy Kuznetsov # Copyright (c) 2020 Arseniy Kuznetsov
## ##
## This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
## modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
## as published by the Free Software Foundation; either version 2 # as published by the Free Software Foundation; either version 2
## of the License, or (at your option) any later version. # of the License, or (at your option) any later version.
## ##
## This program is distributed in the hope that it will be useful, # This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of # but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details. # GNU General Public License for more details.
import os, sys, shutil import os
import sys
import shutil
from collections import namedtuple from collections import namedtuple
from configobj import ConfigObj from configobj import ConfigObj
from abc import ABCMeta, abstractmethod from abc import ABCMeta, abstractmethod
@@ -22,6 +24,7 @@ from mktxp.utils.utils import FSHelper
''' MKTXP conf file handling ''' MKTXP conf file handling
''' '''
class MKTXPConfigKeys: class MKTXPConfigKeys:
''' MKTXP config file keys ''' MKTXP config file keys
''' '''
@@ -38,12 +41,12 @@ class MKTXPConfigKeys:
FE_PACKAGE_KEY = 'installed_packages' FE_PACKAGE_KEY = 'installed_packages'
FE_DHCP_KEY = 'dhcp' FE_DHCP_KEY = 'dhcp'
FE_DHCP_LEASE_KEY = 'dhcp_lease' FE_DHCP_LEASE_KEY = 'dhcp_lease'
FE_DHCP_POOL_KEY = 'pool' FE_DHCP_POOL_KEY = 'pool'
FE_IP_CONNECTIONS_KEY = 'connections' FE_IP_CONNECTIONS_KEY = 'connections'
FE_INTERFACE_KEY = 'interface' FE_INTERFACE_KEY = 'interface'
FE_FIREWALL_KEY = 'firewall' FE_FIREWALL_KEY = 'firewall'
FE_IPV6_FIREWALL_KEY = 'ipv6_firewall' FE_IPV6_FIREWALL_KEY = 'ipv6_firewall'
FE_IPV6_NEIGHBOR_KEY = 'ipv6_neighbor' FE_IPV6_NEIGHBOR_KEY = 'ipv6_neighbor'
@@ -60,7 +63,7 @@ class MKTXPConfigKeys:
FE_USER_KEY = 'user' FE_USER_KEY = 'user'
FE_QUEUE_KEY = 'queue' FE_QUEUE_KEY = 'queue'
MKTXP_SOCKET_TIMEOUT = 'socket_timeout' MKTXP_SOCKET_TIMEOUT = 'socket_timeout'
MKTXP_INITIAL_DELAY = 'initial_delay_on_failure' MKTXP_INITIAL_DELAY = 'initial_delay_on_failure'
MKTXP_MAX_DELAY = 'max_delay_on_failure' MKTXP_MAX_DELAY = 'max_delay_on_failure'
MKTXP_INC_DIV = 'delay_inc_div' MKTXP_INC_DIV = 'delay_inc_div'
@@ -68,13 +71,16 @@ class MKTXPConfigKeys:
MKTXP_BANDWIDTH_TEST_INTERVAL = 'bandwidth_test_interval' MKTXP_BANDWIDTH_TEST_INTERVAL = 'bandwidth_test_interval'
MKTXP_VERBOSE_MODE = 'verbose_mode' MKTXP_VERBOSE_MODE = 'verbose_mode'
MKTXP_MIN_COLLECT_INTERVAL = 'minimal_collect_interval' MKTXP_MIN_COLLECT_INTERVAL = 'minimal_collect_interval'
MKTXP_FETCH_IN_PARALLEL = 'fetch_routers_in_parallel'
MKTXP_MAX_WORKER_THREADS = 'max_worker_threads'
MKTXP_MAX_SCRAPE_DURATION = 'max_scrape_duration'
MKTXP_TOTAL_MAX_SCRAPE_DURATION = 'total_max_scrape_duration'
# UnRegistered entries placeholder # UnRegistered entries placeholder
NO_ENTRIES_REGISTERED = 'NoEntriesRegistered' NO_ENTRIES_REGISTERED = 'NoEntriesRegistered'
MKTXP_USE_COMMENTS_OVER_NAMES = 'use_comments_over_names' MKTXP_USE_COMMENTS_OVER_NAMES = 'use_comments_over_names'
# Base router id labels # Base router id labels
ROUTERBOARD_NAME = 'routerboard_name' ROUTERBOARD_NAME = 'routerboard_name'
ROUTERBOARD_ADDRESS = 'routerboard_address' ROUTERBOARD_ADDRESS = 'routerboard_address'
@@ -89,20 +95,28 @@ class MKTXPConfigKeys:
DEFAULT_MKTXP_INC_DIV = 5 DEFAULT_MKTXP_INC_DIV = 5
DEFAULT_MKTXP_BANDWIDTH_TEST_INTERVAL = 420 DEFAULT_MKTXP_BANDWIDTH_TEST_INTERVAL = 420
DEFAULT_MKTXP_MIN_COLLECT_INTERVAL = 5 DEFAULT_MKTXP_MIN_COLLECT_INTERVAL = 5
DEFAULT_MKTXP_FETCH_IN_PARALLEL = False
DEFAULT_MKTXP_MAX_WORKER_THREADS = 5
DEFAULT_MKTXP_MAX_SCRAPE_DURATION = 10
DEFAULT_MKTXP_TOTAL_MAX_SCRAPE_DURATION = 30
BOOLEAN_KEYS_NO = {ENABLED_KEY, SSL_KEY, NO_SSL_CERTIFICATE,
SSL_CERTIFICATE_VERIFY, FE_IPV6_FIREWALL_KEY, FE_IPV6_NEIGHBOR_KEY}
BOOLEAN_KEYS_NO = {ENABLED_KEY, SSL_KEY, NO_SSL_CERTIFICATE, SSL_CERTIFICATE_VERIFY, FE_IPV6_FIREWALL_KEY, FE_IPV6_NEIGHBOR_KEY}
# Feature keys enabled by default # Feature keys enabled by default
BOOLEAN_KEYS_YES = {FE_DHCP_KEY, FE_PACKAGE_KEY, FE_DHCP_LEASE_KEY, FE_DHCP_POOL_KEY, FE_IP_CONNECTIONS_KEY, FE_INTERFACE_KEY, FE_FIREWALL_KEY, BOOLEAN_KEYS_YES = {FE_DHCP_KEY, FE_PACKAGE_KEY, FE_DHCP_LEASE_KEY, FE_DHCP_POOL_KEY, FE_IP_CONNECTIONS_KEY, FE_INTERFACE_KEY, FE_FIREWALL_KEY,
FE_MONITOR_KEY, FE_ROUTE_KEY, MKTXP_USE_COMMENTS_OVER_NAMES, FE_MONITOR_KEY, FE_ROUTE_KEY, MKTXP_USE_COMMENTS_OVER_NAMES,
FE_WIRELESS_KEY, FE_WIRELESS_CLIENTS_KEY, FE_CAPSMAN_KEY, FE_CAPSMAN_CLIENTS_KEY, FE_POE_KEY, FE_WIRELESS_KEY, FE_WIRELESS_CLIENTS_KEY, FE_CAPSMAN_KEY, FE_CAPSMAN_CLIENTS_KEY, FE_POE_KEY,
FE_NETWATCH_KEY, FE_PUBLIC_IP_KEY, FE_USER_KEY, FE_QUEUE_KEY} FE_NETWATCH_KEY, FE_PUBLIC_IP_KEY, FE_USER_KEY, FE_QUEUE_KEY}
SYSTEM_BOOLEAN_KEYS_YES = {MKTXP_BANDWIDTH_KEY} SYSTEM_BOOLEAN_KEYS_YES = {MKTXP_BANDWIDTH_KEY}
SYSTEM_BOOLEAN_KEYS_NO = {MKTXP_VERBOSE_MODE} SYSTEM_BOOLEAN_KEYS_NO = {MKTXP_VERBOSE_MODE, MKTXP_FETCH_IN_PARALLEL}
STR_KEYS = (HOST_KEY, USER_KEY, PASSWD_KEY) STR_KEYS = (HOST_KEY, USER_KEY, PASSWD_KEY)
MKTXP_INT_KEYS = (PORT_KEY, MKTXP_SOCKET_TIMEOUT, MKTXP_INITIAL_DELAY, MKTXP_MAX_DELAY, MKTXP_INC_DIV, MKTXP_BANDWIDTH_TEST_INTERVAL, MKTXP_MIN_COLLECT_INTERVAL) MKTXP_INT_KEYS = (PORT_KEY, MKTXP_SOCKET_TIMEOUT, MKTXP_INITIAL_DELAY, MKTXP_MAX_DELAY,
MKTXP_INC_DIV, MKTXP_BANDWIDTH_TEST_INTERVAL, MKTXP_MIN_COLLECT_INTERVAL,
MKTXP_MAX_WORKER_THREADS, MKTXP_MAX_SCRAPE_DURATION, MKTXP_TOTAL_MAX_SCRAPE_DURATION)
# MKTXP config entry nane # MKTXP config entry nane
MKTXP_CONFIG_ENTRY_NAME = 'MKTXP' MKTXP_CONFIG_ENTRY_NAME = 'MKTXP'
@@ -110,21 +124,24 @@ class MKTXPConfigKeys:
class ConfigEntry: class ConfigEntry:
MKTXPConfigEntry = namedtuple('MKTXPConfigEntry', [MKTXPConfigKeys.ENABLED_KEY, MKTXPConfigKeys.HOST_KEY, MKTXPConfigKeys.PORT_KEY, MKTXPConfigEntry = namedtuple('MKTXPConfigEntry', [MKTXPConfigKeys.ENABLED_KEY, MKTXPConfigKeys.HOST_KEY, MKTXPConfigKeys.PORT_KEY,
MKTXPConfigKeys.USER_KEY, MKTXPConfigKeys.PASSWD_KEY, MKTXPConfigKeys.USER_KEY, MKTXPConfigKeys.PASSWD_KEY,
MKTXPConfigKeys.SSL_KEY, MKTXPConfigKeys.NO_SSL_CERTIFICATE, MKTXPConfigKeys.SSL_CERTIFICATE_VERIFY, MKTXPConfigKeys.SSL_KEY, MKTXPConfigKeys.NO_SSL_CERTIFICATE, MKTXPConfigKeys.SSL_CERTIFICATE_VERIFY,
MKTXPConfigKeys.FE_DHCP_KEY, MKTXPConfigKeys.FE_PACKAGE_KEY, MKTXPConfigKeys.FE_DHCP_LEASE_KEY, MKTXPConfigKeys.FE_DHCP_POOL_KEY, MKTXPConfigKeys.FE_INTERFACE_KEY, MKTXPConfigKeys.FE_DHCP_KEY, MKTXPConfigKeys.FE_PACKAGE_KEY, MKTXPConfigKeys.FE_DHCP_LEASE_KEY, MKTXPConfigKeys.FE_DHCP_POOL_KEY, MKTXPConfigKeys.FE_INTERFACE_KEY,
MKTXPConfigKeys.FE_FIREWALL_KEY, MKTXPConfigKeys.FE_MONITOR_KEY, MKTXPConfigKeys.FE_ROUTE_KEY, MKTXPConfigKeys.FE_WIRELESS_KEY, MKTXPConfigKeys.FE_WIRELESS_CLIENTS_KEY, MKTXPConfigKeys.FE_FIREWALL_KEY, MKTXPConfigKeys.FE_MONITOR_KEY, MKTXPConfigKeys.FE_ROUTE_KEY, MKTXPConfigKeys.FE_WIRELESS_KEY, MKTXPConfigKeys.FE_WIRELESS_CLIENTS_KEY,
MKTXPConfigKeys.FE_IP_CONNECTIONS_KEY, MKTXPConfigKeys.FE_CAPSMAN_KEY, MKTXPConfigKeys.FE_CAPSMAN_CLIENTS_KEY, MKTXPConfigKeys.FE_POE_KEY, MKTXPConfigKeys.FE_NETWATCH_KEY, MKTXPConfigKeys.FE_IP_CONNECTIONS_KEY, MKTXPConfigKeys.FE_CAPSMAN_KEY, MKTXPConfigKeys.FE_CAPSMAN_CLIENTS_KEY, MKTXPConfigKeys.FE_POE_KEY, MKTXPConfigKeys.FE_NETWATCH_KEY,
MKTXPConfigKeys.MKTXP_USE_COMMENTS_OVER_NAMES, MKTXPConfigKeys.FE_PUBLIC_IP_KEY, MKTXPConfigKeys.FE_IPV6_FIREWALL_KEY, MKTXPConfigKeys.FE_IPV6_NEIGHBOR_KEY, MKTXPConfigKeys.MKTXP_USE_COMMENTS_OVER_NAMES, MKTXPConfigKeys.FE_PUBLIC_IP_KEY, MKTXPConfigKeys.FE_IPV6_FIREWALL_KEY, MKTXPConfigKeys.FE_IPV6_NEIGHBOR_KEY,
MKTXPConfigKeys.FE_USER_KEY, MKTXPConfigKeys.FE_QUEUE_KEY MKTXPConfigKeys.FE_USER_KEY, MKTXPConfigKeys.FE_QUEUE_KEY
]) ])
MKTXPSystemEntry = namedtuple('MKTXPSystemEntry', [MKTXPConfigKeys.PORT_KEY, MKTXPConfigKeys.MKTXP_SOCKET_TIMEOUT, MKTXPSystemEntry = namedtuple('MKTXPSystemEntry', [MKTXPConfigKeys.PORT_KEY, MKTXPConfigKeys.MKTXP_SOCKET_TIMEOUT,
MKTXPConfigKeys.MKTXP_INITIAL_DELAY, MKTXPConfigKeys.MKTXP_MAX_DELAY, MKTXPConfigKeys.MKTXP_INITIAL_DELAY, MKTXPConfigKeys.MKTXP_MAX_DELAY,
MKTXPConfigKeys.MKTXP_INC_DIV, MKTXPConfigKeys.MKTXP_BANDWIDTH_KEY, MKTXPConfigKeys.MKTXP_INC_DIV, MKTXPConfigKeys.MKTXP_BANDWIDTH_KEY,
MKTXPConfigKeys.MKTXP_VERBOSE_MODE, MKTXPConfigKeys.MKTXP_BANDWIDTH_TEST_INTERVAL, MKTXPConfigKeys.MKTXP_MIN_COLLECT_INTERVAL]) MKTXPConfigKeys.MKTXP_VERBOSE_MODE, MKTXPConfigKeys.MKTXP_BANDWIDTH_TEST_INTERVAL,
MKTXPConfigKeys.MKTXP_MIN_COLLECT_INTERVAL, MKTXPConfigKeys.MKTXP_FETCH_IN_PARALLEL,
MKTXPConfigKeys.MKTXP_MAX_WORKER_THREADS, MKTXPConfigKeys.MKTXP_MAX_SCRAPE_DURATION,
MKTXPConfigKeys.MKTXP_TOTAL_MAX_SCRAPE_DURATION])
class OSConfig(metaclass = ABCMeta): class OSConfig(metaclass=ABCMeta):
''' OS-related config ''' OS-related config
''' '''
@staticmethod @staticmethod
@@ -169,7 +186,7 @@ class LinuxConfig(OSConfig):
@property @property
def mktxp_user_dir_path(self): def mktxp_user_dir_path(self):
return FSHelper.full_path('~/mktxp') return FSHelper.full_path('~/mktxp')
#return FSHelper.full_path('/etc/mktxp') # return FSHelper.full_path('/etc/mktxp')
class MKTXPConfigHandler: class MKTXPConfigHandler:
@@ -183,11 +200,15 @@ class MKTXPConfigHandler:
os.makedirs(self.os_config.mktxp_user_dir_path) os.makedirs(self.os_config.mktxp_user_dir_path)
# if needed, stage the user config data # if needed, stage the user config data
self.usr_conf_data_path = os.path.join(self.os_config.mktxp_user_dir_path, 'mktxp.conf') self.usr_conf_data_path = os.path.join(
self.mktxp_conf_path = os.path.join(self.os_config.mktxp_user_dir_path, '_mktxp.conf') self.os_config.mktxp_user_dir_path, 'mktxp.conf')
self.mktxp_conf_path = os.path.join(
self.os_config.mktxp_user_dir_path, '_mktxp.conf')
self._create_os_path(self.usr_conf_data_path, 'mktxp/cli/config/mktxp.conf') self._create_os_path(self.usr_conf_data_path,
self._create_os_path(self.mktxp_conf_path, 'mktxp/cli/config/_mktxp.conf') 'mktxp/cli/config/mktxp.conf')
self._create_os_path(self.mktxp_conf_path,
'mktxp/cli/config/_mktxp.conf')
self.re_compiled = {} self.re_compiled = {}
@@ -225,18 +246,20 @@ class MKTXPConfigHandler:
def _create_os_path(self, os_path, resource_path): def _create_os_path(self, os_path, resource_path):
if not os.path.exists(os_path): if not os.path.exists(os_path):
# stage from the conf templates # stage from the conf templates
lookup_path = resource_filename(Requirement.parse("mktxp"), resource_path) lookup_path = resource_filename(
Requirement.parse("mktxp"), resource_path)
shutil.copy(lookup_path, os_path) shutil.copy(lookup_path, os_path)
def _config_entry_reader(self, entry_name): def _config_entry_reader(self, entry_name):
config_entry_reader = {} config_entry_reader = {}
write_needed = False new_keys = []
for key in MKTXPConfigKeys.BOOLEAN_KEYS_NO.union(MKTXPConfigKeys.BOOLEAN_KEYS_YES): for key in MKTXPConfigKeys.BOOLEAN_KEYS_NO.union(MKTXPConfigKeys.BOOLEAN_KEYS_YES):
if self.config[entry_name].get(key): if self.config[entry_name].get(key):
config_entry_reader[key] = self.config[entry_name].as_bool(key) config_entry_reader[key] = self.config[entry_name].as_bool(key)
else: else:
config_entry_reader[key] = True if key in MKTXPConfigKeys.BOOLEAN_KEYS_YES else False config_entry_reader[key] = True if key in MKTXPConfigKeys.BOOLEAN_KEYS_YES else False
write_needed = True # read from disk next time new_keys.append(key) # read from disk next time
for key in MKTXPConfigKeys.STR_KEYS: for key in MKTXPConfigKeys.STR_KEYS:
config_entry_reader[key] = self.config[entry_name][key] config_entry_reader[key] = self.config[entry_name][key]
@@ -245,54 +268,73 @@ class MKTXPConfigHandler:
# port # port
if self.config[entry_name].get(MKTXPConfigKeys.PORT_KEY): if self.config[entry_name].get(MKTXPConfigKeys.PORT_KEY):
config_entry_reader[MKTXPConfigKeys.PORT_KEY] = self.config[entry_name].as_int(MKTXPConfigKeys.PORT_KEY) config_entry_reader[MKTXPConfigKeys.PORT_KEY] = self.config[entry_name].as_int(
MKTXPConfigKeys.PORT_KEY)
else: else:
config_entry_reader[MKTXPConfigKeys.PORT_KEY] = self._default_value_for_key(MKTXPConfigKeys.SSL_KEY, config_entry_reader[MKTXPConfigKeys.SSL_KEY]) config_entry_reader[MKTXPConfigKeys.PORT_KEY] = self._default_value_for_key(
write_needed = True # read from disk next time MKTXPConfigKeys.SSL_KEY, config_entry_reader[MKTXPConfigKeys.SSL_KEY])
new_keys.append(MKTXPConfigKeys.PORT_KEY) # read from disk next time
if write_needed:
if new_keys:
self.config[entry_name] = config_entry_reader self.config[entry_name] = config_entry_reader
self.config.write() try:
self.config.write()
if self._config[MKTXPConfigKeys.MKTXP_CONFIG_ENTRY_NAME].as_bool(MKTXPConfigKeys.MKTXP_VERBOSE_MODE):
print(f'Updated router entry {entry_name} with new feature keys {new_keys}')
except Exception as exc:
print(f'Error updating router entry {entry_name} with new feature keys {new_keys}: {exc}')
print('Please update mktxp.conf to its latest version manually')
return config_entry_reader return config_entry_reader
def _system_entry_reader(self): def _system_entry_reader(self):
system_entry_reader = {} system_entry_reader = {}
entry_name = MKTXPConfigKeys.MKTXP_CONFIG_ENTRY_NAME entry_name = MKTXPConfigKeys.MKTXP_CONFIG_ENTRY_NAME
write_needed = False new_keys = []
for key in MKTXPConfigKeys.MKTXP_INT_KEYS: for key in MKTXPConfigKeys.MKTXP_INT_KEYS:
if self._config[entry_name].get(key): if self._config[entry_name].get(key):
system_entry_reader[key] = self._config[entry_name].as_int(key) system_entry_reader[key] = self._config[entry_name].as_int(key)
else: else:
system_entry_reader[key] = self._default_value_for_key(key) system_entry_reader[key] = self._default_value_for_key(key)
write_needed = True # read from disk next time new_keys.append(key) # read from disk next time
for key in MKTXPConfigKeys.SYSTEM_BOOLEAN_KEYS_NO.union(MKTXPConfigKeys.SYSTEM_BOOLEAN_KEYS_YES): for key in MKTXPConfigKeys.SYSTEM_BOOLEAN_KEYS_NO.union(MKTXPConfigKeys.SYSTEM_BOOLEAN_KEYS_YES):
if self._config[entry_name].get(key): if self._config[entry_name].get(key):
system_entry_reader[key] = self._config[entry_name].as_bool(key) system_entry_reader[key] = self._config[entry_name].as_bool(
key)
else: else:
system_entry_reader[key] = True if key in MKTXPConfigKeys.SYSTEM_BOOLEAN_KEYS_YES else False system_entry_reader[key] = True if key in MKTXPConfigKeys.SYSTEM_BOOLEAN_KEYS_YES else False
write_needed = True # read from disk next time new_keys.append(key) # read from disk next time
if write_needed: if new_keys:
self._config[entry_name] = system_entry_reader self._config[entry_name] = system_entry_reader
self._config.write() try:
self._config.write()
if self._config[entry_name].as_bool(MKTXPConfigKeys.MKTXP_VERBOSE_MODE):
print(f'Updated system entry {entry_name} with new system keys {new_keys}')
except Exception as exc:
print(f'Error updating system entry {entry_name} with new system keys {new_keys}: {exc}')
print('Please update _mktxp.conf to its latest version manually')
return system_entry_reader return system_entry_reader
def _default_value_for_key(self, key, value = None): def _default_value_for_key(self, key, value=None):
return { return {
MKTXPConfigKeys.SSL_KEY: lambda value: MKTXPConfigKeys.DEFAULT_API_SSL_PORT if value else MKTXPConfigKeys.DEFAULT_API_PORT, MKTXPConfigKeys.SSL_KEY: lambda value: MKTXPConfigKeys.DEFAULT_API_SSL_PORT if value else MKTXPConfigKeys.DEFAULT_API_PORT,
MKTXPConfigKeys.PORT_KEY: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_PORT, MKTXPConfigKeys.PORT_KEY: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_PORT,
MKTXPConfigKeys.MKTXP_SOCKET_TIMEOUT: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_SOCKET_TIMEOUT, MKTXPConfigKeys.MKTXP_SOCKET_TIMEOUT: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_SOCKET_TIMEOUT,
MKTXPConfigKeys.MKTXP_INITIAL_DELAY: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_INITIAL_DELAY, MKTXPConfigKeys.MKTXP_INITIAL_DELAY: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_INITIAL_DELAY,
MKTXPConfigKeys.MKTXP_MAX_DELAY: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_MAX_DELAY, MKTXPConfigKeys.MKTXP_MAX_DELAY: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_MAX_DELAY,
MKTXPConfigKeys.MKTXP_INC_DIV: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_INC_DIV, MKTXPConfigKeys.MKTXP_INC_DIV: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_INC_DIV,
MKTXPConfigKeys.MKTXP_BANDWIDTH_TEST_INTERVAL: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_BANDWIDTH_TEST_INTERVAL, MKTXPConfigKeys.MKTXP_BANDWIDTH_TEST_INTERVAL: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_BANDWIDTH_TEST_INTERVAL,
MKTXPConfigKeys.MKTXP_MIN_COLLECT_INTERVAL: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_MIN_COLLECT_INTERVAL MKTXPConfigKeys.MKTXP_MIN_COLLECT_INTERVAL: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_MIN_COLLECT_INTERVAL,
}[key](value) MKTXPConfigKeys.MKTXP_FETCH_IN_PARALLEL: lambda _: MKTXPConfigKeys.DEFAULT_MKTXP_FETCH_IN_PARALLEL,
MKTXPConfigKeys.MKTXP_MAX_WORKER_THREADS: lambda _: MKTXPConfigKeys.DEFAULT_MKTXP_MAX_WORKER_THREADS,
MKTXPConfigKeys.MKTXP_MAX_SCRAPE_DURATION: lambda _: MKTXPConfigKeys.DEFAULT_MKTXP_MAX_SCRAPE_DURATION,
MKTXPConfigKeys.MKTXP_TOTAL_MAX_SCRAPE_DURATION: lambda _: MKTXPConfigKeys.DEFAULT_MKTXP_TOTAL_MAX_SCRAPE_DURATION,
}[key](value)
# Simplest possible Singleton impl # Simplest possible Singleton impl
config_handler = MKTXPConfigHandler() config_handler = MKTXPConfigHandler()

View File

@@ -13,75 +13,71 @@
from mktxp.collector.base_collector import BaseCollector from mktxp.collector.base_collector import BaseCollector
from mktxp.datasource.queue_ds import QueueTreeMetricsDataSource from mktxp.datasource.queue_ds import QueueMetricsDataSource
class QueueTreeCollector(BaseCollector): class QueueTreeCollector(BaseCollector):
'''Queue Tree collector''' '''Queue Tree collector'''
@staticmethod @staticmethod
def collect(router_entry): def collect(router_entry):
if not router_entry.config_entry.installed_packages: if not router_entry.config_entry.queue:
return return
qt_labels = ['name', 'parent', 'packet_mark', 'limit_at', 'max_limit', 'priority', 'bytes', 'packets', 'queued_bytes', 'queued_packets','dropped', 'rate', 'packet_rate', 'disabled'] qt_labels = ['name', 'parent', 'packet_mark', 'limit_at', 'max_limit', 'priority', 'bytes', 'queued_bytes', 'dropped', 'rate', 'disabled']
qt_records = QueueTreeMetricsDataSource.metric_records(router_entry, metric_labels=qt_labels) qt_records = QueueMetricsDataSource.metric_records(router_entry, metric_labels=qt_labels, kind = 'tree')
if qt_records: if qt_records:
qt_rate_metric = BaseCollector.counter_collector('queue_tree_rates', 'Average passing data rate in bytes per second', qt_records, 'rate', ['name']) qt_rate_metric = BaseCollector.counter_collector('queue_tree_rates', 'Average passing data rate in bytes per second', qt_records, 'rate', ['name'])
yield qt_rate_metric yield qt_rate_metric
qt_packet_rate_metric = BaseCollector.counter_collector('queue_tree_packet_rates', 'Average passing data rate in packets per second', qt_records, 'packet_rate', ['name'])
yield qt_packet_rate_metric
qt_byte_metric = BaseCollector.counter_collector('queue_tree_bytes', 'Number of processed bytes', qt_records, 'bytes', ['name']) qt_byte_metric = BaseCollector.counter_collector('queue_tree_bytes', 'Number of processed bytes', qt_records, 'bytes', ['name'])
yield qt_byte_metric yield qt_byte_metric
qt_packet_metric = BaseCollector.counter_collector('queue_tree_pakets', 'Number of processed packets', qt_records, 'packets', ['name'])
yield qt_packet_metric
qt_queued_metric = BaseCollector.counter_collector('queue_tree_queued_bytes', 'Number of queued bytes', qt_records, 'queued_bytes', ['name']) qt_queued_metric = BaseCollector.counter_collector('queue_tree_queued_bytes', 'Number of queued bytes', qt_records, 'queued_bytes', ['name'])
yield qt_queued_metric yield qt_queued_metric
qt_queued_packets_metric = BaseCollector.counter_collector('queue_tree_queued_packets', 'Number of queued packets', qt_records, 'queued_packets', ['name']) qt_drop_metric = BaseCollector.counter_collector('queue_tree_dropped', 'Number of dropped bytes', qt_records, 'dropped', ['name'])
yield qt_queued_packets_metric
qt_drop_metric = BaseCollector.counter_collector('queue_tree_dropped', 'Number of dropped packets', qt_records, 'dropped', ['name'])
yield qt_drop_metric yield qt_drop_metric
class SimpleCollector(BaseCollector): class QueueSimpleCollector(BaseCollector):
'''Simple Queue collector''' '''Simple Queue collector'''
@staticmethod @staticmethod
def collect(router_entry): def collect(router_entry):
if not router_entry.config_entry.installed_packages: if not router_entry.config_entry.queue:
return return
qt_labels = ['name', 'parent', 'packet_mark', 'limit_at', 'max_limit', 'priority', 'bytes', 'packets', 'queued_bytes', 'queued_packets','dropped', 'rate', 'packet_rate', 'disabled'] qt_labels = ['name', 'parent', 'packet_mark', 'limit_at', 'max_limit', 'priority', 'bytes', 'packets', 'queued_bytes', 'queued_packets','dropped', 'rate', 'packet_rate', 'disabled']
qt_records = QueueTreeMetricsDataSource.metric_records(router_entry, metric_labels=qt_labels) qt_records = QueueMetricsDataSource.metric_records(router_entry, metric_labels=qt_labels, kind = 'simple')
if qt_records: if qt_records:
qt_rate_metric = BaseCollector.counter_collector('queue_tree_rates', 'Average passing data rate in bytes per second', qt_records, 'rate', ['name']) qt_rate_metric = BaseCollector.counter_collector('queue_simple_rates_upload', 'Average passing upload data rate in bytes per second', qt_records, 'rate_up', ['name'])
yield qt_rate_metric yield qt_rate_metric
qt_packet_rate_metric = BaseCollector.counter_collector('queue_tree_packet_rates', 'Average passing data rate in packets per second', qt_records, 'packet_rate', ['name']) qt_rate_metric = BaseCollector.counter_collector('queue_simple_rates_download', 'Average passing download data rate in bytes per second', qt_records, 'rate_down', ['name'])
yield qt_packet_rate_metric yield qt_rate_metric
qt_byte_metric = BaseCollector.counter_collector('queue_tree_bytes', 'Number of processed bytes', qt_records, 'bytes', ['name'])
qt_byte_metric = BaseCollector.counter_collector('queue_simple_bytes_upload', 'Number of upload processed bytes', qt_records, 'bytes_up', ['name'])
yield qt_byte_metric yield qt_byte_metric
qt_packet_metric = BaseCollector.counter_collector('queue_tree_pakets', 'Number of processed packets', qt_records, 'packets', ['name']) qt_byte_metric = BaseCollector.counter_collector('queue_simple_bytes_download', 'Number of download processed bytes', qt_records, 'bytes_down', ['name'])
yield qt_packet_metric yield qt_byte_metric
qt_queued_metric = BaseCollector.counter_collector('queue_tree_queued_bytes', 'Number of queued bytes', qt_records, 'queued_bytes', ['name'])
qt_queued_metric = BaseCollector.counter_collector('queue_simple_queued_bytes_upload', 'Number of upload queued bytes', qt_records, 'queued_bytes_up', ['name'])
yield qt_queued_metric yield qt_queued_metric
qt_queued_packets_metric = BaseCollector.counter_collector('queue_tree_queued_packets', 'Number of queued packets', qt_records, 'queued_packets', ['name']) qt_queued_metric = BaseCollector.counter_collector('queue_simple_queued_bytes_downloadd', 'Number of download queued bytes', qt_records, 'queued_bytes_down', ['name'])
yield qt_queued_packets_metric yield qt_queued_metric
qt_drop_metric = BaseCollector.counter_collector('queue_tree_dropped', 'Number of dropped packets', qt_records, 'dropped', ['name']) qt_drop_metric = BaseCollector.counter_collector('queue_simple_dropped_upload', 'Number of upload dropped bytes', qt_records, 'dropped_up', ['name'])
yield qt_drop_metric
qt_drop_metric = BaseCollector.counter_collector('queue_simple_dropped_download', 'Number of download dropped bytes', qt_records, 'dropped_down', ['name'])
yield qt_drop_metric yield qt_drop_metric

View File

@@ -15,31 +15,37 @@
from mktxp.datasource.base_ds import BaseDSProcessor from mktxp.datasource.base_ds import BaseDSProcessor
class QueueTreeMetricsDataSource: class QueueMetricsDataSource:
''' Queue Tree Metrics data provider ''' Queue Metrics data provider
''' '''
@staticmethod @staticmethod
def metric_records(router_entry, *, metric_labels = None): def metric_records(router_entry, *, metric_labels = None, kind = 'tree'):
if metric_labels is None: if metric_labels is None:
metric_labels = [] metric_labels = []
try: try:
queue_tree_records = router_entry.api_connection.router_api().get_resource('/queue/tree/').get() queue_records = router_entry.api_connection.router_api().get_resource(f'/queue/{kind}/').get()
return BaseDSProcessor.trimmed_records(router_entry, router_records = queue_tree_records, metric_labels = metric_labels) queue_records = BaseDSProcessor.trimmed_records(router_entry, router_records = queue_records, metric_labels = metric_labels)
except Exception as exc: except Exception as exc:
print(f'Error getting system resource info from router{router_entry.router_name}@{router_entry.config_entry.hostname}: {exc}') print(f'Error getting system resource info from router{router_entry.router_name}@{router_entry.config_entry.hostname}: {exc}')
return None return None
if kind == 'tree':
return queue_records
# simple queue records need splitting upload/download values
splitted_queue_records = []
for queue_record in queue_records:
splitted_queue_record = {}
for key, value in queue_record.items():
split_values = value.split('/')
if split_values and len(split_values) > 1:
splitted_queue_record[f'{key}_up'] = split_values[0]
splitted_queue_record[f'{key}_down'] = split_values[1]
else:
splitted_queue_record[key] = value
splitted_queue_records.append(splitted_queue_record)
return splitted_queue_records
class SimpleQueueMetricsDataSource:
''' Simple Queue Metrics data provider
'''
@staticmethod
def metric_records(router_entry, *, metric_labels = None):
if metric_labels is None:
metric_labels = []
try:
simple_queue_records = router_entry.api_connection.router_api().get_resource('/queue/simple/').get()
return BaseDSProcessor.trimmed_records(router_entry, router_records = simple_queue_records, metric_labels = metric_labels)
except Exception as exc:
print(f'Error getting system resource info from router{router_entry.router_name}@{router_entry.config_entry.hostname}: {exc}')
return None

View File

@@ -1,52 +1,147 @@
# coding=utf8 # coding=utf8
## Copyright (c) 2020 Arseniy Kuznenowov # Copyright (c) 2020 Arseniy Kuznenowov
## ##
## This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
## modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
## as published by the Free Software Foundation; either version 2 # as published by the Free Software Foundation; either version 2
## of the License, or (at your option) any later version. # of the License, or (at your option) any later version.
## ##
## This program is distributed in the hope that it will be useful, # This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of # but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details. # GNU General Public License for more details.
from concurrent.futures import ThreadPoolExecutor, as_completed
from timeit import default_timer from timeit import default_timer
from datetime import datetime from datetime import datetime
from threading import Event, Timer
from mktxp.cli.config.config import config_handler from mktxp.cli.config.config import config_handler
from mktxp.cli.config.config import MKTXPConfigKeys
class CollectorHandler: class CollectorHandler:
''' MKTXP Collectors Handler ''' MKTXP Collectors Handler
''' '''
def __init__(self, entries_handler, collector_registry): def __init__(self, entries_handler, collector_registry):
self.entries_handler = entries_handler self.entries_handler = entries_handler
self.collector_registry = collector_registry self.collector_registry = collector_registry
self.last_collect_timestamp = 0 self.last_collect_timestamp = 0
def collect(self):
now = datetime.now().timestamp()
diff = now - self.last_collect_timestamp
if diff < config_handler.system_entry().minimal_collect_interval:
if config_handler.system_entry().verbose_mode:
print(f'An attemp to collect metrics within minimal collection interval: {diff} < {config_handler.system_entry().minimal_collect_interval}')
print('deferring..')
return
self.last_collect_timestamp = now
yield from self.collector_registry.bandwidthCollector.collect()
def collect_sync(self):
"""
Collect the metrics of all router entries defined in the current users configuration synchronously.
This function iterates over each router entry one-by-one.
Thus, the total runtime of this function scales linearly with the number of registered routers.
"""
for router_entry in self.entries_handler.router_entries: for router_entry in self.entries_handler.router_entries:
if not router_entry.api_connection.is_connected(): if not router_entry.api_connection.is_connected():
# let's pick up on things in the next run # let's pick up on things in the next run
router_entry.api_connection.connect() router_entry.api_connection.connect()
continue continue
for collector_ID, collect_func in self.collector_registry.registered_collectors.items(): for collector_ID, collect_func in self.collector_registry.registered_collectors.items():
start = default_timer() start = default_timer()
yield from collect_func(router_entry) yield from collect_func(router_entry)
router_entry.time_spent[collector_ID] += default_timer() - start router_entry.time_spent[collector_ID] += default_timer() - start
def collect_router_entry_async(self, router_entry, scrape_timeout_event, total_scrape_timeout_event):
results = []
for collector_ID, collect_func in self.collector_registry.registered_collectors.items():
if scrape_timeout_event.is_set():
print(f'Hit timeout while scraping router entry: {router_entry.router_id[MKTXPConfigKeys.ROUTERBOARD_NAME]}')
break
if total_scrape_timeout_event.is_set():
print(f'Hit overall timeout while scraping router entry: {router_entry.router_id[MKTXPConfigKeys.ROUTERBOARD_NAME]}')
break
start = default_timer()
result = list(collect_func(router_entry))
results += result
router_entry.time_spent[collector_ID] += default_timer() - start
return results
def collect_async(self, max_worker_threads=5):
"""
Collect the metrics of all router entries defined in the current users configuration in parallel.
This function iterates over multiple routers in parallel (depending on the value of max_worker_threads).
Thus, the total runtime scales sub linearly (number_of_routers / max_worker_threads).
"""
def timeout(timeout_event):
timeout_event.set()
# overall scrape duration
total_scrape_timeout_event = Event()
total_scrape_timer = Timer(config_handler.system_entry().total_max_scrape_duration, timeout, args=(total_scrape_timeout_event,))
total_scrape_timer.start()
with ThreadPoolExecutor(max_workers=max_worker_threads) as executor:
futures = {}
for router_entry in self.entries_handler.router_entries:
if total_scrape_timeout_event.is_set():
print(f'Hit overall timeout while scraping router entry: {router_entry.router_id[MKTXPConfigKeys.ROUTERBOARD_NAME]}')
break
if not router_entry.api_connection.is_connected():
# let's pick up on things in the next run
router_entry.api_connection.connect()
continue
# Duration of individual scrapes
scrape_timeout_event = Event()
scrape_timer = Timer(config_handler.system_entry().max_scrape_duration, timeout, args=(scrape_timeout_event,))
scrape_timer.start()
futures[executor.submit(self.collect_router_entry_async, router_entry, scrape_timeout_event, total_scrape_timeout_event)] = scrape_timer
for future in as_completed(futures):
# cancel unused timers for scrapes finished regularly (within set duration)
futures[future].cancel()
yield from future.result()
# in case collection finished without timeouts, cancel the overall scrape duration timer
total_scrape_timer.cancel()
def collect(self):
if not self._valid_collect_interval():
return
# bandwidth collector
yield from self.collector_registry.bandwidthCollector.collect()
# all other collectors
# Check whether to run in parallel by looking at the mktxp system configuration
parallel = config_handler.system_entry().fetch_routers_in_parallel
max_worker_threads = config_handler.system_entry().max_worker_threads
if parallel:
yield from self.collect_async(max_worker_threads=max_worker_threads)
else:
yield from self.collect_sync()
def _valid_collect_interval(self):
now = datetime.now().timestamp()
diff = now - self.last_collect_timestamp
if diff < config_handler.system_entry().minimal_collect_interval:
if config_handler.system_entry().verbose_mode:
print(f'An attemp to collect metrics within minimal metrics collection interval: {diff} < {config_handler.system_entry().minimal_collect_interval}')
print('deferring..')
return False
self.last_collect_timestamp = now
return True

View File

@@ -34,6 +34,7 @@ from mktxp.collector.firewall_collector import FirewallCollector
from mktxp.collector.mktxp_collector import MKTXPCollector from mktxp.collector.mktxp_collector import MKTXPCollector
from mktxp.collector.user_collector import UserCollector from mktxp.collector.user_collector import UserCollector
from mktxp.collector.queue_collector import QueueTreeCollector from mktxp.collector.queue_collector import QueueTreeCollector
from mktxp.collector.queue_collector import QueueSimpleCollector
class CollectorRegistry: class CollectorRegistry:
@@ -69,6 +70,7 @@ class CollectorRegistry:
self.register('UserCollector', UserCollector.collect) self.register('UserCollector', UserCollector.collect)
self.register('QueueTreeCollector', QueueTreeCollector.collect) self.register('QueueTreeCollector', QueueTreeCollector.collect)
self.register('QueueSimpleCollector', QueueSimpleCollector.collect)
self.register('MKTXPCollector', MKTXPCollector.collect) self.register('MKTXPCollector', MKTXPCollector.collect)

View File

@@ -45,6 +45,7 @@ class RouterEntry:
'WLANCollector': 0, 'WLANCollector': 0,
'CapsmanCollector': 0, 'CapsmanCollector': 0,
'QueueTreeCollector': 0, 'QueueTreeCollector': 0,
'QueueSimpleCollector': 0,
'UserCollector': 0, 'UserCollector': 0,
'MKTXPCollector': 0 'MKTXPCollector': 0
} }