Queue tree / Active Users collectors, fixes

This commit is contained in:
Arseniy Kuznetsov
2022-12-10 17:47:17 +01:00
parent f5c43a9dd0
commit a93ec2939d
14 changed files with 245 additions and 14 deletions

View File

@@ -21,6 +21,7 @@
bandwidth = True # Turns metrics bandwidth metrics collection on / off
bandwidth_test_interval = 420 # Interval for colllecting bandwidth metrics
minimal_collect_interval = 5 # Minimal metric collection interval
verbose_mode = False # Set it on for troubleshooting

View File

@@ -57,6 +57,9 @@ class MKTXPConfigKeys:
FE_PUBLIC_IP_KEY = 'public_ip'
FE_NETWATCH_KEY = 'netwatch'
FE_USER_KEY = 'user'
FE_QUEUE_KEY = 'queue'
MKTXP_SOCKET_TIMEOUT = 'socket_timeout'
MKTXP_INITIAL_DELAY = 'initial_delay_on_failure'
MKTXP_MAX_DELAY = 'max_delay_on_failure'
@@ -64,6 +67,7 @@ class MKTXPConfigKeys:
MKTXP_BANDWIDTH_KEY = 'bandwidth'
MKTXP_BANDWIDTH_TEST_INTERVAL = 'bandwidth_test_interval'
MKTXP_VERBOSE_MODE = 'verbose_mode'
MKTXP_MIN_COLLECT_INTERVAL = 'minimal_collect_interval'
# UnRegistered entries placeholder
NO_ENTRIES_REGISTERED = 'NoEntriesRegistered'
@@ -84,6 +88,7 @@ class MKTXPConfigKeys:
DEFAULT_MKTXP_MAX_DELAY = 900
DEFAULT_MKTXP_INC_DIV = 5
DEFAULT_MKTXP_BANDWIDTH_TEST_INTERVAL = 420
DEFAULT_MKTXP_MIN_COLLECT_INTERVAL = 5
BOOLEAN_KEYS_NO = {ENABLED_KEY, SSL_KEY, NO_SSL_CERTIFICATE, SSL_CERTIFICATE_VERIFY, FE_IPV6_FIREWALL_KEY, FE_IPV6_NEIGHBOR_KEY}
@@ -91,13 +96,13 @@ class MKTXPConfigKeys:
BOOLEAN_KEYS_YES = {FE_DHCP_KEY, FE_PACKAGE_KEY, FE_DHCP_LEASE_KEY, FE_DHCP_POOL_KEY, FE_IP_CONNECTIONS_KEY, FE_INTERFACE_KEY, FE_FIREWALL_KEY,
FE_MONITOR_KEY, FE_ROUTE_KEY, MKTXP_USE_COMMENTS_OVER_NAMES,
FE_WIRELESS_KEY, FE_WIRELESS_CLIENTS_KEY, FE_CAPSMAN_KEY, FE_CAPSMAN_CLIENTS_KEY, FE_POE_KEY,
FE_NETWATCH_KEY, FE_PUBLIC_IP_KEY}
FE_NETWATCH_KEY, FE_PUBLIC_IP_KEY, FE_USER_KEY, FE_QUEUE_KEY}
SYSTEM_BOOLEAN_KEYS_YES = {MKTXP_BANDWIDTH_KEY}
SYSTEM_BOOLEAN_KEYS_NO = {MKTXP_VERBOSE_MODE}
STR_KEYS = (HOST_KEY, USER_KEY, PASSWD_KEY)
MKTXP_INT_KEYS = (PORT_KEY, MKTXP_SOCKET_TIMEOUT, MKTXP_INITIAL_DELAY, MKTXP_MAX_DELAY, MKTXP_INC_DIV, MKTXP_BANDWIDTH_TEST_INTERVAL)
MKTXP_INT_KEYS = (PORT_KEY, MKTXP_SOCKET_TIMEOUT, MKTXP_INITIAL_DELAY, MKTXP_MAX_DELAY, MKTXP_INC_DIV, MKTXP_BANDWIDTH_TEST_INTERVAL, MKTXP_MIN_COLLECT_INTERVAL)
# MKTXP config entry nane
MKTXP_CONFIG_ENTRY_NAME = 'MKTXP'
@@ -110,12 +115,13 @@ class ConfigEntry:
MKTXPConfigKeys.FE_DHCP_KEY, MKTXPConfigKeys.FE_PACKAGE_KEY, MKTXPConfigKeys.FE_DHCP_LEASE_KEY, MKTXPConfigKeys.FE_DHCP_POOL_KEY, MKTXPConfigKeys.FE_INTERFACE_KEY,
MKTXPConfigKeys.FE_FIREWALL_KEY, MKTXPConfigKeys.FE_MONITOR_KEY, MKTXPConfigKeys.FE_ROUTE_KEY, MKTXPConfigKeys.FE_WIRELESS_KEY, MKTXPConfigKeys.FE_WIRELESS_CLIENTS_KEY,
MKTXPConfigKeys.FE_IP_CONNECTIONS_KEY, MKTXPConfigKeys.FE_CAPSMAN_KEY, MKTXPConfigKeys.FE_CAPSMAN_CLIENTS_KEY, MKTXPConfigKeys.FE_POE_KEY, MKTXPConfigKeys.FE_NETWATCH_KEY,
MKTXPConfigKeys.MKTXP_USE_COMMENTS_OVER_NAMES, MKTXPConfigKeys.FE_PUBLIC_IP_KEY, MKTXPConfigKeys.FE_IPV6_FIREWALL_KEY, MKTXPConfigKeys.FE_IPV6_NEIGHBOR_KEY
MKTXPConfigKeys.MKTXP_USE_COMMENTS_OVER_NAMES, MKTXPConfigKeys.FE_PUBLIC_IP_KEY, MKTXPConfigKeys.FE_IPV6_FIREWALL_KEY, MKTXPConfigKeys.FE_IPV6_NEIGHBOR_KEY,
MKTXPConfigKeys.FE_USER_KEY, MKTXPConfigKeys.FE_QUEUE_KEY
])
MKTXPSystemEntry = namedtuple('MKTXPSystemEntry', [MKTXPConfigKeys.PORT_KEY, MKTXPConfigKeys.MKTXP_SOCKET_TIMEOUT,
MKTXPConfigKeys.MKTXP_INITIAL_DELAY, MKTXPConfigKeys.MKTXP_MAX_DELAY,
MKTXPConfigKeys.MKTXP_INC_DIV, MKTXPConfigKeys.MKTXP_BANDWIDTH_KEY,
MKTXPConfigKeys.MKTXP_VERBOSE_MODE, MKTXPConfigKeys.MKTXP_BANDWIDTH_TEST_INTERVAL])
MKTXPConfigKeys.MKTXP_VERBOSE_MODE, MKTXPConfigKeys.MKTXP_BANDWIDTH_TEST_INTERVAL, MKTXPConfigKeys.MKTXP_MIN_COLLECT_INTERVAL])
class OSConfig(metaclass = ABCMeta):
@@ -273,7 +279,8 @@ class MKTXPConfigHandler:
MKTXPConfigKeys.MKTXP_INITIAL_DELAY: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_INITIAL_DELAY,
MKTXPConfigKeys.MKTXP_MAX_DELAY: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_MAX_DELAY,
MKTXPConfigKeys.MKTXP_INC_DIV: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_INC_DIV,
MKTXPConfigKeys.MKTXP_BANDWIDTH_TEST_INTERVAL: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_BANDWIDTH_TEST_INTERVAL
MKTXPConfigKeys.MKTXP_BANDWIDTH_TEST_INTERVAL: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_BANDWIDTH_TEST_INTERVAL,
MKTXPConfigKeys.MKTXP_MIN_COLLECT_INTERVAL: lambda value: MKTXPConfigKeys.DEFAULT_MKTXP_MIN_COLLECT_INTERVAL
}[key](value)
# Simplest possible Singleton impl

