simple queue, parallel router fetch as_completed , config autoupdate fixes

This commit is contained in:
Arseniy Kuznetsov
2022-12-30 19:11:07 +01:00
parent 27c71ee843
commit 660dd33369
6 changed files with 80 additions and 64 deletions

View File

@@ -247,13 +247,14 @@ class MKTXPConfigHandler:
def _config_entry_reader(self, entry_name):
config_entry_reader = {}
write_needed = False
new_keys = []
for key in MKTXPConfigKeys.BOOLEAN_KEYS_NO.union(MKTXPConfigKeys.BOOLEAN_KEYS_YES):
if self.config[entry_name].get(key):
config_entry_reader[key] = self.config[entry_name].as_bool(key)
else:
config_entry_reader[key] = True if key in MKTXPConfigKeys.BOOLEAN_KEYS_YES else False
write_needed = True # read from disk next time
new_keys.append(key) # read from disk next time
for key in MKTXPConfigKeys.STR_KEYS:
config_entry_reader[key] = self.config[entry_name][key]
@@ -267,25 +268,31 @@ class MKTXPConfigHandler:
else:
config_entry_reader[MKTXPConfigKeys.PORT_KEY] = self._default_value_for_key(
MKTXPConfigKeys.SSL_KEY, config_entry_reader[MKTXPConfigKeys.SSL_KEY])
write_needed = True # read from disk next time
if write_needed:
new_keys.append(MKTXPConfigKeys.PORT_KEY) # read from disk next time
if new_keys:
self.config[entry_name] = config_entry_reader
self.config.write()
try:
self.config.write()
if self._config[MKTXPConfigKeys.MKTXP_CONFIG_ENTRY_NAME].as_bool(MKTXPConfigKeys.MKTXP_VERBOSE_MODE):
print(f'Updated router entry {entry_name} with new feature keys {new_keys}')
except Exception as exc:
print(f'Error updating router entry {entry_name} with new feature keys {new_keys}: {exc}')
print('Please update mktxp.conf to its latest version manually')
return config_entry_reader
def _system_entry_reader(self):
system_entry_reader = {}
entry_name = MKTXPConfigKeys.MKTXP_CONFIG_ENTRY_NAME
write_needed = False
new_keys = []
for key in MKTXPConfigKeys.MKTXP_INT_KEYS:
if self._config[entry_name].get(key):
system_entry_reader[key] = self._config[entry_name].as_int(key)
else:
system_entry_reader[key] = self._default_value_for_key(key)
write_needed = True # read from disk next time
new_keys.append(key) # read from disk next time
for key in MKTXPConfigKeys.SYSTEM_BOOLEAN_KEYS_NO.union(MKTXPConfigKeys.SYSTEM_BOOLEAN_KEYS_YES):
if self._config[entry_name].get(key):
@@ -293,11 +300,17 @@ class MKTXPConfigHandler:
key)
else:
system_entry_reader[key] = True if key in MKTXPConfigKeys.SYSTEM_BOOLEAN_KEYS_YES else False
write_needed = True # read from disk next time
new_keys.append(key) # read from disk next time
if write_needed:
if new_keys:
self._config[entry_name] = system_entry_reader
self._config.write()
try:
self._config.write()
if self._config[entry_name].as_bool(MKTXPConfigKeys.MKTXP_VERBOSE_MODE):
print(f'Updated system entry {entry_name} with new system keys {new_keys}')
except Exception as exc:
print(f'Error updating system entry {entry_name} with new system keys {new_keys}: {exc}')
print('Please update _mktxp.conf to its latest version manually')
return system_entry_reader

View File