View File

@@ -44,4 +44,7 @@
capsman = True # CAPsMAN general metrics
capsman_clients = True # CAPsMAN clients metrics
user = True # Active Users metrics
queue = True # Queues metrics
use_comments_over_names = True # when available, forces using comments over the interfaces names

View File

@@ -33,7 +33,9 @@ class WirelessOutput:
dhcp_lease_records = DHCPMetricsDataSource.metric_records(router_entry, metric_labels = dhcp_lease_labels, add_router_id = False)
dhcp_rt_by_interface = {}
for registration_record in sorted(registration_records, key = lambda rt_record: rt_record['signal_strength'], reverse=True):
key = lambda rt_record: rt_record['signal_strength'] if rt_record.get('signal_strength') else rt_record['interface']
for registration_record in sorted(registration_records, key = key, reverse=True):
BaseOutputProcessor.augment_record(router_entry, registration_record, dhcp_lease_records)
interface = registration_record['interface']

View File

@@ -23,9 +23,14 @@ class PublicIPAddressCollector(BaseCollector):
if not router_entry.config_entry.public_ip:
return
address_labels = ['public_address', ]
address_labels = ['public_address', 'dns_name']
address_records = PublicIPAddressDatasource.metric_records(router_entry, metric_labels=address_labels)
if address_records:
for address_record in address_records:
if not 'dns_name' in address_record:
address_record['dns_name'] = 'ddns disabled'
address_metrics = BaseCollector.info_collector('public_ip_address', 'Public IP address', address_records, address_labels)
yield address_metrics