@@ -13,75 +13,71 @@
from mktxp.collector.base_collector import BaseCollector
from mktxp.datasource.queue_ds import QueueTreeMetricsDataSource
from mktxp.datasource.queue_ds import QueueMetricsDataSource
class QueueTreeCollector(BaseCollector):
'''Queue Tree collector'''
@staticmethod
def collect(router_entry):
if not router_entry.config_entry.installed_packages:
if not router_entry.config_entry.queue:
return
qt_labels = ['name', 'parent', 'packet_mark', 'limit_at', 'max_limit', 'priority', 'bytes', 'packets', 'queued_bytes', 'queued_packets','dropped', 'rate', 'packet_rate', 'disabled']
qt_records = QueueTreeMetricsDataSource.metric_records(router_entry, metric_labels=qt_labels)
qt_labels = ['name', 'parent', 'packet_mark', 'limit_at', 'max_limit', 'priority', 'bytes', 'queued_bytes', 'dropped', 'rate', 'disabled']
qt_records = QueueMetricsDataSource.metric_records(router_entry, metric_labels=qt_labels, kind = 'tree')
if qt_records:
qt_rate_metric = BaseCollector.counter_collector('queue_tree_rates', 'Average passing data rate in bytes per second', qt_records, 'rate', ['name'])
yield qt_rate_metric
qt_packet_rate_metric = BaseCollector.counter_collector('queue_tree_packet_rates', 'Average passing data rate in packets per second', qt_records, 'packet_rate', ['name'])
yield qt_packet_rate_metric
qt_byte_metric = BaseCollector.counter_collector('queue_tree_bytes', 'Number of processed bytes', qt_records, 'bytes', ['name'])
yield qt_byte_metric
qt_packet_metric = BaseCollector.counter_collector('queue_tree_pakets', 'Number of processed packets', qt_records, 'packets', ['name'])
yield qt_packet_metric
qt_queued_metric = BaseCollector.counter_collector('queue_tree_queued_bytes', 'Number of queued bytes', qt_records, 'queued_bytes', ['name'])
yield qt_queued_metric
qt_queued_packets_metric = BaseCollector.counter_collector('queue_tree_queued_packets', 'Number of queued packets', qt_records, 'queued_packets', ['name'])
yield qt_queued_packets_metric
qt_drop_metric = BaseCollector.counter_collector('queue_tree_dropped', 'Number of dropped packets', qt_records, 'dropped', ['name'])
qt_drop_metric = BaseCollector.counter_collector('queue_tree_dropped', 'Number of dropped bytes', qt_records, 'dropped', ['name'])
yield qt_drop_metric
class SimpleCollector(BaseCollector):
class QueueSimpleCollector(BaseCollector):
'''Simple Queue collector'''
@staticmethod
def collect(router_entry):
if not router_entry.config_entry.installed_packages:
if not router_entry.config_entry.queue:
return
qt_labels = ['name', 'parent', 'packet_mark', 'limit_at', 'max_limit', 'priority', 'bytes', 'packets', 'queued_bytes', 'queued_packets','dropped', 'rate', 'packet_rate', 'disabled']
qt_records = QueueTreeMetricsDataSource.metric_records(router_entry, metric_labels=qt_labels)
qt_records = QueueMetricsDataSource.metric_records(router_entry, metric_labels=qt_labels, kind = 'simple')
if qt_records:
qt_rate_metric = BaseCollector.counter_collector('queue_tree_rates', 'Average passing data rate in bytes per second', qt_records, 'rate', ['name'])
qt_rate_metric = BaseCollector.counter_collector('queue_simple_rates_upload', 'Average passing upload data rate in bytes per second', qt_records, 'rate_up', ['name'])
yield qt_rate_metric
qt_packet_rate_metric = BaseCollector.counter_collector('queue_tree_packet_rates', 'Average passing data rate in packets per second', qt_records, 'packet_rate', ['name'])
yield qt_packet_rate_metric
qt_rate_metric = BaseCollector.counter_collector('queue_simple_rates_download', 'Average passing download data rate in bytes per second', qt_records, 'rate_down', ['name'])
yield qt_rate_metric
qt_byte_metric = BaseCollector.counter_collector('queue_tree_bytes', 'Number of processed bytes', qt_records, 'bytes', ['name'])
qt_byte_metric = BaseCollector.counter_collector('queue_simple_bytes_upload', 'Number of upload processed bytes', qt_records, 'bytes_up', ['name'])
yield qt_byte_metric
qt_packet_metric = BaseCollector.counter_collector('queue_tree_pakets', 'Number of processed packets', qt_records, 'packets', ['name'])
yield qt_packet_metric
qt_byte_metric = BaseCollector.counter_collector('queue_simple_bytes_download', 'Number of download processed bytes', qt_records, 'bytes_down', ['name'])
yield qt_byte_metric
qt_queued_metric = BaseCollector.counter_collector('queue_tree_queued_bytes', 'Number of queued bytes', qt_records, 'queued_bytes', ['name'])
qt_queued_metric = BaseCollector.counter_collector('queue_simple_queued_bytes_upload', 'Number of upload queued bytes', qt_records, 'queued_bytes_up', ['name'])
yield qt_queued_metric
qt_queued_packets_metric = BaseCollector.counter_collector('queue_tree_queued_packets', 'Number of queued packets', qt_records, 'queued_packets', ['name'])
yield qt_queued_packets_metric
qt_queued_metric = BaseCollector.counter_collector('queue_simple_queued_bytes_downloadd', 'Number of download queued bytes', qt_records, 'queued_bytes_down', ['name'])
yield qt_queued_metric
qt_drop_metric = BaseCollector.counter_collector('queue_tree_dropped', 'Number of dropped packets', qt_records, 'dropped', ['name'])
qt_drop_metric = BaseCollector.counter_collector('queue_simple_dropped_upload', 'Number of upload dropped bytes', qt_records, 'dropped_up', ['name'])
yield qt_drop_metric
qt_drop_metric = BaseCollector.counter_collector('queue_simple_dropped_download', 'Number of download dropped bytes', qt_records, 'dropped_down', ['name'])
yield qt_drop_metric