View File

@@ -0,0 +1,87 @@
# coding=utf8
## Copyright (c) 2020 Arseniy Kuznetsov
##
## This program is free software; you can redistribute it and/or
## modify it under the terms of the GNU General Public License
## as published by the Free Software Foundation; either version 2
## of the License, or (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
from mktxp.collector.base_collector import BaseCollector
from mktxp.datasource.queue_ds import QueueTreeMetricsDataSource
class QueueTreeCollector(BaseCollector):
'''Queue Tree collector'''
@staticmethod
def collect(router_entry):
if not router_entry.config_entry.installed_packages:
return
qt_labels = ['name', 'parent', 'packet_mark', 'limit_at', 'max_limit', 'priority', 'bytes', 'packets', 'queued_bytes', 'queued_packets','dropped', 'rate', 'packet_rate', 'disabled']
qt_records = QueueTreeMetricsDataSource.metric_records(router_entry, metric_labels=qt_labels)
if qt_records:
qt_rate_metric = BaseCollector.counter_collector('queue_tree_rates', 'Average passing data rate in bytes per second', qt_records, 'rate', ['name'])
yield qt_rate_metric
qt_packet_rate_metric = BaseCollector.counter_collector('queue_tree_packet_rates', 'Average passing data rate in packets per second', qt_records, 'packet_rate', ['name'])
yield qt_packet_rate_metric
qt_byte_metric = BaseCollector.counter_collector('queue_tree_bytes', 'Number of processed bytes', qt_records, 'bytes', ['name'])
yield qt_byte_metric
qt_packet_metric = BaseCollector.counter_collector('queue_tree_pakets', 'Number of processed packets', qt_records, 'packets', ['name'])
yield qt_packet_metric
qt_queued_metric = BaseCollector.counter_collector('queue_tree_queued_bytes', 'Number of queued bytes', qt_records, 'queued_bytes', ['name'])
yield qt_queued_metric
qt_queued_packets_metric = BaseCollector.counter_collector('queue_tree_queued_packets', 'Number of queued packets', qt_records, 'queued_packets', ['name'])
yield qt_queued_packets_metric
qt_drop_metric = BaseCollector.counter_collector('queue_tree_dropped', 'Number of dropped packets', qt_records, 'dropped', ['name'])
yield qt_drop_metric
class SimpleCollector(BaseCollector):
'''Simple Queue collector'''
@staticmethod
def collect(router_entry):
if not router_entry.config_entry.installed_packages:
return
qt_labels = ['name', 'parent', 'packet_mark', 'limit_at', 'max_limit', 'priority', 'bytes', 'packets', 'queued_bytes', 'queued_packets','dropped', 'rate', 'packet_rate', 'disabled']
qt_records = QueueTreeMetricsDataSource.metric_records(router_entry, metric_labels=qt_labels)
if qt_records:
qt_rate_metric = BaseCollector.counter_collector('queue_tree_rates', 'Average passing data rate in bytes per second', qt_records, 'rate', ['name'])
yield qt_rate_metric
qt_packet_rate_metric = BaseCollector.counter_collector('queue_tree_packet_rates', 'Average passing data rate in packets per second', qt_records, 'packet_rate', ['name'])
yield qt_packet_rate_metric
qt_byte_metric = BaseCollector.counter_collector('queue_tree_bytes', 'Number of processed bytes', qt_records, 'bytes', ['name'])
yield qt_byte_metric
qt_packet_metric = BaseCollector.counter_collector('queue_tree_pakets', 'Number of processed packets', qt_records, 'packets', ['name'])
yield qt_packet_metric
qt_queued_metric = BaseCollector.counter_collector('queue_tree_queued_bytes', 'Number of queued bytes', qt_records, 'queued_bytes', ['name'])
yield qt_queued_metric
qt_queued_packets_metric = BaseCollector.counter_collector('queue_tree_queued_packets', 'Number of queued packets', qt_records, 'queued_packets', ['name'])
yield qt_queued_packets_metric
qt_drop_metric = BaseCollector.counter_collector('queue_tree_dropped', 'Number of dropped packets', qt_records, 'dropped', ['name'])
yield qt_drop_metric

View File

@@ -0,0 +1,31 @@
# coding=utf8
## Copyright (c) 2020 Arseniy Kuznetsov
##
## This program is free software; you can redistribute it and/or
## modify it under the terms of the GNU General Public License
## as published by the Free Software Foundation; either version 2
## of the License, or (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
from mktxp.collector.base_collector import BaseCollector
from mktxp.datasource.user_ds import UserMetricsDataSource
class UserCollector(BaseCollector):
'''Active Users collector'''
@staticmethod
def collect(router_entry):
if not router_entry.config_entry.installed_packages:
return
user_labels = ['name', 'when', 'address', 'via', 'group']
user_records = UserMetricsDataSource.metric_records(router_entry, metric_labels=user_labels)
if user_records:
user_metrics = BaseCollector.info_collector('active_users', 'Active Users', user_records, user_labels)
yield user_metrics

View File

@@ -41,15 +41,14 @@ class POEMetricsDataSource:
poe_record['poe_out_power'] = poe_monitor_records[0]['poe_out_power']
if include_comments:
interfaces = router_entry.api_connection.router_api().get_resource('/interface/ethernet').get()
comment = lambda interface: interface['comment'] if interface.get('comment') else ''
interfaces = router_entry.api_connection.router_api().get_resource('/interface/ethernet').call('print', {'proplist':'name,comment'})
comment_fn = lambda interface: interface['comment'] if interface.get('comment') else ''
for poe_record in poe_records:
comment = [comment(interface) for interface in interfaces if interface['name'] == poe_record['name']][0]
comment = [comment_fn(interface) for interface in interfaces if interface['name'] == poe_record['name']][0]
if comment:
# combines name with comment
poe_record['name'] = comment if router_entry.config_entry.use_comments_over_names else \
f"{poe_record['name']} ({comment})"
return BaseDSProcessor.trimmed_records(router_entry, router_records = poe_records, metric_labels = metric_labels)
except Exception as exc:
print(f'Error getting PoE info from router{router_entry.router_name}@{router_entry.config_entry.hostname}: {exc}')

View File

@@ -0,0 +1,45 @@
# coding=utf8
## Copyright (c) 2020 Arseniy Kuznetsov
##
## This program is free software; you can redistribute it and/or
## modify it under the terms of the GNU General Public License
## as published by the Free Software Foundation; either version 2
## of the License, or (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
from mktxp.datasource.base_ds import BaseDSProcessor
class QueueTreeMetricsDataSource:
''' Queue Tree Metrics data provider
'''
@staticmethod
def metric_records(router_entry, *, metric_labels = None):
if metric_labels is None:
metric_labels = []
try:
queue_tree_records = router_entry.api_connection.router_api().get_resource('/queue/tree/').get()
return BaseDSProcessor.trimmed_records(router_entry, router_records = queue_tree_records, metric_labels = metric_labels)
except Exception as exc:
print(f'Error getting system resource info from router{router_entry.router_name}@{router_entry.config_entry.hostname}: {exc}')
return None
class SimpleQueueMetricsDataSource:
''' Simple Queue Metrics data provider
'''
@staticmethod
def metric_records(router_entry, *, metric_labels = None):
if metric_labels is None:
metric_labels = []
try:
simple_queue_records = router_entry.api_connection.router_api().get_resource('/queue/simple/').get()
return BaseDSProcessor.trimmed_records(router_entry, router_records = simple_queue_records, metric_labels = metric_labels)
except Exception as exc:
print(f'Error getting system resource info from router{router_entry.router_name}@{router_entry.config_entry.hostname}: {exc}')
return None