View File

@@ -15,31 +15,37 @@
from mktxp.datasource.base_ds import BaseDSProcessor
class QueueTreeMetricsDataSource:
''' Queue Tree Metrics data provider
class QueueMetricsDataSource:
''' Queue Metrics data provider
'''
@staticmethod
def metric_records(router_entry, *, metric_labels = None):
def metric_records(router_entry, *, metric_labels = None, kind = 'tree'):
if metric_labels is None:
metric_labels = []
try:
queue_tree_records = router_entry.api_connection.router_api().get_resource('/queue/tree/').get()
return BaseDSProcessor.trimmed_records(router_entry, router_records = queue_tree_records, metric_labels = metric_labels)
queue_records = router_entry.api_connection.router_api().get_resource(f'/queue/{kind}/').get()
queue_records = BaseDSProcessor.trimmed_records(router_entry, router_records = queue_records, metric_labels = metric_labels)
except Exception as exc:
print(f'Error getting system resource info from router{router_entry.router_name}@{router_entry.config_entry.hostname}: {exc}')
return None
if kind == 'tree':
return queue_records
# simple queue records need splitting upload/download values
splitted_queue_records = []
for queue_record in queue_records:
splitted_queue_record = {}
for key, value in queue_record.items():
split_values = value.split('/')
if split_values and len(split_values) > 1:
splitted_queue_record[f'{key}_up'] = split_values[0]
splitted_queue_record[f'{key}_down'] = split_values[1]
else:
splitted_queue_record[key] = value
splitted_queue_records.append(splitted_queue_record)
return splitted_queue_records
class SimpleQueueMetricsDataSource:
''' Simple Queue Metrics data provider
'''
@staticmethod
def metric_records(router_entry, *, metric_labels = None):
if metric_labels is None:
metric_labels = []
try:
simple_queue_records = router_entry.api_connection.router_api().get_resource('/queue/simple/').get()
return BaseDSProcessor.trimmed_records(router_entry, router_records = simple_queue_records, metric_labels = metric_labels)
except Exception as exc:
print(f'Error getting system resource info from router{router_entry.router_name}@{router_entry.config_entry.hostname}: {exc}')
return None

View File

@@ -11,7 +11,7 @@
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor, as_completed
from timeit import default_timer
from datetime import datetime
from mktxp.cli.config.config import config_handler
@@ -68,13 +68,11 @@ class CollectorHandler:
continue
# Publish the collection function as a future
future = executor.submit(self.collect_single, router_entry)
futures.append(future)
futures.append(executor.submit(self.collect_single, router_entry))
for future in as_completed(futures):
yield from future.result()
# Join all futures and collect their results
for future in futures:
results = future.result()
yield from results
def collect(self):
now = datetime.now().timestamp()

View File

@@ -34,6 +34,7 @@ from mktxp.collector.firewall_collector import FirewallCollector
from mktxp.collector.mktxp_collector import MKTXPCollector
from mktxp.collector.user_collector import UserCollector
from mktxp.collector.queue_collector import QueueTreeCollector
from mktxp.collector.queue_collector import QueueSimpleCollector
class CollectorRegistry:
@@ -69,6 +70,7 @@ class CollectorRegistry:
self.register('UserCollector', UserCollector.collect)
self.register('QueueTreeCollector', QueueTreeCollector.collect)
self.register('QueueSimpleCollector', QueueSimpleCollector.collect)
self.register('MKTXPCollector', MKTXPCollector.collect)

View File

@@ -45,6 +45,7 @@ class RouterEntry:
'WLANCollector': 0,
'CapsmanCollector': 0,
'QueueTreeCollector': 0,
'QueueSimpleCollector': 0,
'UserCollector': 0,
'MKTXPCollector': 0
}