View File

@@ -0,0 +1,30 @@
# coding=utf8
## Copyright (c) 2020 Arseniy Kuznetsov
##
## This program is free software; you can redistribute it and/or
## modify it under the terms of the GNU General Public License
## as published by the Free Software Foundation; either version 2
## of the License, or (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
from mktxp.datasource.base_ds import BaseDSProcessor
class UserMetricsDataSource:
''' Active Users Metrics data provider
'''
@staticmethod
def metric_records(router_entry, *, metric_labels = None):
if metric_labels is None:
metric_labels = []
try:
active_users_records = router_entry.api_connection.router_api().get_resource('/user/active/').get()
return BaseDSProcessor.trimmed_records(router_entry, router_records = active_users_records, metric_labels = metric_labels)
except Exception as exc:
print(f'Error getting system resource info from router{router_entry.router_name}@{router_entry.config_entry.hostname}: {exc}')
return None

View File

@@ -1,5 +1,5 @@
# coding=utf8
## Copyright (c) 2020 Arseniy Kuznetsov
## Copyright (c) 2020 Arseniy Kuznenowov
##
## This program is free software; you can redistribute it and/or
## modify it under the terms of the GNU General Public License
@@ -13,7 +13,8 @@
from timeit import default_timer
from datetime import datetime
from mktxp.cli.config.config import config_handler
class CollectorHandler:
''' MKTXP Collectors Handler
@@ -21,8 +22,18 @@ class CollectorHandler:
def __init__(self, entries_handler, collector_registry):
self.entries_handler = entries_handler
self.collector_registry = collector_registry
self.last_collect_timestamp = 0
def collect(self):
now = datetime.now().timestamp()
diff = now - self.last_collect_timestamp
if diff < config_handler.system_entry().minimal_collect_interval:
if config_handler.system_entry().verbose_mode:
print(f'An attemp to collect metrics within minimal collection interval: {diff} < {config_handler.system_entry().minimal_collect_interval}')
print('deferring..')
return
self.last_collect_timestamp = now
yield from self.collector_registry.bandwidthCollector.collect()
for router_entry in self.entries_handler.router_entries:
@@ -36,3 +47,6 @@ class CollectorHandler:
yield from collect_func(router_entry)
router_entry.time_spent[collector_ID] += default_timer() - start

View File

@@ -32,6 +32,8 @@ from mktxp.collector.capsman_collector import CapsmanCollector
from mktxp.collector.bandwidth_collector import BandwidthCollector
from mktxp.collector.firewall_collector import FirewallCollector
from mktxp.collector.mktxp_collector import MKTXPCollector
from mktxp.collector.user_collector import UserCollector
from mktxp.collector.queue_collector import QueueTreeCollector
class CollectorRegistry:
@@ -65,6 +67,9 @@ class CollectorRegistry:
self.register('WLANCollector', WLANCollector.collect)
self.register('CapsmanCollector', CapsmanCollector.collect)
self.register('UserCollector', UserCollector.collect)
self.register('QueueTreeCollector', QueueTreeCollector.collect)
self.register('MKTXPCollector', MKTXPCollector.collect)
def register(self, collector_ID, collect_func):

View File

@@ -44,5 +44,7 @@ class RouterEntry:
'RouteCollector': 0,
'WLANCollector': 0,
'CapsmanCollector': 0,
'QueueTreeCollector': 0,
'UserCollector': 0,
'MKTXPCollector': 0
}

View File

@@ -20,7 +20,7 @@ with open(path.join(pkg_dir, 'README.md'), encoding='utf-8') as f:
setup(
name='mktxp',
version='0.40',
version='0.41',
url='https://github.com/akpw/mktxp',